this repo has no description

Add comprehensive logging to bsky.py

- Add message counters and statistics tracking
- Create separate logger for prompts to reduce noise
- Add numbered cycle tracking and section markers
- Add progress indicators for queue processing
- Track messages per minute and session totals
- Improve logging of agent responses
- Set httpx logging to CRITICAL to remove noise
- Add detailed logging for initialization and main loop flow

+105 -20
+105 -20
bsky.py
··· 9 import subprocess 10 from pathlib import Path 11 from datetime import datetime 12 13 from utils import ( 14 upsert_block, ··· 43 level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 44 ) 45 logger = logging.getLogger("void_bot") 46 - logger.setLevel(logging.INFO) 47 48 - # Set httpx logging to DEBUG to reduce noise 49 - logging.getLogger("httpx").setLevel(logging.DEBUG) 50 51 52 # Create a client with extended timeout for LLM operations ··· 70 71 # Maximum number of processed notifications to track 72 MAX_PROCESSED_NOTIFICATIONS = 10000 73 74 def export_agent_state(client, agent): 75 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" ··· 112 logger.error(f"Failed to export agent: {e}") 113 114 def initialize_void(): 115 116 # Ensure that a shared zeitgeist block exists 117 zeigeist_block = upsert_block( 118 CLIENT, 119 label = "zeitgeist", ··· 122 ) 123 124 # Ensure that a shared void personality block exists 125 persona_block = upsert_block( 126 CLIENT, 127 label = "void-persona", ··· 130 ) 131 132 # Ensure that a shared void human block exists 133 human_block = upsert_block( 134 CLIENT, 135 label = "void-humans", ··· 138 ) 139 140 # Create the agent if it doesn't exist 141 void_agent = upsert_agent( 142 CLIENT, 143 name = "void", ··· 154 ) 155 156 # Export agent state 157 export_agent_state(CLIENT, void_agent) 158 159 # Log agent details ··· 229 logger.error(f"Thread attributes: {thread.__dict__}") 230 # Try to continue with a simple context 231 thread_context = f"Error processing thread context: {str(yaml_error)}" 232 - 233 - # print(thread_context) 234 235 # Create a prompt for the Letta agent with thread context 236 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). ··· 269 270 # Get response from Letta agent 271 logger.info(f"Mention from @{author_handle}: {mention_text}") 272 - logger.debug(f"Prompt being sent: {prompt}") 273 274 - # Log the exact parameters being sent to Letta 275 - logger.debug(f"Calling Letta API with agent_id: {void_agent.id}") 276 - logger.debug(f"Message content length: {len(prompt)} characters") 277 278 try: 279 message_response = CLIENT.agents.messages.create( ··· 333 334 # Extract the reply text from the agent's response 335 reply_text = "" 336 - for message in message_response.messages: 337 - print(message) 338 339 # Check if this is a ToolCallMessage with bluesky_reply tool 340 if hasattr(message, 'tool_call') and message.tool_call: ··· 478 479 def load_and_process_queued_notifications(void_agent, atproto_client): 480 """Load and process all notifications from the queue.""" 481 try: 482 # Get all JSON files in queue directory (excluding processed_notifications.json) 483 queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 484 485 if not queue_files: 486 - logger.debug("No queued notifications to process") 487 return 488 489 logger.info(f"Processing {len(queue_files)} queued notifications") 490 491 - for filepath in queue_files: 492 try: 493 # Load notification data 494 with open(filepath, 'r') as f: 495 notif_data = json.load(f) 496 497 # Process based on type using dict data directly 498 success = False 499 if notif_data['reason'] == "mention": 500 success = process_mention(void_agent, atproto_client, notif_data) 501 elif notif_data['reason'] == "reply": 502 success = process_mention(void_agent, atproto_client, notif_data) 503 elif notif_data['reason'] == "follow": 504 author_handle = notif_data['author']['handle'] 505 author_display_name = notif_data['author'].get('display_name', 'no display name') 506 follow_update = f"@{author_handle} ({author_display_name}) started following you." 507 CLIENT.agents.messages.create( 508 agent_id = void_agent.id, 509 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 510 ) 511 success = True # Follow updates are always successful 512 elif notif_data['reason'] == "repost": 513 logger.info(f"Skipping repost notification from @{notif_data['author']['handle']}") 514 success = True # Skip reposts but mark as successful to remove from queue 515 else: 516 logger.warning(f"Unknown notification type: {notif_data['reason']}") 517 success = True # Remove unknown types from queue ··· 519 # Handle file based on processing result 520 if success: 521 filepath.unlink() 522 - logger.info(f"Processed and removed: {filepath.name}") 523 524 # Mark as processed to avoid reprocessing 525 processed_uris = load_processed_notifications() ··· 529 elif success is None: # Special case for moving to error directory 530 error_path = QUEUE_ERROR_DIR / filepath.name 531 filepath.rename(error_path) 532 - logger.warning(f"Moved {filepath.name} to errors directory") 533 534 # Also mark as processed to avoid retrying 535 processed_uris = load_processed_notifications() ··· 537 save_processed_notifications(processed_uris) 538 539 else: 540 - logger.warning(f"Failed to process {filepath.name}, keeping in queue for retry") 541 542 except Exception as e: 543 - logger.error(f"Error processing queued notification {filepath.name}: {e}") 544 # Keep the file for retry later 545 546 except Exception as e: ··· 549 550 def process_notifications(void_agent, atproto_client): 551 """Fetch new notifications, queue them, and process the queue.""" 552 try: 553 # First, process any existing queued notifications 554 load_and_process_queued_notifications(void_agent, atproto_client) 555 556 # Get current time for marking notifications as seen 557 last_seen_at = atproto_client.get_current_time_iso() 558 559 # Fetch ALL notifications using pagination 560 all_notifications = [] 561 cursor = None 562 page_count = 0 ··· 615 break 616 617 # Queue all unread notifications (except likes) 618 new_count = 0 619 for notification in all_notifications: 620 if not notification.is_read and notification.reason != "like": ··· 629 logger.debug("No new notifications to queue") 630 631 # Process the queue (including any newly added notifications) 632 load_and_process_queued_notifications(void_agent, atproto_client) 633 634 except Exception as e: ··· 637 638 def main(): 639 """Main bot loop that continuously monitors for notifications.""" 640 logger.info("Initializing Void bot...") 641 642 # Initialize the Letta agent 643 void_agent = initialize_void() 644 logger.info(f"Void agent initialized: {void_agent.id}") 645 ··· 658 logger.warning("Agent has no tools registered!") 659 660 # Initialize Bluesky client 661 atproto_client = bsky_utils.default_login() 662 logger.info("Connected to Bluesky") 663 664 # Main loop 665 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 666 667 while True: 668 try: 669 process_notifications(void_agent, atproto_client) 670 - logger.debug("Sleeping, no notifications were detected") 671 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 672 673 except KeyboardInterrupt: 674 - logger.info("Bot stopped by user") 675 break 676 except Exception as e: 677 - logger.error(f"Error in main loop: {e}") 678 # Wait a bit longer on errors 679 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 680 681
··· 9 import subprocess 10 from pathlib import Path 11 from datetime import datetime 12 + from collections import defaultdict 13 + import time 14 15 from utils import ( 16 upsert_block, ··· 45 level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 46 ) 47 logger = logging.getLogger("void_bot") 48 + logger.setLevel(logging.DEBUG) 49 50 + # Create a separate logger for prompts (set to WARNING to hide by default) 51 + prompt_logger = logging.getLogger("void_bot.prompts") 52 + prompt_logger.setLevel(logging.WARNING) # Change to DEBUG if you want to see prompts 53 + 54 + # Disable httpx logging completely 55 + logging.getLogger("httpx").setLevel(logging.CRITICAL) 56 57 58 # Create a client with extended timeout for LLM operations ··· 76 77 # Maximum number of processed notifications to track 78 MAX_PROCESSED_NOTIFICATIONS = 10000 79 + 80 + # Message tracking counters 81 + message_counters = defaultdict(int) 82 + start_time = time.time() 83 84 def export_agent_state(client, agent): 85 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" ··· 122 logger.error(f"Failed to export agent: {e}") 123 124 def initialize_void(): 125 + logger.info("Starting void agent initialization...") 126 127 # Ensure that a shared zeitgeist block exists 128 + logger.info("Creating/updating zeitgeist block...") 129 zeigeist_block = upsert_block( 130 CLIENT, 131 label = "zeitgeist", ··· 134 ) 135 136 # Ensure that a shared void personality block exists 137 + logger.info("Creating/updating void-persona block...") 138 persona_block = upsert_block( 139 CLIENT, 140 label = "void-persona", ··· 143 ) 144 145 # Ensure that a shared void human block exists 146 + logger.info("Creating/updating void-humans block...") 147 human_block = upsert_block( 148 CLIENT, 149 label = "void-humans", ··· 152 ) 153 154 # Create the agent if it doesn't exist 155 + logger.info("Creating/updating void agent...") 156 void_agent = upsert_agent( 157 CLIENT, 158 name = "void", ··· 169 ) 170 171 # Export agent state 172 + logger.info("Exporting agent state...") 173 export_agent_state(CLIENT, void_agent) 174 175 # Log agent details ··· 245 logger.error(f"Thread attributes: {thread.__dict__}") 246 # Try to continue with a simple context 247 thread_context = f"Error processing thread context: {str(yaml_error)}" 248 249 # Create a prompt for the Letta agent with thread context 250 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). ··· 283 284 # Get response from Letta agent 285 logger.info(f"Mention from @{author_handle}: {mention_text}") 286 287 + # Log prompt details to separate logger 288 + prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 289 + 290 + # Log concise prompt info to main logger 291 + thread_handles_count = len(unique_handles) 292 + logger.info(f"💬 Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users") 293 294 try: 295 message_response = CLIENT.agents.messages.create( ··· 349 350 # Extract the reply text from the agent's response 351 reply_text = "" 352 + logger.debug(f"Processing {len(message_response.messages)} response messages...") 353 + 354 + for i, message in enumerate(message_response.messages, 1): 355 + # Log concise message info instead of full object 356 + msg_type = getattr(message, 'message_type', 'unknown') 357 + if hasattr(message, 'reasoning') and message.reasoning: 358 + logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 359 + elif hasattr(message, 'tool_call') and message.tool_call: 360 + tool_name = message.tool_call.name 361 + logger.debug(f" {i}. {msg_type}: {tool_name}") 362 + elif hasattr(message, 'tool_return'): 363 + tool_name = getattr(message, 'name', 'unknown_tool') 364 + return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 365 + logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}...") 366 + elif hasattr(message, 'text'): 367 + logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 368 + else: 369 + logger.debug(f" {i}. {msg_type}: <no content>") 370 371 # Check if this is a ToolCallMessage with bluesky_reply tool 372 if hasattr(message, 'tool_call') and message.tool_call: ··· 510 511 def load_and_process_queued_notifications(void_agent, atproto_client): 512 """Load and process all notifications from the queue.""" 513 + logger.info("Loading queued notifications from disk...") 514 try: 515 # Get all JSON files in queue directory (excluding processed_notifications.json) 516 queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 517 518 if not queue_files: 519 + logger.info("No queued notifications found") 520 return 521 522 logger.info(f"Processing {len(queue_files)} queued notifications") 523 + 524 + # Log current statistics 525 + elapsed_time = time.time() - start_time 526 + total_messages = sum(message_counters.values()) 527 + messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 528 + 529 + logger.info(f"📊 Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 530 531 + for i, filepath in enumerate(queue_files, 1): 532 + logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}") 533 try: 534 # Load notification data 535 with open(filepath, 'r') as f: 536 notif_data = json.load(f) 537 538 # Process based on type using dict data directly 539 + logger.info(f"Processing {notif_data['reason']} from @{notif_data['author']['handle']}") 540 success = False 541 if notif_data['reason'] == "mention": 542 success = process_mention(void_agent, atproto_client, notif_data) 543 + if success: 544 + message_counters['mentions'] += 1 545 elif notif_data['reason'] == "reply": 546 success = process_mention(void_agent, atproto_client, notif_data) 547 + if success: 548 + message_counters['replies'] += 1 549 elif notif_data['reason'] == "follow": 550 author_handle = notif_data['author']['handle'] 551 author_display_name = notif_data['author'].get('display_name', 'no display name') 552 follow_update = f"@{author_handle} ({author_display_name}) started following you." 553 + logger.info(f"Notifying agent about new follower: @{author_handle}") 554 CLIENT.agents.messages.create( 555 agent_id = void_agent.id, 556 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 557 ) 558 success = True # Follow updates are always successful 559 + if success: 560 + message_counters['follows'] += 1 561 elif notif_data['reason'] == "repost": 562 logger.info(f"Skipping repost notification from @{notif_data['author']['handle']}") 563 success = True # Skip reposts but mark as successful to remove from queue 564 + if success: 565 + message_counters['reposts_skipped'] += 1 566 else: 567 logger.warning(f"Unknown notification type: {notif_data['reason']}") 568 success = True # Remove unknown types from queue ··· 570 # Handle file based on processing result 571 if success: 572 filepath.unlink() 573 + logger.info(f"✅ Successfully processed and removed: {filepath.name}") 574 575 # Mark as processed to avoid reprocessing 576 processed_uris = load_processed_notifications() ··· 580 elif success is None: # Special case for moving to error directory 581 error_path = QUEUE_ERROR_DIR / filepath.name 582 filepath.rename(error_path) 583 + logger.warning(f"❌ Moved {filepath.name} to errors directory") 584 585 # Also mark as processed to avoid retrying 586 processed_uris = load_processed_notifications() ··· 588 save_processed_notifications(processed_uris) 589 590 else: 591 + logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 592 593 except Exception as e: 594 + logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 595 # Keep the file for retry later 596 597 except Exception as e: ··· 600 601 def process_notifications(void_agent, atproto_client): 602 """Fetch new notifications, queue them, and process the queue.""" 603 + logger.info("Starting notification processing cycle...") 604 try: 605 # First, process any existing queued notifications 606 + logger.info("Processing existing queued notifications...") 607 load_and_process_queued_notifications(void_agent, atproto_client) 608 609 # Get current time for marking notifications as seen 610 + logger.debug("Getting current time for notification marking...") 611 last_seen_at = atproto_client.get_current_time_iso() 612 613 # Fetch ALL notifications using pagination 614 + logger.info("Beginning notification fetch with pagination...") 615 all_notifications = [] 616 cursor = None 617 page_count = 0 ··· 670 break 671 672 # Queue all unread notifications (except likes) 673 + logger.info("Queuing unread notifications...") 674 new_count = 0 675 for notification in all_notifications: 676 if not notification.is_read and notification.reason != "like": ··· 685 logger.debug("No new notifications to queue") 686 687 # Process the queue (including any newly added notifications) 688 + logger.info("Processing notification queue after fetching...") 689 load_and_process_queued_notifications(void_agent, atproto_client) 690 691 except Exception as e: ··· 694 695 def main(): 696 """Main bot loop that continuously monitors for notifications.""" 697 + global start_time 698 + start_time = time.time() 699 + logger.info("=== STARTING VOID BOT ===") 700 logger.info("Initializing Void bot...") 701 702 # Initialize the Letta agent 703 + logger.info("Calling initialize_void()...") 704 void_agent = initialize_void() 705 logger.info(f"Void agent initialized: {void_agent.id}") 706 ··· 719 logger.warning("Agent has no tools registered!") 720 721 # Initialize Bluesky client 722 + logger.info("Connecting to Bluesky...") 723 atproto_client = bsky_utils.default_login() 724 logger.info("Connected to Bluesky") 725 726 # Main loop 727 + logger.info(f"=== ENTERING MAIN LOOP ===") 728 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 729 730 + cycle_count = 0 731 while True: 732 try: 733 + cycle_count += 1 734 + logger.info(f"=== MAIN LOOP CYCLE {cycle_count} ===") 735 process_notifications(void_agent, atproto_client) 736 + # Log cycle completion with stats 737 + elapsed_time = time.time() - start_time 738 + total_messages = sum(message_counters.values()) 739 + messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 740 + 741 + logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 742 + logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC} seconds...") 743 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 744 745 except KeyboardInterrupt: 746 + # Final stats 747 + elapsed_time = time.time() - start_time 748 + total_messages = sum(message_counters.values()) 749 + messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 750 + 751 + logger.info("=== BOT STOPPED BY USER ===") 752 + logger.info(f"📊 Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 753 + logger.info(f" - {message_counters['mentions']} mentions") 754 + logger.info(f" - {message_counters['replies']} replies") 755 + logger.info(f" - {message_counters['follows']} follows") 756 + logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 757 + logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 758 break 759 except Exception as e: 760 + logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 761 + logger.error(f"Error details: {e}") 762 # Wait a bit longer on errors 763 + logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 764 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 765 766