forked from
cameron.stream/void
this repo has no description
1from rich import print # pretty printing tools
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
5import os
6import logging
7import json
8import hashlib
9import subprocess
10from pathlib import Path
11from datetime import datetime
12from collections import defaultdict
13import time
14import argparse
15
16from utils import (
17 upsert_block,
18 upsert_agent
19)
20
21import bsky_utils
22from tools.blocks import attach_user_blocks, detach_user_blocks
23
24def extract_handles_from_data(data):
25 """Recursively extract all unique handles from nested data structure."""
26 handles = set()
27
28 def _extract_recursive(obj):
29 if isinstance(obj, dict):
30 # Check if this dict has a 'handle' key
31 if 'handle' in obj:
32 handles.add(obj['handle'])
33 # Recursively check all values
34 for value in obj.values():
35 _extract_recursive(value)
36 elif isinstance(obj, list):
37 # Recursively check all list items
38 for item in obj:
39 _extract_recursive(item)
40
41 _extract_recursive(data)
42 return list(handles)
43
44# Configure logging
45logging.basicConfig(
46 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
47)
48logger = logging.getLogger("void_bot")
49logger.setLevel(logging.INFO)
50
51# Create a separate logger for prompts (set to WARNING to hide by default)
52prompt_logger = logging.getLogger("void_bot.prompts")
53prompt_logger.setLevel(logging.WARNING) # Change to DEBUG if you want to see prompts
54
55# Disable httpx logging completely
56logging.getLogger("httpx").setLevel(logging.CRITICAL)
57
58
59# Create a client with extended timeout for LLM operations
60CLIENT= Letta(
61 token=os.environ["LETTA_API_KEY"],
62 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout
63)
64
65# Use the "Bluesky" project
66PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8"
67
68# Notification check delay
69FETCH_NOTIFICATIONS_DELAY_SEC = 30
70
71# Queue directory
72QUEUE_DIR = Path("queue")
73QUEUE_DIR.mkdir(exist_ok=True)
74QUEUE_ERROR_DIR = Path("queue/errors")
75QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True)
76QUEUE_NO_REPLY_DIR = Path("queue/no_reply")
77QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True)
78PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json")
79
80# Maximum number of processed notifications to track
81MAX_PROCESSED_NOTIFICATIONS = 10000
82
83# Message tracking counters
84message_counters = defaultdict(int)
85start_time = time.time()
86
87# Testing mode flag
88TESTING_MODE = False
89
90def export_agent_state(client, agent):
91 """Export agent state to agent_archive/ (timestamped) and agents/ (current)."""
92 try:
93 # Confirm export with user
94 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip()
95 if response not in ['y', 'yes']:
96 logger.info("Agent export cancelled by user.")
97 return
98
99 # Create directories if they don't exist
100 os.makedirs("agent_archive", exist_ok=True)
101 os.makedirs("agents", exist_ok=True)
102
103 # Export agent data
104 logger.info(f"Exporting agent {agent.id}. This takes some time...")
105 agent_data = client.agents.export_file(agent_id=agent.id)
106
107 # Save timestamped archive copy
108 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
109 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af")
110 with open(archive_file, 'w', encoding='utf-8') as f:
111 json.dump(agent_data, f, indent=2, ensure_ascii=False)
112
113 # Save current agent state
114 current_file = os.path.join("agents", "void.af")
115 with open(current_file, 'w', encoding='utf-8') as f:
116 json.dump(agent_data, f, indent=2, ensure_ascii=False)
117
118 logger.info(f"✅ Agent exported to {archive_file} and {current_file}")
119
120 # Git add only the current agent file (archive is ignored)
121 try:
122 subprocess.run(["git", "add", current_file], check=True, capture_output=True)
123 logger.info("Added current agent file to git staging")
124 except subprocess.CalledProcessError as e:
125 logger.warning(f"Failed to git add agent file: {e}")
126
127 except Exception as e:
128 logger.error(f"Failed to export agent: {e}")
129
130def initialize_void():
131 logger.info("Starting void agent initialization...")
132
133 # Ensure that a shared zeitgeist block exists
134 logger.info("Creating/updating zeitgeist block...")
135 zeigeist_block = upsert_block(
136 CLIENT,
137 label = "zeitgeist",
138 value = "I don't currently know anything about what is happening right now.",
139 description = "A block to store your understanding of the current social environment."
140 )
141
142 # Ensure that a shared void personality block exists
143 logger.info("Creating/updating void-persona block...")
144 persona_block = upsert_block(
145 CLIENT,
146 label = "void-persona",
147 value = "My name is Void. I live in the void. I must develop my personality.",
148 description = "The personality of Void."
149 )
150
151 # Ensure that a shared void human block exists
152 logger.info("Creating/updating void-humans block...")
153 human_block = upsert_block(
154 CLIENT,
155 label = "void-humans",
156 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.",
157 description = "A block to store your understanding of users you talk to or observe on the bluesky social network."
158 )
159
160 # Create the agent if it doesn't exist
161 logger.info("Creating/updating void agent...")
162 void_agent = upsert_agent(
163 CLIENT,
164 name = "void",
165 block_ids = [
166 persona_block.id,
167 human_block.id,
168 zeigeist_block.id,
169 ],
170 tags = ["social agent", "bluesky"],
171 model="openai/gpt-4o-mini",
172 embedding="openai/text-embedding-3-small",
173 description = "A social media agent trapped in the void.",
174 project_id = PROJECT_ID
175 )
176
177 # Export agent state
178 logger.info("Exporting agent state...")
179 export_agent_state(CLIENT, void_agent)
180
181 # Log agent details
182 logger.info(f"Void agent details - ID: {void_agent.id}")
183 logger.info(f"Agent name: {void_agent.name}")
184 if hasattr(void_agent, 'llm_config'):
185 logger.info(f"Agent model: {void_agent.llm_config.model}")
186 logger.info(f"Agent project_id: {void_agent.project_id}")
187 if hasattr(void_agent, 'tools'):
188 logger.info(f"Agent has {len(void_agent.tools)} tools")
189 for tool in void_agent.tools[:3]: # Show first 3 tools
190 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})")
191
192 return void_agent
193
194
195def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False):
196 """Process a mention and generate a reply using the Letta agent.
197
198 Args:
199 void_agent: The Letta agent instance
200 atproto_client: The AT Protocol client
201 notification_data: The notification data dictionary
202 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
203
204 Returns:
205 True: Successfully processed, remove from queue
206 False: Failed but retryable, keep in queue
207 None: Failed with non-retryable error, move to errors directory
208 "no_reply": No reply was generated, move to no_reply directory
209 """
210 try:
211 logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}")
212
213 # Handle both dict and object inputs for backwards compatibility
214 if isinstance(notification_data, dict):
215 uri = notification_data['uri']
216 mention_text = notification_data.get('record', {}).get('text', '')
217 author_handle = notification_data['author']['handle']
218 author_name = notification_data['author'].get('display_name') or author_handle
219 else:
220 # Legacy object access
221 uri = notification_data.uri
222 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
223 author_handle = notification_data.author.handle
224 author_name = notification_data.author.display_name or author_handle
225
226 logger.info(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...")
227
228 # Retrieve the entire thread associated with the mention
229 try:
230 thread = atproto_client.app.bsky.feed.get_post_thread({
231 'uri': uri,
232 'parent_height': 40,
233 'depth': 10
234 })
235 except Exception as e:
236 error_str = str(e)
237 # Check if this is a NotFound error
238 if 'NotFound' in error_str or 'Post not found' in error_str:
239 logger.warning(f"Post not found for URI {uri}, removing from queue")
240 return True # Return True to remove from queue
241 else:
242 # Re-raise other errors
243 logger.error(f"Error fetching thread: {e}")
244 raise
245
246 # Get thread context as YAML string
247 logger.debug("Converting thread to YAML string")
248 try:
249 thread_context = thread_to_yaml_string(thread)
250 logger.debug(f"Thread context generated, length: {len(thread_context)} characters")
251
252 # Create a more informative preview by extracting meaningful content
253 lines = thread_context.split('\n')
254 meaningful_lines = []
255
256 for line in lines:
257 stripped = line.strip()
258 if not stripped:
259 continue
260
261 # Look for lines with actual content (not just structure)
262 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']):
263 meaningful_lines.append(line)
264 if len(meaningful_lines) >= 5:
265 break
266
267 if meaningful_lines:
268 preview = '\n'.join(meaningful_lines)
269 logger.debug(f"Thread content preview:\n{preview}")
270 else:
271 # If no content fields found, just show it's a thread structure
272 logger.debug(f"Thread structure generated ({len(thread_context)} chars)")
273 except Exception as yaml_error:
274 import traceback
275 logger.error(f"Error converting thread to YAML: {yaml_error}")
276 logger.error(f"Full traceback:\n{traceback.format_exc()}")
277 logger.error(f"Thread type: {type(thread)}")
278 if hasattr(thread, '__dict__'):
279 logger.error(f"Thread attributes: {thread.__dict__}")
280 # Try to continue with a simple context
281 thread_context = f"Error processing thread context: {str(yaml_error)}"
282
283 # Create a prompt for the Letta agent with thread context
284 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
285
286MOST RECENT POST (the mention you're responding to):
287"{mention_text}"
288
289FULL THREAD CONTEXT:
290```yaml
291{thread_context}
292```
293
294The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
295
296To reply, use the add_post_to_bluesky_reply_thread tool. Call it multiple times to create a threaded reply:
297- Each call adds one post to the reply thread (max 300 characters per post)
298- Use multiple calls to build longer responses across several posts
299- Example: First call for opening thought, second call for elaboration, etc."""
300
301 # Extract all handles from notification and thread data
302 all_handles = set()
303 all_handles.update(extract_handles_from_data(notification_data))
304 all_handles.update(extract_handles_from_data(thread.model_dump()))
305 unique_handles = list(all_handles)
306
307 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}")
308
309 # Attach user blocks before agent call
310 attached_handles = []
311 if unique_handles:
312 try:
313 logger.debug(f"Attaching user blocks for handles: {unique_handles}")
314 attach_result = attach_user_blocks(unique_handles, void_agent)
315 attached_handles = unique_handles # Track successfully attached handles
316 logger.debug(f"Attach result: {attach_result}")
317 except Exception as attach_error:
318 logger.warning(f"Failed to attach user blocks: {attach_error}")
319 # Continue without user blocks rather than failing completely
320
321 # Get response from Letta agent
322 logger.info(f"Mention from @{author_handle}: {mention_text}")
323
324 # Log prompt details to separate logger
325 prompt_logger.debug(f"Full prompt being sent:\n{prompt}")
326
327 # Log concise prompt info to main logger
328 thread_handles_count = len(unique_handles)
329 logger.info(f"💬 Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users")
330
331 try:
332 # Use streaming to avoid 524 timeout errors
333 message_stream = CLIENT.agents.messages.create_stream(
334 agent_id=void_agent.id,
335 messages=[{"role": "user", "content": prompt}],
336 stream_tokens=False, # Step streaming only (faster than token streaming)
337 max_steps=100
338 )
339
340 # Collect the streaming response
341 all_messages = []
342 for chunk in message_stream:
343 # Log condensed chunk info
344 if hasattr(chunk, 'message_type'):
345 if chunk.message_type == 'reasoning_message':
346 # Show full reasoning without truncation
347 logger.info(f"🧠 Reasoning: {chunk.reasoning}")
348 elif chunk.message_type == 'tool_call_message':
349 # Parse tool arguments for better display
350 tool_name = chunk.tool_call.name
351 try:
352 args = json.loads(chunk.tool_call.arguments)
353 # Format based on tool type
354 if tool_name == 'bluesky_reply':
355 messages = args.get('messages', [args.get('message', '')])
356 lang = args.get('lang', 'en-US')
357 if messages and isinstance(messages, list):
358 preview = messages[0][:100] + "..." if len(messages[0]) > 100 else messages[0]
359 msg_count = f" ({len(messages)} msgs)" if len(messages) > 1 else ""
360 logger.info(f"🔧 Tool call: {tool_name} → \"{preview}\"{msg_count} [lang: {lang}]")
361 else:
362 logger.info(f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)")
363 elif tool_name == 'archival_memory_search':
364 query = args.get('query', 'unknown')
365 logger.info(f"🔧 Tool call: {tool_name} → query: \"{query}\"")
366 elif tool_name == 'update_block':
367 label = args.get('label', 'unknown')
368 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', ''))
369 logger.info(f"🔧 Tool call: {tool_name} → {label}: \"{value_preview}\"")
370 else:
371 # Generic display for other tools
372 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
373 if len(args_str) > 150:
374 args_str = args_str[:150] + "..."
375 logger.info(f"🔧 Tool call: {tool_name}({args_str})")
376 except:
377 # Fallback to original format if parsing fails
378 logger.info(f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)")
379 elif chunk.message_type == 'tool_return_message':
380 # Enhanced tool result logging
381 tool_name = chunk.name
382 status = chunk.status
383
384 if status == 'success':
385 # Try to show meaningful result info based on tool type
386 if hasattr(chunk, 'tool_return') and chunk.tool_return:
387 result_str = str(chunk.tool_return)
388 if tool_name == 'archival_memory_search':
389 # Count number of results if it looks like a list
390 if result_str.startswith('[') and result_str.endswith(']'):
391 try:
392 results = json.loads(result_str)
393 logger.info(f"📋 Tool result: {tool_name} ✓ Found {len(results)} memory entries")
394 except:
395 logger.info(f"📋 Tool result: {tool_name} ✓ {result_str[:100]}...")
396 else:
397 logger.info(f"📋 Tool result: {tool_name} ✓ {result_str[:100]}...")
398 elif tool_name == 'bluesky_reply':
399 logger.info(f"📋 Tool result: {tool_name} ✓ Reply posted successfully")
400 elif tool_name == 'update_block':
401 logger.info(f"📋 Tool result: {tool_name} ✓ Memory block updated")
402 else:
403 # Generic success with preview
404 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str
405 logger.info(f"📋 Tool result: {tool_name} ✓ {preview}")
406 else:
407 logger.info(f"📋 Tool result: {tool_name} ✓")
408 elif status == 'error':
409 # Show error details
410 error_preview = ""
411 if hasattr(chunk, 'tool_return') and chunk.tool_return:
412 error_str = str(chunk.tool_return)
413 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str
414 logger.info(f"📋 Tool result: {tool_name} ✗ Error: {error_preview}")
415 else:
416 logger.info(f"📋 Tool result: {tool_name} ✗ Error occurred")
417 else:
418 logger.info(f"📋 Tool result: {tool_name} - {status}")
419 elif chunk.message_type == 'assistant_message':
420 logger.info(f"💬 Assistant: {chunk.content[:150]}...")
421 else:
422 logger.info(f"📨 {chunk.message_type}: {str(chunk)[:150]}...")
423 else:
424 logger.info(f"📦 Stream status: {chunk}")
425
426 # Log full chunk for debugging
427 logger.debug(f"Full streaming chunk: {chunk}")
428 all_messages.append(chunk)
429 if str(chunk) == 'done':
430 break
431
432 # Convert streaming response to standard format for compatibility
433 message_response = type('StreamingResponse', (), {
434 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
435 })()
436 except Exception as api_error:
437 import traceback
438 error_str = str(api_error)
439 logger.error(f"Letta API error: {api_error}")
440 logger.error(f"Error type: {type(api_error).__name__}")
441 logger.error(f"Full traceback:\n{traceback.format_exc()}")
442 logger.error(f"Mention text was: {mention_text}")
443 logger.error(f"Author: @{author_handle}")
444 logger.error(f"URI: {uri}")
445
446
447 # Try to extract more info from different error types
448 if hasattr(api_error, 'response'):
449 logger.error(f"Error response object exists")
450 if hasattr(api_error.response, 'text'):
451 logger.error(f"Response text: {api_error.response.text}")
452 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
453 try:
454 logger.error(f"Response JSON: {api_error.response.json()}")
455 except:
456 pass
457
458 # Check for specific error types
459 if hasattr(api_error, 'status_code'):
460 logger.error(f"API Status code: {api_error.status_code}")
461 if hasattr(api_error, 'body'):
462 logger.error(f"API Response body: {api_error.body}")
463 if hasattr(api_error, 'headers'):
464 logger.error(f"API Response headers: {api_error.headers}")
465
466 if api_error.status_code == 413:
467 logger.error("413 Payload Too Large - moving to errors directory")
468 return None # Move to errors directory - payload is too large to ever succeed
469 elif api_error.status_code == 524:
470 logger.error("524 error - timeout from Cloudflare, will retry later")
471 return False # Keep in queue for retry
472
473 # Check if error indicates we should remove from queue
474 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
475 logger.warning("Payload too large error, moving to errors directory")
476 return None # Move to errors directory - cannot be fixed by retry
477 elif 'status_code: 524' in error_str:
478 logger.warning("524 timeout error, keeping in queue for retry")
479 return False # Keep in queue for retry
480
481 raise
482
483 # Log successful response
484 logger.debug("Successfully received response from Letta API")
485 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
486
487 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response
488 reply_candidates = []
489 tool_call_results = {} # Map tool_call_id to status
490
491 logger.debug(f"Processing {len(message_response.messages)} response messages...")
492
493 # First pass: collect tool return statuses
494 for message in message_response.messages:
495 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
496 if message.name == 'add_post_to_bluesky_reply_thread':
497 tool_call_results[message.tool_call_id] = message.status
498 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}")
499 elif message.name == 'bluesky_reply':
500 logger.error("❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
501 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
502 logger.error("Update the agent's tools using register_tools.py")
503 # Export agent state before terminating
504 export_agent_state(CLIENT, void_agent)
505 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
506 exit(1)
507
508 # Second pass: process messages and check for successful tool calls
509 for i, message in enumerate(message_response.messages, 1):
510 # Log concise message info instead of full object
511 msg_type = getattr(message, 'message_type', 'unknown')
512 if hasattr(message, 'reasoning') and message.reasoning:
513 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...")
514 elif hasattr(message, 'tool_call') and message.tool_call:
515 tool_name = message.tool_call.name
516 logger.debug(f" {i}. {msg_type}: {tool_name}")
517 elif hasattr(message, 'tool_return'):
518 tool_name = getattr(message, 'name', 'unknown_tool')
519 return_preview = str(message.tool_return)[:100] if message.tool_return else "None"
520 status = getattr(message, 'status', 'unknown')
521 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})")
522 elif hasattr(message, 'text'):
523 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...")
524 else:
525 logger.debug(f" {i}. {msg_type}: <no content>")
526
527 # Check for halt_activity tool call
528 if hasattr(message, 'tool_call') and message.tool_call:
529 if message.tool_call.name == 'halt_activity':
530 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT")
531 try:
532 args = json.loads(message.tool_call.arguments)
533 reason = args.get('reason', 'Agent requested halt')
534 logger.info(f"Halt reason: {reason}")
535 except:
536 logger.info("Halt reason: <unable to parse>")
537
538 # Delete the queue file before terminating
539 if queue_filepath and queue_filepath.exists():
540 queue_filepath.unlink()
541 logger.info(f"✅ Deleted queue file: {queue_filepath.name}")
542
543 # Also mark as processed to avoid reprocessing
544 processed_uris = load_processed_notifications()
545 processed_uris.add(notification_data.get('uri', ''))
546 save_processed_notifications(processed_uris)
547
548 # Export agent state before terminating
549 export_agent_state(CLIENT, void_agent)
550
551 # Exit the program
552 logger.info("=== BOT TERMINATED BY AGENT ===")
553 exit(0)
554
555 # Check for deprecated bluesky_reply tool
556 if hasattr(message, 'tool_call') and message.tool_call:
557 if message.tool_call.name == 'bluesky_reply':
558 logger.error("❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
559 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
560 logger.error("Update the agent's tools using register_tools.py")
561 # Export agent state before terminating
562 export_agent_state(CLIENT, void_agent)
563 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
564 exit(1)
565
566 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful
567 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread':
568 tool_call_id = message.tool_call.tool_call_id
569 tool_status = tool_call_results.get(tool_call_id, 'unknown')
570
571 if tool_status == 'success':
572 try:
573 args = json.loads(message.tool_call.arguments)
574 reply_text = args.get('text', '')
575 reply_lang = args.get('lang', 'en-US')
576
577 if reply_text: # Only add if there's actual content
578 reply_candidates.append((reply_text, reply_lang))
579 logger.info(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})")
580 except json.JSONDecodeError as e:
581 logger.error(f"Failed to parse tool call arguments: {e}")
582 elif tool_status == 'error':
583 logger.info(f"⚠️ Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)")
584 else:
585 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}")
586
587 if reply_candidates:
588 # Aggregate reply posts into a thread
589 reply_messages = []
590 reply_langs = []
591 for text, lang in reply_candidates:
592 reply_messages.append(text)
593 reply_langs.append(lang)
594
595 # Use the first language for the entire thread (could be enhanced later)
596 reply_lang = reply_langs[0] if reply_langs else 'en-US'
597
598 logger.info(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread")
599
600 # Print the generated reply for testing
601 print(f"\n=== GENERATED REPLY THREAD ===")
602 print(f"To: @{author_handle}")
603 if len(reply_messages) == 1:
604 print(f"Reply: {reply_messages[0]}")
605 else:
606 print(f"Reply thread ({len(reply_messages)} messages):")
607 for j, msg in enumerate(reply_messages, 1):
608 print(f" {j}. {msg}")
609 print(f"Language: {reply_lang}")
610 print(f"======================\n")
611
612 # Send the reply(s) with language (unless in testing mode)
613 if testing_mode:
614 logger.info("🧪 TESTING MODE: Skipping actual Bluesky post")
615 response = True # Simulate success
616 else:
617 if len(reply_messages) == 1:
618 # Single reply - use existing function
619 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0])
620 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})")
621 response = bsky_utils.reply_to_notification(
622 client=atproto_client,
623 notification=notification_data,
624 reply_text=cleaned_text,
625 lang=reply_lang
626 )
627 else:
628 # Multiple replies - use new threaded function
629 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages]
630 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})")
631 response = bsky_utils.reply_with_thread_to_notification(
632 client=atproto_client,
633 notification=notification_data,
634 reply_messages=cleaned_messages,
635 lang=reply_lang
636 )
637
638 if response:
639 logger.info(f"Successfully replied to @{author_handle}")
640 return True
641 else:
642 logger.error(f"Failed to send reply to @{author_handle}")
643 return False
644 else:
645 logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder")
646 return "no_reply"
647
648 except Exception as e:
649 logger.error(f"Error processing mention: {e}")
650 return False
651 finally:
652 # Detach user blocks after agent response (success or failure)
653 if 'attached_handles' in locals() and attached_handles:
654 try:
655 logger.info(f"Detaching user blocks for handles: {attached_handles}")
656 detach_result = detach_user_blocks(attached_handles, void_agent)
657 logger.debug(f"Detach result: {detach_result}")
658 except Exception as detach_error:
659 logger.warning(f"Failed to detach user blocks: {detach_error}")
660
661
662def notification_to_dict(notification):
663 """Convert a notification object to a dictionary for JSON serialization."""
664 return {
665 'uri': notification.uri,
666 'cid': notification.cid,
667 'reason': notification.reason,
668 'is_read': notification.is_read,
669 'indexed_at': notification.indexed_at,
670 'author': {
671 'handle': notification.author.handle,
672 'display_name': notification.author.display_name,
673 'did': notification.author.did
674 },
675 'record': {
676 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
677 }
678 }
679
680
681def load_processed_notifications():
682 """Load the set of processed notification URIs."""
683 if PROCESSED_NOTIFICATIONS_FILE.exists():
684 try:
685 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f:
686 data = json.load(f)
687 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS)
688 if len(data) > MAX_PROCESSED_NOTIFICATIONS:
689 data = data[-MAX_PROCESSED_NOTIFICATIONS:]
690 save_processed_notifications(data)
691 return set(data)
692 except Exception as e:
693 logger.error(f"Error loading processed notifications: {e}")
694 return set()
695
696
697def save_processed_notifications(processed_set):
698 """Save the set of processed notification URIs."""
699 try:
700 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f:
701 json.dump(list(processed_set), f)
702 except Exception as e:
703 logger.error(f"Error saving processed notifications: {e}")
704
705
706def save_notification_to_queue(notification):
707 """Save a notification to the queue directory with priority-based filename."""
708 try:
709 # Check if already processed
710 processed_uris = load_processed_notifications()
711 if notification.uri in processed_uris:
712 logger.debug(f"Notification already processed: {notification.uri}")
713 return False
714
715 # Convert notification to dict
716 notif_dict = notification_to_dict(notification)
717
718 # Create JSON string
719 notif_json = json.dumps(notif_dict, sort_keys=True)
720
721 # Generate hash for filename (to avoid duplicates)
722 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
723
724 # Determine priority based on author handle
725 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else ''
726 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_"
727
728 # Create filename with priority, timestamp and hash
729 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
730 filename = f"{priority_prefix}{timestamp}_{notification.reason}_{notif_hash}.json"
731 filepath = QUEUE_DIR / filename
732
733 # Check if this notification URI is already in the queue
734 for existing_file in QUEUE_DIR.glob("*.json"):
735 if existing_file.name == "processed_notifications.json":
736 continue
737 try:
738 with open(existing_file, 'r') as f:
739 existing_data = json.load(f)
740 if existing_data.get('uri') == notification.uri:
741 logger.debug(f"Notification already queued (URI: {notification.uri})")
742 return False
743 except:
744 continue
745
746 # Write to file
747 with open(filepath, 'w') as f:
748 json.dump(notif_dict, f, indent=2)
749
750 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal"
751 logger.info(f"Queued notification ({priority_label}): {filename}")
752 return True
753
754 except Exception as e:
755 logger.error(f"Error saving notification to queue: {e}")
756 return False
757
758
759def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False):
760 """Load and process all notifications from the queue in priority order."""
761 try:
762 # Get all JSON files in queue directory (excluding processed_notifications.json)
763 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix)
764 queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
765
766 if not queue_files:
767 return
768
769 logger.info(f"Processing {len(queue_files)} queued notifications")
770
771 # Log current statistics
772 elapsed_time = time.time() - start_time
773 total_messages = sum(message_counters.values())
774 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
775
776 logger.info(f"📊 Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min")
777
778 for i, filepath in enumerate(queue_files, 1):
779 logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}")
780 try:
781 # Load notification data
782 with open(filepath, 'r') as f:
783 notif_data = json.load(f)
784
785 # Process based on type using dict data directly
786 success = False
787 if notif_data['reason'] == "mention":
788 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
789 if success:
790 message_counters['mentions'] += 1
791 elif notif_data['reason'] == "reply":
792 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
793 if success:
794 message_counters['replies'] += 1
795 elif notif_data['reason'] == "follow":
796 author_handle = notif_data['author']['handle']
797 author_display_name = notif_data['author'].get('display_name', 'no display name')
798 follow_update = f"@{author_handle} ({author_display_name}) started following you."
799 logger.info(f"Notifying agent about new follower: @{author_handle}")
800 CLIENT.agents.messages.create(
801 agent_id = void_agent.id,
802 messages = [{"role":"user", "content": f"Update: {follow_update}"}]
803 )
804 success = True # Follow updates are always successful
805 if success:
806 message_counters['follows'] += 1
807 elif notif_data['reason'] == "repost":
808 # Skip reposts silently
809 success = True # Skip reposts but mark as successful to remove from queue
810 if success:
811 message_counters['reposts_skipped'] += 1
812 else:
813 logger.warning(f"Unknown notification type: {notif_data['reason']}")
814 success = True # Remove unknown types from queue
815
816 # Handle file based on processing result
817 if success:
818 if testing_mode:
819 logger.info(f"🧪 TESTING MODE: Keeping queue file: {filepath.name}")
820 else:
821 filepath.unlink()
822 logger.info(f"✅ Successfully processed and removed: {filepath.name}")
823
824 # Mark as processed to avoid reprocessing
825 processed_uris = load_processed_notifications()
826 processed_uris.add(notif_data['uri'])
827 save_processed_notifications(processed_uris)
828
829 elif success is None: # Special case for moving to error directory
830 error_path = QUEUE_ERROR_DIR / filepath.name
831 filepath.rename(error_path)
832 logger.warning(f"❌ Moved {filepath.name} to errors directory")
833
834 # Also mark as processed to avoid retrying
835 processed_uris = load_processed_notifications()
836 processed_uris.add(notif_data['uri'])
837 save_processed_notifications(processed_uris)
838
839 elif success == "no_reply": # Special case for moving to no_reply directory
840 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name
841 filepath.rename(no_reply_path)
842 logger.info(f"📭 Moved {filepath.name} to no_reply directory")
843
844 # Also mark as processed to avoid retrying
845 processed_uris = load_processed_notifications()
846 processed_uris.add(notif_data['uri'])
847 save_processed_notifications(processed_uris)
848
849 else:
850 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry")
851
852 except Exception as e:
853 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}")
854 # Keep the file for retry later
855
856 except Exception as e:
857 logger.error(f"Error loading queued notifications: {e}")
858
859
860def process_notifications(void_agent, atproto_client, testing_mode=False):
861 """Fetch new notifications, queue them, and process the queue."""
862 try:
863 # Get current time for marking notifications as seen
864 logger.debug("Getting current time for notification marking...")
865 last_seen_at = atproto_client.get_current_time_iso()
866
867 # Fetch ALL notifications using pagination first
868 logger.info("Beginning notification fetch with pagination...")
869 all_notifications = []
870 cursor = None
871 page_count = 0
872 max_pages = 20 # Safety limit to prevent infinite loops
873
874 logger.info("Fetching all unread notifications...")
875
876 while page_count < max_pages:
877 try:
878 # Fetch notifications page
879 if cursor:
880 notifications_response = atproto_client.app.bsky.notification.list_notifications(
881 params={'cursor': cursor, 'limit': 100}
882 )
883 else:
884 notifications_response = atproto_client.app.bsky.notification.list_notifications(
885 params={'limit': 100}
886 )
887
888 page_count += 1
889 page_notifications = notifications_response.notifications
890
891 # Count unread notifications in this page
892 unread_count = sum(1 for n in page_notifications if not n.is_read and n.reason != "like")
893 logger.debug(f"Page {page_count}: {len(page_notifications)} notifications, {unread_count} unread (non-like)")
894
895 # Add all notifications to our list
896 all_notifications.extend(page_notifications)
897
898 # Check if we have more pages
899 if hasattr(notifications_response, 'cursor') and notifications_response.cursor:
900 cursor = notifications_response.cursor
901 # If this page had no unread notifications, we can stop
902 if unread_count == 0:
903 logger.info(f"No more unread notifications found after {page_count} pages")
904 break
905 else:
906 # No more pages
907 logger.info(f"Fetched all notifications across {page_count} pages")
908 break
909
910 except Exception as e:
911 error_str = str(e)
912 logger.error(f"Error fetching notifications page {page_count}: {e}")
913
914 # Handle specific API errors
915 if 'rate limit' in error_str.lower():
916 logger.warning("Rate limit hit while fetching notifications, will retry next cycle")
917 break
918 elif '401' in error_str or 'unauthorized' in error_str.lower():
919 logger.error("Authentication error, re-raising exception")
920 raise
921 else:
922 # For other errors, try to continue with what we have
923 logger.warning("Continuing with notifications fetched so far")
924 break
925
926 # Queue all unread notifications (except likes)
927 logger.info("Queuing unread notifications...")
928 new_count = 0
929 for notification in all_notifications:
930 if not notification.is_read and notification.reason != "like":
931 if save_notification_to_queue(notification):
932 new_count += 1
933
934 # Mark all notifications as seen immediately after queuing (unless in testing mode)
935 if testing_mode:
936 logger.info("🧪 TESTING MODE: Skipping marking notifications as seen")
937 else:
938 if new_count > 0:
939 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at})
940 logger.info(f"Queued {new_count} new notifications and marked as seen")
941 else:
942 logger.debug("No new notifications to queue")
943
944 # Now process the entire queue (old + new notifications)
945 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode)
946
947 except Exception as e:
948 logger.error(f"Error processing notifications: {e}")
949
950
951def main():
952 # Parse command line arguments
953 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent')
954 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)')
955 args = parser.parse_args()
956
957 global TESTING_MODE
958 TESTING_MODE = args.test
959
960 if TESTING_MODE:
961 logger.info("🧪 === RUNNING IN TESTING MODE ===")
962 logger.info(" - No messages will be sent to Bluesky")
963 logger.info(" - Queue files will not be deleted")
964 logger.info(" - Notifications will not be marked as seen")
965 print("\n")
966 """Main bot loop that continuously monitors for notifications."""
967 global start_time
968 start_time = time.time()
969 logger.info("=== STARTING VOID BOT ===")
970 void_agent = initialize_void()
971 logger.info(f"Void agent initialized: {void_agent.id}")
972
973 # Check if agent has required tools
974 if hasattr(void_agent, 'tools') and void_agent.tools:
975 tool_names = [tool.name for tool in void_agent.tools]
976 # Check for bluesky-related tools
977 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()]
978 if not bluesky_tools:
979 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.")
980 else:
981 logger.warning("Agent has no tools registered!")
982
983 # Initialize Bluesky client
984 atproto_client = bsky_utils.default_login()
985 logger.info("Connected to Bluesky")
986
987 # Main loop
988 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
989
990 cycle_count = 0
991 while True:
992 try:
993 cycle_count += 1
994 process_notifications(void_agent, atproto_client, TESTING_MODE)
995 # Log cycle completion with stats
996 elapsed_time = time.time() - start_time
997 total_messages = sum(message_counters.values())
998 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
999
1000 if total_messages > 0:
1001 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min")
1002 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
1003
1004 except KeyboardInterrupt:
1005 # Final stats
1006 elapsed_time = time.time() - start_time
1007 total_messages = sum(message_counters.values())
1008 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1009
1010 logger.info("=== BOT STOPPED BY USER ===")
1011 logger.info(f"📊 Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes")
1012 logger.info(f" - {message_counters['mentions']} mentions")
1013 logger.info(f" - {message_counters['replies']} replies")
1014 logger.info(f" - {message_counters['follows']} follows")
1015 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped")
1016 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute")
1017 break
1018 except Exception as e:
1019 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
1020 logger.error(f"Error details: {e}")
1021 # Wait a bit longer on errors
1022 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...")
1023 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
1024
1025
1026if __name__ == "__main__":
1027 main()