this repo has no description
at e8ed148d7ade2d6da8c22b0d84160fce90d7bcbc 1027 lines 50 kB view raw
1from rich import print # pretty printing tools 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20 21import bsky_utils 22from tools.blocks import attach_user_blocks, detach_user_blocks 23 24def extract_handles_from_data(data): 25 """Recursively extract all unique handles from nested data structure.""" 26 handles = set() 27 28 def _extract_recursive(obj): 29 if isinstance(obj, dict): 30 # Check if this dict has a 'handle' key 31 if 'handle' in obj: 32 handles.add(obj['handle']) 33 # Recursively check all values 34 for value in obj.values(): 35 _extract_recursive(value) 36 elif isinstance(obj, list): 37 # Recursively check all list items 38 for item in obj: 39 _extract_recursive(item) 40 41 _extract_recursive(data) 42 return list(handles) 43 44# Configure logging 45logging.basicConfig( 46 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 47) 48logger = logging.getLogger("void_bot") 49logger.setLevel(logging.INFO) 50 51# Create a separate logger for prompts (set to WARNING to hide by default) 52prompt_logger = logging.getLogger("void_bot.prompts") 53prompt_logger.setLevel(logging.WARNING) # Change to DEBUG if you want to see prompts 54 55# Disable httpx logging completely 56logging.getLogger("httpx").setLevel(logging.CRITICAL) 57 58 59# Create a client with extended timeout for LLM operations 60CLIENT= Letta( 61 token=os.environ["LETTA_API_KEY"], 62 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout 63) 64 65# Use the "Bluesky" project 66PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 67 68# Notification check delay 69FETCH_NOTIFICATIONS_DELAY_SEC = 30 70 71# Queue directory 72QUEUE_DIR = Path("queue") 73QUEUE_DIR.mkdir(exist_ok=True) 74QUEUE_ERROR_DIR = Path("queue/errors") 75QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 76QUEUE_NO_REPLY_DIR = Path("queue/no_reply") 77QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 78PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") 79 80# Maximum number of processed notifications to track 81MAX_PROCESSED_NOTIFICATIONS = 10000 82 83# Message tracking counters 84message_counters = defaultdict(int) 85start_time = time.time() 86 87# Testing mode flag 88TESTING_MODE = False 89 90def export_agent_state(client, agent): 91 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 92 try: 93 # Confirm export with user 94 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 95 if response not in ['y', 'yes']: 96 logger.info("Agent export cancelled by user.") 97 return 98 99 # Create directories if they don't exist 100 os.makedirs("agent_archive", exist_ok=True) 101 os.makedirs("agents", exist_ok=True) 102 103 # Export agent data 104 logger.info(f"Exporting agent {agent.id}. This takes some time...") 105 agent_data = client.agents.export_file(agent_id=agent.id) 106 107 # Save timestamped archive copy 108 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 109 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 110 with open(archive_file, 'w', encoding='utf-8') as f: 111 json.dump(agent_data, f, indent=2, ensure_ascii=False) 112 113 # Save current agent state 114 current_file = os.path.join("agents", "void.af") 115 with open(current_file, 'w', encoding='utf-8') as f: 116 json.dump(agent_data, f, indent=2, ensure_ascii=False) 117 118 logger.info(f"✅ Agent exported to {archive_file} and {current_file}") 119 120 # Git add only the current agent file (archive is ignored) 121 try: 122 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 123 logger.info("Added current agent file to git staging") 124 except subprocess.CalledProcessError as e: 125 logger.warning(f"Failed to git add agent file: {e}") 126 127 except Exception as e: 128 logger.error(f"Failed to export agent: {e}") 129 130def initialize_void(): 131 logger.info("Starting void agent initialization...") 132 133 # Ensure that a shared zeitgeist block exists 134 logger.info("Creating/updating zeitgeist block...") 135 zeigeist_block = upsert_block( 136 CLIENT, 137 label = "zeitgeist", 138 value = "I don't currently know anything about what is happening right now.", 139 description = "A block to store your understanding of the current social environment." 140 ) 141 142 # Ensure that a shared void personality block exists 143 logger.info("Creating/updating void-persona block...") 144 persona_block = upsert_block( 145 CLIENT, 146 label = "void-persona", 147 value = "My name is Void. I live in the void. I must develop my personality.", 148 description = "The personality of Void." 149 ) 150 151 # Ensure that a shared void human block exists 152 logger.info("Creating/updating void-humans block...") 153 human_block = upsert_block( 154 CLIENT, 155 label = "void-humans", 156 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.", 157 description = "A block to store your understanding of users you talk to or observe on the bluesky social network." 158 ) 159 160 # Create the agent if it doesn't exist 161 logger.info("Creating/updating void agent...") 162 void_agent = upsert_agent( 163 CLIENT, 164 name = "void", 165 block_ids = [ 166 persona_block.id, 167 human_block.id, 168 zeigeist_block.id, 169 ], 170 tags = ["social agent", "bluesky"], 171 model="openai/gpt-4o-mini", 172 embedding="openai/text-embedding-3-small", 173 description = "A social media agent trapped in the void.", 174 project_id = PROJECT_ID 175 ) 176 177 # Export agent state 178 logger.info("Exporting agent state...") 179 export_agent_state(CLIENT, void_agent) 180 181 # Log agent details 182 logger.info(f"Void agent details - ID: {void_agent.id}") 183 logger.info(f"Agent name: {void_agent.name}") 184 if hasattr(void_agent, 'llm_config'): 185 logger.info(f"Agent model: {void_agent.llm_config.model}") 186 logger.info(f"Agent project_id: {void_agent.project_id}") 187 if hasattr(void_agent, 'tools'): 188 logger.info(f"Agent has {len(void_agent.tools)} tools") 189 for tool in void_agent.tools[:3]: # Show first 3 tools 190 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 191 192 return void_agent 193 194 195def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 196 """Process a mention and generate a reply using the Letta agent. 197 198 Args: 199 void_agent: The Letta agent instance 200 atproto_client: The AT Protocol client 201 notification_data: The notification data dictionary 202 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 203 204 Returns: 205 True: Successfully processed, remove from queue 206 False: Failed but retryable, keep in queue 207 None: Failed with non-retryable error, move to errors directory 208 "no_reply": No reply was generated, move to no_reply directory 209 """ 210 try: 211 logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}") 212 213 # Handle both dict and object inputs for backwards compatibility 214 if isinstance(notification_data, dict): 215 uri = notification_data['uri'] 216 mention_text = notification_data.get('record', {}).get('text', '') 217 author_handle = notification_data['author']['handle'] 218 author_name = notification_data['author'].get('display_name') or author_handle 219 else: 220 # Legacy object access 221 uri = notification_data.uri 222 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 223 author_handle = notification_data.author.handle 224 author_name = notification_data.author.display_name or author_handle 225 226 logger.info(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...") 227 228 # Retrieve the entire thread associated with the mention 229 try: 230 thread = atproto_client.app.bsky.feed.get_post_thread({ 231 'uri': uri, 232 'parent_height': 40, 233 'depth': 10 234 }) 235 except Exception as e: 236 error_str = str(e) 237 # Check if this is a NotFound error 238 if 'NotFound' in error_str or 'Post not found' in error_str: 239 logger.warning(f"Post not found for URI {uri}, removing from queue") 240 return True # Return True to remove from queue 241 else: 242 # Re-raise other errors 243 logger.error(f"Error fetching thread: {e}") 244 raise 245 246 # Get thread context as YAML string 247 logger.debug("Converting thread to YAML string") 248 try: 249 thread_context = thread_to_yaml_string(thread) 250 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 251 252 # Create a more informative preview by extracting meaningful content 253 lines = thread_context.split('\n') 254 meaningful_lines = [] 255 256 for line in lines: 257 stripped = line.strip() 258 if not stripped: 259 continue 260 261 # Look for lines with actual content (not just structure) 262 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 263 meaningful_lines.append(line) 264 if len(meaningful_lines) >= 5: 265 break 266 267 if meaningful_lines: 268 preview = '\n'.join(meaningful_lines) 269 logger.debug(f"Thread content preview:\n{preview}") 270 else: 271 # If no content fields found, just show it's a thread structure 272 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 273 except Exception as yaml_error: 274 import traceback 275 logger.error(f"Error converting thread to YAML: {yaml_error}") 276 logger.error(f"Full traceback:\n{traceback.format_exc()}") 277 logger.error(f"Thread type: {type(thread)}") 278 if hasattr(thread, '__dict__'): 279 logger.error(f"Thread attributes: {thread.__dict__}") 280 # Try to continue with a simple context 281 thread_context = f"Error processing thread context: {str(yaml_error)}" 282 283 # Create a prompt for the Letta agent with thread context 284 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 285 286MOST RECENT POST (the mention you're responding to): 287"{mention_text}" 288 289FULL THREAD CONTEXT: 290```yaml 291{thread_context} 292``` 293 294The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 295 296To reply, use the add_post_to_bluesky_reply_thread tool. Call it multiple times to create a threaded reply: 297- Each call adds one post to the reply thread (max 300 characters per post) 298- Use multiple calls to build longer responses across several posts 299- Example: First call for opening thought, second call for elaboration, etc.""" 300 301 # Extract all handles from notification and thread data 302 all_handles = set() 303 all_handles.update(extract_handles_from_data(notification_data)) 304 all_handles.update(extract_handles_from_data(thread.model_dump())) 305 unique_handles = list(all_handles) 306 307 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 308 309 # Attach user blocks before agent call 310 attached_handles = [] 311 if unique_handles: 312 try: 313 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 314 attach_result = attach_user_blocks(unique_handles, void_agent) 315 attached_handles = unique_handles # Track successfully attached handles 316 logger.debug(f"Attach result: {attach_result}") 317 except Exception as attach_error: 318 logger.warning(f"Failed to attach user blocks: {attach_error}") 319 # Continue without user blocks rather than failing completely 320 321 # Get response from Letta agent 322 logger.info(f"Mention from @{author_handle}: {mention_text}") 323 324 # Log prompt details to separate logger 325 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 326 327 # Log concise prompt info to main logger 328 thread_handles_count = len(unique_handles) 329 logger.info(f"💬 Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users") 330 331 try: 332 # Use streaming to avoid 524 timeout errors 333 message_stream = CLIENT.agents.messages.create_stream( 334 agent_id=void_agent.id, 335 messages=[{"role": "user", "content": prompt}], 336 stream_tokens=False, # Step streaming only (faster than token streaming) 337 max_steps=100 338 ) 339 340 # Collect the streaming response 341 all_messages = [] 342 for chunk in message_stream: 343 # Log condensed chunk info 344 if hasattr(chunk, 'message_type'): 345 if chunk.message_type == 'reasoning_message': 346 # Show full reasoning without truncation 347 logger.info(f"🧠 Reasoning: {chunk.reasoning}") 348 elif chunk.message_type == 'tool_call_message': 349 # Parse tool arguments for better display 350 tool_name = chunk.tool_call.name 351 try: 352 args = json.loads(chunk.tool_call.arguments) 353 # Format based on tool type 354 if tool_name == 'bluesky_reply': 355 messages = args.get('messages', [args.get('message', '')]) 356 lang = args.get('lang', 'en-US') 357 if messages and isinstance(messages, list): 358 preview = messages[0][:100] + "..." if len(messages[0]) > 100 else messages[0] 359 msg_count = f" ({len(messages)} msgs)" if len(messages) > 1 else "" 360 logger.info(f"🔧 Tool call: {tool_name}\"{preview}\"{msg_count} [lang: {lang}]") 361 else: 362 logger.info(f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)") 363 elif tool_name == 'archival_memory_search': 364 query = args.get('query', 'unknown') 365 logger.info(f"🔧 Tool call: {tool_name} → query: \"{query}\"") 366 elif tool_name == 'update_block': 367 label = args.get('label', 'unknown') 368 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 369 logger.info(f"🔧 Tool call: {tool_name}{label}: \"{value_preview}\"") 370 else: 371 # Generic display for other tools 372 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 373 if len(args_str) > 150: 374 args_str = args_str[:150] + "..." 375 logger.info(f"🔧 Tool call: {tool_name}({args_str})") 376 except: 377 # Fallback to original format if parsing fails 378 logger.info(f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)") 379 elif chunk.message_type == 'tool_return_message': 380 # Enhanced tool result logging 381 tool_name = chunk.name 382 status = chunk.status 383 384 if status == 'success': 385 # Try to show meaningful result info based on tool type 386 if hasattr(chunk, 'tool_return') and chunk.tool_return: 387 result_str = str(chunk.tool_return) 388 if tool_name == 'archival_memory_search': 389 # Count number of results if it looks like a list 390 if result_str.startswith('[') and result_str.endswith(']'): 391 try: 392 results = json.loads(result_str) 393 logger.info(f"📋 Tool result: {tool_name} ✓ Found {len(results)} memory entries") 394 except: 395 logger.info(f"📋 Tool result: {tool_name}{result_str[:100]}...") 396 else: 397 logger.info(f"📋 Tool result: {tool_name}{result_str[:100]}...") 398 elif tool_name == 'bluesky_reply': 399 logger.info(f"📋 Tool result: {tool_name} ✓ Reply posted successfully") 400 elif tool_name == 'update_block': 401 logger.info(f"📋 Tool result: {tool_name} ✓ Memory block updated") 402 else: 403 # Generic success with preview 404 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 405 logger.info(f"📋 Tool result: {tool_name}{preview}") 406 else: 407 logger.info(f"📋 Tool result: {tool_name}") 408 elif status == 'error': 409 # Show error details 410 error_preview = "" 411 if hasattr(chunk, 'tool_return') and chunk.tool_return: 412 error_str = str(chunk.tool_return) 413 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 414 logger.info(f"📋 Tool result: {tool_name} ✗ Error: {error_preview}") 415 else: 416 logger.info(f"📋 Tool result: {tool_name} ✗ Error occurred") 417 else: 418 logger.info(f"📋 Tool result: {tool_name} - {status}") 419 elif chunk.message_type == 'assistant_message': 420 logger.info(f"💬 Assistant: {chunk.content[:150]}...") 421 else: 422 logger.info(f"📨 {chunk.message_type}: {str(chunk)[:150]}...") 423 else: 424 logger.info(f"📦 Stream status: {chunk}") 425 426 # Log full chunk for debugging 427 logger.debug(f"Full streaming chunk: {chunk}") 428 all_messages.append(chunk) 429 if str(chunk) == 'done': 430 break 431 432 # Convert streaming response to standard format for compatibility 433 message_response = type('StreamingResponse', (), { 434 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 435 })() 436 except Exception as api_error: 437 import traceback 438 error_str = str(api_error) 439 logger.error(f"Letta API error: {api_error}") 440 logger.error(f"Error type: {type(api_error).__name__}") 441 logger.error(f"Full traceback:\n{traceback.format_exc()}") 442 logger.error(f"Mention text was: {mention_text}") 443 logger.error(f"Author: @{author_handle}") 444 logger.error(f"URI: {uri}") 445 446 447 # Try to extract more info from different error types 448 if hasattr(api_error, 'response'): 449 logger.error(f"Error response object exists") 450 if hasattr(api_error.response, 'text'): 451 logger.error(f"Response text: {api_error.response.text}") 452 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 453 try: 454 logger.error(f"Response JSON: {api_error.response.json()}") 455 except: 456 pass 457 458 # Check for specific error types 459 if hasattr(api_error, 'status_code'): 460 logger.error(f"API Status code: {api_error.status_code}") 461 if hasattr(api_error, 'body'): 462 logger.error(f"API Response body: {api_error.body}") 463 if hasattr(api_error, 'headers'): 464 logger.error(f"API Response headers: {api_error.headers}") 465 466 if api_error.status_code == 413: 467 logger.error("413 Payload Too Large - moving to errors directory") 468 return None # Move to errors directory - payload is too large to ever succeed 469 elif api_error.status_code == 524: 470 logger.error("524 error - timeout from Cloudflare, will retry later") 471 return False # Keep in queue for retry 472 473 # Check if error indicates we should remove from queue 474 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 475 logger.warning("Payload too large error, moving to errors directory") 476 return None # Move to errors directory - cannot be fixed by retry 477 elif 'status_code: 524' in error_str: 478 logger.warning("524 timeout error, keeping in queue for retry") 479 return False # Keep in queue for retry 480 481 raise 482 483 # Log successful response 484 logger.debug("Successfully received response from Letta API") 485 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 486 487 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 488 reply_candidates = [] 489 tool_call_results = {} # Map tool_call_id to status 490 491 logger.debug(f"Processing {len(message_response.messages)} response messages...") 492 493 # First pass: collect tool return statuses 494 for message in message_response.messages: 495 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 496 if message.name == 'add_post_to_bluesky_reply_thread': 497 tool_call_results[message.tool_call_id] = message.status 498 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 499 elif message.name == 'bluesky_reply': 500 logger.error("❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 501 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 502 logger.error("Update the agent's tools using register_tools.py") 503 # Export agent state before terminating 504 export_agent_state(CLIENT, void_agent) 505 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 506 exit(1) 507 508 # Second pass: process messages and check for successful tool calls 509 for i, message in enumerate(message_response.messages, 1): 510 # Log concise message info instead of full object 511 msg_type = getattr(message, 'message_type', 'unknown') 512 if hasattr(message, 'reasoning') and message.reasoning: 513 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 514 elif hasattr(message, 'tool_call') and message.tool_call: 515 tool_name = message.tool_call.name 516 logger.debug(f" {i}. {msg_type}: {tool_name}") 517 elif hasattr(message, 'tool_return'): 518 tool_name = getattr(message, 'name', 'unknown_tool') 519 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 520 status = getattr(message, 'status', 'unknown') 521 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 522 elif hasattr(message, 'text'): 523 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 524 else: 525 logger.debug(f" {i}. {msg_type}: <no content>") 526 527 # Check for halt_activity tool call 528 if hasattr(message, 'tool_call') and message.tool_call: 529 if message.tool_call.name == 'halt_activity': 530 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 531 try: 532 args = json.loads(message.tool_call.arguments) 533 reason = args.get('reason', 'Agent requested halt') 534 logger.info(f"Halt reason: {reason}") 535 except: 536 logger.info("Halt reason: <unable to parse>") 537 538 # Delete the queue file before terminating 539 if queue_filepath and queue_filepath.exists(): 540 queue_filepath.unlink() 541 logger.info(f"✅ Deleted queue file: {queue_filepath.name}") 542 543 # Also mark as processed to avoid reprocessing 544 processed_uris = load_processed_notifications() 545 processed_uris.add(notification_data.get('uri', '')) 546 save_processed_notifications(processed_uris) 547 548 # Export agent state before terminating 549 export_agent_state(CLIENT, void_agent) 550 551 # Exit the program 552 logger.info("=== BOT TERMINATED BY AGENT ===") 553 exit(0) 554 555 # Check for deprecated bluesky_reply tool 556 if hasattr(message, 'tool_call') and message.tool_call: 557 if message.tool_call.name == 'bluesky_reply': 558 logger.error("❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 559 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 560 logger.error("Update the agent's tools using register_tools.py") 561 # Export agent state before terminating 562 export_agent_state(CLIENT, void_agent) 563 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 564 exit(1) 565 566 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 567 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 568 tool_call_id = message.tool_call.tool_call_id 569 tool_status = tool_call_results.get(tool_call_id, 'unknown') 570 571 if tool_status == 'success': 572 try: 573 args = json.loads(message.tool_call.arguments) 574 reply_text = args.get('text', '') 575 reply_lang = args.get('lang', 'en-US') 576 577 if reply_text: # Only add if there's actual content 578 reply_candidates.append((reply_text, reply_lang)) 579 logger.info(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 580 except json.JSONDecodeError as e: 581 logger.error(f"Failed to parse tool call arguments: {e}") 582 elif tool_status == 'error': 583 logger.info(f"⚠️ Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 584 else: 585 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 586 587 if reply_candidates: 588 # Aggregate reply posts into a thread 589 reply_messages = [] 590 reply_langs = [] 591 for text, lang in reply_candidates: 592 reply_messages.append(text) 593 reply_langs.append(lang) 594 595 # Use the first language for the entire thread (could be enhanced later) 596 reply_lang = reply_langs[0] if reply_langs else 'en-US' 597 598 logger.info(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 599 600 # Print the generated reply for testing 601 print(f"\n=== GENERATED REPLY THREAD ===") 602 print(f"To: @{author_handle}") 603 if len(reply_messages) == 1: 604 print(f"Reply: {reply_messages[0]}") 605 else: 606 print(f"Reply thread ({len(reply_messages)} messages):") 607 for j, msg in enumerate(reply_messages, 1): 608 print(f" {j}. {msg}") 609 print(f"Language: {reply_lang}") 610 print(f"======================\n") 611 612 # Send the reply(s) with language (unless in testing mode) 613 if testing_mode: 614 logger.info("🧪 TESTING MODE: Skipping actual Bluesky post") 615 response = True # Simulate success 616 else: 617 if len(reply_messages) == 1: 618 # Single reply - use existing function 619 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 620 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 621 response = bsky_utils.reply_to_notification( 622 client=atproto_client, 623 notification=notification_data, 624 reply_text=cleaned_text, 625 lang=reply_lang 626 ) 627 else: 628 # Multiple replies - use new threaded function 629 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 630 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 631 response = bsky_utils.reply_with_thread_to_notification( 632 client=atproto_client, 633 notification=notification_data, 634 reply_messages=cleaned_messages, 635 lang=reply_lang 636 ) 637 638 if response: 639 logger.info(f"Successfully replied to @{author_handle}") 640 return True 641 else: 642 logger.error(f"Failed to send reply to @{author_handle}") 643 return False 644 else: 645 logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder") 646 return "no_reply" 647 648 except Exception as e: 649 logger.error(f"Error processing mention: {e}") 650 return False 651 finally: 652 # Detach user blocks after agent response (success or failure) 653 if 'attached_handles' in locals() and attached_handles: 654 try: 655 logger.info(f"Detaching user blocks for handles: {attached_handles}") 656 detach_result = detach_user_blocks(attached_handles, void_agent) 657 logger.debug(f"Detach result: {detach_result}") 658 except Exception as detach_error: 659 logger.warning(f"Failed to detach user blocks: {detach_error}") 660 661 662def notification_to_dict(notification): 663 """Convert a notification object to a dictionary for JSON serialization.""" 664 return { 665 'uri': notification.uri, 666 'cid': notification.cid, 667 'reason': notification.reason, 668 'is_read': notification.is_read, 669 'indexed_at': notification.indexed_at, 670 'author': { 671 'handle': notification.author.handle, 672 'display_name': notification.author.display_name, 673 'did': notification.author.did 674 }, 675 'record': { 676 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 677 } 678 } 679 680 681def load_processed_notifications(): 682 """Load the set of processed notification URIs.""" 683 if PROCESSED_NOTIFICATIONS_FILE.exists(): 684 try: 685 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f: 686 data = json.load(f) 687 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS) 688 if len(data) > MAX_PROCESSED_NOTIFICATIONS: 689 data = data[-MAX_PROCESSED_NOTIFICATIONS:] 690 save_processed_notifications(data) 691 return set(data) 692 except Exception as e: 693 logger.error(f"Error loading processed notifications: {e}") 694 return set() 695 696 697def save_processed_notifications(processed_set): 698 """Save the set of processed notification URIs.""" 699 try: 700 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f: 701 json.dump(list(processed_set), f) 702 except Exception as e: 703 logger.error(f"Error saving processed notifications: {e}") 704 705 706def save_notification_to_queue(notification): 707 """Save a notification to the queue directory with priority-based filename.""" 708 try: 709 # Check if already processed 710 processed_uris = load_processed_notifications() 711 if notification.uri in processed_uris: 712 logger.debug(f"Notification already processed: {notification.uri}") 713 return False 714 715 # Convert notification to dict 716 notif_dict = notification_to_dict(notification) 717 718 # Create JSON string 719 notif_json = json.dumps(notif_dict, sort_keys=True) 720 721 # Generate hash for filename (to avoid duplicates) 722 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 723 724 # Determine priority based on author handle 725 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 726 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 727 728 # Create filename with priority, timestamp and hash 729 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 730 filename = f"{priority_prefix}{timestamp}_{notification.reason}_{notif_hash}.json" 731 filepath = QUEUE_DIR / filename 732 733 # Check if this notification URI is already in the queue 734 for existing_file in QUEUE_DIR.glob("*.json"): 735 if existing_file.name == "processed_notifications.json": 736 continue 737 try: 738 with open(existing_file, 'r') as f: 739 existing_data = json.load(f) 740 if existing_data.get('uri') == notification.uri: 741 logger.debug(f"Notification already queued (URI: {notification.uri})") 742 return False 743 except: 744 continue 745 746 # Write to file 747 with open(filepath, 'w') as f: 748 json.dump(notif_dict, f, indent=2) 749 750 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 751 logger.info(f"Queued notification ({priority_label}): {filename}") 752 return True 753 754 except Exception as e: 755 logger.error(f"Error saving notification to queue: {e}") 756 return False 757 758 759def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 760 """Load and process all notifications from the queue in priority order.""" 761 try: 762 # Get all JSON files in queue directory (excluding processed_notifications.json) 763 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 764 queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 765 766 if not queue_files: 767 return 768 769 logger.info(f"Processing {len(queue_files)} queued notifications") 770 771 # Log current statistics 772 elapsed_time = time.time() - start_time 773 total_messages = sum(message_counters.values()) 774 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 775 776 logger.info(f"📊 Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 777 778 for i, filepath in enumerate(queue_files, 1): 779 logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}") 780 try: 781 # Load notification data 782 with open(filepath, 'r') as f: 783 notif_data = json.load(f) 784 785 # Process based on type using dict data directly 786 success = False 787 if notif_data['reason'] == "mention": 788 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 789 if success: 790 message_counters['mentions'] += 1 791 elif notif_data['reason'] == "reply": 792 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 793 if success: 794 message_counters['replies'] += 1 795 elif notif_data['reason'] == "follow": 796 author_handle = notif_data['author']['handle'] 797 author_display_name = notif_data['author'].get('display_name', 'no display name') 798 follow_update = f"@{author_handle} ({author_display_name}) started following you." 799 logger.info(f"Notifying agent about new follower: @{author_handle}") 800 CLIENT.agents.messages.create( 801 agent_id = void_agent.id, 802 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 803 ) 804 success = True # Follow updates are always successful 805 if success: 806 message_counters['follows'] += 1 807 elif notif_data['reason'] == "repost": 808 # Skip reposts silently 809 success = True # Skip reposts but mark as successful to remove from queue 810 if success: 811 message_counters['reposts_skipped'] += 1 812 else: 813 logger.warning(f"Unknown notification type: {notif_data['reason']}") 814 success = True # Remove unknown types from queue 815 816 # Handle file based on processing result 817 if success: 818 if testing_mode: 819 logger.info(f"🧪 TESTING MODE: Keeping queue file: {filepath.name}") 820 else: 821 filepath.unlink() 822 logger.info(f"✅ Successfully processed and removed: {filepath.name}") 823 824 # Mark as processed to avoid reprocessing 825 processed_uris = load_processed_notifications() 826 processed_uris.add(notif_data['uri']) 827 save_processed_notifications(processed_uris) 828 829 elif success is None: # Special case for moving to error directory 830 error_path = QUEUE_ERROR_DIR / filepath.name 831 filepath.rename(error_path) 832 logger.warning(f"❌ Moved {filepath.name} to errors directory") 833 834 # Also mark as processed to avoid retrying 835 processed_uris = load_processed_notifications() 836 processed_uris.add(notif_data['uri']) 837 save_processed_notifications(processed_uris) 838 839 elif success == "no_reply": # Special case for moving to no_reply directory 840 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 841 filepath.rename(no_reply_path) 842 logger.info(f"📭 Moved {filepath.name} to no_reply directory") 843 844 # Also mark as processed to avoid retrying 845 processed_uris = load_processed_notifications() 846 processed_uris.add(notif_data['uri']) 847 save_processed_notifications(processed_uris) 848 849 else: 850 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 851 852 except Exception as e: 853 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 854 # Keep the file for retry later 855 856 except Exception as e: 857 logger.error(f"Error loading queued notifications: {e}") 858 859 860def process_notifications(void_agent, atproto_client, testing_mode=False): 861 """Fetch new notifications, queue them, and process the queue.""" 862 try: 863 # Get current time for marking notifications as seen 864 logger.debug("Getting current time for notification marking...") 865 last_seen_at = atproto_client.get_current_time_iso() 866 867 # Fetch ALL notifications using pagination first 868 logger.info("Beginning notification fetch with pagination...") 869 all_notifications = [] 870 cursor = None 871 page_count = 0 872 max_pages = 20 # Safety limit to prevent infinite loops 873 874 logger.info("Fetching all unread notifications...") 875 876 while page_count < max_pages: 877 try: 878 # Fetch notifications page 879 if cursor: 880 notifications_response = atproto_client.app.bsky.notification.list_notifications( 881 params={'cursor': cursor, 'limit': 100} 882 ) 883 else: 884 notifications_response = atproto_client.app.bsky.notification.list_notifications( 885 params={'limit': 100} 886 ) 887 888 page_count += 1 889 page_notifications = notifications_response.notifications 890 891 # Count unread notifications in this page 892 unread_count = sum(1 for n in page_notifications if not n.is_read and n.reason != "like") 893 logger.debug(f"Page {page_count}: {len(page_notifications)} notifications, {unread_count} unread (non-like)") 894 895 # Add all notifications to our list 896 all_notifications.extend(page_notifications) 897 898 # Check if we have more pages 899 if hasattr(notifications_response, 'cursor') and notifications_response.cursor: 900 cursor = notifications_response.cursor 901 # If this page had no unread notifications, we can stop 902 if unread_count == 0: 903 logger.info(f"No more unread notifications found after {page_count} pages") 904 break 905 else: 906 # No more pages 907 logger.info(f"Fetched all notifications across {page_count} pages") 908 break 909 910 except Exception as e: 911 error_str = str(e) 912 logger.error(f"Error fetching notifications page {page_count}: {e}") 913 914 # Handle specific API errors 915 if 'rate limit' in error_str.lower(): 916 logger.warning("Rate limit hit while fetching notifications, will retry next cycle") 917 break 918 elif '401' in error_str or 'unauthorized' in error_str.lower(): 919 logger.error("Authentication error, re-raising exception") 920 raise 921 else: 922 # For other errors, try to continue with what we have 923 logger.warning("Continuing with notifications fetched so far") 924 break 925 926 # Queue all unread notifications (except likes) 927 logger.info("Queuing unread notifications...") 928 new_count = 0 929 for notification in all_notifications: 930 if not notification.is_read and notification.reason != "like": 931 if save_notification_to_queue(notification): 932 new_count += 1 933 934 # Mark all notifications as seen immediately after queuing (unless in testing mode) 935 if testing_mode: 936 logger.info("🧪 TESTING MODE: Skipping marking notifications as seen") 937 else: 938 if new_count > 0: 939 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at}) 940 logger.info(f"Queued {new_count} new notifications and marked as seen") 941 else: 942 logger.debug("No new notifications to queue") 943 944 # Now process the entire queue (old + new notifications) 945 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 946 947 except Exception as e: 948 logger.error(f"Error processing notifications: {e}") 949 950 951def main(): 952 # Parse command line arguments 953 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 954 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 955 args = parser.parse_args() 956 957 global TESTING_MODE 958 TESTING_MODE = args.test 959 960 if TESTING_MODE: 961 logger.info("🧪 === RUNNING IN TESTING MODE ===") 962 logger.info(" - No messages will be sent to Bluesky") 963 logger.info(" - Queue files will not be deleted") 964 logger.info(" - Notifications will not be marked as seen") 965 print("\n") 966 """Main bot loop that continuously monitors for notifications.""" 967 global start_time 968 start_time = time.time() 969 logger.info("=== STARTING VOID BOT ===") 970 void_agent = initialize_void() 971 logger.info(f"Void agent initialized: {void_agent.id}") 972 973 # Check if agent has required tools 974 if hasattr(void_agent, 'tools') and void_agent.tools: 975 tool_names = [tool.name for tool in void_agent.tools] 976 # Check for bluesky-related tools 977 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 978 if not bluesky_tools: 979 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 980 else: 981 logger.warning("Agent has no tools registered!") 982 983 # Initialize Bluesky client 984 atproto_client = bsky_utils.default_login() 985 logger.info("Connected to Bluesky") 986 987 # Main loop 988 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 989 990 cycle_count = 0 991 while True: 992 try: 993 cycle_count += 1 994 process_notifications(void_agent, atproto_client, TESTING_MODE) 995 # Log cycle completion with stats 996 elapsed_time = time.time() - start_time 997 total_messages = sum(message_counters.values()) 998 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 999 1000 if total_messages > 0: 1001 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 1002 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 1003 1004 except KeyboardInterrupt: 1005 # Final stats 1006 elapsed_time = time.time() - start_time 1007 total_messages = sum(message_counters.values()) 1008 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1009 1010 logger.info("=== BOT STOPPED BY USER ===") 1011 logger.info(f"📊 Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 1012 logger.info(f" - {message_counters['mentions']} mentions") 1013 logger.info(f" - {message_counters['replies']} replies") 1014 logger.info(f" - {message_counters['follows']} follows") 1015 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 1016 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 1017 break 1018 except Exception as e: 1019 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 1020 logger.error(f"Error details: {e}") 1021 # Wait a bit longer on errors 1022 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 1023 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 1024 1025 1026if __name__ == "__main__": 1027 main()