a digital person for bluesky

Add --reset-messages flag to clear agent buffer after each notification

Adds a new CLI option that resets the agent's message buffer after each
notification is processed, making each interaction stateless. This helps
prevent context window overflow and keeps notification responses independent.

Uses the Letta SDK's agents.messages.reset() endpoint with
add_default_initial_messages=True to restore the agent to a clean state
while preserving core memory blocks and archival/recall memory access.

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

+116 -3
+5
README.md
··· 199 200 # Use simplified log format (void - LEVEL - message) 201 python bsky.py --simple-logs 202 ``` 203 204 **Note**: The default config path is `configs/config.yaml`. 205 206 ### Running Multiple Bots (`run_bots.py`) 207
··· 199 200 # Use simplified log format (void - LEVEL - message) 201 python bsky.py --simple-logs 202 + 203 + # Reset agent message buffer after each notification (stateless mode) 204 + python bsky.py --reset-messages 205 ``` 206 207 **Note**: The default config path is `configs/config.yaml`. 208 + 209 + The `--reset-messages` flag resets the agent's conversation history after each notification is processed, making each interaction stateless. This can help prevent context window overflow and keeps each notification response independent. 210 211 ### Running Multiple Bots (`run_bots.py`) 212
+111 -3
bsky.py
··· 1 # Rich imports removed - using simple text formatting 2 from time import sleep 3 from letta_client import Letta 4 - from bsky_utils import thread_to_yaml_string, count_thread_posts 5 import os 6 import logging 7 import json ··· 12 from collections import defaultdict 13 import time 14 import argparse 15 16 from utils import ( 17 upsert_block, ··· 19 ) 20 from config_loader import get_letta_config, get_config, get_queue_config 21 22 import bsky_utils 23 from datetime import date 24 from notification_db import NotificationDB 25 26 def extract_handles_from_data(data): ··· 98 99 # Testing mode flag 100 TESTING_MODE = False 101 102 # Skip git operations flag 103 SKIP_GIT = False ··· 243 logger.info(f"[{correlation_id}] Skipping mention from @{author_handle} (not in allowed_handles)") 244 return True # Remove from queue 245 246 # Retrieve the entire thread associated with the mention 247 try: 248 thread = atproto_client.app.bsky.feed.get_post_thread({ ··· 411 logger.error(f"Error attaching user blocks: {e}") 412 413 try: 414 # Use streaming to avoid 524 timeout errors 415 message_stream = CLIENT.agents.messages.stream( 416 agent_id=void_agent.id, 417 - messages=[{"role": "user", "content": prompt}], 418 stream_tokens=False, # Step streaming only (faster than token streaming) 419 max_steps=100 420 ) ··· 1325 processed_uris = load_processed_notifications() 1326 processed_uris.add(notif_data['uri']) 1327 save_processed_notifications(processed_uris) 1328 1329 elif success is None: # Special case for moving to error directory 1330 error_path = QUEUE_ERROR_DIR / filepath.name ··· 2024 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 2025 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 2026 parser.add_argument('--debug', action='store_true', help='Enable debug logging') 2027 args = parser.parse_args() 2028 2029 # Initialize configuration with custom path ··· 2122 # Create Rich console for pretty printing 2123 # Console no longer used - simple text formatting 2124 2125 - global TESTING_MODE, SKIP_GIT, SHOW_REASONING 2126 TESTING_MODE = args.test 2127 2128 # Store no-git flag globally for use in export_agent_state calls ··· 2133 2134 # Store reasoning flag globally 2135 SHOW_REASONING = args.reasoning 2136 2137 if TESTING_MODE: 2138 logger.info("=== RUNNING IN TESTING MODE ===")
··· 1 # Rich imports removed - using simple text formatting 2 from time import sleep 3 from letta_client import Letta 4 + from bsky_utils import thread_to_yaml_string, count_thread_posts, extract_images_from_thread 5 import os 6 import logging 7 import json ··· 12 from collections import defaultdict 13 import time 14 import argparse 15 + import random 16 17 from utils import ( 18 upsert_block, ··· 20 ) 21 from config_loader import get_letta_config, get_config, get_queue_config 22 23 + # Vision support (optional - requires pillow) 24 + VISION_ENABLED = False 25 + try: 26 + from integrate_vision import create_message_with_vision 27 + VISION_ENABLED = True 28 + except ImportError as e: 29 + pass # Vision not available - pillow not installed 30 + 31 import bsky_utils 32 from datetime import date 33 + 34 + # Downrank configuration 35 + BSKY_DOWNRANK_FILE = Path("bsky_downrank_handles.txt") 36 + DEFAULT_DOWNRANK_RATE = 0.1 # 10% response rate for downranked users 37 + 38 + 39 + def load_downrank_handles() -> dict: 40 + """Load handles that should be downranked (responded to less frequently). 41 + 42 + File format (one per line): 43 + handle.bsky.social # Uses default rate (10%) 44 + handle.bsky.social:0.05 # Custom rate (5%) 45 + # Comments start with # 46 + 47 + Returns: 48 + Dict mapping handle -> response rate (0.0 to 1.0) 49 + """ 50 + try: 51 + if not BSKY_DOWNRANK_FILE.exists(): 52 + return {} 53 + 54 + downrank_handles = {} 55 + with open(BSKY_DOWNRANK_FILE, 'r') as f: 56 + for line in f: 57 + line = line.strip() 58 + if not line or line.startswith('#'): 59 + continue 60 + 61 + # Check for custom rate 62 + if ':' in line: 63 + handle, rate_str = line.split(':', 1) 64 + try: 65 + rate = float(rate_str) 66 + except ValueError: 67 + rate = DEFAULT_DOWNRANK_RATE 68 + else: 69 + handle = line 70 + rate = DEFAULT_DOWNRANK_RATE 71 + 72 + downrank_handles[handle.lower()] = rate 73 + 74 + if downrank_handles: 75 + logger.info(f"Loaded {len(downrank_handles)} downrank handles") 76 + return downrank_handles 77 + except Exception as e: 78 + logger.error(f"Error loading downrank handles: {e}") 79 + return {} 80 + 81 + 82 + def should_respond_to_handle(handle: str, downrank_handles: dict) -> bool: 83 + """Check if we should respond to this handle. 84 + 85 + Returns True 100% of the time for non-downranked users. 86 + Returns True at the configured rate for downranked users. 87 + """ 88 + handle_lower = handle.lower() 89 + if handle_lower not in downrank_handles: 90 + return True 91 + 92 + rate = downrank_handles[handle_lower] 93 + should_respond = random.random() < rate 94 + logger.info(f"Downranked handle @{handle}: {'responding' if should_respond else 'skipping'} ({rate*100:.0f}% chance)") 95 + return should_respond 96 from notification_db import NotificationDB 97 98 def extract_handles_from_data(data): ··· 170 171 # Testing mode flag 172 TESTING_MODE = False 173 + RESET_MESSAGES_AFTER_NOTIFICATION = False 174 175 # Skip git operations flag 176 SKIP_GIT = False ··· 316 logger.info(f"[{correlation_id}] Skipping mention from @{author_handle} (not in allowed_handles)") 317 return True # Remove from queue 318 319 + # Check if handle is downranked (reduced response rate) 320 + downrank_handles = load_downrank_handles() 321 + if not should_respond_to_handle(author_handle, downrank_handles): 322 + logger.info(f"[{correlation_id}] Skipping mention from @{author_handle} (downranked, not selected)") 323 + return True # Remove from queue 324 + 325 # Retrieve the entire thread associated with the mention 326 try: 327 thread = atproto_client.app.bsky.feed.get_post_thread({ ··· 490 logger.error(f"Error attaching user blocks: {e}") 491 492 try: 493 + # Extract images from thread for vision support 494 + thread_images = extract_images_from_thread(thread, max_images=4) 495 + 496 + # Build message (with or without images) 497 + if VISION_ENABLED and thread_images: 498 + logger.info(f"Thread contains {len(thread_images)} images, downloading for vision...") 499 + message = create_message_with_vision(prompt, thread_images, max_images=4) 500 + if isinstance(message.get('content'), list): 501 + logger.info(f"Vision message created with {len([c for c in message['content'] if c.get('type') == 'image'])} images") 502 + else: 503 + if thread_images and not VISION_ENABLED: 504 + logger.debug(f"Thread has {len(thread_images)} images but vision not enabled (install pillow)") 505 + message = {"role": "user", "content": prompt} 506 + 507 # Use streaming to avoid 524 timeout errors 508 message_stream = CLIENT.agents.messages.stream( 509 agent_id=void_agent.id, 510 + messages=[message], 511 stream_tokens=False, # Step streaming only (faster than token streaming) 512 max_steps=100 513 ) ··· 1418 processed_uris = load_processed_notifications() 1419 processed_uris.add(notif_data['uri']) 1420 save_processed_notifications(processed_uris) 1421 + 1422 + # Reset agent message buffer if enabled 1423 + if RESET_MESSAGES_AFTER_NOTIFICATION: 1424 + try: 1425 + CLIENT.agents.messages.reset( 1426 + agent_id=void_agent.id, 1427 + add_default_initial_messages=True 1428 + ) 1429 + logger.info(f"Reset agent message buffer after processing notification") 1430 + except Exception as e: 1431 + logger.warning(f"Failed to reset agent messages: {e}") 1432 1433 elif success is None: # Special case for moving to error directory 1434 error_path = QUEUE_ERROR_DIR / filepath.name ··· 2128 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 2129 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 2130 parser.add_argument('--debug', action='store_true', help='Enable debug logging') 2131 + parser.add_argument('--reset-messages', action='store_true', help='Reset agent message buffer after each notification is processed') 2132 args = parser.parse_args() 2133 2134 # Initialize configuration with custom path ··· 2227 # Create Rich console for pretty printing 2228 # Console no longer used - simple text formatting 2229 2230 + global TESTING_MODE, SKIP_GIT, SHOW_REASONING, RESET_MESSAGES_AFTER_NOTIFICATION 2231 TESTING_MODE = args.test 2232 2233 # Store no-git flag globally for use in export_agent_state calls ··· 2238 2239 # Store reasoning flag globally 2240 SHOW_REASONING = args.reasoning 2241 + 2242 + # Store reset-messages flag globally 2243 + RESET_MESSAGES_AFTER_NOTIFICATION = args.reset_messages 2244 2245 if TESTING_MODE: 2246 logger.info("=== RUNNING IN TESTING MODE ===")