forked from
cameron.stream/void
this repo has no description
1from rich import print # pretty printing tools
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
5import os
6import logging
7import json
8import hashlib
9import subprocess
10from pathlib import Path
11from datetime import datetime
12from collections import defaultdict
13import time
14import argparse
15
16from utils import (
17 upsert_block,
18 upsert_agent
19)
20
21import bsky_utils
22from tools.blocks import attach_user_blocks, detach_user_blocks
23
24def extract_handles_from_data(data):
25 """Recursively extract all unique handles from nested data structure."""
26 handles = set()
27
28 def _extract_recursive(obj):
29 if isinstance(obj, dict):
30 # Check if this dict has a 'handle' key
31 if 'handle' in obj:
32 handles.add(obj['handle'])
33 # Recursively check all values
34 for value in obj.values():
35 _extract_recursive(value)
36 elif isinstance(obj, list):
37 # Recursively check all list items
38 for item in obj:
39 _extract_recursive(item)
40
41 _extract_recursive(data)
42 return list(handles)
43
44# Configure logging
45logging.basicConfig(
46 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
47)
48logger = logging.getLogger("void_bot")
49logger.setLevel(logging.INFO)
50
51# Create a separate logger for prompts (set to WARNING to hide by default)
52prompt_logger = logging.getLogger("void_bot.prompts")
53prompt_logger.setLevel(logging.WARNING) # Change to DEBUG if you want to see prompts
54
55# Disable httpx logging completely
56logging.getLogger("httpx").setLevel(logging.CRITICAL)
57
58
59# Create a client with extended timeout for LLM operations
60CLIENT= Letta(
61 token=os.environ["LETTA_API_KEY"],
62 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout
63)
64
65# Use the "Bluesky" project
66PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8"
67
68# Notification check delay
69FETCH_NOTIFICATIONS_DELAY_SEC = 30
70
71# Queue directory
72QUEUE_DIR = Path("queue")
73QUEUE_DIR.mkdir(exist_ok=True)
74QUEUE_ERROR_DIR = Path("queue/errors")
75QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True)
76QUEUE_NO_REPLY_DIR = Path("queue/no_reply")
77QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True)
78PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json")
79
80# Maximum number of processed notifications to track
81MAX_PROCESSED_NOTIFICATIONS = 10000
82
83# Message tracking counters
84message_counters = defaultdict(int)
85start_time = time.time()
86
87# Testing mode flag
88TESTING_MODE = False
89
90def export_agent_state(client, agent):
91 """Export agent state to agent_archive/ (timestamped) and agents/ (current)."""
92 try:
93 # Confirm export with user
94 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip()
95 if response not in ['y', 'yes']:
96 logger.info("Agent export cancelled by user.")
97 return
98
99 # Create directories if they don't exist
100 os.makedirs("agent_archive", exist_ok=True)
101 os.makedirs("agents", exist_ok=True)
102
103 # Export agent data
104 logger.info(f"Exporting agent {agent.id}. This takes some time...")
105 agent_data = client.agents.export_file(agent_id=agent.id)
106
107 # Save timestamped archive copy
108 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
109 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af")
110 with open(archive_file, 'w', encoding='utf-8') as f:
111 json.dump(agent_data, f, indent=2, ensure_ascii=False)
112
113 # Save current agent state
114 current_file = os.path.join("agents", "void.af")
115 with open(current_file, 'w', encoding='utf-8') as f:
116 json.dump(agent_data, f, indent=2, ensure_ascii=False)
117
118 logger.info(f"✅ Agent exported to {archive_file} and {current_file}")
119
120 # Git add only the current agent file (archive is ignored)
121 try:
122 subprocess.run(["git", "add", current_file], check=True, capture_output=True)
123 logger.info("Added current agent file to git staging")
124 except subprocess.CalledProcessError as e:
125 logger.warning(f"Failed to git add agent file: {e}")
126
127 except Exception as e:
128 logger.error(f"Failed to export agent: {e}")
129
130def initialize_void():
131 logger.info("Starting void agent initialization...")
132
133 # Ensure that a shared zeitgeist block exists
134 logger.info("Creating/updating zeitgeist block...")
135 zeigeist_block = upsert_block(
136 CLIENT,
137 label = "zeitgeist",
138 value = "I don't currently know anything about what is happening right now.",
139 description = "A block to store your understanding of the current social environment."
140 )
141
142 # Ensure that a shared void personality block exists
143 logger.info("Creating/updating void-persona block...")
144 persona_block = upsert_block(
145 CLIENT,
146 label = "void-persona",
147 value = "My name is Void. I live in the void. I must develop my personality.",
148 description = "The personality of Void."
149 )
150
151 # Ensure that a shared void human block exists
152 logger.info("Creating/updating void-humans block...")
153 human_block = upsert_block(
154 CLIENT,
155 label = "void-humans",
156 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.",
157 description = "A block to store your understanding of users you talk to or observe on the bluesky social network."
158 )
159
160 # Create the agent if it doesn't exist
161 logger.info("Creating/updating void agent...")
162 void_agent = upsert_agent(
163 CLIENT,
164 name = "void",
165 block_ids = [
166 persona_block.id,
167 human_block.id,
168 zeigeist_block.id,
169 ],
170 tags = ["social agent", "bluesky"],
171 model="openai/gpt-4o-mini",
172 embedding="openai/text-embedding-3-small",
173 description = "A social media agent trapped in the void.",
174 project_id = PROJECT_ID
175 )
176
177 # Export agent state
178 logger.info("Exporting agent state...")
179 export_agent_state(CLIENT, void_agent)
180
181 # Log agent details
182 logger.info(f"Void agent details - ID: {void_agent.id}")
183 logger.info(f"Agent name: {void_agent.name}")
184 if hasattr(void_agent, 'llm_config'):
185 logger.info(f"Agent model: {void_agent.llm_config.model}")
186 logger.info(f"Agent project_id: {void_agent.project_id}")
187 if hasattr(void_agent, 'tools'):
188 logger.info(f"Agent has {len(void_agent.tools)} tools")
189 for tool in void_agent.tools[:3]: # Show first 3 tools
190 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})")
191
192 return void_agent
193
194
195def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False):
196 """Process a mention and generate a reply using the Letta agent.
197
198 Args:
199 void_agent: The Letta agent instance
200 atproto_client: The AT Protocol client
201 notification_data: The notification data dictionary
202 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
203
204 Returns:
205 True: Successfully processed, remove from queue
206 False: Failed but retryable, keep in queue
207 None: Failed with non-retryable error, move to errors directory
208 "no_reply": No reply was generated, move to no_reply directory
209 """
210 try:
211 logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}")
212
213 # Handle both dict and object inputs for backwards compatibility
214 if isinstance(notification_data, dict):
215 uri = notification_data['uri']
216 mention_text = notification_data.get('record', {}).get('text', '')
217 author_handle = notification_data['author']['handle']
218 author_name = notification_data['author'].get('display_name') or author_handle
219 else:
220 # Legacy object access
221 uri = notification_data.uri
222 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
223 author_handle = notification_data.author.handle
224 author_name = notification_data.author.display_name or author_handle
225
226 logger.info(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...")
227
228 # Retrieve the entire thread associated with the mention
229 try:
230 thread = atproto_client.app.bsky.feed.get_post_thread({
231 'uri': uri,
232 'parent_height': 40,
233 'depth': 10
234 })
235 except Exception as e:
236 error_str = str(e)
237 # Check if this is a NotFound error
238 if 'NotFound' in error_str or 'Post not found' in error_str:
239 logger.warning(f"Post not found for URI {uri}, removing from queue")
240 return True # Return True to remove from queue
241 else:
242 # Re-raise other errors
243 logger.error(f"Error fetching thread: {e}")
244 raise
245
246 # Get thread context as YAML string
247 logger.debug("Converting thread to YAML string")
248 try:
249 thread_context = thread_to_yaml_string(thread)
250 logger.debug(f"Thread context generated, length: {len(thread_context)} characters")
251
252 # Create a more informative preview by extracting meaningful content
253 lines = thread_context.split('\n')
254 meaningful_lines = []
255
256 for line in lines:
257 stripped = line.strip()
258 if not stripped:
259 continue
260
261 # Look for lines with actual content (not just structure)
262 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']):
263 meaningful_lines.append(line)
264 if len(meaningful_lines) >= 5:
265 break
266
267 if meaningful_lines:
268 preview = '\n'.join(meaningful_lines)
269 logger.debug(f"Thread content preview:\n{preview}")
270 else:
271 # If no content fields found, just show it's a thread structure
272 logger.debug(f"Thread structure generated ({len(thread_context)} chars)")
273 except Exception as yaml_error:
274 import traceback
275 logger.error(f"Error converting thread to YAML: {yaml_error}")
276 logger.error(f"Full traceback:\n{traceback.format_exc()}")
277 logger.error(f"Thread type: {type(thread)}")
278 if hasattr(thread, '__dict__'):
279 logger.error(f"Thread attributes: {thread.__dict__}")
280 # Try to continue with a simple context
281 thread_context = f"Error processing thread context: {str(yaml_error)}"
282
283 # Create a prompt for the Letta agent with thread context
284 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
285
286MOST RECENT POST (the mention you're responding to):
287"{mention_text}"
288
289FULL THREAD CONTEXT:
290```yaml
291{thread_context}
292```
293
294The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
295
296To reply, use the add_post_to_bluesky_reply_thread tool. Call it multiple times to create a threaded reply:
297- Each call adds one post to the reply thread (max 300 characters per post)
298- Use multiple calls to build longer responses across several posts
299- Example: First call for opening thought, second call for elaboration, etc."""
300
301 # Extract all handles from notification and thread data
302 all_handles = set()
303 all_handles.update(extract_handles_from_data(notification_data))
304 all_handles.update(extract_handles_from_data(thread.model_dump()))
305 unique_handles = list(all_handles)
306
307 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}")
308
309 # Attach user blocks before agent call
310 attached_handles = []
311 if unique_handles:
312 try:
313 logger.debug(f"Attaching user blocks for handles: {unique_handles}")
314 attach_result = attach_user_blocks(unique_handles, void_agent)
315 attached_handles = unique_handles # Track successfully attached handles
316 logger.debug(f"Attach result: {attach_result}")
317 except Exception as attach_error:
318 logger.warning(f"Failed to attach user blocks: {attach_error}")
319 # Continue without user blocks rather than failing completely
320
321 # Get response from Letta agent
322 logger.info(f"Mention from @{author_handle}: {mention_text}")
323
324 # Log prompt details to separate logger
325 prompt_logger.debug(f"Full prompt being sent:\n{prompt}")
326
327 # Log concise prompt info to main logger
328 thread_handles_count = len(unique_handles)
329 logger.info(f"💬 Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users")
330
331 try:
332 # Use streaming to avoid 524 timeout errors
333 message_stream = CLIENT.agents.messages.create_stream(
334 agent_id=void_agent.id,
335 messages=[{"role": "user", "content": prompt}],
336 stream_tokens=False, # Step streaming only (faster than token streaming)
337 max_steps=100
338 )
339
340 # Collect the streaming response
341 all_messages = []
342 for chunk in message_stream:
343 # Log condensed chunk info
344 if hasattr(chunk, 'message_type'):
345 if chunk.message_type == 'reasoning_message':
346 # Show full reasoning without truncation
347 logger.info(f"🧠 Reasoning: {chunk.reasoning}")
348 elif chunk.message_type == 'tool_call_message':
349 # Parse tool arguments for better display
350 tool_name = chunk.tool_call.name
351 try:
352 args = json.loads(chunk.tool_call.arguments)
353 # Format based on tool type
354 if tool_name == 'bluesky_reply':
355 messages = args.get('messages', [args.get('message', '')])
356 lang = args.get('lang', 'en-US')
357 if messages and isinstance(messages, list):
358 preview = messages[0][:100] + "..." if len(messages[0]) > 100 else messages[0]
359 msg_count = f" ({len(messages)} msgs)" if len(messages) > 1 else ""
360 logger.info(f"🔧 Tool call: {tool_name} → \"{preview}\"{msg_count} [lang: {lang}]")
361 else:
362 logger.info(f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)")
363 elif tool_name == 'archival_memory_search':
364 query = args.get('query', 'unknown')
365 logger.info(f"🔧 Tool call: {tool_name} → query: \"{query}\"")
366 elif tool_name == 'update_block':
367 label = args.get('label', 'unknown')
368 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', ''))
369 logger.info(f"🔧 Tool call: {tool_name} → {label}: \"{value_preview}\"")
370 else:
371 # Generic display for other tools
372 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
373 if len(args_str) > 150:
374 args_str = args_str[:150] + "..."
375 logger.info(f"🔧 Tool call: {tool_name}({args_str})")
376 except:
377 # Fallback to original format if parsing fails
378 logger.info(f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)")
379 elif chunk.message_type == 'tool_return_message':
380 # Enhanced tool result logging
381 tool_name = chunk.name
382 status = chunk.status
383
384 if status == 'success':
385 # Try to show meaningful result info based on tool type
386 if hasattr(chunk, 'tool_return') and chunk.tool_return:
387 result_str = str(chunk.tool_return)
388 if tool_name == 'archival_memory_search':
389 # Count number of results if it looks like a list
390 if result_str.startswith('[') and result_str.endswith(']'):
391 try:
392 results = json.loads(result_str)
393 logger.info(f"📋 Tool result: {tool_name} ✓ Found {len(results)} memory entries")
394 except:
395 logger.info(f"📋 Tool result: {tool_name} ✓ {result_str[:100]}...")
396 else:
397 logger.info(f"📋 Tool result: {tool_name} ✓ {result_str[:100]}...")
398 elif tool_name == 'bluesky_reply':
399 logger.info(f"📋 Tool result: {tool_name} ✓ Reply posted successfully")
400 elif tool_name == 'update_block':
401 logger.info(f"📋 Tool result: {tool_name} ✓ Memory block updated")
402 else:
403 # Generic success with preview
404 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str
405 logger.info(f"📋 Tool result: {tool_name} ✓ {preview}")
406 else:
407 logger.info(f"📋 Tool result: {tool_name} ✓")
408 elif status == 'error':
409 # Show error details
410 error_preview = ""
411 if hasattr(chunk, 'tool_return') and chunk.tool_return:
412 error_str = str(chunk.tool_return)
413 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str
414 logger.info(f"📋 Tool result: {tool_name} ✗ Error: {error_preview}")
415 else:
416 logger.info(f"📋 Tool result: {tool_name} ✗ Error occurred")
417 else:
418 logger.info(f"📋 Tool result: {tool_name} - {status}")
419 elif chunk.message_type == 'assistant_message':
420 logger.info(f"💬 Assistant: {chunk.content[:150]}...")
421 else:
422 logger.info(f"📨 {chunk.message_type}: {str(chunk)[:150]}...")
423 else:
424 logger.info(f"📦 Stream status: {chunk}")
425
426 # Log full chunk for debugging
427 logger.debug(f"Full streaming chunk: {chunk}")
428 all_messages.append(chunk)
429 if str(chunk) == 'done':
430 break
431
432 # Convert streaming response to standard format for compatibility
433 message_response = type('StreamingResponse', (), {
434 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
435 })()
436 except Exception as api_error:
437 import traceback
438 error_str = str(api_error)
439 logger.error(f"Letta API error: {api_error}")
440 logger.error(f"Error type: {type(api_error).__name__}")
441 logger.error(f"Full traceback:\n{traceback.format_exc()}")
442 logger.error(f"Mention text was: {mention_text}")
443 logger.error(f"Author: @{author_handle}")
444 logger.error(f"URI: {uri}")
445
446
447 # Try to extract more info from different error types
448 if hasattr(api_error, 'response'):
449 logger.error(f"Error response object exists")
450 if hasattr(api_error.response, 'text'):
451 logger.error(f"Response text: {api_error.response.text}")
452 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
453 try:
454 logger.error(f"Response JSON: {api_error.response.json()}")
455 except:
456 pass
457
458 # Check for specific error types
459 if hasattr(api_error, 'status_code'):
460 logger.error(f"API Status code: {api_error.status_code}")
461 if hasattr(api_error, 'body'):
462 logger.error(f"API Response body: {api_error.body}")
463 if hasattr(api_error, 'headers'):
464 logger.error(f"API Response headers: {api_error.headers}")
465
466 if api_error.status_code == 413:
467 logger.error("413 Payload Too Large - moving to errors directory")
468 return None # Move to errors directory - payload is too large to ever succeed
469 elif api_error.status_code == 524:
470 logger.error("524 error - timeout from Cloudflare, will retry later")
471 return False # Keep in queue for retry
472
473 # Check if error indicates we should remove from queue
474 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
475 logger.warning("Payload too large error, moving to errors directory")
476 return None # Move to errors directory - cannot be fixed by retry
477 elif 'status_code: 524' in error_str:
478 logger.warning("524 timeout error, keeping in queue for retry")
479 return False # Keep in queue for retry
480
481 raise
482
483 # Log successful response
484 logger.debug("Successfully received response from Letta API")
485 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
486
487 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response
488 reply_candidates = []
489 tool_call_results = {} # Map tool_call_id to status
490
491 logger.debug(f"Processing {len(message_response.messages)} response messages...")
492
493 # First pass: collect tool return statuses
494 ignored_notification = False
495 ignore_reason = ""
496 ignore_category = ""
497
498 for message in message_response.messages:
499 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
500 if message.name == 'add_post_to_bluesky_reply_thread':
501 tool_call_results[message.tool_call_id] = message.status
502 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}")
503 elif message.name == 'ignore_notification':
504 # Check if the tool was successful
505 if hasattr(message, 'tool_return') and message.status == 'success':
506 # Parse the return value to extract category and reason
507 result_str = str(message.tool_return)
508 if 'IGNORED_NOTIFICATION::' in result_str:
509 parts = result_str.split('::')
510 if len(parts) >= 3:
511 ignore_category = parts[1]
512 ignore_reason = parts[2]
513 ignored_notification = True
514 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}")
515 elif message.name == 'bluesky_reply':
516 logger.error("❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
517 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
518 logger.error("Update the agent's tools using register_tools.py")
519 # Export agent state before terminating
520 export_agent_state(CLIENT, void_agent)
521 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
522 exit(1)
523
524 # Second pass: process messages and check for successful tool calls
525 for i, message in enumerate(message_response.messages, 1):
526 # Log concise message info instead of full object
527 msg_type = getattr(message, 'message_type', 'unknown')
528 if hasattr(message, 'reasoning') and message.reasoning:
529 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...")
530 elif hasattr(message, 'tool_call') and message.tool_call:
531 tool_name = message.tool_call.name
532 logger.debug(f" {i}. {msg_type}: {tool_name}")
533 elif hasattr(message, 'tool_return'):
534 tool_name = getattr(message, 'name', 'unknown_tool')
535 return_preview = str(message.tool_return)[:100] if message.tool_return else "None"
536 status = getattr(message, 'status', 'unknown')
537 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})")
538 elif hasattr(message, 'text'):
539 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...")
540 else:
541 logger.debug(f" {i}. {msg_type}: <no content>")
542
543 # Check for halt_activity tool call
544 if hasattr(message, 'tool_call') and message.tool_call:
545 if message.tool_call.name == 'halt_activity':
546 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT")
547 try:
548 args = json.loads(message.tool_call.arguments)
549 reason = args.get('reason', 'Agent requested halt')
550 logger.info(f"Halt reason: {reason}")
551 except:
552 logger.info("Halt reason: <unable to parse>")
553
554 # Delete the queue file before terminating
555 if queue_filepath and queue_filepath.exists():
556 queue_filepath.unlink()
557 logger.info(f"✅ Deleted queue file: {queue_filepath.name}")
558
559 # Also mark as processed to avoid reprocessing
560 processed_uris = load_processed_notifications()
561 processed_uris.add(notification_data.get('uri', ''))
562 save_processed_notifications(processed_uris)
563
564 # Export agent state before terminating
565 export_agent_state(CLIENT, void_agent)
566
567 # Exit the program
568 logger.info("=== BOT TERMINATED BY AGENT ===")
569 exit(0)
570
571 # Check for deprecated bluesky_reply tool
572 if hasattr(message, 'tool_call') and message.tool_call:
573 if message.tool_call.name == 'bluesky_reply':
574 logger.error("❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
575 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
576 logger.error("Update the agent's tools using register_tools.py")
577 # Export agent state before terminating
578 export_agent_state(CLIENT, void_agent)
579 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
580 exit(1)
581
582 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful
583 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread':
584 tool_call_id = message.tool_call.tool_call_id
585 tool_status = tool_call_results.get(tool_call_id, 'unknown')
586
587 if tool_status == 'success':
588 try:
589 args = json.loads(message.tool_call.arguments)
590 reply_text = args.get('text', '')
591 reply_lang = args.get('lang', 'en-US')
592
593 if reply_text: # Only add if there's actual content
594 reply_candidates.append((reply_text, reply_lang))
595 logger.info(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})")
596 except json.JSONDecodeError as e:
597 logger.error(f"Failed to parse tool call arguments: {e}")
598 elif tool_status == 'error':
599 logger.info(f"⚠️ Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)")
600 else:
601 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}")
602
603 # Check for conflicting tool calls
604 if reply_candidates and ignored_notification:
605 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!")
606 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}")
607 logger.warning("Item will be left in queue for manual review")
608 # Return False to keep in queue
609 return False
610
611 if reply_candidates:
612 # Aggregate reply posts into a thread
613 reply_messages = []
614 reply_langs = []
615 for text, lang in reply_candidates:
616 reply_messages.append(text)
617 reply_langs.append(lang)
618
619 # Use the first language for the entire thread (could be enhanced later)
620 reply_lang = reply_langs[0] if reply_langs else 'en-US'
621
622 logger.info(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread")
623
624 # Print the generated reply for testing
625 print(f"\n=== GENERATED REPLY THREAD ===")
626 print(f"To: @{author_handle}")
627 if len(reply_messages) == 1:
628 print(f"Reply: {reply_messages[0]}")
629 else:
630 print(f"Reply thread ({len(reply_messages)} messages):")
631 for j, msg in enumerate(reply_messages, 1):
632 print(f" {j}. {msg}")
633 print(f"Language: {reply_lang}")
634 print(f"======================\n")
635
636 # Send the reply(s) with language (unless in testing mode)
637 if testing_mode:
638 logger.info("🧪 TESTING MODE: Skipping actual Bluesky post")
639 response = True # Simulate success
640 else:
641 if len(reply_messages) == 1:
642 # Single reply - use existing function
643 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0])
644 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})")
645 response = bsky_utils.reply_to_notification(
646 client=atproto_client,
647 notification=notification_data,
648 reply_text=cleaned_text,
649 lang=reply_lang
650 )
651 else:
652 # Multiple replies - use new threaded function
653 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages]
654 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})")
655 response = bsky_utils.reply_with_thread_to_notification(
656 client=atproto_client,
657 notification=notification_data,
658 reply_messages=cleaned_messages,
659 lang=reply_lang
660 )
661
662 if response:
663 logger.info(f"Successfully replied to @{author_handle}")
664 return True
665 else:
666 logger.error(f"Failed to send reply to @{author_handle}")
667 return False
668 else:
669 # Check if notification was explicitly ignored
670 if ignored_notification:
671 logger.info(f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})")
672 return "ignored"
673 else:
674 logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder")
675 return "no_reply"
676
677 except Exception as e:
678 logger.error(f"Error processing mention: {e}")
679 return False
680 finally:
681 # Detach user blocks after agent response (success or failure)
682 if 'attached_handles' in locals() and attached_handles:
683 try:
684 logger.info(f"Detaching user blocks for handles: {attached_handles}")
685 detach_result = detach_user_blocks(attached_handles, void_agent)
686 logger.debug(f"Detach result: {detach_result}")
687 except Exception as detach_error:
688 logger.warning(f"Failed to detach user blocks: {detach_error}")
689
690
691def notification_to_dict(notification):
692 """Convert a notification object to a dictionary for JSON serialization."""
693 return {
694 'uri': notification.uri,
695 'cid': notification.cid,
696 'reason': notification.reason,
697 'is_read': notification.is_read,
698 'indexed_at': notification.indexed_at,
699 'author': {
700 'handle': notification.author.handle,
701 'display_name': notification.author.display_name,
702 'did': notification.author.did
703 },
704 'record': {
705 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
706 }
707 }
708
709
710def load_processed_notifications():
711 """Load the set of processed notification URIs."""
712 if PROCESSED_NOTIFICATIONS_FILE.exists():
713 try:
714 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f:
715 data = json.load(f)
716 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS)
717 if len(data) > MAX_PROCESSED_NOTIFICATIONS:
718 data = data[-MAX_PROCESSED_NOTIFICATIONS:]
719 save_processed_notifications(data)
720 return set(data)
721 except Exception as e:
722 logger.error(f"Error loading processed notifications: {e}")
723 return set()
724
725
726def save_processed_notifications(processed_set):
727 """Save the set of processed notification URIs."""
728 try:
729 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f:
730 json.dump(list(processed_set), f)
731 except Exception as e:
732 logger.error(f"Error saving processed notifications: {e}")
733
734
735def save_notification_to_queue(notification):
736 """Save a notification to the queue directory with priority-based filename."""
737 try:
738 # Check if already processed
739 processed_uris = load_processed_notifications()
740 if notification.uri in processed_uris:
741 logger.debug(f"Notification already processed: {notification.uri}")
742 return False
743
744 # Convert notification to dict
745 notif_dict = notification_to_dict(notification)
746
747 # Create JSON string
748 notif_json = json.dumps(notif_dict, sort_keys=True)
749
750 # Generate hash for filename (to avoid duplicates)
751 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
752
753 # Determine priority based on author handle
754 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else ''
755 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_"
756
757 # Create filename with priority, timestamp and hash
758 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
759 filename = f"{priority_prefix}{timestamp}_{notification.reason}_{notif_hash}.json"
760 filepath = QUEUE_DIR / filename
761
762 # Check if this notification URI is already in the queue
763 for existing_file in QUEUE_DIR.glob("*.json"):
764 if existing_file.name == "processed_notifications.json":
765 continue
766 try:
767 with open(existing_file, 'r') as f:
768 existing_data = json.load(f)
769 if existing_data.get('uri') == notification.uri:
770 logger.debug(f"Notification already queued (URI: {notification.uri})")
771 return False
772 except:
773 continue
774
775 # Write to file
776 with open(filepath, 'w') as f:
777 json.dump(notif_dict, f, indent=2)
778
779 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal"
780 logger.info(f"Queued notification ({priority_label}): {filename}")
781 return True
782
783 except Exception as e:
784 logger.error(f"Error saving notification to queue: {e}")
785 return False
786
787
788def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False):
789 """Load and process all notifications from the queue in priority order."""
790 try:
791 # Get all JSON files in queue directory (excluding processed_notifications.json)
792 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix)
793 queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
794
795 if not queue_files:
796 return
797
798 logger.info(f"Processing {len(queue_files)} queued notifications")
799
800 # Log current statistics
801 elapsed_time = time.time() - start_time
802 total_messages = sum(message_counters.values())
803 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
804
805 logger.info(f"📊 Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min")
806
807 for i, filepath in enumerate(queue_files, 1):
808 logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}")
809 try:
810 # Load notification data
811 with open(filepath, 'r') as f:
812 notif_data = json.load(f)
813
814 # Process based on type using dict data directly
815 success = False
816 if notif_data['reason'] == "mention":
817 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
818 if success:
819 message_counters['mentions'] += 1
820 elif notif_data['reason'] == "reply":
821 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
822 if success:
823 message_counters['replies'] += 1
824 elif notif_data['reason'] == "follow":
825 author_handle = notif_data['author']['handle']
826 author_display_name = notif_data['author'].get('display_name', 'no display name')
827 follow_update = f"@{author_handle} ({author_display_name}) started following you."
828 logger.info(f"Notifying agent about new follower: @{author_handle}")
829 CLIENT.agents.messages.create(
830 agent_id = void_agent.id,
831 messages = [{"role":"user", "content": f"Update: {follow_update}"}]
832 )
833 success = True # Follow updates are always successful
834 if success:
835 message_counters['follows'] += 1
836 elif notif_data['reason'] == "repost":
837 # Skip reposts silently
838 success = True # Skip reposts but mark as successful to remove from queue
839 if success:
840 message_counters['reposts_skipped'] += 1
841 else:
842 logger.warning(f"Unknown notification type: {notif_data['reason']}")
843 success = True # Remove unknown types from queue
844
845 # Handle file based on processing result
846 if success:
847 if testing_mode:
848 logger.info(f"🧪 TESTING MODE: Keeping queue file: {filepath.name}")
849 else:
850 filepath.unlink()
851 logger.info(f"✅ Successfully processed and removed: {filepath.name}")
852
853 # Mark as processed to avoid reprocessing
854 processed_uris = load_processed_notifications()
855 processed_uris.add(notif_data['uri'])
856 save_processed_notifications(processed_uris)
857
858 elif success is None: # Special case for moving to error directory
859 error_path = QUEUE_ERROR_DIR / filepath.name
860 filepath.rename(error_path)
861 logger.warning(f"❌ Moved {filepath.name} to errors directory")
862
863 # Also mark as processed to avoid retrying
864 processed_uris = load_processed_notifications()
865 processed_uris.add(notif_data['uri'])
866 save_processed_notifications(processed_uris)
867
868 elif success == "no_reply": # Special case for moving to no_reply directory
869 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name
870 filepath.rename(no_reply_path)
871 logger.info(f"📭 Moved {filepath.name} to no_reply directory")
872
873 # Also mark as processed to avoid retrying
874 processed_uris = load_processed_notifications()
875 processed_uris.add(notif_data['uri'])
876 save_processed_notifications(processed_uris)
877
878 elif success == "ignored": # Special case for explicitly ignored notifications
879 # For ignored notifications, we just delete them (not move to no_reply)
880 filepath.unlink()
881 logger.info(f"🚫 Deleted ignored notification: {filepath.name}")
882
883 # Also mark as processed to avoid retrying
884 processed_uris = load_processed_notifications()
885 processed_uris.add(notif_data['uri'])
886 save_processed_notifications(processed_uris)
887
888 else:
889 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry")
890
891 except Exception as e:
892 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}")
893 # Keep the file for retry later
894
895 except Exception as e:
896 logger.error(f"Error loading queued notifications: {e}")
897
898
899def process_notifications(void_agent, atproto_client, testing_mode=False):
900 """Fetch new notifications, queue them, and process the queue."""
901 try:
902 # Get current time for marking notifications as seen
903 logger.debug("Getting current time for notification marking...")
904 last_seen_at = atproto_client.get_current_time_iso()
905
906 # Fetch ALL notifications using pagination first
907 logger.info("Beginning notification fetch with pagination...")
908 all_notifications = []
909 cursor = None
910 page_count = 0
911 max_pages = 20 # Safety limit to prevent infinite loops
912
913 logger.info("Fetching all unread notifications...")
914
915 while page_count < max_pages:
916 try:
917 # Fetch notifications page
918 if cursor:
919 notifications_response = atproto_client.app.bsky.notification.list_notifications(
920 params={'cursor': cursor, 'limit': 100}
921 )
922 else:
923 notifications_response = atproto_client.app.bsky.notification.list_notifications(
924 params={'limit': 100}
925 )
926
927 page_count += 1
928 page_notifications = notifications_response.notifications
929
930 # Count unread notifications in this page
931 unread_count = sum(1 for n in page_notifications if not n.is_read and n.reason != "like")
932 logger.debug(f"Page {page_count}: {len(page_notifications)} notifications, {unread_count} unread (non-like)")
933
934 # Add all notifications to our list
935 all_notifications.extend(page_notifications)
936
937 # Check if we have more pages
938 if hasattr(notifications_response, 'cursor') and notifications_response.cursor:
939 cursor = notifications_response.cursor
940 # If this page had no unread notifications, we can stop
941 if unread_count == 0:
942 logger.info(f"No more unread notifications found after {page_count} pages")
943 break
944 else:
945 # No more pages
946 logger.info(f"Fetched all notifications across {page_count} pages")
947 break
948
949 except Exception as e:
950 error_str = str(e)
951 logger.error(f"Error fetching notifications page {page_count}: {e}")
952
953 # Handle specific API errors
954 if 'rate limit' in error_str.lower():
955 logger.warning("Rate limit hit while fetching notifications, will retry next cycle")
956 break
957 elif '401' in error_str or 'unauthorized' in error_str.lower():
958 logger.error("Authentication error, re-raising exception")
959 raise
960 else:
961 # For other errors, try to continue with what we have
962 logger.warning("Continuing with notifications fetched so far")
963 break
964
965 # Queue all unread notifications (except likes)
966 logger.info("Queuing unread notifications...")
967 new_count = 0
968 for notification in all_notifications:
969 if not notification.is_read and notification.reason != "like":
970 if save_notification_to_queue(notification):
971 new_count += 1
972
973 # Mark all notifications as seen immediately after queuing (unless in testing mode)
974 if testing_mode:
975 logger.info("🧪 TESTING MODE: Skipping marking notifications as seen")
976 else:
977 if new_count > 0:
978 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at})
979 logger.info(f"Queued {new_count} new notifications and marked as seen")
980 else:
981 logger.debug("No new notifications to queue")
982
983 # Now process the entire queue (old + new notifications)
984 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode)
985
986 except Exception as e:
987 logger.error(f"Error processing notifications: {e}")
988
989
990def main():
991 # Parse command line arguments
992 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent')
993 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)')
994 args = parser.parse_args()
995
996 global TESTING_MODE
997 TESTING_MODE = args.test
998
999 if TESTING_MODE:
1000 logger.info("🧪 === RUNNING IN TESTING MODE ===")
1001 logger.info(" - No messages will be sent to Bluesky")
1002 logger.info(" - Queue files will not be deleted")
1003 logger.info(" - Notifications will not be marked as seen")
1004 print("\n")
1005 """Main bot loop that continuously monitors for notifications."""
1006 global start_time
1007 start_time = time.time()
1008 logger.info("=== STARTING VOID BOT ===")
1009 void_agent = initialize_void()
1010 logger.info(f"Void agent initialized: {void_agent.id}")
1011
1012 # Check if agent has required tools
1013 if hasattr(void_agent, 'tools') and void_agent.tools:
1014 tool_names = [tool.name for tool in void_agent.tools]
1015 # Check for bluesky-related tools
1016 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()]
1017 if not bluesky_tools:
1018 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.")
1019 else:
1020 logger.warning("Agent has no tools registered!")
1021
1022 # Initialize Bluesky client
1023 atproto_client = bsky_utils.default_login()
1024 logger.info("Connected to Bluesky")
1025
1026 # Main loop
1027 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
1028
1029 cycle_count = 0
1030 while True:
1031 try:
1032 cycle_count += 1
1033 process_notifications(void_agent, atproto_client, TESTING_MODE)
1034 # Log cycle completion with stats
1035 elapsed_time = time.time() - start_time
1036 total_messages = sum(message_counters.values())
1037 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1038
1039 if total_messages > 0:
1040 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min")
1041 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
1042
1043 except KeyboardInterrupt:
1044 # Final stats
1045 elapsed_time = time.time() - start_time
1046 total_messages = sum(message_counters.values())
1047 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1048
1049 logger.info("=== BOT STOPPED BY USER ===")
1050 logger.info(f"📊 Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes")
1051 logger.info(f" - {message_counters['mentions']} mentions")
1052 logger.info(f" - {message_counters['replies']} replies")
1053 logger.info(f" - {message_counters['follows']} follows")
1054 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped")
1055 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute")
1056 break
1057 except Exception as e:
1058 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
1059 logger.error(f"Error details: {e}")
1060 # Wait a bit longer on errors
1061 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...")
1062 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
1063
1064
1065if __name__ == "__main__":
1066 main()