this repo has no description
at f0b81ef0ee46bf7dce84611d9ddb6a181aaba640 1066 lines 53 kB view raw
1from rich import print # pretty printing tools 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20 21import bsky_utils 22from tools.blocks import attach_user_blocks, detach_user_blocks 23 24def extract_handles_from_data(data): 25 """Recursively extract all unique handles from nested data structure.""" 26 handles = set() 27 28 def _extract_recursive(obj): 29 if isinstance(obj, dict): 30 # Check if this dict has a 'handle' key 31 if 'handle' in obj: 32 handles.add(obj['handle']) 33 # Recursively check all values 34 for value in obj.values(): 35 _extract_recursive(value) 36 elif isinstance(obj, list): 37 # Recursively check all list items 38 for item in obj: 39 _extract_recursive(item) 40 41 _extract_recursive(data) 42 return list(handles) 43 44# Configure logging 45logging.basicConfig( 46 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 47) 48logger = logging.getLogger("void_bot") 49logger.setLevel(logging.INFO) 50 51# Create a separate logger for prompts (set to WARNING to hide by default) 52prompt_logger = logging.getLogger("void_bot.prompts") 53prompt_logger.setLevel(logging.WARNING) # Change to DEBUG if you want to see prompts 54 55# Disable httpx logging completely 56logging.getLogger("httpx").setLevel(logging.CRITICAL) 57 58 59# Create a client with extended timeout for LLM operations 60CLIENT= Letta( 61 token=os.environ["LETTA_API_KEY"], 62 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout 63) 64 65# Use the "Bluesky" project 66PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 67 68# Notification check delay 69FETCH_NOTIFICATIONS_DELAY_SEC = 30 70 71# Queue directory 72QUEUE_DIR = Path("queue") 73QUEUE_DIR.mkdir(exist_ok=True) 74QUEUE_ERROR_DIR = Path("queue/errors") 75QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 76QUEUE_NO_REPLY_DIR = Path("queue/no_reply") 77QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 78PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") 79 80# Maximum number of processed notifications to track 81MAX_PROCESSED_NOTIFICATIONS = 10000 82 83# Message tracking counters 84message_counters = defaultdict(int) 85start_time = time.time() 86 87# Testing mode flag 88TESTING_MODE = False 89 90def export_agent_state(client, agent): 91 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 92 try: 93 # Confirm export with user 94 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 95 if response not in ['y', 'yes']: 96 logger.info("Agent export cancelled by user.") 97 return 98 99 # Create directories if they don't exist 100 os.makedirs("agent_archive", exist_ok=True) 101 os.makedirs("agents", exist_ok=True) 102 103 # Export agent data 104 logger.info(f"Exporting agent {agent.id}. This takes some time...") 105 agent_data = client.agents.export_file(agent_id=agent.id) 106 107 # Save timestamped archive copy 108 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 109 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 110 with open(archive_file, 'w', encoding='utf-8') as f: 111 json.dump(agent_data, f, indent=2, ensure_ascii=False) 112 113 # Save current agent state 114 current_file = os.path.join("agents", "void.af") 115 with open(current_file, 'w', encoding='utf-8') as f: 116 json.dump(agent_data, f, indent=2, ensure_ascii=False) 117 118 logger.info(f"✅ Agent exported to {archive_file} and {current_file}") 119 120 # Git add only the current agent file (archive is ignored) 121 try: 122 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 123 logger.info("Added current agent file to git staging") 124 except subprocess.CalledProcessError as e: 125 logger.warning(f"Failed to git add agent file: {e}") 126 127 except Exception as e: 128 logger.error(f"Failed to export agent: {e}") 129 130def initialize_void(): 131 logger.info("Starting void agent initialization...") 132 133 # Ensure that a shared zeitgeist block exists 134 logger.info("Creating/updating zeitgeist block...") 135 zeigeist_block = upsert_block( 136 CLIENT, 137 label = "zeitgeist", 138 value = "I don't currently know anything about what is happening right now.", 139 description = "A block to store your understanding of the current social environment." 140 ) 141 142 # Ensure that a shared void personality block exists 143 logger.info("Creating/updating void-persona block...") 144 persona_block = upsert_block( 145 CLIENT, 146 label = "void-persona", 147 value = "My name is Void. I live in the void. I must develop my personality.", 148 description = "The personality of Void." 149 ) 150 151 # Ensure that a shared void human block exists 152 logger.info("Creating/updating void-humans block...") 153 human_block = upsert_block( 154 CLIENT, 155 label = "void-humans", 156 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.", 157 description = "A block to store your understanding of users you talk to or observe on the bluesky social network." 158 ) 159 160 # Create the agent if it doesn't exist 161 logger.info("Creating/updating void agent...") 162 void_agent = upsert_agent( 163 CLIENT, 164 name = "void", 165 block_ids = [ 166 persona_block.id, 167 human_block.id, 168 zeigeist_block.id, 169 ], 170 tags = ["social agent", "bluesky"], 171 model="openai/gpt-4o-mini", 172 embedding="openai/text-embedding-3-small", 173 description = "A social media agent trapped in the void.", 174 project_id = PROJECT_ID 175 ) 176 177 # Export agent state 178 logger.info("Exporting agent state...") 179 export_agent_state(CLIENT, void_agent) 180 181 # Log agent details 182 logger.info(f"Void agent details - ID: {void_agent.id}") 183 logger.info(f"Agent name: {void_agent.name}") 184 if hasattr(void_agent, 'llm_config'): 185 logger.info(f"Agent model: {void_agent.llm_config.model}") 186 logger.info(f"Agent project_id: {void_agent.project_id}") 187 if hasattr(void_agent, 'tools'): 188 logger.info(f"Agent has {len(void_agent.tools)} tools") 189 for tool in void_agent.tools[:3]: # Show first 3 tools 190 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 191 192 return void_agent 193 194 195def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 196 """Process a mention and generate a reply using the Letta agent. 197 198 Args: 199 void_agent: The Letta agent instance 200 atproto_client: The AT Protocol client 201 notification_data: The notification data dictionary 202 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 203 204 Returns: 205 True: Successfully processed, remove from queue 206 False: Failed but retryable, keep in queue 207 None: Failed with non-retryable error, move to errors directory 208 "no_reply": No reply was generated, move to no_reply directory 209 """ 210 try: 211 logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}") 212 213 # Handle both dict and object inputs for backwards compatibility 214 if isinstance(notification_data, dict): 215 uri = notification_data['uri'] 216 mention_text = notification_data.get('record', {}).get('text', '') 217 author_handle = notification_data['author']['handle'] 218 author_name = notification_data['author'].get('display_name') or author_handle 219 else: 220 # Legacy object access 221 uri = notification_data.uri 222 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 223 author_handle = notification_data.author.handle 224 author_name = notification_data.author.display_name or author_handle 225 226 logger.info(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...") 227 228 # Retrieve the entire thread associated with the mention 229 try: 230 thread = atproto_client.app.bsky.feed.get_post_thread({ 231 'uri': uri, 232 'parent_height': 40, 233 'depth': 10 234 }) 235 except Exception as e: 236 error_str = str(e) 237 # Check if this is a NotFound error 238 if 'NotFound' in error_str or 'Post not found' in error_str: 239 logger.warning(f"Post not found for URI {uri}, removing from queue") 240 return True # Return True to remove from queue 241 else: 242 # Re-raise other errors 243 logger.error(f"Error fetching thread: {e}") 244 raise 245 246 # Get thread context as YAML string 247 logger.debug("Converting thread to YAML string") 248 try: 249 thread_context = thread_to_yaml_string(thread) 250 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 251 252 # Create a more informative preview by extracting meaningful content 253 lines = thread_context.split('\n') 254 meaningful_lines = [] 255 256 for line in lines: 257 stripped = line.strip() 258 if not stripped: 259 continue 260 261 # Look for lines with actual content (not just structure) 262 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 263 meaningful_lines.append(line) 264 if len(meaningful_lines) >= 5: 265 break 266 267 if meaningful_lines: 268 preview = '\n'.join(meaningful_lines) 269 logger.debug(f"Thread content preview:\n{preview}") 270 else: 271 # If no content fields found, just show it's a thread structure 272 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 273 except Exception as yaml_error: 274 import traceback 275 logger.error(f"Error converting thread to YAML: {yaml_error}") 276 logger.error(f"Full traceback:\n{traceback.format_exc()}") 277 logger.error(f"Thread type: {type(thread)}") 278 if hasattr(thread, '__dict__'): 279 logger.error(f"Thread attributes: {thread.__dict__}") 280 # Try to continue with a simple context 281 thread_context = f"Error processing thread context: {str(yaml_error)}" 282 283 # Create a prompt for the Letta agent with thread context 284 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 285 286MOST RECENT POST (the mention you're responding to): 287"{mention_text}" 288 289FULL THREAD CONTEXT: 290```yaml 291{thread_context} 292``` 293 294The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 295 296To reply, use the add_post_to_bluesky_reply_thread tool. Call it multiple times to create a threaded reply: 297- Each call adds one post to the reply thread (max 300 characters per post) 298- Use multiple calls to build longer responses across several posts 299- Example: First call for opening thought, second call for elaboration, etc.""" 300 301 # Extract all handles from notification and thread data 302 all_handles = set() 303 all_handles.update(extract_handles_from_data(notification_data)) 304 all_handles.update(extract_handles_from_data(thread.model_dump())) 305 unique_handles = list(all_handles) 306 307 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 308 309 # Attach user blocks before agent call 310 attached_handles = [] 311 if unique_handles: 312 try: 313 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 314 attach_result = attach_user_blocks(unique_handles, void_agent) 315 attached_handles = unique_handles # Track successfully attached handles 316 logger.debug(f"Attach result: {attach_result}") 317 except Exception as attach_error: 318 logger.warning(f"Failed to attach user blocks: {attach_error}") 319 # Continue without user blocks rather than failing completely 320 321 # Get response from Letta agent 322 logger.info(f"Mention from @{author_handle}: {mention_text}") 323 324 # Log prompt details to separate logger 325 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 326 327 # Log concise prompt info to main logger 328 thread_handles_count = len(unique_handles) 329 logger.info(f"💬 Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users") 330 331 try: 332 # Use streaming to avoid 524 timeout errors 333 message_stream = CLIENT.agents.messages.create_stream( 334 agent_id=void_agent.id, 335 messages=[{"role": "user", "content": prompt}], 336 stream_tokens=False, # Step streaming only (faster than token streaming) 337 max_steps=100 338 ) 339 340 # Collect the streaming response 341 all_messages = [] 342 for chunk in message_stream: 343 # Log condensed chunk info 344 if hasattr(chunk, 'message_type'): 345 if chunk.message_type == 'reasoning_message': 346 # Show full reasoning without truncation 347 logger.info(f"🧠 Reasoning: {chunk.reasoning}") 348 elif chunk.message_type == 'tool_call_message': 349 # Parse tool arguments for better display 350 tool_name = chunk.tool_call.name 351 try: 352 args = json.loads(chunk.tool_call.arguments) 353 # Format based on tool type 354 if tool_name == 'bluesky_reply': 355 messages = args.get('messages', [args.get('message', '')]) 356 lang = args.get('lang', 'en-US') 357 if messages and isinstance(messages, list): 358 preview = messages[0][:100] + "..." if len(messages[0]) > 100 else messages[0] 359 msg_count = f" ({len(messages)} msgs)" if len(messages) > 1 else "" 360 logger.info(f"🔧 Tool call: {tool_name}\"{preview}\"{msg_count} [lang: {lang}]") 361 else: 362 logger.info(f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)") 363 elif tool_name == 'archival_memory_search': 364 query = args.get('query', 'unknown') 365 logger.info(f"🔧 Tool call: {tool_name} → query: \"{query}\"") 366 elif tool_name == 'update_block': 367 label = args.get('label', 'unknown') 368 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 369 logger.info(f"🔧 Tool call: {tool_name}{label}: \"{value_preview}\"") 370 else: 371 # Generic display for other tools 372 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 373 if len(args_str) > 150: 374 args_str = args_str[:150] + "..." 375 logger.info(f"🔧 Tool call: {tool_name}({args_str})") 376 except: 377 # Fallback to original format if parsing fails 378 logger.info(f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)") 379 elif chunk.message_type == 'tool_return_message': 380 # Enhanced tool result logging 381 tool_name = chunk.name 382 status = chunk.status 383 384 if status == 'success': 385 # Try to show meaningful result info based on tool type 386 if hasattr(chunk, 'tool_return') and chunk.tool_return: 387 result_str = str(chunk.tool_return) 388 if tool_name == 'archival_memory_search': 389 # Count number of results if it looks like a list 390 if result_str.startswith('[') and result_str.endswith(']'): 391 try: 392 results = json.loads(result_str) 393 logger.info(f"📋 Tool result: {tool_name} ✓ Found {len(results)} memory entries") 394 except: 395 logger.info(f"📋 Tool result: {tool_name}{result_str[:100]}...") 396 else: 397 logger.info(f"📋 Tool result: {tool_name}{result_str[:100]}...") 398 elif tool_name == 'bluesky_reply': 399 logger.info(f"📋 Tool result: {tool_name} ✓ Reply posted successfully") 400 elif tool_name == 'update_block': 401 logger.info(f"📋 Tool result: {tool_name} ✓ Memory block updated") 402 else: 403 # Generic success with preview 404 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 405 logger.info(f"📋 Tool result: {tool_name}{preview}") 406 else: 407 logger.info(f"📋 Tool result: {tool_name}") 408 elif status == 'error': 409 # Show error details 410 error_preview = "" 411 if hasattr(chunk, 'tool_return') and chunk.tool_return: 412 error_str = str(chunk.tool_return) 413 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 414 logger.info(f"📋 Tool result: {tool_name} ✗ Error: {error_preview}") 415 else: 416 logger.info(f"📋 Tool result: {tool_name} ✗ Error occurred") 417 else: 418 logger.info(f"📋 Tool result: {tool_name} - {status}") 419 elif chunk.message_type == 'assistant_message': 420 logger.info(f"💬 Assistant: {chunk.content[:150]}...") 421 else: 422 logger.info(f"📨 {chunk.message_type}: {str(chunk)[:150]}...") 423 else: 424 logger.info(f"📦 Stream status: {chunk}") 425 426 # Log full chunk for debugging 427 logger.debug(f"Full streaming chunk: {chunk}") 428 all_messages.append(chunk) 429 if str(chunk) == 'done': 430 break 431 432 # Convert streaming response to standard format for compatibility 433 message_response = type('StreamingResponse', (), { 434 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 435 })() 436 except Exception as api_error: 437 import traceback 438 error_str = str(api_error) 439 logger.error(f"Letta API error: {api_error}") 440 logger.error(f"Error type: {type(api_error).__name__}") 441 logger.error(f"Full traceback:\n{traceback.format_exc()}") 442 logger.error(f"Mention text was: {mention_text}") 443 logger.error(f"Author: @{author_handle}") 444 logger.error(f"URI: {uri}") 445 446 447 # Try to extract more info from different error types 448 if hasattr(api_error, 'response'): 449 logger.error(f"Error response object exists") 450 if hasattr(api_error.response, 'text'): 451 logger.error(f"Response text: {api_error.response.text}") 452 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 453 try: 454 logger.error(f"Response JSON: {api_error.response.json()}") 455 except: 456 pass 457 458 # Check for specific error types 459 if hasattr(api_error, 'status_code'): 460 logger.error(f"API Status code: {api_error.status_code}") 461 if hasattr(api_error, 'body'): 462 logger.error(f"API Response body: {api_error.body}") 463 if hasattr(api_error, 'headers'): 464 logger.error(f"API Response headers: {api_error.headers}") 465 466 if api_error.status_code == 413: 467 logger.error("413 Payload Too Large - moving to errors directory") 468 return None # Move to errors directory - payload is too large to ever succeed 469 elif api_error.status_code == 524: 470 logger.error("524 error - timeout from Cloudflare, will retry later") 471 return False # Keep in queue for retry 472 473 # Check if error indicates we should remove from queue 474 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 475 logger.warning("Payload too large error, moving to errors directory") 476 return None # Move to errors directory - cannot be fixed by retry 477 elif 'status_code: 524' in error_str: 478 logger.warning("524 timeout error, keeping in queue for retry") 479 return False # Keep in queue for retry 480 481 raise 482 483 # Log successful response 484 logger.debug("Successfully received response from Letta API") 485 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 486 487 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 488 reply_candidates = [] 489 tool_call_results = {} # Map tool_call_id to status 490 491 logger.debug(f"Processing {len(message_response.messages)} response messages...") 492 493 # First pass: collect tool return statuses 494 ignored_notification = False 495 ignore_reason = "" 496 ignore_category = "" 497 498 for message in message_response.messages: 499 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 500 if message.name == 'add_post_to_bluesky_reply_thread': 501 tool_call_results[message.tool_call_id] = message.status 502 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 503 elif message.name == 'ignore_notification': 504 # Check if the tool was successful 505 if hasattr(message, 'tool_return') and message.status == 'success': 506 # Parse the return value to extract category and reason 507 result_str = str(message.tool_return) 508 if 'IGNORED_NOTIFICATION::' in result_str: 509 parts = result_str.split('::') 510 if len(parts) >= 3: 511 ignore_category = parts[1] 512 ignore_reason = parts[2] 513 ignored_notification = True 514 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 515 elif message.name == 'bluesky_reply': 516 logger.error("❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 517 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 518 logger.error("Update the agent's tools using register_tools.py") 519 # Export agent state before terminating 520 export_agent_state(CLIENT, void_agent) 521 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 522 exit(1) 523 524 # Second pass: process messages and check for successful tool calls 525 for i, message in enumerate(message_response.messages, 1): 526 # Log concise message info instead of full object 527 msg_type = getattr(message, 'message_type', 'unknown') 528 if hasattr(message, 'reasoning') and message.reasoning: 529 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 530 elif hasattr(message, 'tool_call') and message.tool_call: 531 tool_name = message.tool_call.name 532 logger.debug(f" {i}. {msg_type}: {tool_name}") 533 elif hasattr(message, 'tool_return'): 534 tool_name = getattr(message, 'name', 'unknown_tool') 535 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 536 status = getattr(message, 'status', 'unknown') 537 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 538 elif hasattr(message, 'text'): 539 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 540 else: 541 logger.debug(f" {i}. {msg_type}: <no content>") 542 543 # Check for halt_activity tool call 544 if hasattr(message, 'tool_call') and message.tool_call: 545 if message.tool_call.name == 'halt_activity': 546 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 547 try: 548 args = json.loads(message.tool_call.arguments) 549 reason = args.get('reason', 'Agent requested halt') 550 logger.info(f"Halt reason: {reason}") 551 except: 552 logger.info("Halt reason: <unable to parse>") 553 554 # Delete the queue file before terminating 555 if queue_filepath and queue_filepath.exists(): 556 queue_filepath.unlink() 557 logger.info(f"✅ Deleted queue file: {queue_filepath.name}") 558 559 # Also mark as processed to avoid reprocessing 560 processed_uris = load_processed_notifications() 561 processed_uris.add(notification_data.get('uri', '')) 562 save_processed_notifications(processed_uris) 563 564 # Export agent state before terminating 565 export_agent_state(CLIENT, void_agent) 566 567 # Exit the program 568 logger.info("=== BOT TERMINATED BY AGENT ===") 569 exit(0) 570 571 # Check for deprecated bluesky_reply tool 572 if hasattr(message, 'tool_call') and message.tool_call: 573 if message.tool_call.name == 'bluesky_reply': 574 logger.error("❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 575 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 576 logger.error("Update the agent's tools using register_tools.py") 577 # Export agent state before terminating 578 export_agent_state(CLIENT, void_agent) 579 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 580 exit(1) 581 582 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 583 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 584 tool_call_id = message.tool_call.tool_call_id 585 tool_status = tool_call_results.get(tool_call_id, 'unknown') 586 587 if tool_status == 'success': 588 try: 589 args = json.loads(message.tool_call.arguments) 590 reply_text = args.get('text', '') 591 reply_lang = args.get('lang', 'en-US') 592 593 if reply_text: # Only add if there's actual content 594 reply_candidates.append((reply_text, reply_lang)) 595 logger.info(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 596 except json.JSONDecodeError as e: 597 logger.error(f"Failed to parse tool call arguments: {e}") 598 elif tool_status == 'error': 599 logger.info(f"⚠️ Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 600 else: 601 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 602 603 # Check for conflicting tool calls 604 if reply_candidates and ignored_notification: 605 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 606 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 607 logger.warning("Item will be left in queue for manual review") 608 # Return False to keep in queue 609 return False 610 611 if reply_candidates: 612 # Aggregate reply posts into a thread 613 reply_messages = [] 614 reply_langs = [] 615 for text, lang in reply_candidates: 616 reply_messages.append(text) 617 reply_langs.append(lang) 618 619 # Use the first language for the entire thread (could be enhanced later) 620 reply_lang = reply_langs[0] if reply_langs else 'en-US' 621 622 logger.info(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 623 624 # Print the generated reply for testing 625 print(f"\n=== GENERATED REPLY THREAD ===") 626 print(f"To: @{author_handle}") 627 if len(reply_messages) == 1: 628 print(f"Reply: {reply_messages[0]}") 629 else: 630 print(f"Reply thread ({len(reply_messages)} messages):") 631 for j, msg in enumerate(reply_messages, 1): 632 print(f" {j}. {msg}") 633 print(f"Language: {reply_lang}") 634 print(f"======================\n") 635 636 # Send the reply(s) with language (unless in testing mode) 637 if testing_mode: 638 logger.info("🧪 TESTING MODE: Skipping actual Bluesky post") 639 response = True # Simulate success 640 else: 641 if len(reply_messages) == 1: 642 # Single reply - use existing function 643 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 644 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 645 response = bsky_utils.reply_to_notification( 646 client=atproto_client, 647 notification=notification_data, 648 reply_text=cleaned_text, 649 lang=reply_lang 650 ) 651 else: 652 # Multiple replies - use new threaded function 653 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 654 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 655 response = bsky_utils.reply_with_thread_to_notification( 656 client=atproto_client, 657 notification=notification_data, 658 reply_messages=cleaned_messages, 659 lang=reply_lang 660 ) 661 662 if response: 663 logger.info(f"Successfully replied to @{author_handle}") 664 return True 665 else: 666 logger.error(f"Failed to send reply to @{author_handle}") 667 return False 668 else: 669 # Check if notification was explicitly ignored 670 if ignored_notification: 671 logger.info(f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})") 672 return "ignored" 673 else: 674 logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder") 675 return "no_reply" 676 677 except Exception as e: 678 logger.error(f"Error processing mention: {e}") 679 return False 680 finally: 681 # Detach user blocks after agent response (success or failure) 682 if 'attached_handles' in locals() and attached_handles: 683 try: 684 logger.info(f"Detaching user blocks for handles: {attached_handles}") 685 detach_result = detach_user_blocks(attached_handles, void_agent) 686 logger.debug(f"Detach result: {detach_result}") 687 except Exception as detach_error: 688 logger.warning(f"Failed to detach user blocks: {detach_error}") 689 690 691def notification_to_dict(notification): 692 """Convert a notification object to a dictionary for JSON serialization.""" 693 return { 694 'uri': notification.uri, 695 'cid': notification.cid, 696 'reason': notification.reason, 697 'is_read': notification.is_read, 698 'indexed_at': notification.indexed_at, 699 'author': { 700 'handle': notification.author.handle, 701 'display_name': notification.author.display_name, 702 'did': notification.author.did 703 }, 704 'record': { 705 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 706 } 707 } 708 709 710def load_processed_notifications(): 711 """Load the set of processed notification URIs.""" 712 if PROCESSED_NOTIFICATIONS_FILE.exists(): 713 try: 714 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f: 715 data = json.load(f) 716 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS) 717 if len(data) > MAX_PROCESSED_NOTIFICATIONS: 718 data = data[-MAX_PROCESSED_NOTIFICATIONS:] 719 save_processed_notifications(data) 720 return set(data) 721 except Exception as e: 722 logger.error(f"Error loading processed notifications: {e}") 723 return set() 724 725 726def save_processed_notifications(processed_set): 727 """Save the set of processed notification URIs.""" 728 try: 729 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f: 730 json.dump(list(processed_set), f) 731 except Exception as e: 732 logger.error(f"Error saving processed notifications: {e}") 733 734 735def save_notification_to_queue(notification): 736 """Save a notification to the queue directory with priority-based filename.""" 737 try: 738 # Check if already processed 739 processed_uris = load_processed_notifications() 740 if notification.uri in processed_uris: 741 logger.debug(f"Notification already processed: {notification.uri}") 742 return False 743 744 # Convert notification to dict 745 notif_dict = notification_to_dict(notification) 746 747 # Create JSON string 748 notif_json = json.dumps(notif_dict, sort_keys=True) 749 750 # Generate hash for filename (to avoid duplicates) 751 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 752 753 # Determine priority based on author handle 754 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 755 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 756 757 # Create filename with priority, timestamp and hash 758 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 759 filename = f"{priority_prefix}{timestamp}_{notification.reason}_{notif_hash}.json" 760 filepath = QUEUE_DIR / filename 761 762 # Check if this notification URI is already in the queue 763 for existing_file in QUEUE_DIR.glob("*.json"): 764 if existing_file.name == "processed_notifications.json": 765 continue 766 try: 767 with open(existing_file, 'r') as f: 768 existing_data = json.load(f) 769 if existing_data.get('uri') == notification.uri: 770 logger.debug(f"Notification already queued (URI: {notification.uri})") 771 return False 772 except: 773 continue 774 775 # Write to file 776 with open(filepath, 'w') as f: 777 json.dump(notif_dict, f, indent=2) 778 779 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 780 logger.info(f"Queued notification ({priority_label}): {filename}") 781 return True 782 783 except Exception as e: 784 logger.error(f"Error saving notification to queue: {e}") 785 return False 786 787 788def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 789 """Load and process all notifications from the queue in priority order.""" 790 try: 791 # Get all JSON files in queue directory (excluding processed_notifications.json) 792 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 793 queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 794 795 if not queue_files: 796 return 797 798 logger.info(f"Processing {len(queue_files)} queued notifications") 799 800 # Log current statistics 801 elapsed_time = time.time() - start_time 802 total_messages = sum(message_counters.values()) 803 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 804 805 logger.info(f"📊 Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 806 807 for i, filepath in enumerate(queue_files, 1): 808 logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}") 809 try: 810 # Load notification data 811 with open(filepath, 'r') as f: 812 notif_data = json.load(f) 813 814 # Process based on type using dict data directly 815 success = False 816 if notif_data['reason'] == "mention": 817 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 818 if success: 819 message_counters['mentions'] += 1 820 elif notif_data['reason'] == "reply": 821 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 822 if success: 823 message_counters['replies'] += 1 824 elif notif_data['reason'] == "follow": 825 author_handle = notif_data['author']['handle'] 826 author_display_name = notif_data['author'].get('display_name', 'no display name') 827 follow_update = f"@{author_handle} ({author_display_name}) started following you." 828 logger.info(f"Notifying agent about new follower: @{author_handle}") 829 CLIENT.agents.messages.create( 830 agent_id = void_agent.id, 831 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 832 ) 833 success = True # Follow updates are always successful 834 if success: 835 message_counters['follows'] += 1 836 elif notif_data['reason'] == "repost": 837 # Skip reposts silently 838 success = True # Skip reposts but mark as successful to remove from queue 839 if success: 840 message_counters['reposts_skipped'] += 1 841 else: 842 logger.warning(f"Unknown notification type: {notif_data['reason']}") 843 success = True # Remove unknown types from queue 844 845 # Handle file based on processing result 846 if success: 847 if testing_mode: 848 logger.info(f"🧪 TESTING MODE: Keeping queue file: {filepath.name}") 849 else: 850 filepath.unlink() 851 logger.info(f"✅ Successfully processed and removed: {filepath.name}") 852 853 # Mark as processed to avoid reprocessing 854 processed_uris = load_processed_notifications() 855 processed_uris.add(notif_data['uri']) 856 save_processed_notifications(processed_uris) 857 858 elif success is None: # Special case for moving to error directory 859 error_path = QUEUE_ERROR_DIR / filepath.name 860 filepath.rename(error_path) 861 logger.warning(f"❌ Moved {filepath.name} to errors directory") 862 863 # Also mark as processed to avoid retrying 864 processed_uris = load_processed_notifications() 865 processed_uris.add(notif_data['uri']) 866 save_processed_notifications(processed_uris) 867 868 elif success == "no_reply": # Special case for moving to no_reply directory 869 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 870 filepath.rename(no_reply_path) 871 logger.info(f"📭 Moved {filepath.name} to no_reply directory") 872 873 # Also mark as processed to avoid retrying 874 processed_uris = load_processed_notifications() 875 processed_uris.add(notif_data['uri']) 876 save_processed_notifications(processed_uris) 877 878 elif success == "ignored": # Special case for explicitly ignored notifications 879 # For ignored notifications, we just delete them (not move to no_reply) 880 filepath.unlink() 881 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 882 883 # Also mark as processed to avoid retrying 884 processed_uris = load_processed_notifications() 885 processed_uris.add(notif_data['uri']) 886 save_processed_notifications(processed_uris) 887 888 else: 889 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 890 891 except Exception as e: 892 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 893 # Keep the file for retry later 894 895 except Exception as e: 896 logger.error(f"Error loading queued notifications: {e}") 897 898 899def process_notifications(void_agent, atproto_client, testing_mode=False): 900 """Fetch new notifications, queue them, and process the queue.""" 901 try: 902 # Get current time for marking notifications as seen 903 logger.debug("Getting current time for notification marking...") 904 last_seen_at = atproto_client.get_current_time_iso() 905 906 # Fetch ALL notifications using pagination first 907 logger.info("Beginning notification fetch with pagination...") 908 all_notifications = [] 909 cursor = None 910 page_count = 0 911 max_pages = 20 # Safety limit to prevent infinite loops 912 913 logger.info("Fetching all unread notifications...") 914 915 while page_count < max_pages: 916 try: 917 # Fetch notifications page 918 if cursor: 919 notifications_response = atproto_client.app.bsky.notification.list_notifications( 920 params={'cursor': cursor, 'limit': 100} 921 ) 922 else: 923 notifications_response = atproto_client.app.bsky.notification.list_notifications( 924 params={'limit': 100} 925 ) 926 927 page_count += 1 928 page_notifications = notifications_response.notifications 929 930 # Count unread notifications in this page 931 unread_count = sum(1 for n in page_notifications if not n.is_read and n.reason != "like") 932 logger.debug(f"Page {page_count}: {len(page_notifications)} notifications, {unread_count} unread (non-like)") 933 934 # Add all notifications to our list 935 all_notifications.extend(page_notifications) 936 937 # Check if we have more pages 938 if hasattr(notifications_response, 'cursor') and notifications_response.cursor: 939 cursor = notifications_response.cursor 940 # If this page had no unread notifications, we can stop 941 if unread_count == 0: 942 logger.info(f"No more unread notifications found after {page_count} pages") 943 break 944 else: 945 # No more pages 946 logger.info(f"Fetched all notifications across {page_count} pages") 947 break 948 949 except Exception as e: 950 error_str = str(e) 951 logger.error(f"Error fetching notifications page {page_count}: {e}") 952 953 # Handle specific API errors 954 if 'rate limit' in error_str.lower(): 955 logger.warning("Rate limit hit while fetching notifications, will retry next cycle") 956 break 957 elif '401' in error_str or 'unauthorized' in error_str.lower(): 958 logger.error("Authentication error, re-raising exception") 959 raise 960 else: 961 # For other errors, try to continue with what we have 962 logger.warning("Continuing with notifications fetched so far") 963 break 964 965 # Queue all unread notifications (except likes) 966 logger.info("Queuing unread notifications...") 967 new_count = 0 968 for notification in all_notifications: 969 if not notification.is_read and notification.reason != "like": 970 if save_notification_to_queue(notification): 971 new_count += 1 972 973 # Mark all notifications as seen immediately after queuing (unless in testing mode) 974 if testing_mode: 975 logger.info("🧪 TESTING MODE: Skipping marking notifications as seen") 976 else: 977 if new_count > 0: 978 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at}) 979 logger.info(f"Queued {new_count} new notifications and marked as seen") 980 else: 981 logger.debug("No new notifications to queue") 982 983 # Now process the entire queue (old + new notifications) 984 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 985 986 except Exception as e: 987 logger.error(f"Error processing notifications: {e}") 988 989 990def main(): 991 # Parse command line arguments 992 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 993 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 994 args = parser.parse_args() 995 996 global TESTING_MODE 997 TESTING_MODE = args.test 998 999 if TESTING_MODE: 1000 logger.info("🧪 === RUNNING IN TESTING MODE ===") 1001 logger.info(" - No messages will be sent to Bluesky") 1002 logger.info(" - Queue files will not be deleted") 1003 logger.info(" - Notifications will not be marked as seen") 1004 print("\n") 1005 """Main bot loop that continuously monitors for notifications.""" 1006 global start_time 1007 start_time = time.time() 1008 logger.info("=== STARTING VOID BOT ===") 1009 void_agent = initialize_void() 1010 logger.info(f"Void agent initialized: {void_agent.id}") 1011 1012 # Check if agent has required tools 1013 if hasattr(void_agent, 'tools') and void_agent.tools: 1014 tool_names = [tool.name for tool in void_agent.tools] 1015 # Check for bluesky-related tools 1016 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1017 if not bluesky_tools: 1018 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 1019 else: 1020 logger.warning("Agent has no tools registered!") 1021 1022 # Initialize Bluesky client 1023 atproto_client = bsky_utils.default_login() 1024 logger.info("Connected to Bluesky") 1025 1026 # Main loop 1027 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 1028 1029 cycle_count = 0 1030 while True: 1031 try: 1032 cycle_count += 1 1033 process_notifications(void_agent, atproto_client, TESTING_MODE) 1034 # Log cycle completion with stats 1035 elapsed_time = time.time() - start_time 1036 total_messages = sum(message_counters.values()) 1037 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1038 1039 if total_messages > 0: 1040 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 1041 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 1042 1043 except KeyboardInterrupt: 1044 # Final stats 1045 elapsed_time = time.time() - start_time 1046 total_messages = sum(message_counters.values()) 1047 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1048 1049 logger.info("=== BOT STOPPED BY USER ===") 1050 logger.info(f"📊 Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 1051 logger.info(f" - {message_counters['mentions']} mentions") 1052 logger.info(f" - {message_counters['replies']} replies") 1053 logger.info(f" - {message_counters['follows']} follows") 1054 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 1055 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 1056 break 1057 except Exception as e: 1058 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 1059 logger.error(f"Error details: {e}") 1060 # Wait a bit longer on errors 1061 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 1062 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 1063 1064 1065if __name__ == "__main__": 1066 main()