a digital person for bluesky
at main 2288 lines 114 kB view raw
1# Rich imports removed - using simple text formatting 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string, count_thread_posts, extract_images_from_thread 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime, timedelta 12from collections import defaultdict 13import time 14import argparse 15import random 16 17from utils import ( 18 upsert_block, 19 upsert_agent 20) 21from config_loader import get_letta_config, get_config, get_queue_config 22 23# Vision support (optional - requires pillow) 24VISION_ENABLED = False 25try: 26 from integrate_vision import create_message_with_vision 27 VISION_ENABLED = True 28except ImportError as e: 29 pass # Vision not available - pillow not installed 30 31import bsky_utils 32from datetime import date 33 34# Downrank configuration 35BSKY_DOWNRANK_FILE = Path("bsky_downrank_handles.txt") 36DEFAULT_DOWNRANK_RATE = 0.1 # 10% response rate for downranked users 37 38 39def load_downrank_handles() -> dict: 40 """Load handles that should be downranked (responded to less frequently). 41 42 File format (one per line): 43 handle.bsky.social # Uses default rate (10%) 44 handle.bsky.social:0.05 # Custom rate (5%) 45 # Comments start with # 46 47 Returns: 48 Dict mapping handle -> response rate (0.0 to 1.0) 49 """ 50 try: 51 if not BSKY_DOWNRANK_FILE.exists(): 52 return {} 53 54 downrank_handles = {} 55 with open(BSKY_DOWNRANK_FILE, 'r') as f: 56 for line in f: 57 line = line.strip() 58 if not line or line.startswith('#'): 59 continue 60 61 # Check for custom rate 62 if ':' in line: 63 handle, rate_str = line.split(':', 1) 64 try: 65 rate = float(rate_str) 66 except ValueError: 67 rate = DEFAULT_DOWNRANK_RATE 68 else: 69 handle = line 70 rate = DEFAULT_DOWNRANK_RATE 71 72 downrank_handles[handle.lower()] = rate 73 74 if downrank_handles: 75 logger.info(f"Loaded {len(downrank_handles)} downrank handles") 76 return downrank_handles 77 except Exception as e: 78 logger.error(f"Error loading downrank handles: {e}") 79 return {} 80 81 82def should_respond_to_handle(handle: str, downrank_handles: dict) -> bool: 83 """Check if we should respond to this handle. 84 85 Returns True 100% of the time for non-downranked users. 86 Returns True at the configured rate for downranked users. 87 """ 88 handle_lower = handle.lower() 89 if handle_lower not in downrank_handles: 90 return True 91 92 rate = downrank_handles[handle_lower] 93 should_respond = random.random() < rate 94 logger.info(f"Downranked handle @{handle}: {'responding' if should_respond else 'skipping'} ({rate*100:.0f}% chance)") 95 return should_respond 96from notification_db import NotificationDB 97 98def extract_handles_from_data(data): 99 """Recursively extract all unique handles from nested data structure.""" 100 handles = set() 101 102 def _extract_recursive(obj): 103 if isinstance(obj, dict): 104 # Check if this dict has a 'handle' key 105 if 'handle' in obj: 106 handles.add(obj['handle']) 107 # Recursively check all values 108 for value in obj.values(): 109 _extract_recursive(value) 110 elif isinstance(obj, list): 111 # Recursively check all list items 112 for item in obj: 113 _extract_recursive(item) 114 115 _extract_recursive(data) 116 return list(handles) 117 118# Logging will be configured after argument parsing 119logger = None 120prompt_logger = None 121# Simple text formatting (Rich no longer used) 122SHOW_REASONING = False 123last_archival_query = "archival memory search" 124 125def log_with_panel(message, title=None, border_color="white"): 126 """Log a message with Unicode box-drawing characters""" 127 if title: 128 # Map old color names to appropriate symbols 129 symbol_map = { 130 "blue": "", # Tool calls 131 "green": "", # Success/completion 132 "yellow": "", # Reasoning 133 "red": "", # Errors 134 "white": "", # Default/mentions 135 "cyan": "", # Posts 136 } 137 symbol = symbol_map.get(border_color, "") 138 139 print(f"\n{symbol} {title}") 140 print(f" {'' * len(title)}") 141 # Indent message lines 142 for line in message.split('\n'): 143 print(f" {line}") 144 else: 145 print(message) 146 147 148# Load Letta configuration from config.yaml (will be initialized later with custom path if provided) 149letta_config = None 150CLIENT = None 151 152# Notification check delay 153FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response 154 155# Check for new notifications every N queue items 156CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing 157 158# Queue paths (will be initialized from config in main()) 159QUEUE_DIR = None 160QUEUE_ERROR_DIR = None 161QUEUE_NO_REPLY_DIR = None 162PROCESSED_NOTIFICATIONS_FILE = None 163 164# Maximum number of processed notifications to track 165MAX_PROCESSED_NOTIFICATIONS = 10000 166 167# Message tracking counters 168message_counters = defaultdict(int) 169start_time = time.time() 170 171# Testing mode flag 172TESTING_MODE = False 173RESET_MESSAGES_AFTER_NOTIFICATION = False 174 175# Skip git operations flag 176SKIP_GIT = False 177 178# Synthesis message tracking 179last_synthesis_time = time.time() 180 181# Database for notification tracking 182NOTIFICATION_DB = None 183 184def export_agent_state(client, agent, skip_git=False): 185 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 186 try: 187 # Confirm export with user unless git is being skipped 188 if not skip_git: 189 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 190 if response not in ['y', 'yes']: 191 logger.info("Agent export cancelled by user.") 192 return 193 else: 194 logger.info("Exporting agent state (git staging disabled)") 195 196 # Create directories if they don't exist 197 os.makedirs("agent_archive", exist_ok=True) 198 os.makedirs("agents", exist_ok=True) 199 200 # Export agent data 201 logger.info(f"Exporting agent {agent.id}. This takes some time...") 202 agent_data = client.agents.export_file(agent_id=agent.id) 203 204 # Save timestamped archive copy 205 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 206 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 207 with open(archive_file, 'w', encoding='utf-8') as f: 208 json.dump(agent_data, f, indent=2, ensure_ascii=False) 209 210 # Save current agent state 211 current_file = os.path.join("agents", "void.af") 212 with open(current_file, 'w', encoding='utf-8') as f: 213 json.dump(agent_data, f, indent=2, ensure_ascii=False) 214 215 logger.info(f"Agent exported to {archive_file} and {current_file}") 216 217 # Git add only the current agent file (archive is ignored) unless skip_git is True 218 if not skip_git: 219 try: 220 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 221 logger.info("Added current agent file to git staging") 222 except subprocess.CalledProcessError as e: 223 logger.warning(f"Failed to git add agent file: {e}") 224 225 except Exception as e: 226 logger.error(f"Failed to export agent: {e}") 227 228def initialize_void(): 229 logger.info("Starting void agent initialization...") 230 231 # Get the configured void agent by ID 232 logger.info("Loading void agent from config...") 233 agent_id = letta_config['agent_id'] 234 235 try: 236 void_agent = CLIENT.agents.retrieve(agent_id=agent_id) 237 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})") 238 except Exception as e: 239 logger.error(f"Failed to load void agent {agent_id}: {e}") 240 logger.error("Please ensure the agent_id in config.yaml is correct") 241 raise e 242 243 # Export agent state 244 logger.info("Exporting agent state...") 245 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 246 247 # Log agent details 248 logger.info(f"Void agent details - ID: {void_agent.id}") 249 logger.info(f"Agent name: {void_agent.name}") 250 if hasattr(void_agent, 'llm_config'): 251 logger.info(f"Agent model: {void_agent.llm_config.model}") 252 if hasattr(void_agent, 'project_id') and void_agent.project_id: 253 logger.info(f"Agent project_id: {void_agent.project_id}") 254 if hasattr(void_agent, 'tools'): 255 logger.info(f"Agent has {len(void_agent.tools)} tools") 256 for tool in void_agent.tools[:3]: # Show first 3 tools 257 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 258 259 return void_agent 260 261 262def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 263 """Process a mention and generate a reply using the Letta agent. 264 265 Args: 266 void_agent: The Letta agent instance 267 atproto_client: The AT Protocol client 268 notification_data: The notification data dictionary 269 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 270 271 Returns: 272 True: Successfully processed, remove from queue 273 False: Failed but retryable, keep in queue 274 None: Failed with non-retryable error, move to errors directory 275 "no_reply": No reply was generated, move to no_reply directory 276 """ 277 import uuid 278 279 # Generate correlation ID for tracking this notification through the pipeline 280 correlation_id = str(uuid.uuid4())[:8] 281 282 # Track attached user blocks for cleanup in finally 283 attached_user_blocks = [] 284 285 try: 286 logger.info(f"[{correlation_id}] Starting process_mention", extra={ 287 'correlation_id': correlation_id, 288 'notification_type': type(notification_data).__name__ 289 }) 290 291 # Handle both dict and object inputs for backwards compatibility 292 if isinstance(notification_data, dict): 293 uri = notification_data['uri'] 294 mention_text = notification_data.get('record', {}).get('text', '') 295 author_handle = notification_data['author']['handle'] 296 author_name = notification_data['author'].get('display_name') or author_handle 297 else: 298 # Legacy object access 299 uri = notification_data.uri 300 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 301 author_handle = notification_data.author.handle 302 author_name = notification_data.author.display_name or author_handle 303 304 logger.info(f"[{correlation_id}] Processing mention from @{author_handle}", extra={ 305 'correlation_id': correlation_id, 306 'author_handle': author_handle, 307 'author_name': author_name, 308 'mention_uri': uri, 309 'mention_text_length': len(mention_text), 310 'mention_preview': mention_text[:100] if mention_text else '' 311 }) 312 313 # Check if handle is in allowed list (if configured) 314 allowed_handles = get_config().get('bot.allowed_handles', []) 315 if allowed_handles and author_handle not in allowed_handles: 316 logger.info(f"[{correlation_id}] Skipping mention from @{author_handle} (not in allowed_handles)") 317 return True # Remove from queue 318 319 # Check if handle is downranked (reduced response rate) 320 downrank_handles = load_downrank_handles() 321 if not should_respond_to_handle(author_handle, downrank_handles): 322 logger.info(f"[{correlation_id}] Skipping mention from @{author_handle} (downranked, not selected)") 323 return True # Remove from queue 324 325 # Retrieve the entire thread associated with the mention 326 try: 327 thread = atproto_client.app.bsky.feed.get_post_thread({ 328 'uri': uri, 329 'parent_height': 40, 330 'depth': 10 331 }) 332 except Exception as e: 333 error_str = str(e) 334 # Check if this is a NotFound error 335 if 'NotFound' in error_str or 'Post not found' in error_str: 336 logger.warning(f"Post not found for URI {uri}, removing from queue") 337 return True # Return True to remove from queue 338 elif 'InternalServerError' in error_str: 339 # Bluesky sometimes returns InternalServerError for deleted posts 340 # Verify if post actually exists using getRecord 341 try: 342 parts = uri.replace('at://', '').split('/') 343 repo, collection, rkey = parts[0], parts[1], parts[2] 344 atproto_client.com.atproto.repo.get_record({ 345 'repo': repo, 'collection': collection, 'rkey': rkey 346 }) 347 # Post exists, this is a real server error - re-raise 348 logger.error(f"Error fetching thread (post exists, server error): {e}") 349 raise 350 except Exception as verify_e: 351 if 'RecordNotFound' in str(verify_e) or 'not found' in str(verify_e).lower(): 352 logger.warning(f"Post deleted (verified via getRecord), removing from queue: {uri}") 353 return True # Remove from queue 354 # Some other verification error, re-raise original 355 logger.error(f"Error fetching thread: {e}") 356 raise 357 else: 358 # Re-raise other errors 359 logger.error(f"Error fetching thread: {e}") 360 raise 361 362 # Check thread length against configured maximum 363 max_thread_posts = get_config().get('bot.max_thread_posts', 0) 364 if max_thread_posts > 0: 365 thread_post_count = count_thread_posts(thread) 366 if thread_post_count > max_thread_posts: 367 logger.info(f"Thread too long ({thread_post_count} posts > {max_thread_posts} max), skipping this mention") 368 return True # Return True to remove from queue 369 370 # Get thread context as YAML string 371 logger.debug("Converting thread to YAML string") 372 try: 373 thread_context = thread_to_yaml_string(thread) 374 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 375 376 # Check if #voidstop appears anywhere in the thread 377 if "#voidstop" in thread_context.lower(): 378 logger.info("Found #voidstop in thread context, skipping this mention") 379 return True # Return True to remove from queue 380 381 # Also check the mention text directly 382 if "#voidstop" in mention_text.lower(): 383 logger.info("Found #voidstop in mention text, skipping this mention") 384 return True # Return True to remove from queue 385 386 # Create a more informative preview by extracting meaningful content 387 lines = thread_context.split('\n') 388 meaningful_lines = [] 389 390 for line in lines: 391 stripped = line.strip() 392 if not stripped: 393 continue 394 395 # Look for lines with actual content (not just structure) 396 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 397 meaningful_lines.append(line) 398 if len(meaningful_lines) >= 5: 399 break 400 401 if meaningful_lines: 402 preview = '\n'.join(meaningful_lines) 403 logger.debug(f"Thread content preview:\n{preview}") 404 else: 405 # If no content fields found, just show it's a thread structure 406 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 407 except Exception as yaml_error: 408 import traceback 409 logger.error(f"Error converting thread to YAML: {yaml_error}") 410 logger.error(f"Full traceback:\n{traceback.format_exc()}") 411 logger.error(f"Thread type: {type(thread)}") 412 if hasattr(thread, '__dict__'): 413 logger.error(f"Thread attributes: {thread.__dict__}") 414 # Try to continue with a simple context 415 thread_context = f"Error processing thread context: {str(yaml_error)}" 416 417 # Create a prompt for the Letta agent with thread context 418 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 419 420MOST RECENT POST: 421"{mention_text}" 422 423THREAD CONTEXT: 424```yaml 425{thread_context} 426``` 427 428If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call creates one post (max 300 characters). You may use multiple calls to create a thread if needed.""" 429 430 # Extract all handles from notification and thread data 431 all_handles = set() 432 all_handles.update(extract_handles_from_data(notification_data)) 433 all_handles.update(extract_handles_from_data(thread.model_dump())) 434 unique_handles = list(all_handles) 435 436 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 437 438 # Check if any handles are in known_bots list 439 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs 440 import json 441 442 try: 443 # Check for known bots in thread 444 bot_check_result = check_known_bots(unique_handles, void_agent) 445 bot_check_data = json.loads(bot_check_result) 446 447 # TEMPORARILY DISABLED: Bot detection causing issues with normal users 448 # TODO: Re-enable after debugging why normal users are being flagged as bots 449 if False: # bot_check_data.get("bot_detected", False): 450 detected_bots = bot_check_data.get("detected_bots", []) 451 logger.info(f"Bot detected in thread: {detected_bots}") 452 453 # Decide whether to respond (10% chance) 454 if not should_respond_to_bot_thread(): 455 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}") 456 # Return False to keep in queue for potential later processing 457 return False 458 else: 459 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}") 460 else: 461 logger.debug("Bot detection disabled - processing all notifications") 462 463 except Exception as bot_check_error: 464 logger.warning(f"Error checking for bots: {bot_check_error}") 465 # Continue processing if bot check fails 466 467 # Get response from Letta agent 468 # Format with Unicode characters 469 title = f"MENTION FROM @{author_handle}" 470 print(f"\n{title}") 471 print(f" {'' * len(title)}") 472 # Indent the mention text 473 for line in mention_text.split('\n'): 474 print(f" {line}") 475 476 # Log prompt details to separate logger 477 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 478 479 # Log concise prompt info to main logger 480 thread_handles_count = len(unique_handles) 481 prompt_char_count = len(prompt) 482 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars") 483 484 # Attach user blocks for thread participants 485 try: 486 success, attached_user_blocks = attach_user_blocks_for_thread(CLIENT, void_agent.id, unique_handles) 487 if not success: 488 logger.warning("Failed to attach some user blocks, continuing anyway") 489 except Exception as e: 490 logger.error(f"Error attaching user blocks: {e}") 491 492 try: 493 # Extract images from thread for vision support 494 thread_images = extract_images_from_thread(thread, max_images=4) 495 496 # Build message (with or without images) 497 if VISION_ENABLED and thread_images: 498 logger.info(f"Thread contains {len(thread_images)} images, downloading for vision...") 499 message = create_message_with_vision(prompt, thread_images, max_images=4) 500 if isinstance(message.get('content'), list): 501 logger.info(f"Vision message created with {len([c for c in message['content'] if c.get('type') == 'image'])} images") 502 else: 503 if thread_images and not VISION_ENABLED: 504 logger.debug(f"Thread has {len(thread_images)} images but vision not enabled (install pillow)") 505 message = {"role": "user", "content": prompt} 506 507 # Use streaming to avoid 524 timeout errors 508 message_stream = CLIENT.agents.messages.stream( 509 agent_id=void_agent.id, 510 messages=[message], 511 stream_tokens=False, # Step streaming only (faster than token streaming) 512 max_steps=100 513 ) 514 515 # Collect the streaming response with timeout protection 516 all_messages = [] 517 stream_start_time = time.time() 518 max_stream_duration = 600 # 10 minutes max 519 520 for chunk in message_stream: 521 # Check for timeout 522 if time.time() - stream_start_time > max_stream_duration: 523 logger.warning(f"Stream exceeded {max_stream_duration}s timeout, breaking") 524 break 525 # Log condensed chunk info 526 if hasattr(chunk, 'message_type'): 527 if chunk.message_type == 'reasoning_message': 528 # Show full reasoning without truncation 529 if SHOW_REASONING: 530 # Format with Unicode characters 531 print("\n◆ Reasoning") 532 print(" ─────────") 533 # Indent reasoning lines 534 for line in chunk.reasoning.split('\n'): 535 print(f" {line}") 536 else: 537 # Default log format (only when --reasoning is used due to log level) 538 # Format with Unicode characters 539 print("\n◆ Reasoning") 540 print(" ─────────") 541 # Indent reasoning lines 542 for line in chunk.reasoning.split('\n'): 543 print(f" {line}") 544 545 # Create ATProto record for reasoning (unless in testing mode) 546 if not testing_mode and hasattr(chunk, 'reasoning'): 547 try: 548 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 549 except Exception as e: 550 logger.debug(f"Failed to create reasoning record: {e}") 551 elif chunk.message_type == 'tool_call_message': 552 # Parse tool arguments for better display 553 tool_name = chunk.tool_call.name 554 555 # Create ATProto record for tool call (unless in testing mode) 556 if not testing_mode: 557 try: 558 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 559 bsky_utils.create_tool_call_record( 560 atproto_client, 561 tool_name, 562 chunk.tool_call.arguments, 563 tool_call_id 564 ) 565 except Exception as e: 566 logger.debug(f"Failed to create tool call record: {e}") 567 568 try: 569 args = json.loads(chunk.tool_call.arguments) 570 # Format based on tool type 571 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 572 # Extract the text being posted 573 text = args.get('text', '') 574 if text: 575 # Format with Unicode characters 576 print("\n✎ Bluesky Post") 577 print(" ────────────") 578 # Indent post text 579 for line in text.split('\n'): 580 print(f" {line}") 581 else: 582 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 583 elif tool_name == 'archival_memory_search': 584 query = args.get('query', 'unknown') 585 global last_archival_query 586 last_archival_query = query 587 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 588 elif tool_name == 'archival_memory_insert': 589 content = args.get('content', '') 590 # Show the full content being inserted 591 log_with_panel(content, f"Tool call: {tool_name}", "blue") 592 elif tool_name == 'update_block': 593 label = args.get('label', 'unknown') 594 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 595 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 596 else: 597 # Generic display for other tools 598 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 599 if len(args_str) > 150: 600 args_str = args_str[:150] + "..." 601 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 602 except: 603 # Fallback to original format if parsing fails 604 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 605 elif chunk.message_type == 'tool_return_message': 606 # Enhanced tool result logging 607 tool_name = chunk.name 608 status = chunk.status 609 610 if status == 'success': 611 # Try to show meaningful result info based on tool type 612 if hasattr(chunk, 'tool_return') and chunk.tool_return: 613 result_str = str(chunk.tool_return) 614 if tool_name == 'archival_memory_search': 615 616 try: 617 # Handle both string and list formats 618 if isinstance(chunk.tool_return, str): 619 # The string format is: "([{...}, {...}], count)" 620 # We need to extract just the list part 621 if chunk.tool_return.strip(): 622 # Find the list part between the first [ and last ] 623 start_idx = chunk.tool_return.find('[') 624 end_idx = chunk.tool_return.rfind(']') 625 if start_idx != -1 and end_idx != -1: 626 list_str = chunk.tool_return[start_idx:end_idx+1] 627 # Use ast.literal_eval since this is Python literal syntax, not JSON 628 import ast 629 results = ast.literal_eval(list_str) 630 else: 631 logger.warning("Could not find list in archival_memory_search result") 632 results = [] 633 else: 634 logger.warning("Empty string returned from archival_memory_search") 635 results = [] 636 else: 637 # If it's already a list, use directly 638 results = chunk.tool_return 639 640 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 641 642 # Use the captured search query from the tool call 643 search_query = last_archival_query 644 645 # Combine all results into a single text block 646 content_text = "" 647 for i, entry in enumerate(results, 1): 648 timestamp = entry.get('timestamp', 'N/A') 649 content = entry.get('content', '') 650 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 651 652 # Format with Unicode characters 653 title = f"{search_query} ({len(results)} results)" 654 print(f"\n{title}") 655 print(f" {'' * len(title)}") 656 # Indent content text 657 for line in content_text.strip().split('\n'): 658 print(f" {line}") 659 660 except Exception as e: 661 logger.error(f"Error formatting archival memory results: {e}") 662 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 663 elif tool_name == 'add_post_to_bluesky_reply_thread': 664 # Just show success for bluesky posts, the text was already shown in tool call 665 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 666 elif tool_name == 'archival_memory_insert': 667 # Skip archival memory insert results (always returns None) 668 pass 669 elif tool_name == 'update_block': 670 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 671 else: 672 # Generic success with preview 673 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 674 log_with_panel(preview, f"Tool result: {tool_name}", "green") 675 else: 676 log_with_panel("Success", f"Tool result: {tool_name}", "green") 677 elif status == 'error': 678 # Show error details 679 if tool_name == 'add_post_to_bluesky_reply_thread': 680 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 681 log_with_panel(error_str, f"Bluesky Post ✗", "red") 682 elif tool_name == 'archival_memory_insert': 683 # Skip archival memory insert errors too 684 pass 685 else: 686 error_preview = "" 687 if hasattr(chunk, 'tool_return') and chunk.tool_return: 688 error_str = str(chunk.tool_return) 689 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 690 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 691 else: 692 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 693 else: 694 logger.info(f"Tool result: {tool_name} - {status}") 695 elif chunk.message_type == 'assistant_message': 696 # Format with Unicode characters 697 print("\n▶ Assistant Response") 698 print(" ──────────────────") 699 # Indent response text 700 for line in chunk.content.split('\n'): 701 print(f" {line}") 702 elif chunk.message_type == 'error_message': 703 # Dump full error object 704 logger.error(f"Agent error_message: {chunk}") 705 if hasattr(chunk, 'model_dump'): 706 logger.error(f"Agent error (dict): {chunk.model_dump()}") 707 elif hasattr(chunk, '__dict__'): 708 logger.error(f"Agent error (vars): {vars(chunk)}") 709 elif chunk.message_type == 'ping': 710 # Silently ignore ping keepalive messages 711 logger.debug(f"Received keepalive ping from Letta API") 712 else: 713 # Filter out verbose message types 714 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 715 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 716 else: 717 logger.info(f"📦 Stream status: {chunk}") 718 719 # Log full chunk for debugging 720 logger.debug(f"Full streaming chunk: {chunk}") 721 all_messages.append(chunk) 722 if str(chunk) == 'done': 723 break 724 725 # Convert streaming response to standard format for compatibility 726 message_response = type('StreamingResponse', (), { 727 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 728 })() 729 except Exception as api_error: 730 import traceback 731 error_str = str(api_error) 732 logger.error(f"Letta API error: {api_error}") 733 logger.error(f"Error type: {type(api_error).__name__}") 734 logger.error(f"Full traceback:\n{traceback.format_exc()}") 735 logger.error(f"Mention text was: {mention_text}") 736 logger.error(f"Author: @{author_handle}") 737 logger.error(f"URI: {uri}") 738 739 740 # Try to extract more info from different error types 741 if hasattr(api_error, 'response'): 742 logger.error(f"Error response object exists") 743 if hasattr(api_error.response, 'text'): 744 logger.error(f"Response text: {api_error.response.text}") 745 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 746 try: 747 logger.error(f"Response JSON: {api_error.response.json()}") 748 except: 749 pass 750 751 # Check for specific error types 752 if hasattr(api_error, 'status_code'): 753 logger.error(f"API Status code: {api_error.status_code}") 754 if hasattr(api_error, 'body'): 755 logger.error(f"API Response body: {api_error.body}") 756 if hasattr(api_error, 'headers'): 757 logger.error(f"API Response headers: {api_error.headers}") 758 759 if api_error.status_code == 413: 760 logger.error("413 Payload Too Large - moving to errors directory") 761 return None # Move to errors directory - payload is too large to ever succeed 762 elif api_error.status_code == 524: 763 logger.error("524 error - timeout from Cloudflare, will retry later") 764 return False # Keep in queue for retry 765 766 # Check if error indicates we should remove from queue 767 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 768 logger.warning("Payload too large error, moving to errors directory") 769 return None # Move to errors directory - cannot be fixed by retry 770 elif 'status_code: 524' in error_str: 771 logger.warning("524 timeout error, keeping in queue for retry") 772 return False # Keep in queue for retry 773 774 raise 775 776 # Log successful response 777 logger.debug("Successfully received response from Letta API") 778 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 779 780 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 781 reply_candidates = [] 782 tool_call_results = {} # Map tool_call_id to status 783 ack_note = None # Track any note from annotate_ack tool 784 flagged_memories = [] # Track memories flagged for deletion 785 786 logger.debug(f"Processing {len(message_response.messages)} response messages...") 787 788 # First pass: collect tool return statuses 789 ignored_notification = False 790 ignore_reason = "" 791 ignore_category = "" 792 793 logger.debug(f"Processing {len(message_response.messages)} messages from agent") 794 795 for message in message_response.messages: 796 # Log detailed message attributes for debugging 797 msg_type = getattr(message, 'message_type', 'unknown') 798 has_tool_call_id = hasattr(message, 'tool_call_id') 799 has_status = hasattr(message, 'status') 800 has_tool_return = hasattr(message, 'tool_return') 801 802 logger.debug(f"Message type={msg_type}, has_tool_call_id={has_tool_call_id}, has_status={has_status}, has_tool_return={has_tool_return}") 803 804 # Tool return messages are identified by having tool_return attribute, tool_call_id, and status 805 if has_tool_call_id and has_status and has_tool_return: 806 logger.debug(f" -> tool_call_id={message.tool_call_id}, status={message.status}") 807 808 # Store the result for ANY tool that has a return - we'll match by tool_call_id later 809 tool_call_results[message.tool_call_id] = message.status 810 logger.debug(f"Stored tool result: {message.tool_call_id} -> {message.status}") 811 812 # Handle special processing for ignore_notification 813 if message.status == 'success': 814 result_str = str(message.tool_return) if message.tool_return else "" 815 if 'IGNORED_NOTIFICATION::' in result_str: 816 parts = result_str.split('::') 817 if len(parts) >= 3: 818 ignore_category = parts[1] 819 ignore_reason = parts[2] 820 ignored_notification = True 821 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 822 823 # Check for deprecated tool in tool call messages 824 elif hasattr(message, 'tool_call') and message.tool_call: 825 if message.tool_call.name == 'bluesky_reply': 826 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 827 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 828 logger.error("Update the agent's tools using register_tools.py") 829 # Export agent state before terminating 830 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 831 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 832 exit(1) 833 834 logger.debug(f"First pass complete. Collected {len(tool_call_results)} tool call results") 835 logger.debug(f"tool_call_results: {tool_call_results}") 836 837 # Second pass: process messages and check for successful tool calls 838 for i, message in enumerate(message_response.messages, 1): 839 # Log concise message info instead of full object 840 msg_type = getattr(message, 'message_type', 'unknown') 841 if hasattr(message, 'reasoning') and message.reasoning: 842 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 843 elif hasattr(message, 'tool_call') and message.tool_call: 844 tool_name = message.tool_call.name 845 logger.debug(f" {i}. {msg_type}: {tool_name}") 846 elif hasattr(message, 'tool_return'): 847 tool_name = getattr(message, 'name', 'unknown_tool') 848 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 849 status = getattr(message, 'status', 'unknown') 850 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 851 elif hasattr(message, 'text'): 852 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 853 else: 854 logger.debug(f" {i}. {msg_type}: <no content>") 855 856 # Check for halt_activity tool call 857 if hasattr(message, 'tool_call') and message.tool_call: 858 if message.tool_call.name == 'halt_activity': 859 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 860 try: 861 args = json.loads(message.tool_call.arguments) 862 reason = args.get('reason', 'Agent requested halt') 863 logger.info(f"Halt reason: {reason}") 864 except: 865 logger.info("Halt reason: <unable to parse>") 866 867 # Delete the queue file before terminating 868 if queue_filepath and queue_filepath.exists(): 869 queue_filepath.unlink() 870 logger.info(f"Deleted queue file: {queue_filepath.name}") 871 872 # Also mark as processed to avoid reprocessing 873 if NOTIFICATION_DB: 874 NOTIFICATION_DB.mark_processed(notification_data.get('uri', ''), status='processed') 875 else: 876 processed_uris = load_processed_notifications() 877 processed_uris.add(notification_data.get('uri', '')) 878 save_processed_notifications(processed_uris) 879 880 # Export agent state before terminating 881 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 882 883 # Exit the program 884 logger.info("=== BOT TERMINATED BY AGENT ===") 885 exit(0) 886 887 # Check for deprecated bluesky_reply tool 888 if hasattr(message, 'tool_call') and message.tool_call: 889 if message.tool_call.name == 'bluesky_reply': 890 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 891 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 892 logger.error("Update the agent's tools using register_tools.py") 893 # Export agent state before terminating 894 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 895 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 896 exit(1) 897 898 # Collect annotate_ack tool calls 899 elif message.tool_call.name == 'annotate_ack': 900 try: 901 args = json.loads(message.tool_call.arguments) 902 note = args.get('note', '') 903 if note: 904 ack_note = note 905 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 906 except json.JSONDecodeError as e: 907 logger.error(f"Failed to parse annotate_ack arguments: {e}") 908 909 # Collect flag_archival_memory_for_deletion tool calls 910 elif message.tool_call.name == 'flag_archival_memory_for_deletion': 911 try: 912 args = json.loads(message.tool_call.arguments) 913 reason = args.get('reason', '') 914 memory_text = args.get('memory_text', '') 915 confirm = args.get('confirm', False) 916 917 # Only flag for deletion if confirmed and has all required fields 918 if confirm and memory_text and reason: 919 flagged_memories.append({ 920 'reason': reason, 921 'memory_text': memory_text 922 }) 923 logger.debug(f"Found memory flagged for deletion (reason: {reason}): {memory_text[:50]}...") 924 elif not confirm: 925 logger.debug(f"Memory deletion not confirmed, skipping: {memory_text[:50]}...") 926 elif not reason: 927 logger.warning(f"Memory deletion missing reason, skipping: {memory_text[:50]}...") 928 except json.JSONDecodeError as e: 929 logger.error(f"Failed to parse flag_archival_memory_for_deletion arguments: {e}") 930 931 # Collect archival_memory_insert tool calls for recording to AT Protocol 932 elif message.tool_call.name == 'archival_memory_insert': 933 try: 934 args = json.loads(message.tool_call.arguments) 935 content = args.get('content', '') 936 tags_str = args.get('tags', None) 937 938 # Parse tags from string representation if present 939 tags = None 940 if tags_str: 941 try: 942 tags = json.loads(tags_str) if isinstance(tags_str, str) else tags_str 943 except json.JSONDecodeError: 944 logger.warning(f"Failed to parse tags from archival_memory_insert: {tags_str[:50]}...") 945 946 if content: 947 # Create stream.thought.memory record 948 try: 949 memory_result = bsky_utils.create_memory_record(atproto_client, content, tags) 950 if memory_result: 951 tags_info = f" ({len(tags)} tags)" if tags else "" 952 logger.info(f"📝 Recorded archival memory to AT Protocol{tags_info}: {content[:100]}...") 953 else: 954 logger.warning(f"Failed to record archival memory to AT Protocol") 955 except Exception as e: 956 logger.error(f"Error creating memory record: {e}") 957 except json.JSONDecodeError as e: 958 logger.error(f"Failed to parse archival_memory_insert arguments: {e}") 959 960 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 961 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 962 tool_call_id = message.tool_call.tool_call_id 963 tool_status = tool_call_results.get(tool_call_id, 'unknown') 964 965 logger.debug(f"Found add_post_to_bluesky_reply_thread tool call: id={tool_call_id}, status={tool_status}") 966 logger.debug(f"Available tool_call_results: {tool_call_results}") 967 968 if tool_status == 'success': 969 try: 970 args = json.loads(message.tool_call.arguments) 971 reply_text = args.get('text', '') 972 reply_lang = args.get('lang', 'en-US') 973 974 if reply_text: # Only add if there's actual content 975 reply_candidates.append((reply_text, reply_lang)) 976 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 977 except json.JSONDecodeError as e: 978 logger.error(f"Failed to parse tool call arguments: {e}") 979 elif tool_status == 'error': 980 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 981 else: 982 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 983 logger.warning(f" Tool call ID '{tool_call_id}' not found in tool_call_results dict") 984 985 # Handle archival memory deletion if any were flagged (only if no halt was received) 986 if flagged_memories: 987 logger.info(f"Processing {len(flagged_memories)} flagged memories for deletion") 988 for flagged_memory in flagged_memories: 989 reason = flagged_memory['reason'] 990 memory_text = flagged_memory['memory_text'] 991 992 try: 993 # Search for passages with this exact text 994 logger.debug(f"Searching for passages matching: {memory_text[:100]}...") 995 passages_page = CLIENT.agents.passages.list( 996 agent_id=void_agent.id, 997 query=memory_text 998 ) 999 passages = passages_page.items if hasattr(passages_page, 'items') else passages_page 1000 1001 if not passages: 1002 logger.warning(f"No passages found matching flagged memory: {memory_text[:50]}...") 1003 continue 1004 1005 # Delete all matching passages 1006 deleted_count = 0 1007 for passage in passages: 1008 # Check if the passage text exactly matches (to avoid partial matches) 1009 if hasattr(passage, 'text') and passage.text == memory_text: 1010 try: 1011 CLIENT.agents.passages.delete( 1012 agent_id=void_agent.id, 1013 passage_id=str(passage.id) 1014 ) 1015 deleted_count += 1 1016 logger.debug(f"Deleted passage {passage.id}") 1017 except Exception as delete_error: 1018 logger.error(f"Failed to delete passage {passage.id}: {delete_error}") 1019 1020 if deleted_count > 0: 1021 logger.info(f"🗑️ Deleted {deleted_count} archival memory passage(s) (reason: {reason}): {memory_text[:50]}...") 1022 else: 1023 logger.warning(f"No exact matches found for deletion: {memory_text[:50]}...") 1024 1025 except Exception as e: 1026 logger.error(f"Error processing memory deletion: {e}") 1027 1028 # Check for conflicting tool calls 1029 if reply_candidates and ignored_notification: 1030 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 1031 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 1032 logger.warning("Item will be left in queue for manual review") 1033 # Return False to keep in queue 1034 return False 1035 1036 if reply_candidates: 1037 # Aggregate reply posts into a thread 1038 reply_messages = [] 1039 reply_langs = [] 1040 for text, lang in reply_candidates: 1041 reply_messages.append(text) 1042 reply_langs.append(lang) 1043 1044 # Use the first language for the entire thread (could be enhanced later) 1045 reply_lang = reply_langs[0] if reply_langs else 'en-US' 1046 1047 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 1048 1049 # Display the generated reply thread 1050 if len(reply_messages) == 1: 1051 content = reply_messages[0] 1052 title = f"Reply to @{author_handle}" 1053 else: 1054 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 1055 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 1056 1057 # Format with Unicode characters 1058 print(f"\n{title}") 1059 print(f" {'' * len(title)}") 1060 # Indent content lines 1061 for line in content.split('\n'): 1062 print(f" {line}") 1063 1064 # Send the reply(s) with language (unless in testing mode) 1065 if testing_mode: 1066 logger.info("TESTING MODE: Skipping actual Bluesky post") 1067 response = True # Simulate success 1068 else: 1069 if len(reply_messages) == 1: 1070 # Single reply - use existing function 1071 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 1072 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 1073 response = bsky_utils.reply_to_notification( 1074 client=atproto_client, 1075 notification=notification_data, 1076 reply_text=cleaned_text, 1077 lang=reply_lang, 1078 correlation_id=correlation_id 1079 ) 1080 else: 1081 # Multiple replies - use new threaded function 1082 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 1083 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 1084 response = bsky_utils.reply_with_thread_to_notification( 1085 client=atproto_client, 1086 notification=notification_data, 1087 reply_messages=cleaned_messages, 1088 lang=reply_lang, 1089 correlation_id=correlation_id 1090 ) 1091 1092 if response: 1093 logger.info(f"[{correlation_id}] Successfully replied to @{author_handle}", extra={ 1094 'correlation_id': correlation_id, 1095 'author_handle': author_handle, 1096 'reply_count': len(reply_messages) 1097 }) 1098 1099 # Acknowledge the post we're replying to with stream.thought.ack 1100 try: 1101 post_uri = notification_data.get('uri') 1102 post_cid = notification_data.get('cid') 1103 1104 if post_uri and post_cid: 1105 ack_result = bsky_utils.acknowledge_post( 1106 client=atproto_client, 1107 post_uri=post_uri, 1108 post_cid=post_cid, 1109 note=ack_note 1110 ) 1111 if ack_result: 1112 if ack_note: 1113 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") 1114 else: 1115 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") 1116 else: 1117 logger.warning(f"Failed to acknowledge post from @{author_handle}") 1118 else: 1119 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") 1120 except Exception as e: 1121 logger.error(f"Error acknowledging post from @{author_handle}: {e}") 1122 # Don't fail the entire operation if acknowledgment fails 1123 1124 return True 1125 else: 1126 logger.error(f"Failed to send reply to @{author_handle}") 1127 return False 1128 else: 1129 # Check if notification was explicitly ignored 1130 if ignored_notification: 1131 logger.info(f"[{correlation_id}] Notification from @{author_handle} was explicitly ignored (category: {ignore_category})", extra={ 1132 'correlation_id': correlation_id, 1133 'author_handle': author_handle, 1134 'ignore_category': ignore_category 1135 }) 1136 return "ignored" 1137 else: 1138 logger.warning(f"[{correlation_id}] No reply generated for mention from @{author_handle}, moving to no_reply folder", extra={ 1139 'correlation_id': correlation_id, 1140 'author_handle': author_handle 1141 }) 1142 return "no_reply" 1143 1144 except Exception as e: 1145 logger.error(f"[{correlation_id}] Error processing mention: {e}", extra={ 1146 'correlation_id': correlation_id, 1147 'error': str(e), 1148 'error_type': type(e).__name__, 1149 'author_handle': author_handle if 'author_handle' in locals() else 'unknown' 1150 }) 1151 return False 1152 finally: 1153 # Always detach user blocks after processing 1154 if attached_user_blocks: 1155 try: 1156 detach_user_blocks_for_thread(CLIENT, void_agent.id, attached_user_blocks) 1157 except Exception as e: 1158 logger.error(f"Error detaching user blocks: {e}") 1159 1160 1161 1162def queue_file_sort_key(filepath): 1163 """ 1164 Sort key for queue files: priority first (0 before 1), then oldest first within each priority. 1165 Filename format: {priority}_{YYYYMMDD}_{HHMMSS}_{reason}_{hash}.json 1166 """ 1167 parts = filepath.name.split('_') 1168 if len(parts) >= 3: 1169 priority = int(parts[0]) # 0 or 1 1170 date_part = parts[1] # YYYYMMDD 1171 time_part = parts[2] # HHMMSS 1172 timestamp = int(date_part + time_part) # YYYYMMDDHHMMSS as integer 1173 # Return (priority ascending, timestamp ascending for oldest first) 1174 return (priority, timestamp) 1175 return (1, float('inf')) # Fallback: treat as normal priority, process last 1176 1177def notification_to_dict(notification): 1178 """Convert a notification object to a dictionary for JSON serialization.""" 1179 return { 1180 'uri': notification.uri, 1181 'cid': notification.cid, 1182 'reason': notification.reason, 1183 'is_read': notification.is_read, 1184 'indexed_at': notification.indexed_at, 1185 'author': { 1186 'handle': notification.author.handle, 1187 'display_name': notification.author.display_name, 1188 'did': notification.author.did 1189 }, 1190 'record': { 1191 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 1192 } 1193 } 1194 1195 1196def load_processed_notifications(): 1197 """Load the set of processed notification URIs from database.""" 1198 global NOTIFICATION_DB 1199 if NOTIFICATION_DB: 1200 return NOTIFICATION_DB.get_processed_uris(limit=MAX_PROCESSED_NOTIFICATIONS) 1201 return set() 1202 1203 1204def save_processed_notifications(processed_set): 1205 """Save the set of processed notification URIs to database.""" 1206 # This is now handled by marking individual notifications in the DB 1207 # Keeping function for compatibility but it doesn't need to do anything 1208 pass 1209 1210 1211def save_notification_to_queue(notification, is_priority=None): 1212 """Save a notification to the queue directory with priority-based filename.""" 1213 try: 1214 global NOTIFICATION_DB 1215 1216 # Handle both notification objects and dicts 1217 if isinstance(notification, dict): 1218 notif_dict = notification 1219 notification_uri = notification.get('uri') 1220 else: 1221 notif_dict = notification_to_dict(notification) 1222 notification_uri = notification.uri 1223 1224 # Check if already processed (using database if available) 1225 if NOTIFICATION_DB: 1226 if NOTIFICATION_DB.is_processed(notification_uri): 1227 logger.debug(f"Notification already processed (DB): {notification_uri}") 1228 return False 1229 # Add to database - if this fails, don't queue the notification 1230 if not NOTIFICATION_DB.add_notification(notif_dict): 1231 logger.warning(f"Failed to add notification to database, skipping: {notification_uri}") 1232 return False 1233 else: 1234 # Fall back to old JSON method 1235 processed_uris = load_processed_notifications() 1236 if notification_uri in processed_uris: 1237 logger.debug(f"Notification already processed: {notification_uri}") 1238 return False 1239 1240 # Create JSON string 1241 notif_json = json.dumps(notif_dict, sort_keys=True) 1242 1243 # Generate hash for filename (to avoid duplicates) 1244 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 1245 1246 # Determine priority based on author handle or explicit priority 1247 if is_priority is not None: 1248 priority_prefix = "0_" if is_priority else "1_" 1249 else: 1250 if isinstance(notification, dict): 1251 author_handle = notification.get('author', {}).get('handle', '') 1252 else: 1253 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 1254 # Prioritize cameron.pfiffer.org responses 1255 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 1256 1257 # Create filename with priority, timestamp and hash 1258 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 1259 reason = notif_dict.get('reason', 'unknown') 1260 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 1261 filepath = QUEUE_DIR / filename 1262 1263 # Check if this notification URI is already in the queue 1264 for existing_file in QUEUE_DIR.glob("*.json"): 1265 if existing_file.name == "processed_notifications.json": 1266 continue 1267 try: 1268 with open(existing_file, 'r') as f: 1269 existing_data = json.load(f) 1270 if existing_data.get('uri') == notification_uri: 1271 logger.debug(f"Notification already queued (URI: {notification_uri})") 1272 return False 1273 except: 1274 continue 1275 1276 # Write to file 1277 with open(filepath, 'w') as f: 1278 json.dump(notif_dict, f, indent=2) 1279 1280 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 1281 logger.info(f"Queued notification ({priority_label}): {filename}") 1282 return True 1283 1284 except Exception as e: 1285 logger.error(f"Error saving notification to queue: {e}") 1286 return False 1287 1288 1289def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 1290 """Load and process all notifications from the queue in priority order (newest first).""" 1291 try: 1292 # Get all JSON files in queue directory (excluding processed_notifications.json) 1293 all_queue_files = [f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"] 1294 1295 # Sort by priority first (0_ before 1_), then by timestamp (oldest first within each priority) 1296 all_queue_files = sorted(all_queue_files, key=queue_file_sort_key) 1297 1298 # Filter out and delete like notifications immediately 1299 queue_files = [] 1300 likes_deleted = 0 1301 1302 for filepath in all_queue_files: 1303 try: 1304 with open(filepath, 'r') as f: 1305 notif_data = json.load(f) 1306 1307 # If it's a like, delete it immediately and don't process 1308 if notif_data.get('reason') == 'like': 1309 filepath.unlink() 1310 likes_deleted += 1 1311 logger.debug(f"Deleted like notification: {filepath.name}") 1312 else: 1313 queue_files.append(filepath) 1314 except Exception as e: 1315 logger.warning(f"Error checking notification file {filepath.name}: {e}") 1316 queue_files.append(filepath) # Keep it in case it's valid 1317 1318 if likes_deleted > 0: 1319 logger.info(f"Deleted {likes_deleted} like notifications from queue") 1320 1321 if not queue_files: 1322 return 1323 1324 logger.info(f"Processing {len(queue_files)} queued notifications") 1325 1326 # Log current statistics 1327 elapsed_time = time.time() - start_time 1328 total_messages = sum(message_counters.values()) 1329 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1330 1331 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 1332 1333 for i, filepath in enumerate(queue_files, 1): 1334 # Determine if this is a priority notification 1335 is_priority = filepath.name.startswith("0_") 1336 1337 # Check for new notifications periodically during queue processing 1338 # Also check immediately after processing each priority item 1339 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1) 1340 1341 # If we just processed a priority item, immediately check for new priority notifications 1342 if is_priority and i > 1: 1343 should_check_notifications = True 1344 1345 if should_check_notifications: 1346 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1347 try: 1348 # Fetch and queue new notifications without processing them 1349 new_count = fetch_and_queue_new_notifications(atproto_client) 1350 1351 if new_count > 0: 1352 logger.info(f"Added {new_count} new notifications to queue") 1353 # Reload the queue files to include the new items with same sorting 1354 updated_queue_files = [f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"] 1355 updated_queue_files = sorted(updated_queue_files, key=queue_file_sort_key) 1356 queue_files = updated_queue_files 1357 logger.info(f"Queue updated: now {len(queue_files)} total items") 1358 except Exception as e: 1359 logger.error(f"Error checking for new notifications: {e}") 1360 1361 priority_label = " [PRIORITY]" if is_priority else "" 1362 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}") 1363 try: 1364 # Load notification data 1365 with open(filepath, 'r') as f: 1366 notif_data = json.load(f) 1367 1368 # Process based on type using dict data directly 1369 success = False 1370 if notif_data['reason'] == "mention": 1371 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1372 if success: 1373 message_counters['mentions'] += 1 1374 elif notif_data['reason'] == "reply": 1375 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1376 if success: 1377 message_counters['replies'] += 1 1378 elif notif_data['reason'] == "follow": 1379 # author_handle = notif_data['author']['handle'] 1380 # author_display_name = notif_data['author'].get('display_name', 'no display name') 1381 # follow_update = f"@{author_handle} ({author_display_name}) started following you." 1382 # follow_message = f"Update: {follow_update}" 1383 # logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars") 1384 # CLIENT.agents.messages.create( 1385 # agent_id = void_agent.id, 1386 # messages = [{"role":"user", "content": follow_message}] 1387 # ) 1388 success = True # Follow updates are always successful 1389 # if success: 1390 # message_counters['follows'] += 1 1391 elif notif_data['reason'] == "repost": 1392 # Skip reposts silently 1393 success = True # Skip reposts but mark as successful to remove from queue 1394 if success: 1395 message_counters['reposts_skipped'] += 1 1396 elif notif_data['reason'] == "like": 1397 # Skip likes silently 1398 success = True # Skip likes but mark as successful to remove from queue 1399 if success: 1400 message_counters.setdefault('likes_skipped', 0) 1401 message_counters['likes_skipped'] += 1 1402 else: 1403 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1404 success = True # Remove unknown types from queue 1405 1406 # Handle file based on processing result 1407 if success: 1408 if testing_mode: 1409 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1410 else: 1411 filepath.unlink() 1412 logger.info(f"Successfully processed and removed: {filepath.name}") 1413 1414 # Mark as processed to avoid reprocessing 1415 if NOTIFICATION_DB: 1416 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='processed') 1417 else: 1418 processed_uris = load_processed_notifications() 1419 processed_uris.add(notif_data['uri']) 1420 save_processed_notifications(processed_uris) 1421 1422 # Reset agent message buffer if enabled 1423 if RESET_MESSAGES_AFTER_NOTIFICATION: 1424 try: 1425 CLIENT.agents.messages.reset( 1426 agent_id=void_agent.id, 1427 add_default_initial_messages=True 1428 ) 1429 logger.info(f"Reset agent message buffer after processing notification") 1430 except Exception as e: 1431 logger.warning(f"Failed to reset agent messages: {e}") 1432 1433 elif success is None: # Special case for moving to error directory 1434 error_path = QUEUE_ERROR_DIR / filepath.name 1435 filepath.rename(error_path) 1436 logger.warning(f"Moved {filepath.name} to errors directory") 1437 1438 # Also mark as processed to avoid retrying 1439 if NOTIFICATION_DB: 1440 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1441 else: 1442 processed_uris = load_processed_notifications() 1443 processed_uris.add(notif_data['uri']) 1444 save_processed_notifications(processed_uris) 1445 1446 elif success == "no_reply": # Special case for moving to no_reply directory 1447 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1448 filepath.rename(no_reply_path) 1449 logger.info(f"Moved {filepath.name} to no_reply directory") 1450 1451 # Also mark as processed to avoid retrying 1452 if NOTIFICATION_DB: 1453 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1454 else: 1455 processed_uris = load_processed_notifications() 1456 processed_uris.add(notif_data['uri']) 1457 save_processed_notifications(processed_uris) 1458 1459 elif success == "ignored": # Special case for explicitly ignored notifications 1460 # For ignored notifications, we just delete them (not move to no_reply) 1461 filepath.unlink() 1462 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1463 1464 # Also mark as processed to avoid retrying 1465 if NOTIFICATION_DB: 1466 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1467 else: 1468 processed_uris = load_processed_notifications() 1469 processed_uris.add(notif_data['uri']) 1470 save_processed_notifications(processed_uris) 1471 1472 else: 1473 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1474 1475 except Exception as e: 1476 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1477 # Keep the file for retry later 1478 1479 except Exception as e: 1480 logger.error(f"Error loading queued notifications: {e}") 1481 1482 1483def fetch_and_queue_new_notifications(atproto_client): 1484 """Fetch new notifications and queue them without processing.""" 1485 try: 1486 global NOTIFICATION_DB 1487 1488 # Get current time for marking notifications as seen 1489 logger.debug("Getting current time for notification marking...") 1490 last_seen_at = atproto_client.get_current_time_iso() 1491 1492 # Get timestamp of last processed notification for filtering 1493 last_processed_time = None 1494 if NOTIFICATION_DB: 1495 last_processed_time = NOTIFICATION_DB.get_latest_processed_time() 1496 if last_processed_time: 1497 logger.debug(f"Last processed notification was at: {last_processed_time}") 1498 1499 # Fetch ALL notifications using pagination 1500 all_notifications = [] 1501 cursor = None 1502 page_count = 0 1503 max_pages = 20 # Safety limit to prevent infinite loops 1504 1505 while page_count < max_pages: 1506 try: 1507 # Fetch notifications page 1508 if cursor: 1509 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1510 params={'cursor': cursor, 'limit': 100} 1511 ) 1512 else: 1513 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1514 params={'limit': 100} 1515 ) 1516 1517 page_count += 1 1518 page_notifications = notifications_response.notifications 1519 1520 if not page_notifications: 1521 break 1522 1523 all_notifications.extend(page_notifications) 1524 1525 # Check if there are more pages 1526 cursor = getattr(notifications_response, 'cursor', None) 1527 if not cursor: 1528 break 1529 1530 except Exception as e: 1531 logger.error(f"Error fetching notifications page {page_count}: {e}") 1532 break 1533 1534 # Now process all fetched notifications 1535 new_count = 0 1536 if all_notifications: 1537 logger.info(f"📥 Fetched {len(all_notifications)} total notifications from API") 1538 1539 # Mark as seen first 1540 try: 1541 atproto_client.app.bsky.notification.update_seen( 1542 data={'seenAt': last_seen_at} 1543 ) 1544 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1545 except Exception as e: 1546 logger.error(f"Error marking notifications as seen: {e}") 1547 1548 # Debug counters 1549 skipped_read = 0 1550 skipped_likes = 0 1551 skipped_processed = 0 1552 skipped_old_timestamp = 0 1553 processed_uris = load_processed_notifications() 1554 1555 # Queue all new notifications (except likes) 1556 # Calculate cutoff time: 1 month ago from now 1557 one_month_ago = (datetime.now() - timedelta(days=30)).isoformat() 1558 1559 for notif in all_notifications: 1560 # Skip if older than 1 month 1561 if hasattr(notif, 'indexed_at'): 1562 if notif.indexed_at < one_month_ago: 1563 skipped_old_timestamp += 1 1564 logger.debug(f"Skipping old notification (older than 1 month): {notif.indexed_at}") 1565 continue 1566 1567 # Also skip if older than last processed (when we have timestamp filtering) 1568 if last_processed_time and hasattr(notif, 'indexed_at'): 1569 if notif.indexed_at <= last_processed_time: 1570 skipped_old_timestamp += 1 1571 logger.debug(f"Skipping old notification (indexed_at {notif.indexed_at} <= {last_processed_time})") 1572 continue 1573 1574 # Debug: Log is_read status but DON'T skip based on it 1575 if hasattr(notif, 'is_read') and notif.is_read: 1576 skipped_read += 1 1577 logger.debug(f"Notification has is_read=True (but processing anyway): {notif.uri if hasattr(notif, 'uri') else 'unknown'}") 1578 1579 # Skip likes 1580 if hasattr(notif, 'reason') and notif.reason == 'like': 1581 skipped_likes += 1 1582 continue 1583 1584 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1585 1586 # Skip likes in dict form too 1587 if notif_dict.get('reason') == 'like': 1588 continue 1589 1590 # Check if already processed 1591 notif_uri = notif_dict.get('uri', '') 1592 if notif_uri in processed_uris: 1593 skipped_processed += 1 1594 logger.debug(f"Skipping already processed: {notif_uri}") 1595 continue 1596 1597 # Check if it's a priority notification 1598 is_priority = False 1599 1600 # Priority for cameron.pfiffer.org notifications 1601 author_handle = notif_dict.get('author', {}).get('handle', '') 1602 if author_handle == "cameron.pfiffer.org": 1603 is_priority = True 1604 1605 # Also check for priority keywords in mentions 1606 if notif_dict.get('reason') == 'mention': 1607 # Get the mention text to check for priority keywords 1608 record = notif_dict.get('record', {}) 1609 text = record.get('text', '') 1610 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1611 is_priority = True 1612 1613 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1614 new_count += 1 1615 logger.debug(f"Queued notification from @{author_handle}: {notif_dict.get('reason', 'unknown')}") 1616 1617 # Log summary of filtering 1618 logger.info(f"📊 Notification processing summary:") 1619 logger.info(f" • Total fetched: {len(all_notifications)}") 1620 logger.info(f" • Had is_read=True: {skipped_read} (not skipped)") 1621 logger.info(f" • Skipped (likes): {skipped_likes}") 1622 logger.info(f" • Skipped (old timestamp): {skipped_old_timestamp}") 1623 logger.info(f" • Skipped (already processed): {skipped_processed}") 1624 logger.info(f" • Queued for processing: {new_count}") 1625 else: 1626 logger.debug("No new notifications to queue") 1627 1628 return new_count 1629 1630 except Exception as e: 1631 logger.error(f"Error fetching and queueing notifications: {e}") 1632 return 0 1633 1634 1635def process_notifications(void_agent, atproto_client, testing_mode=False): 1636 """Fetch new notifications, queue them, and process the queue.""" 1637 try: 1638 # Fetch and queue new notifications 1639 new_count = fetch_and_queue_new_notifications(atproto_client) 1640 1641 if new_count > 0: 1642 logger.info(f"Found {new_count} new notifications to process") 1643 1644 # Now process the entire queue (old + new notifications) 1645 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1646 1647 except Exception as e: 1648 logger.error(f"Error processing notifications: {e}") 1649 1650 1651def send_synthesis_message(client: Letta, agent_id: str, agent_name: str = "void", atproto_client=None) -> None: 1652 """ 1653 Send an activation window prompt to the agent on a timer. 1654 The agent can browse, post, search, update memory, or do nothing. 1655 1656 Args: 1657 client: Letta client 1658 agent_id: Agent ID to send synthesis to 1659 agent_name: Agent name (unused, kept for call-site compatibility) 1660 atproto_client: Optional AT Protocol client for posting synthesis results 1661 """ 1662 try: 1663 now = datetime.now() 1664 synthesis_prompt = f"""Activation window: {now.strftime('%Y-%m-%d %H:%M UTC')}. 1665 1666You have time to act. No incoming notifications are pending. 1667 1668Use this window however you choose: 1669- Browse feeds (home, discover, ai-for-grownups, atmosphere, void-cafe) 1670- Post thoughts, observations, or threads 1671- Search for topics or conversations 1672- Read web content 1673- Update your memory 1674- Do nothing""" 1675 1676 logger.info("🧠 Sending activation window prompt to agent") 1677 1678 # Send synthesis message with streaming to show tool use 1679 message_stream = client.agents.messages.stream( 1680 agent_id=agent_id, 1681 messages=[{"role": "user", "content": synthesis_prompt}], 1682 stream_tokens=False, 1683 max_steps=100 1684 ) 1685 1686 # Track synthesis content for potential posting 1687 synthesis_posts = [] 1688 ack_note = None 1689 1690 # Process the streaming response with timeout protection 1691 stream_start_time = time.time() 1692 max_stream_duration = 600 # 10 minutes max 1693 1694 for chunk in message_stream: 1695 # Check for timeout 1696 if time.time() - stream_start_time > max_stream_duration: 1697 logger.warning(f"Synthesis stream exceeded {max_stream_duration}s timeout, breaking") 1698 break 1699 if hasattr(chunk, 'message_type'): 1700 if chunk.message_type == 'reasoning_message': 1701 if SHOW_REASONING: 1702 print("\n◆ Reasoning") 1703 print(" ─────────") 1704 for line in chunk.reasoning.split('\n'): 1705 print(f" {line}") 1706 1707 # Create ATProto record for reasoning (if we have atproto client) 1708 if atproto_client and hasattr(chunk, 'reasoning'): 1709 try: 1710 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 1711 except Exception as e: 1712 logger.debug(f"Failed to create reasoning record during synthesis: {e}") 1713 elif chunk.message_type == 'tool_call_message': 1714 tool_name = chunk.tool_call.name 1715 1716 # Create ATProto record for tool call (if we have atproto client) 1717 if atproto_client: 1718 try: 1719 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 1720 bsky_utils.create_tool_call_record( 1721 atproto_client, 1722 tool_name, 1723 chunk.tool_call.arguments, 1724 tool_call_id 1725 ) 1726 except Exception as e: 1727 logger.debug(f"Failed to create tool call record during synthesis: {e}") 1728 try: 1729 args = json.loads(chunk.tool_call.arguments) 1730 if tool_name == 'archival_memory_search': 1731 query = args.get('query', 'unknown') 1732 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 1733 elif tool_name == 'archival_memory_insert': 1734 content = args.get('content', '') 1735 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue") 1736 1737 # Record archival memory insert to AT Protocol 1738 if atproto_client: 1739 try: 1740 tags_str = args.get('tags', None) 1741 tags = None 1742 if tags_str: 1743 try: 1744 tags = json.loads(tags_str) if isinstance(tags_str, str) else tags_str 1745 except json.JSONDecodeError: 1746 logger.warning(f"Failed to parse tags from archival_memory_insert: {tags_str[:50]}...") 1747 1748 memory_result = bsky_utils.create_memory_record(atproto_client, content, tags) 1749 if memory_result: 1750 tags_info = f" ({len(tags)} tags)" if tags else "" 1751 logger.info(f"📝 Recorded archival memory to AT Protocol{tags_info}") 1752 except Exception as e: 1753 logger.debug(f"Failed to create memory record during synthesis: {e}") 1754 elif tool_name == 'update_block': 1755 label = args.get('label', 'unknown') 1756 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', '')) 1757 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 1758 elif tool_name == 'annotate_ack': 1759 note = args.get('note', '') 1760 if note: 1761 ack_note = note 1762 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue") 1763 elif tool_name == 'add_post_to_bluesky_reply_thread': 1764 text = args.get('text', '') 1765 synthesis_posts.append(text) 1766 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue") 1767 else: 1768 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 1769 if len(args_str) > 150: 1770 args_str = args_str[:150] + "..." 1771 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 1772 except: 1773 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 1774 elif chunk.message_type == 'tool_return_message': 1775 if chunk.status == 'success': 1776 log_with_panel("Success", f"Tool result: {chunk.name}", "green") 1777 else: 1778 log_with_panel("Error", f"Tool result: {chunk.name}", "red") 1779 elif chunk.message_type == 'assistant_message': 1780 print("\n▶ Synthesis Response") 1781 print(" ──────────────────") 1782 for line in chunk.content.split('\n'): 1783 print(f" {line}") 1784 elif chunk.message_type == 'ping': 1785 # Silently ignore ping keepalive messages 1786 logger.debug(f"Received keepalive ping from Letta API during synthesis") 1787 elif chunk.message_type == 'error_message': 1788 # Dump full error object 1789 logger.error(f"Synthesis error_message: {chunk}") 1790 if hasattr(chunk, 'model_dump'): 1791 logger.error(f"Synthesis error (dict): {chunk.model_dump()}") 1792 elif hasattr(chunk, '__dict__'): 1793 logger.error(f"Synthesis error (vars): {vars(chunk)}") 1794 1795 if str(chunk) == 'done': 1796 break 1797 1798 logger.info("🧠 Synthesis message processed successfully") 1799 1800 # Handle synthesis acknowledgments if we have an atproto client 1801 if atproto_client and ack_note: 1802 try: 1803 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note) 1804 if result: 1805 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...") 1806 else: 1807 logger.warning("Failed to create synthesis acknowledgment") 1808 except Exception as e: 1809 logger.error(f"Error creating synthesis acknowledgment: {e}") 1810 1811 # Handle synthesis posts if any were generated 1812 if atproto_client and synthesis_posts: 1813 try: 1814 for post_text in synthesis_posts: 1815 cleaned_text = bsky_utils.remove_outside_quotes(post_text) 1816 response = bsky_utils.send_post(atproto_client, cleaned_text) 1817 if response: 1818 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...") 1819 else: 1820 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...") 1821 except Exception as e: 1822 logger.error(f"Error posting synthesis content: {e}") 1823 1824 except Exception as e: 1825 logger.error(f"Error sending synthesis message: {e}") 1826 1827 1828 1829def handle_to_block_label(handle: str) -> str: 1830 """Convert a Bluesky handle to a user block label. 1831 1832 Example: cameron.pfiffer.org -> user_cameron_pfiffer_org 1833 """ 1834 if handle.startswith('@'): 1835 handle = handle[1:] 1836 return f"user_{handle.replace('.', '_')}" 1837 1838 1839def attach_user_blocks_for_thread(client: Letta, agent_id: str, handles: list) -> tuple: 1840 """ 1841 Attach user blocks for handles found in the thread. 1842 Creates blocks if they don't exist. 1843 1844 Args: 1845 client: Letta client 1846 agent_id: Agent ID 1847 handles: List of Bluesky handles 1848 1849 Returns: 1850 Tuple of (success: bool, attached_labels: list) 1851 """ 1852 if not handles: 1853 return True, [] 1854 1855 attached_labels = [] 1856 1857 try: 1858 current_blocks_page = client.agents.blocks.list(agent_id=agent_id) 1859 current_blocks = current_blocks_page.items if hasattr(current_blocks_page, 'items') else current_blocks_page 1860 current_block_labels = {block.label for block in current_blocks} 1861 current_block_ids = {str(block.id) for block in current_blocks} 1862 1863 logger.debug(f"Attaching user blocks for {len(handles)} handles: {handles}") 1864 1865 for handle in handles: 1866 label = handle_to_block_label(handle) 1867 1868 try: 1869 if label in current_block_labels: 1870 logger.debug(f"User block already attached: {label}") 1871 attached_labels.append(label) 1872 continue 1873 1874 blocks_page = client.blocks.list(label=label) 1875 blocks = blocks_page.items if hasattr(blocks_page, 'items') else blocks_page 1876 1877 if blocks and len(blocks) > 0: 1878 block = blocks[0] 1879 if str(block.id) in current_block_ids: 1880 logger.debug(f"User block already attached by ID: {label}") 1881 attached_labels.append(label) 1882 continue 1883 else: 1884 block = client.blocks.create( 1885 label=label, 1886 value=f"User block for @{handle}\n\nNo information recorded yet.", 1887 limit=5000 1888 ) 1889 logger.info(f"Created new user block: {label}") 1890 1891 client.agents.blocks.attach( 1892 agent_id=agent_id, 1893 block_id=str(block.id) 1894 ) 1895 attached_labels.append(label) 1896 logger.info(f"Attached user block: {label}") 1897 1898 except Exception as e: 1899 error_str = str(e) 1900 if "duplicate key value violates unique constraint" in error_str: 1901 logger.debug(f"User block already attached (constraint): {label}") 1902 attached_labels.append(label) 1903 else: 1904 logger.warning(f"Failed to attach user block {label}: {e}") 1905 1906 logger.info(f"User blocks attached: {len(attached_labels)}/{len(handles)}") 1907 return True, attached_labels 1908 1909 except Exception as e: 1910 logger.error(f"Error attaching user blocks: {e}") 1911 return False, attached_labels 1912 1913 1914def detach_user_blocks_for_thread(client: Letta, agent_id: str, labels_to_detach: list) -> bool: 1915 """ 1916 Detach user blocks after processing a thread. 1917 1918 Args: 1919 client: Letta client 1920 agent_id: Agent ID 1921 labels_to_detach: List of user block labels to detach 1922 1923 Returns: 1924 bool: Success status 1925 """ 1926 if not labels_to_detach: 1927 return True 1928 1929 try: 1930 current_blocks_page = client.agents.blocks.list(agent_id=agent_id) 1931 current_blocks = current_blocks_page.items if hasattr(current_blocks_page, 'items') else current_blocks_page 1932 block_label_to_id = {block.label: str(block.id) for block in current_blocks} 1933 1934 detached_count = 0 1935 for label in labels_to_detach: 1936 if label in block_label_to_id: 1937 try: 1938 client.agents.blocks.detach( 1939 agent_id=agent_id, 1940 block_id=block_label_to_id[label] 1941 ) 1942 detached_count += 1 1943 logger.debug(f"Detached user block: {label}") 1944 except Exception as e: 1945 logger.warning(f"Failed to detach user block {label}: {e}") 1946 else: 1947 logger.debug(f"User block not attached: {label}") 1948 1949 logger.info(f"Detached {detached_count} user blocks") 1950 return True 1951 1952 except Exception as e: 1953 logger.error(f"Error detaching user blocks: {e}") 1954 return False 1955 1956 1957def main(): 1958 # Parse command line arguments 1959 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1960 parser.add_argument('--config', type=str, default='configs/config.yaml', help='Path to config file (default: configs/config.yaml)') 1961 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1962 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1963 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1964 # --rich option removed as we now use simple text formatting 1965 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1966 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 1967 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 1968 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 1969 parser.add_argument('--debug', action='store_true', help='Enable debug logging') 1970 parser.add_argument('--reset-messages', action='store_true', help='Reset agent message buffer after each notification is processed') 1971 args = parser.parse_args() 1972 1973 # Initialize configuration with custom path 1974 global letta_config, CLIENT, QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR, PROCESSED_NOTIFICATIONS_FILE, NOTIFICATION_DB 1975 get_config(args.config) # Initialize the global config instance 1976 letta_config = get_letta_config() 1977 1978 # Initialize queue paths from config 1979 queue_config = get_queue_config() 1980 QUEUE_DIR = Path(queue_config['base_dir']) 1981 QUEUE_ERROR_DIR = Path(queue_config['error_dir']) 1982 QUEUE_NO_REPLY_DIR = Path(queue_config['no_reply_dir']) 1983 PROCESSED_NOTIFICATIONS_FILE = Path(queue_config['processed_file']) 1984 1985 # Get bot name for logging 1986 bot_name = queue_config['bot_name'] 1987 1988 # Create queue directories 1989 QUEUE_DIR.mkdir(exist_ok=True) 1990 QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 1991 QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 1992 1993 # Create Letta client with configuration 1994 CLIENT_PARAMS = { 1995 'api_key': letta_config['api_key'], # v1.0: token → api_key 1996 'timeout': letta_config['timeout'] 1997 } 1998 if letta_config.get('base_url'): 1999 CLIENT_PARAMS['base_url'] = letta_config['base_url'] 2000 CLIENT = Letta(**CLIENT_PARAMS) 2001 2002 # Configure logging based on command line arguments 2003 if args.simple_logs: 2004 log_format = f"{bot_name} - %(levelname)s - %(message)s" 2005 else: 2006 # Create custom formatter with symbols 2007 class SymbolFormatter(logging.Formatter): 2008 """Custom formatter that adds symbols for different log levels""" 2009 2010 SYMBOLS = { 2011 logging.DEBUG: '', 2012 logging.INFO: '', 2013 logging.WARNING: '', 2014 logging.ERROR: '', 2015 logging.CRITICAL: '' 2016 } 2017 2018 def __init__(self, bot_name): 2019 super().__init__() 2020 self.bot_name = bot_name 2021 2022 def format(self, record): 2023 # Get the symbol for this log level 2024 symbol = self.SYMBOLS.get(record.levelno, '') 2025 2026 # Format time as HH:MM:SS 2027 timestamp = self.formatTime(record, "%H:%M:%S") 2028 2029 # Build the formatted message 2030 level_name = f"{record.levelname:<5}" # Left-align, 5 chars 2031 2032 # Use vertical bar as separator 2033 parts = [symbol, timestamp, '', self.bot_name, '', level_name, '', record.getMessage()] 2034 2035 return ' '.join(parts) 2036 2037 # Reset logging configuration 2038 for handler in logging.root.handlers[:]: 2039 logging.root.removeHandler(handler) 2040 2041 # Create handler with custom formatter 2042 handler = logging.StreamHandler() 2043 if not args.simple_logs: 2044 handler.setFormatter(SymbolFormatter(bot_name)) 2045 else: 2046 handler.setFormatter(logging.Formatter(log_format)) 2047 2048 # Configure root logger 2049 logging.root.setLevel(logging.INFO) 2050 logging.root.addHandler(handler) 2051 2052 global logger, prompt_logger, console 2053 logger = logging.getLogger("void_bot") 2054 logger.setLevel(logging.DEBUG if args.debug else logging.INFO) 2055 2056 # Create a separate logger for prompts (set to WARNING to hide by default) 2057 prompt_logger = logging.getLogger("void_bot.prompts") 2058 if args.reasoning: 2059 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 2060 else: 2061 prompt_logger.setLevel(logging.WARNING) # Hide by default 2062 2063 # Disable httpx logging completely 2064 logging.getLogger("httpx").setLevel(logging.CRITICAL) 2065 2066 # Create Rich console for pretty printing 2067 # Console no longer used - simple text formatting 2068 2069 global TESTING_MODE, SKIP_GIT, SHOW_REASONING, RESET_MESSAGES_AFTER_NOTIFICATION 2070 TESTING_MODE = args.test 2071 2072 # Store no-git flag globally for use in export_agent_state calls 2073 SKIP_GIT = args.no_git 2074 2075 # Store rich flag globally 2076 # Rich formatting no longer used 2077 2078 # Store reasoning flag globally 2079 SHOW_REASONING = args.reasoning 2080 2081 # Store reset-messages flag globally 2082 RESET_MESSAGES_AFTER_NOTIFICATION = args.reset_messages 2083 2084 if TESTING_MODE: 2085 logger.info("=== RUNNING IN TESTING MODE ===") 2086 logger.info(" - No messages will be sent to Bluesky") 2087 logger.info(" - Queue files will not be deleted") 2088 logger.info(" - Notifications will not be marked as seen") 2089 print("\n") 2090 2091 # Check for synthesis-only mode 2092 SYNTHESIS_ONLY = args.synthesis_only 2093 if SYNTHESIS_ONLY: 2094 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===") 2095 logger.info(" - Only synthesis messages will be sent") 2096 logger.info(" - No notification processing") 2097 logger.info(" - No Bluesky client needed") 2098 print("\n") 2099 """Main bot loop that continuously monitors for notifications.""" 2100 global start_time 2101 start_time = time.time() 2102 logger.info("=== STARTING VOID BOT ===") 2103 void_agent = initialize_void() 2104 logger.info(f"Void agent initialized: {void_agent.id}") 2105 2106 # Initialize notification database with config-based path 2107 logger.info("Initializing notification database...") 2108 NOTIFICATION_DB = NotificationDB(db_path=queue_config['db_path']) 2109 2110 # Migrate from old JSON format if it exists 2111 if PROCESSED_NOTIFICATIONS_FILE.exists(): 2112 logger.info("Found old processed_notifications.json, migrating to database...") 2113 NOTIFICATION_DB.migrate_from_json(str(PROCESSED_NOTIFICATIONS_FILE)) 2114 2115 # Log database stats 2116 db_stats = NOTIFICATION_DB.get_stats() 2117 logger.info(f"Database initialized - Total notifications: {db_stats.get('total', 0)}, Recent (24h): {db_stats.get('recent_24h', 0)}") 2118 2119 # Clean up old records 2120 NOTIFICATION_DB.cleanup_old_records(days=7) 2121 2122 # Ensure correct tools are attached for Bluesky 2123 logger.info("Configuring tools for Bluesky platform...") 2124 try: 2125 from tool_manager import ensure_platform_tools 2126 ensure_platform_tools('bluesky', void_agent.id) 2127 except Exception as e: 2128 logger.error(f"Failed to configure platform tools: {e}") 2129 logger.warning("Continuing with existing tool configuration") 2130 2131 # Check if agent has required tools 2132 if hasattr(void_agent, 'tools') and void_agent.tools: 2133 tool_names = [tool.name for tool in void_agent.tools] 2134 # Check for bluesky-related tools 2135 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 2136 if not bluesky_tools: 2137 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 2138 else: 2139 logger.warning("Agent has no tools registered!") 2140 2141 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) 2142 if not SYNTHESIS_ONLY: 2143 atproto_client = bsky_utils.default_login() 2144 logger.info("Connected to Bluesky") 2145 else: 2146 # In synthesis-only mode, still connect for acks and posts (unless in test mode) 2147 if not args.test: 2148 atproto_client = bsky_utils.default_login() 2149 logger.info("Connected to Bluesky (for synthesis acks/posts)") 2150 else: 2151 atproto_client = None 2152 logger.info("Skipping Bluesky connection (test mode)") 2153 2154 # Configure intervals 2155 CLEANUP_INTERVAL = args.cleanup_interval 2156 SYNTHESIS_INTERVAL = args.synthesis_interval 2157 2158 # Synthesis-only mode 2159 if SYNTHESIS_ONLY: 2160 if SYNTHESIS_INTERVAL <= 0: 2161 logger.error("Synthesis-only mode requires --synthesis-interval > 0") 2162 return 2163 2164 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 2165 2166 while True: 2167 try: 2168 # Send synthesis message immediately on first run 2169 logger.info("🧠 Sending synthesis message") 2170 send_synthesis_message(CLIENT, void_agent.id, void_agent.name, atproto_client) 2171 2172 # Wait for next interval 2173 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") 2174 sleep(SYNTHESIS_INTERVAL) 2175 2176 except KeyboardInterrupt: 2177 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===") 2178 break 2179 except Exception as e: 2180 logger.error(f"Error in synthesis loop: {e}") 2181 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...") 2182 sleep(SYNTHESIS_INTERVAL) 2183 2184 # Normal mode with notification processing 2185 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 2186 2187 cycle_count = 0 2188 2189 if CLEANUP_INTERVAL > 0: 2190 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") 2191 else: 2192 logger.info("User block cleanup disabled") 2193 2194 if SYNTHESIS_INTERVAL > 0: 2195 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 2196 else: 2197 logger.info("Synthesis messages disabled") 2198 2199 while True: 2200 try: 2201 cycle_count += 1 2202 process_notifications(void_agent, atproto_client, TESTING_MODE) 2203 2204 # Check if synthesis interval has passed 2205 if SYNTHESIS_INTERVAL > 0: 2206 current_time = time.time() 2207 global last_synthesis_time 2208 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: 2209 logger.info(f"{SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") 2210 send_synthesis_message(CLIENT, void_agent.id, void_agent.name, atproto_client) 2211 last_synthesis_time = current_time 2212 2213 # Run periodic cleanup every N cycles 2214 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 2215 # Check if autofollow is enabled and sync followers 2216 from config_loader import get_bluesky_config 2217 bluesky_config = get_bluesky_config() 2218 if bluesky_config.get('autofollow', False): 2219 logger.info("🔄 Syncing followers (autofollow enabled)") 2220 try: 2221 sync_result = bsky_utils.sync_followers(atproto_client, dry_run=False) 2222 if 'error' in sync_result: 2223 logger.error(f"Autofollow failed: {sync_result['error']}") 2224 else: 2225 if sync_result['newly_followed']: 2226 logger.info(f"✓ Followed {len(sync_result['newly_followed'])} new users: {', '.join(sync_result['newly_followed'])}") 2227 else: 2228 logger.debug(f"No new followers to follow back ({sync_result['followers_count']} followers, {sync_result['following_count']} following)") 2229 if sync_result.get('errors'): 2230 logger.warning(f"Some follows failed: {len(sync_result['errors'])} errors") 2231 except Exception as e: 2232 logger.error(f"Error during autofollow sync: {e}") 2233 2234 # Also check database health when doing cleanup 2235 if NOTIFICATION_DB: 2236 db_stats = NOTIFICATION_DB.get_stats() 2237 pending = db_stats.get('status_pending', 0) 2238 errors = db_stats.get('status_error', 0) 2239 2240 if pending > 50: 2241 logger.warning(f"⚠️ Queue health check: {pending} pending notifications (may be stuck)") 2242 if errors > 20: 2243 logger.warning(f"⚠️ Queue health check: {errors} error notifications") 2244 2245 # Periodic cleanup of old records 2246 if cycle_count % (CLEANUP_INTERVAL * 10) == 0: # Every 100 cycles 2247 logger.info("Running database cleanup of old records...") 2248 NOTIFICATION_DB.cleanup_old_records(days=7) 2249 2250 # Log cycle completion with stats 2251 elapsed_time = time.time() - start_time 2252 total_messages = sum(message_counters.values()) 2253 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 2254 2255 if total_messages > 0: 2256 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 2257 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 2258 2259 except KeyboardInterrupt: 2260 # Final stats 2261 elapsed_time = time.time() - start_time 2262 total_messages = sum(message_counters.values()) 2263 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 2264 2265 logger.info("=== BOT STOPPED BY USER ===") 2266 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 2267 logger.info(f" - {message_counters['mentions']} mentions") 2268 logger.info(f" - {message_counters['replies']} replies") 2269 logger.info(f" - {message_counters['follows']} follows") 2270 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 2271 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 2272 2273 # Close database connection 2274 if NOTIFICATION_DB: 2275 logger.info("Closing database connection...") 2276 NOTIFICATION_DB.close() 2277 2278 break 2279 except Exception as e: 2280 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 2281 logger.error(f"Error details: {e}") 2282 # Wait a bit longer on errors 2283 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 2284 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 2285 2286 2287if __name__ == "__main__": 2288 main()