a digital person for bluesky
1# Rich imports removed - using simple text formatting
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string, count_thread_posts, extract_images_from_thread
5import os
6import logging
7import json
8import hashlib
9import subprocess
10from pathlib import Path
11from datetime import datetime, timedelta
12from collections import defaultdict
13import time
14import argparse
15import random
16
17from utils import (
18 upsert_block,
19 upsert_agent
20)
21from config_loader import get_letta_config, get_config, get_queue_config
22
23# Vision support (optional - requires pillow)
24VISION_ENABLED = False
25try:
26 from integrate_vision import create_message_with_vision
27 VISION_ENABLED = True
28except ImportError as e:
29 pass # Vision not available - pillow not installed
30
31import bsky_utils
32from datetime import date
33
34# Downrank configuration
35BSKY_DOWNRANK_FILE = Path("bsky_downrank_handles.txt")
36DEFAULT_DOWNRANK_RATE = 0.1 # 10% response rate for downranked users
37
38
39def load_downrank_handles() -> dict:
40 """Load handles that should be downranked (responded to less frequently).
41
42 File format (one per line):
43 handle.bsky.social # Uses default rate (10%)
44 handle.bsky.social:0.05 # Custom rate (5%)
45 # Comments start with #
46
47 Returns:
48 Dict mapping handle -> response rate (0.0 to 1.0)
49 """
50 try:
51 if not BSKY_DOWNRANK_FILE.exists():
52 return {}
53
54 downrank_handles = {}
55 with open(BSKY_DOWNRANK_FILE, 'r') as f:
56 for line in f:
57 line = line.strip()
58 if not line or line.startswith('#'):
59 continue
60
61 # Check for custom rate
62 if ':' in line:
63 handle, rate_str = line.split(':', 1)
64 try:
65 rate = float(rate_str)
66 except ValueError:
67 rate = DEFAULT_DOWNRANK_RATE
68 else:
69 handle = line
70 rate = DEFAULT_DOWNRANK_RATE
71
72 downrank_handles[handle.lower()] = rate
73
74 if downrank_handles:
75 logger.info(f"Loaded {len(downrank_handles)} downrank handles")
76 return downrank_handles
77 except Exception as e:
78 logger.error(f"Error loading downrank handles: {e}")
79 return {}
80
81
82def should_respond_to_handle(handle: str, downrank_handles: dict) -> bool:
83 """Check if we should respond to this handle.
84
85 Returns True 100% of the time for non-downranked users.
86 Returns True at the configured rate for downranked users.
87 """
88 handle_lower = handle.lower()
89 if handle_lower not in downrank_handles:
90 return True
91
92 rate = downrank_handles[handle_lower]
93 should_respond = random.random() < rate
94 logger.info(f"Downranked handle @{handle}: {'responding' if should_respond else 'skipping'} ({rate*100:.0f}% chance)")
95 return should_respond
96from notification_db import NotificationDB
97
98def extract_handles_from_data(data):
99 """Recursively extract all unique handles from nested data structure."""
100 handles = set()
101
102 def _extract_recursive(obj):
103 if isinstance(obj, dict):
104 # Check if this dict has a 'handle' key
105 if 'handle' in obj:
106 handles.add(obj['handle'])
107 # Recursively check all values
108 for value in obj.values():
109 _extract_recursive(value)
110 elif isinstance(obj, list):
111 # Recursively check all list items
112 for item in obj:
113 _extract_recursive(item)
114
115 _extract_recursive(data)
116 return list(handles)
117
118# Logging will be configured after argument parsing
119logger = None
120prompt_logger = None
121# Simple text formatting (Rich no longer used)
122SHOW_REASONING = False
123last_archival_query = "archival memory search"
124
125def log_with_panel(message, title=None, border_color="white"):
126 """Log a message with Unicode box-drawing characters"""
127 if title:
128 # Map old color names to appropriate symbols
129 symbol_map = {
130 "blue": "⚙", # Tool calls
131 "green": "✓", # Success/completion
132 "yellow": "◆", # Reasoning
133 "red": "✗", # Errors
134 "white": "▶", # Default/mentions
135 "cyan": "✎", # Posts
136 }
137 symbol = symbol_map.get(border_color, "▶")
138
139 print(f"\n{symbol} {title}")
140 print(f" {'─' * len(title)}")
141 # Indent message lines
142 for line in message.split('\n'):
143 print(f" {line}")
144 else:
145 print(message)
146
147
148# Load Letta configuration from config.yaml (will be initialized later with custom path if provided)
149letta_config = None
150CLIENT = None
151
152# Notification check delay
153FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response
154
155# Check for new notifications every N queue items
156CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing
157
158# Queue paths (will be initialized from config in main())
159QUEUE_DIR = None
160QUEUE_ERROR_DIR = None
161QUEUE_NO_REPLY_DIR = None
162PROCESSED_NOTIFICATIONS_FILE = None
163
164# Maximum number of processed notifications to track
165MAX_PROCESSED_NOTIFICATIONS = 10000
166
167# Message tracking counters
168message_counters = defaultdict(int)
169start_time = time.time()
170
171# Testing mode flag
172TESTING_MODE = False
173RESET_MESSAGES_AFTER_NOTIFICATION = False
174
175# Skip git operations flag
176SKIP_GIT = False
177
178# Synthesis message tracking
179last_synthesis_time = time.time()
180
181# Database for notification tracking
182NOTIFICATION_DB = None
183
184def export_agent_state(client, agent, skip_git=False):
185 """Export agent state to agent_archive/ (timestamped) and agents/ (current)."""
186 try:
187 # Confirm export with user unless git is being skipped
188 if not skip_git:
189 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip()
190 if response not in ['y', 'yes']:
191 logger.info("Agent export cancelled by user.")
192 return
193 else:
194 logger.info("Exporting agent state (git staging disabled)")
195
196 # Create directories if they don't exist
197 os.makedirs("agent_archive", exist_ok=True)
198 os.makedirs("agents", exist_ok=True)
199
200 # Export agent data
201 logger.info(f"Exporting agent {agent.id}. This takes some time...")
202 agent_data = client.agents.export_file(agent_id=agent.id)
203
204 # Save timestamped archive copy
205 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
206 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af")
207 with open(archive_file, 'w', encoding='utf-8') as f:
208 json.dump(agent_data, f, indent=2, ensure_ascii=False)
209
210 # Save current agent state
211 current_file = os.path.join("agents", "void.af")
212 with open(current_file, 'w', encoding='utf-8') as f:
213 json.dump(agent_data, f, indent=2, ensure_ascii=False)
214
215 logger.info(f"Agent exported to {archive_file} and {current_file}")
216
217 # Git add only the current agent file (archive is ignored) unless skip_git is True
218 if not skip_git:
219 try:
220 subprocess.run(["git", "add", current_file], check=True, capture_output=True)
221 logger.info("Added current agent file to git staging")
222 except subprocess.CalledProcessError as e:
223 logger.warning(f"Failed to git add agent file: {e}")
224
225 except Exception as e:
226 logger.error(f"Failed to export agent: {e}")
227
228def initialize_void():
229 logger.info("Starting void agent initialization...")
230
231 # Get the configured void agent by ID
232 logger.info("Loading void agent from config...")
233 agent_id = letta_config['agent_id']
234
235 try:
236 void_agent = CLIENT.agents.retrieve(agent_id=agent_id)
237 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})")
238 except Exception as e:
239 logger.error(f"Failed to load void agent {agent_id}: {e}")
240 logger.error("Please ensure the agent_id in config.yaml is correct")
241 raise e
242
243 # Export agent state
244 logger.info("Exporting agent state...")
245 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
246
247 # Log agent details
248 logger.info(f"Void agent details - ID: {void_agent.id}")
249 logger.info(f"Agent name: {void_agent.name}")
250 if hasattr(void_agent, 'llm_config'):
251 logger.info(f"Agent model: {void_agent.llm_config.model}")
252 if hasattr(void_agent, 'project_id') and void_agent.project_id:
253 logger.info(f"Agent project_id: {void_agent.project_id}")
254 if hasattr(void_agent, 'tools'):
255 logger.info(f"Agent has {len(void_agent.tools)} tools")
256 for tool in void_agent.tools[:3]: # Show first 3 tools
257 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})")
258
259 return void_agent
260
261
262def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False):
263 """Process a mention and generate a reply using the Letta agent.
264
265 Args:
266 void_agent: The Letta agent instance
267 atproto_client: The AT Protocol client
268 notification_data: The notification data dictionary
269 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
270
271 Returns:
272 True: Successfully processed, remove from queue
273 False: Failed but retryable, keep in queue
274 None: Failed with non-retryable error, move to errors directory
275 "no_reply": No reply was generated, move to no_reply directory
276 """
277 import uuid
278
279 # Generate correlation ID for tracking this notification through the pipeline
280 correlation_id = str(uuid.uuid4())[:8]
281
282 # Track attached user blocks for cleanup in finally
283 attached_user_blocks = []
284
285 try:
286 logger.info(f"[{correlation_id}] Starting process_mention", extra={
287 'correlation_id': correlation_id,
288 'notification_type': type(notification_data).__name__
289 })
290
291 # Handle both dict and object inputs for backwards compatibility
292 if isinstance(notification_data, dict):
293 uri = notification_data['uri']
294 mention_text = notification_data.get('record', {}).get('text', '')
295 author_handle = notification_data['author']['handle']
296 author_name = notification_data['author'].get('display_name') or author_handle
297 else:
298 # Legacy object access
299 uri = notification_data.uri
300 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
301 author_handle = notification_data.author.handle
302 author_name = notification_data.author.display_name or author_handle
303
304 logger.info(f"[{correlation_id}] Processing mention from @{author_handle}", extra={
305 'correlation_id': correlation_id,
306 'author_handle': author_handle,
307 'author_name': author_name,
308 'mention_uri': uri,
309 'mention_text_length': len(mention_text),
310 'mention_preview': mention_text[:100] if mention_text else ''
311 })
312
313 # Check if handle is in allowed list (if configured)
314 allowed_handles = get_config().get('bot.allowed_handles', [])
315 if allowed_handles and author_handle not in allowed_handles:
316 logger.info(f"[{correlation_id}] Skipping mention from @{author_handle} (not in allowed_handles)")
317 return True # Remove from queue
318
319 # Check if handle is downranked (reduced response rate)
320 downrank_handles = load_downrank_handles()
321 if not should_respond_to_handle(author_handle, downrank_handles):
322 logger.info(f"[{correlation_id}] Skipping mention from @{author_handle} (downranked, not selected)")
323 return True # Remove from queue
324
325 # Retrieve the entire thread associated with the mention
326 try:
327 thread = atproto_client.app.bsky.feed.get_post_thread({
328 'uri': uri,
329 'parent_height': 40,
330 'depth': 10
331 })
332 except Exception as e:
333 error_str = str(e)
334 # Check if this is a NotFound error
335 if 'NotFound' in error_str or 'Post not found' in error_str:
336 logger.warning(f"Post not found for URI {uri}, removing from queue")
337 return True # Return True to remove from queue
338 elif 'InternalServerError' in error_str:
339 # Bluesky sometimes returns InternalServerError for deleted posts
340 # Verify if post actually exists using getRecord
341 try:
342 parts = uri.replace('at://', '').split('/')
343 repo, collection, rkey = parts[0], parts[1], parts[2]
344 atproto_client.com.atproto.repo.get_record({
345 'repo': repo, 'collection': collection, 'rkey': rkey
346 })
347 # Post exists, this is a real server error - re-raise
348 logger.error(f"Error fetching thread (post exists, server error): {e}")
349 raise
350 except Exception as verify_e:
351 if 'RecordNotFound' in str(verify_e) or 'not found' in str(verify_e).lower():
352 logger.warning(f"Post deleted (verified via getRecord), removing from queue: {uri}")
353 return True # Remove from queue
354 # Some other verification error, re-raise original
355 logger.error(f"Error fetching thread: {e}")
356 raise
357 else:
358 # Re-raise other errors
359 logger.error(f"Error fetching thread: {e}")
360 raise
361
362 # Check thread length against configured maximum
363 max_thread_posts = get_config().get('bot.max_thread_posts', 0)
364 if max_thread_posts > 0:
365 thread_post_count = count_thread_posts(thread)
366 if thread_post_count > max_thread_posts:
367 logger.info(f"Thread too long ({thread_post_count} posts > {max_thread_posts} max), skipping this mention")
368 return True # Return True to remove from queue
369
370 # Get thread context as YAML string
371 logger.debug("Converting thread to YAML string")
372 try:
373 thread_context = thread_to_yaml_string(thread)
374 logger.debug(f"Thread context generated, length: {len(thread_context)} characters")
375
376 # Check if #voidstop appears anywhere in the thread
377 if "#voidstop" in thread_context.lower():
378 logger.info("Found #voidstop in thread context, skipping this mention")
379 return True # Return True to remove from queue
380
381 # Also check the mention text directly
382 if "#voidstop" in mention_text.lower():
383 logger.info("Found #voidstop in mention text, skipping this mention")
384 return True # Return True to remove from queue
385
386 # Create a more informative preview by extracting meaningful content
387 lines = thread_context.split('\n')
388 meaningful_lines = []
389
390 for line in lines:
391 stripped = line.strip()
392 if not stripped:
393 continue
394
395 # Look for lines with actual content (not just structure)
396 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']):
397 meaningful_lines.append(line)
398 if len(meaningful_lines) >= 5:
399 break
400
401 if meaningful_lines:
402 preview = '\n'.join(meaningful_lines)
403 logger.debug(f"Thread content preview:\n{preview}")
404 else:
405 # If no content fields found, just show it's a thread structure
406 logger.debug(f"Thread structure generated ({len(thread_context)} chars)")
407 except Exception as yaml_error:
408 import traceback
409 logger.error(f"Error converting thread to YAML: {yaml_error}")
410 logger.error(f"Full traceback:\n{traceback.format_exc()}")
411 logger.error(f"Thread type: {type(thread)}")
412 if hasattr(thread, '__dict__'):
413 logger.error(f"Thread attributes: {thread.__dict__}")
414 # Try to continue with a simple context
415 thread_context = f"Error processing thread context: {str(yaml_error)}"
416
417 # Create a prompt for the Letta agent with thread context
418 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
419
420MOST RECENT POST:
421"{mention_text}"
422
423THREAD CONTEXT:
424```yaml
425{thread_context}
426```
427
428If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call creates one post (max 300 characters). You may use multiple calls to create a thread if needed."""
429
430 # Extract all handles from notification and thread data
431 all_handles = set()
432 all_handles.update(extract_handles_from_data(notification_data))
433 all_handles.update(extract_handles_from_data(thread.model_dump()))
434 unique_handles = list(all_handles)
435
436 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}")
437
438 # Check if any handles are in known_bots list
439 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs
440 import json
441
442 try:
443 # Check for known bots in thread
444 bot_check_result = check_known_bots(unique_handles, void_agent)
445 bot_check_data = json.loads(bot_check_result)
446
447 # TEMPORARILY DISABLED: Bot detection causing issues with normal users
448 # TODO: Re-enable after debugging why normal users are being flagged as bots
449 if False: # bot_check_data.get("bot_detected", False):
450 detected_bots = bot_check_data.get("detected_bots", [])
451 logger.info(f"Bot detected in thread: {detected_bots}")
452
453 # Decide whether to respond (10% chance)
454 if not should_respond_to_bot_thread():
455 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}")
456 # Return False to keep in queue for potential later processing
457 return False
458 else:
459 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}")
460 else:
461 logger.debug("Bot detection disabled - processing all notifications")
462
463 except Exception as bot_check_error:
464 logger.warning(f"Error checking for bots: {bot_check_error}")
465 # Continue processing if bot check fails
466
467 # Get response from Letta agent
468 # Format with Unicode characters
469 title = f"MENTION FROM @{author_handle}"
470 print(f"\n▶ {title}")
471 print(f" {'═' * len(title)}")
472 # Indent the mention text
473 for line in mention_text.split('\n'):
474 print(f" {line}")
475
476 # Log prompt details to separate logger
477 prompt_logger.debug(f"Full prompt being sent:\n{prompt}")
478
479 # Log concise prompt info to main logger
480 thread_handles_count = len(unique_handles)
481 prompt_char_count = len(prompt)
482 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars")
483
484 # Attach user blocks for thread participants
485 try:
486 success, attached_user_blocks = attach_user_blocks_for_thread(CLIENT, void_agent.id, unique_handles)
487 if not success:
488 logger.warning("Failed to attach some user blocks, continuing anyway")
489 except Exception as e:
490 logger.error(f"Error attaching user blocks: {e}")
491
492 try:
493 # Extract images from thread for vision support
494 thread_images = extract_images_from_thread(thread, max_images=4)
495
496 # Build message (with or without images)
497 if VISION_ENABLED and thread_images:
498 logger.info(f"Thread contains {len(thread_images)} images, downloading for vision...")
499 message = create_message_with_vision(prompt, thread_images, max_images=4)
500 if isinstance(message.get('content'), list):
501 logger.info(f"Vision message created with {len([c for c in message['content'] if c.get('type') == 'image'])} images")
502 else:
503 if thread_images and not VISION_ENABLED:
504 logger.debug(f"Thread has {len(thread_images)} images but vision not enabled (install pillow)")
505 message = {"role": "user", "content": prompt}
506
507 # Use streaming to avoid 524 timeout errors
508 message_stream = CLIENT.agents.messages.stream(
509 agent_id=void_agent.id,
510 messages=[message],
511 stream_tokens=False, # Step streaming only (faster than token streaming)
512 max_steps=100
513 )
514
515 # Collect the streaming response with timeout protection
516 all_messages = []
517 stream_start_time = time.time()
518 max_stream_duration = 600 # 10 minutes max
519
520 for chunk in message_stream:
521 # Check for timeout
522 if time.time() - stream_start_time > max_stream_duration:
523 logger.warning(f"Stream exceeded {max_stream_duration}s timeout, breaking")
524 break
525 # Log condensed chunk info
526 if hasattr(chunk, 'message_type'):
527 if chunk.message_type == 'reasoning_message':
528 # Show full reasoning without truncation
529 if SHOW_REASONING:
530 # Format with Unicode characters
531 print("\n◆ Reasoning")
532 print(" ─────────")
533 # Indent reasoning lines
534 for line in chunk.reasoning.split('\n'):
535 print(f" {line}")
536 else:
537 # Default log format (only when --reasoning is used due to log level)
538 # Format with Unicode characters
539 print("\n◆ Reasoning")
540 print(" ─────────")
541 # Indent reasoning lines
542 for line in chunk.reasoning.split('\n'):
543 print(f" {line}")
544
545 # Create ATProto record for reasoning (unless in testing mode)
546 if not testing_mode and hasattr(chunk, 'reasoning'):
547 try:
548 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning)
549 except Exception as e:
550 logger.debug(f"Failed to create reasoning record: {e}")
551 elif chunk.message_type == 'tool_call_message':
552 # Parse tool arguments for better display
553 tool_name = chunk.tool_call.name
554
555 # Create ATProto record for tool call (unless in testing mode)
556 if not testing_mode:
557 try:
558 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None
559 bsky_utils.create_tool_call_record(
560 atproto_client,
561 tool_name,
562 chunk.tool_call.arguments,
563 tool_call_id
564 )
565 except Exception as e:
566 logger.debug(f"Failed to create tool call record: {e}")
567
568 try:
569 args = json.loads(chunk.tool_call.arguments)
570 # Format based on tool type
571 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']:
572 # Extract the text being posted
573 text = args.get('text', '')
574 if text:
575 # Format with Unicode characters
576 print("\n✎ Bluesky Post")
577 print(" ────────────")
578 # Indent post text
579 for line in text.split('\n'):
580 print(f" {line}")
581 else:
582 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
583 elif tool_name == 'archival_memory_search':
584 query = args.get('query', 'unknown')
585 global last_archival_query
586 last_archival_query = query
587 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue")
588 elif tool_name == 'archival_memory_insert':
589 content = args.get('content', '')
590 # Show the full content being inserted
591 log_with_panel(content, f"Tool call: {tool_name}", "blue")
592 elif tool_name == 'update_block':
593 label = args.get('label', 'unknown')
594 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', ''))
595 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue")
596 else:
597 # Generic display for other tools
598 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
599 if len(args_str) > 150:
600 args_str = args_str[:150] + "..."
601 log_with_panel(args_str, f"Tool call: {tool_name}", "blue")
602 except:
603 # Fallback to original format if parsing fails
604 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
605 elif chunk.message_type == 'tool_return_message':
606 # Enhanced tool result logging
607 tool_name = chunk.name
608 status = chunk.status
609
610 if status == 'success':
611 # Try to show meaningful result info based on tool type
612 if hasattr(chunk, 'tool_return') and chunk.tool_return:
613 result_str = str(chunk.tool_return)
614 if tool_name == 'archival_memory_search':
615
616 try:
617 # Handle both string and list formats
618 if isinstance(chunk.tool_return, str):
619 # The string format is: "([{...}, {...}], count)"
620 # We need to extract just the list part
621 if chunk.tool_return.strip():
622 # Find the list part between the first [ and last ]
623 start_idx = chunk.tool_return.find('[')
624 end_idx = chunk.tool_return.rfind(']')
625 if start_idx != -1 and end_idx != -1:
626 list_str = chunk.tool_return[start_idx:end_idx+1]
627 # Use ast.literal_eval since this is Python literal syntax, not JSON
628 import ast
629 results = ast.literal_eval(list_str)
630 else:
631 logger.warning("Could not find list in archival_memory_search result")
632 results = []
633 else:
634 logger.warning("Empty string returned from archival_memory_search")
635 results = []
636 else:
637 # If it's already a list, use directly
638 results = chunk.tool_return
639
640 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name} ✓", "green")
641
642 # Use the captured search query from the tool call
643 search_query = last_archival_query
644
645 # Combine all results into a single text block
646 content_text = ""
647 for i, entry in enumerate(results, 1):
648 timestamp = entry.get('timestamp', 'N/A')
649 content = entry.get('content', '')
650 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n"
651
652 # Format with Unicode characters
653 title = f"{search_query} ({len(results)} results)"
654 print(f"\n⚙ {title}")
655 print(f" {'─' * len(title)}")
656 # Indent content text
657 for line in content_text.strip().split('\n'):
658 print(f" {line}")
659
660 except Exception as e:
661 logger.error(f"Error formatting archival memory results: {e}")
662 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name} ✓", "green")
663 elif tool_name == 'add_post_to_bluesky_reply_thread':
664 # Just show success for bluesky posts, the text was already shown in tool call
665 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green")
666 elif tool_name == 'archival_memory_insert':
667 # Skip archival memory insert results (always returns None)
668 pass
669 elif tool_name == 'update_block':
670 log_with_panel("Memory block updated", f"Tool result: {tool_name} ✓", "green")
671 else:
672 # Generic success with preview
673 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str
674 log_with_panel(preview, f"Tool result: {tool_name} ✓", "green")
675 else:
676 log_with_panel("Success", f"Tool result: {tool_name} ✓", "green")
677 elif status == 'error':
678 # Show error details
679 if tool_name == 'add_post_to_bluesky_reply_thread':
680 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred"
681 log_with_panel(error_str, f"Bluesky Post ✗", "red")
682 elif tool_name == 'archival_memory_insert':
683 # Skip archival memory insert errors too
684 pass
685 else:
686 error_preview = ""
687 if hasattr(chunk, 'tool_return') and chunk.tool_return:
688 error_str = str(chunk.tool_return)
689 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str
690 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name} ✗", "red")
691 else:
692 log_with_panel("Error occurred", f"Tool result: {tool_name} ✗", "red")
693 else:
694 logger.info(f"Tool result: {tool_name} - {status}")
695 elif chunk.message_type == 'assistant_message':
696 # Format with Unicode characters
697 print("\n▶ Assistant Response")
698 print(" ──────────────────")
699 # Indent response text
700 for line in chunk.content.split('\n'):
701 print(f" {line}")
702 elif chunk.message_type == 'error_message':
703 # Dump full error object
704 logger.error(f"Agent error_message: {chunk}")
705 if hasattr(chunk, 'model_dump'):
706 logger.error(f"Agent error (dict): {chunk.model_dump()}")
707 elif hasattr(chunk, '__dict__'):
708 logger.error(f"Agent error (vars): {vars(chunk)}")
709 elif chunk.message_type == 'ping':
710 # Silently ignore ping keepalive messages
711 logger.debug(f"Received keepalive ping from Letta API")
712 else:
713 # Filter out verbose message types
714 if chunk.message_type not in ['usage_statistics', 'stop_reason']:
715 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...")
716 else:
717 logger.info(f"📦 Stream status: {chunk}")
718
719 # Log full chunk for debugging
720 logger.debug(f"Full streaming chunk: {chunk}")
721 all_messages.append(chunk)
722 if str(chunk) == 'done':
723 break
724
725 # Convert streaming response to standard format for compatibility
726 message_response = type('StreamingResponse', (), {
727 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
728 })()
729 except Exception as api_error:
730 import traceback
731 error_str = str(api_error)
732 logger.error(f"Letta API error: {api_error}")
733 logger.error(f"Error type: {type(api_error).__name__}")
734 logger.error(f"Full traceback:\n{traceback.format_exc()}")
735 logger.error(f"Mention text was: {mention_text}")
736 logger.error(f"Author: @{author_handle}")
737 logger.error(f"URI: {uri}")
738
739
740 # Try to extract more info from different error types
741 if hasattr(api_error, 'response'):
742 logger.error(f"Error response object exists")
743 if hasattr(api_error.response, 'text'):
744 logger.error(f"Response text: {api_error.response.text}")
745 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
746 try:
747 logger.error(f"Response JSON: {api_error.response.json()}")
748 except:
749 pass
750
751 # Check for specific error types
752 if hasattr(api_error, 'status_code'):
753 logger.error(f"API Status code: {api_error.status_code}")
754 if hasattr(api_error, 'body'):
755 logger.error(f"API Response body: {api_error.body}")
756 if hasattr(api_error, 'headers'):
757 logger.error(f"API Response headers: {api_error.headers}")
758
759 if api_error.status_code == 413:
760 logger.error("413 Payload Too Large - moving to errors directory")
761 return None # Move to errors directory - payload is too large to ever succeed
762 elif api_error.status_code == 524:
763 logger.error("524 error - timeout from Cloudflare, will retry later")
764 return False # Keep in queue for retry
765
766 # Check if error indicates we should remove from queue
767 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
768 logger.warning("Payload too large error, moving to errors directory")
769 return None # Move to errors directory - cannot be fixed by retry
770 elif 'status_code: 524' in error_str:
771 logger.warning("524 timeout error, keeping in queue for retry")
772 return False # Keep in queue for retry
773
774 raise
775
776 # Log successful response
777 logger.debug("Successfully received response from Letta API")
778 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
779
780 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response
781 reply_candidates = []
782 tool_call_results = {} # Map tool_call_id to status
783 ack_note = None # Track any note from annotate_ack tool
784 flagged_memories = [] # Track memories flagged for deletion
785
786 logger.debug(f"Processing {len(message_response.messages)} response messages...")
787
788 # First pass: collect tool return statuses
789 ignored_notification = False
790 ignore_reason = ""
791 ignore_category = ""
792
793 logger.debug(f"Processing {len(message_response.messages)} messages from agent")
794
795 for message in message_response.messages:
796 # Log detailed message attributes for debugging
797 msg_type = getattr(message, 'message_type', 'unknown')
798 has_tool_call_id = hasattr(message, 'tool_call_id')
799 has_status = hasattr(message, 'status')
800 has_tool_return = hasattr(message, 'tool_return')
801
802 logger.debug(f"Message type={msg_type}, has_tool_call_id={has_tool_call_id}, has_status={has_status}, has_tool_return={has_tool_return}")
803
804 # Tool return messages are identified by having tool_return attribute, tool_call_id, and status
805 if has_tool_call_id and has_status and has_tool_return:
806 logger.debug(f" -> tool_call_id={message.tool_call_id}, status={message.status}")
807
808 # Store the result for ANY tool that has a return - we'll match by tool_call_id later
809 tool_call_results[message.tool_call_id] = message.status
810 logger.debug(f"Stored tool result: {message.tool_call_id} -> {message.status}")
811
812 # Handle special processing for ignore_notification
813 if message.status == 'success':
814 result_str = str(message.tool_return) if message.tool_return else ""
815 if 'IGNORED_NOTIFICATION::' in result_str:
816 parts = result_str.split('::')
817 if len(parts) >= 3:
818 ignore_category = parts[1]
819 ignore_reason = parts[2]
820 ignored_notification = True
821 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}")
822
823 # Check for deprecated tool in tool call messages
824 elif hasattr(message, 'tool_call') and message.tool_call:
825 if message.tool_call.name == 'bluesky_reply':
826 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
827 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
828 logger.error("Update the agent's tools using register_tools.py")
829 # Export agent state before terminating
830 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
831 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
832 exit(1)
833
834 logger.debug(f"First pass complete. Collected {len(tool_call_results)} tool call results")
835 logger.debug(f"tool_call_results: {tool_call_results}")
836
837 # Second pass: process messages and check for successful tool calls
838 for i, message in enumerate(message_response.messages, 1):
839 # Log concise message info instead of full object
840 msg_type = getattr(message, 'message_type', 'unknown')
841 if hasattr(message, 'reasoning') and message.reasoning:
842 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...")
843 elif hasattr(message, 'tool_call') and message.tool_call:
844 tool_name = message.tool_call.name
845 logger.debug(f" {i}. {msg_type}: {tool_name}")
846 elif hasattr(message, 'tool_return'):
847 tool_name = getattr(message, 'name', 'unknown_tool')
848 return_preview = str(message.tool_return)[:100] if message.tool_return else "None"
849 status = getattr(message, 'status', 'unknown')
850 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})")
851 elif hasattr(message, 'text'):
852 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...")
853 else:
854 logger.debug(f" {i}. {msg_type}: <no content>")
855
856 # Check for halt_activity tool call
857 if hasattr(message, 'tool_call') and message.tool_call:
858 if message.tool_call.name == 'halt_activity':
859 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT")
860 try:
861 args = json.loads(message.tool_call.arguments)
862 reason = args.get('reason', 'Agent requested halt')
863 logger.info(f"Halt reason: {reason}")
864 except:
865 logger.info("Halt reason: <unable to parse>")
866
867 # Delete the queue file before terminating
868 if queue_filepath and queue_filepath.exists():
869 queue_filepath.unlink()
870 logger.info(f"Deleted queue file: {queue_filepath.name}")
871
872 # Also mark as processed to avoid reprocessing
873 if NOTIFICATION_DB:
874 NOTIFICATION_DB.mark_processed(notification_data.get('uri', ''), status='processed')
875 else:
876 processed_uris = load_processed_notifications()
877 processed_uris.add(notification_data.get('uri', ''))
878 save_processed_notifications(processed_uris)
879
880 # Export agent state before terminating
881 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
882
883 # Exit the program
884 logger.info("=== BOT TERMINATED BY AGENT ===")
885 exit(0)
886
887 # Check for deprecated bluesky_reply tool
888 if hasattr(message, 'tool_call') and message.tool_call:
889 if message.tool_call.name == 'bluesky_reply':
890 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
891 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
892 logger.error("Update the agent's tools using register_tools.py")
893 # Export agent state before terminating
894 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
895 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
896 exit(1)
897
898 # Collect annotate_ack tool calls
899 elif message.tool_call.name == 'annotate_ack':
900 try:
901 args = json.loads(message.tool_call.arguments)
902 note = args.get('note', '')
903 if note:
904 ack_note = note
905 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
906 except json.JSONDecodeError as e:
907 logger.error(f"Failed to parse annotate_ack arguments: {e}")
908
909 # Collect flag_archival_memory_for_deletion tool calls
910 elif message.tool_call.name == 'flag_archival_memory_for_deletion':
911 try:
912 args = json.loads(message.tool_call.arguments)
913 reason = args.get('reason', '')
914 memory_text = args.get('memory_text', '')
915 confirm = args.get('confirm', False)
916
917 # Only flag for deletion if confirmed and has all required fields
918 if confirm and memory_text and reason:
919 flagged_memories.append({
920 'reason': reason,
921 'memory_text': memory_text
922 })
923 logger.debug(f"Found memory flagged for deletion (reason: {reason}): {memory_text[:50]}...")
924 elif not confirm:
925 logger.debug(f"Memory deletion not confirmed, skipping: {memory_text[:50]}...")
926 elif not reason:
927 logger.warning(f"Memory deletion missing reason, skipping: {memory_text[:50]}...")
928 except json.JSONDecodeError as e:
929 logger.error(f"Failed to parse flag_archival_memory_for_deletion arguments: {e}")
930
931 # Collect archival_memory_insert tool calls for recording to AT Protocol
932 elif message.tool_call.name == 'archival_memory_insert':
933 try:
934 args = json.loads(message.tool_call.arguments)
935 content = args.get('content', '')
936 tags_str = args.get('tags', None)
937
938 # Parse tags from string representation if present
939 tags = None
940 if tags_str:
941 try:
942 tags = json.loads(tags_str) if isinstance(tags_str, str) else tags_str
943 except json.JSONDecodeError:
944 logger.warning(f"Failed to parse tags from archival_memory_insert: {tags_str[:50]}...")
945
946 if content:
947 # Create stream.thought.memory record
948 try:
949 memory_result = bsky_utils.create_memory_record(atproto_client, content, tags)
950 if memory_result:
951 tags_info = f" ({len(tags)} tags)" if tags else ""
952 logger.info(f"📝 Recorded archival memory to AT Protocol{tags_info}: {content[:100]}...")
953 else:
954 logger.warning(f"Failed to record archival memory to AT Protocol")
955 except Exception as e:
956 logger.error(f"Error creating memory record: {e}")
957 except json.JSONDecodeError as e:
958 logger.error(f"Failed to parse archival_memory_insert arguments: {e}")
959
960 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful
961 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread':
962 tool_call_id = message.tool_call.tool_call_id
963 tool_status = tool_call_results.get(tool_call_id, 'unknown')
964
965 logger.debug(f"Found add_post_to_bluesky_reply_thread tool call: id={tool_call_id}, status={tool_status}")
966 logger.debug(f"Available tool_call_results: {tool_call_results}")
967
968 if tool_status == 'success':
969 try:
970 args = json.loads(message.tool_call.arguments)
971 reply_text = args.get('text', '')
972 reply_lang = args.get('lang', 'en-US')
973
974 if reply_text: # Only add if there's actual content
975 reply_candidates.append((reply_text, reply_lang))
976 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})")
977 except json.JSONDecodeError as e:
978 logger.error(f"Failed to parse tool call arguments: {e}")
979 elif tool_status == 'error':
980 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)")
981 else:
982 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}")
983 logger.warning(f" Tool call ID '{tool_call_id}' not found in tool_call_results dict")
984
985 # Handle archival memory deletion if any were flagged (only if no halt was received)
986 if flagged_memories:
987 logger.info(f"Processing {len(flagged_memories)} flagged memories for deletion")
988 for flagged_memory in flagged_memories:
989 reason = flagged_memory['reason']
990 memory_text = flagged_memory['memory_text']
991
992 try:
993 # Search for passages with this exact text
994 logger.debug(f"Searching for passages matching: {memory_text[:100]}...")
995 passages_page = CLIENT.agents.passages.list(
996 agent_id=void_agent.id,
997 query=memory_text
998 )
999 passages = passages_page.items if hasattr(passages_page, 'items') else passages_page
1000
1001 if not passages:
1002 logger.warning(f"No passages found matching flagged memory: {memory_text[:50]}...")
1003 continue
1004
1005 # Delete all matching passages
1006 deleted_count = 0
1007 for passage in passages:
1008 # Check if the passage text exactly matches (to avoid partial matches)
1009 if hasattr(passage, 'text') and passage.text == memory_text:
1010 try:
1011 CLIENT.agents.passages.delete(
1012 agent_id=void_agent.id,
1013 passage_id=str(passage.id)
1014 )
1015 deleted_count += 1
1016 logger.debug(f"Deleted passage {passage.id}")
1017 except Exception as delete_error:
1018 logger.error(f"Failed to delete passage {passage.id}: {delete_error}")
1019
1020 if deleted_count > 0:
1021 logger.info(f"🗑️ Deleted {deleted_count} archival memory passage(s) (reason: {reason}): {memory_text[:50]}...")
1022 else:
1023 logger.warning(f"No exact matches found for deletion: {memory_text[:50]}...")
1024
1025 except Exception as e:
1026 logger.error(f"Error processing memory deletion: {e}")
1027
1028 # Check for conflicting tool calls
1029 if reply_candidates and ignored_notification:
1030 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!")
1031 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}")
1032 logger.warning("Item will be left in queue for manual review")
1033 # Return False to keep in queue
1034 return False
1035
1036 if reply_candidates:
1037 # Aggregate reply posts into a thread
1038 reply_messages = []
1039 reply_langs = []
1040 for text, lang in reply_candidates:
1041 reply_messages.append(text)
1042 reply_langs.append(lang)
1043
1044 # Use the first language for the entire thread (could be enhanced later)
1045 reply_lang = reply_langs[0] if reply_langs else 'en-US'
1046
1047 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread")
1048
1049 # Display the generated reply thread
1050 if len(reply_messages) == 1:
1051 content = reply_messages[0]
1052 title = f"Reply to @{author_handle}"
1053 else:
1054 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)])
1055 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)"
1056
1057 # Format with Unicode characters
1058 print(f"\n✎ {title}")
1059 print(f" {'─' * len(title)}")
1060 # Indent content lines
1061 for line in content.split('\n'):
1062 print(f" {line}")
1063
1064 # Send the reply(s) with language (unless in testing mode)
1065 if testing_mode:
1066 logger.info("TESTING MODE: Skipping actual Bluesky post")
1067 response = True # Simulate success
1068 else:
1069 if len(reply_messages) == 1:
1070 # Single reply - use existing function
1071 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0])
1072 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})")
1073 response = bsky_utils.reply_to_notification(
1074 client=atproto_client,
1075 notification=notification_data,
1076 reply_text=cleaned_text,
1077 lang=reply_lang,
1078 correlation_id=correlation_id
1079 )
1080 else:
1081 # Multiple replies - use new threaded function
1082 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages]
1083 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})")
1084 response = bsky_utils.reply_with_thread_to_notification(
1085 client=atproto_client,
1086 notification=notification_data,
1087 reply_messages=cleaned_messages,
1088 lang=reply_lang,
1089 correlation_id=correlation_id
1090 )
1091
1092 if response:
1093 logger.info(f"[{correlation_id}] Successfully replied to @{author_handle}", extra={
1094 'correlation_id': correlation_id,
1095 'author_handle': author_handle,
1096 'reply_count': len(reply_messages)
1097 })
1098
1099 # Acknowledge the post we're replying to with stream.thought.ack
1100 try:
1101 post_uri = notification_data.get('uri')
1102 post_cid = notification_data.get('cid')
1103
1104 if post_uri and post_cid:
1105 ack_result = bsky_utils.acknowledge_post(
1106 client=atproto_client,
1107 post_uri=post_uri,
1108 post_cid=post_cid,
1109 note=ack_note
1110 )
1111 if ack_result:
1112 if ack_note:
1113 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")")
1114 else:
1115 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack")
1116 else:
1117 logger.warning(f"Failed to acknowledge post from @{author_handle}")
1118 else:
1119 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}")
1120 except Exception as e:
1121 logger.error(f"Error acknowledging post from @{author_handle}: {e}")
1122 # Don't fail the entire operation if acknowledgment fails
1123
1124 return True
1125 else:
1126 logger.error(f"Failed to send reply to @{author_handle}")
1127 return False
1128 else:
1129 # Check if notification was explicitly ignored
1130 if ignored_notification:
1131 logger.info(f"[{correlation_id}] Notification from @{author_handle} was explicitly ignored (category: {ignore_category})", extra={
1132 'correlation_id': correlation_id,
1133 'author_handle': author_handle,
1134 'ignore_category': ignore_category
1135 })
1136 return "ignored"
1137 else:
1138 logger.warning(f"[{correlation_id}] No reply generated for mention from @{author_handle}, moving to no_reply folder", extra={
1139 'correlation_id': correlation_id,
1140 'author_handle': author_handle
1141 })
1142 return "no_reply"
1143
1144 except Exception as e:
1145 logger.error(f"[{correlation_id}] Error processing mention: {e}", extra={
1146 'correlation_id': correlation_id,
1147 'error': str(e),
1148 'error_type': type(e).__name__,
1149 'author_handle': author_handle if 'author_handle' in locals() else 'unknown'
1150 })
1151 return False
1152 finally:
1153 # Always detach user blocks after processing
1154 if attached_user_blocks:
1155 try:
1156 detach_user_blocks_for_thread(CLIENT, void_agent.id, attached_user_blocks)
1157 except Exception as e:
1158 logger.error(f"Error detaching user blocks: {e}")
1159
1160
1161
1162def queue_file_sort_key(filepath):
1163 """
1164 Sort key for queue files: priority first (0 before 1), then oldest first within each priority.
1165 Filename format: {priority}_{YYYYMMDD}_{HHMMSS}_{reason}_{hash}.json
1166 """
1167 parts = filepath.name.split('_')
1168 if len(parts) >= 3:
1169 priority = int(parts[0]) # 0 or 1
1170 date_part = parts[1] # YYYYMMDD
1171 time_part = parts[2] # HHMMSS
1172 timestamp = int(date_part + time_part) # YYYYMMDDHHMMSS as integer
1173 # Return (priority ascending, timestamp ascending for oldest first)
1174 return (priority, timestamp)
1175 return (1, float('inf')) # Fallback: treat as normal priority, process last
1176
1177def notification_to_dict(notification):
1178 """Convert a notification object to a dictionary for JSON serialization."""
1179 return {
1180 'uri': notification.uri,
1181 'cid': notification.cid,
1182 'reason': notification.reason,
1183 'is_read': notification.is_read,
1184 'indexed_at': notification.indexed_at,
1185 'author': {
1186 'handle': notification.author.handle,
1187 'display_name': notification.author.display_name,
1188 'did': notification.author.did
1189 },
1190 'record': {
1191 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
1192 }
1193 }
1194
1195
1196def load_processed_notifications():
1197 """Load the set of processed notification URIs from database."""
1198 global NOTIFICATION_DB
1199 if NOTIFICATION_DB:
1200 return NOTIFICATION_DB.get_processed_uris(limit=MAX_PROCESSED_NOTIFICATIONS)
1201 return set()
1202
1203
1204def save_processed_notifications(processed_set):
1205 """Save the set of processed notification URIs to database."""
1206 # This is now handled by marking individual notifications in the DB
1207 # Keeping function for compatibility but it doesn't need to do anything
1208 pass
1209
1210
1211def save_notification_to_queue(notification, is_priority=None):
1212 """Save a notification to the queue directory with priority-based filename."""
1213 try:
1214 global NOTIFICATION_DB
1215
1216 # Handle both notification objects and dicts
1217 if isinstance(notification, dict):
1218 notif_dict = notification
1219 notification_uri = notification.get('uri')
1220 else:
1221 notif_dict = notification_to_dict(notification)
1222 notification_uri = notification.uri
1223
1224 # Check if already processed (using database if available)
1225 if NOTIFICATION_DB:
1226 if NOTIFICATION_DB.is_processed(notification_uri):
1227 logger.debug(f"Notification already processed (DB): {notification_uri}")
1228 return False
1229 # Add to database - if this fails, don't queue the notification
1230 if not NOTIFICATION_DB.add_notification(notif_dict):
1231 logger.warning(f"Failed to add notification to database, skipping: {notification_uri}")
1232 return False
1233 else:
1234 # Fall back to old JSON method
1235 processed_uris = load_processed_notifications()
1236 if notification_uri in processed_uris:
1237 logger.debug(f"Notification already processed: {notification_uri}")
1238 return False
1239
1240 # Create JSON string
1241 notif_json = json.dumps(notif_dict, sort_keys=True)
1242
1243 # Generate hash for filename (to avoid duplicates)
1244 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
1245
1246 # Determine priority based on author handle or explicit priority
1247 if is_priority is not None:
1248 priority_prefix = "0_" if is_priority else "1_"
1249 else:
1250 if isinstance(notification, dict):
1251 author_handle = notification.get('author', {}).get('handle', '')
1252 else:
1253 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else ''
1254 # Prioritize cameron.pfiffer.org responses
1255 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_"
1256
1257 # Create filename with priority, timestamp and hash
1258 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
1259 reason = notif_dict.get('reason', 'unknown')
1260 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json"
1261 filepath = QUEUE_DIR / filename
1262
1263 # Check if this notification URI is already in the queue
1264 for existing_file in QUEUE_DIR.glob("*.json"):
1265 if existing_file.name == "processed_notifications.json":
1266 continue
1267 try:
1268 with open(existing_file, 'r') as f:
1269 existing_data = json.load(f)
1270 if existing_data.get('uri') == notification_uri:
1271 logger.debug(f"Notification already queued (URI: {notification_uri})")
1272 return False
1273 except:
1274 continue
1275
1276 # Write to file
1277 with open(filepath, 'w') as f:
1278 json.dump(notif_dict, f, indent=2)
1279
1280 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal"
1281 logger.info(f"Queued notification ({priority_label}): {filename}")
1282 return True
1283
1284 except Exception as e:
1285 logger.error(f"Error saving notification to queue: {e}")
1286 return False
1287
1288
1289def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False):
1290 """Load and process all notifications from the queue in priority order (newest first)."""
1291 try:
1292 # Get all JSON files in queue directory (excluding processed_notifications.json)
1293 all_queue_files = [f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]
1294
1295 # Sort by priority first (0_ before 1_), then by timestamp (oldest first within each priority)
1296 all_queue_files = sorted(all_queue_files, key=queue_file_sort_key)
1297
1298 # Filter out and delete like notifications immediately
1299 queue_files = []
1300 likes_deleted = 0
1301
1302 for filepath in all_queue_files:
1303 try:
1304 with open(filepath, 'r') as f:
1305 notif_data = json.load(f)
1306
1307 # If it's a like, delete it immediately and don't process
1308 if notif_data.get('reason') == 'like':
1309 filepath.unlink()
1310 likes_deleted += 1
1311 logger.debug(f"Deleted like notification: {filepath.name}")
1312 else:
1313 queue_files.append(filepath)
1314 except Exception as e:
1315 logger.warning(f"Error checking notification file {filepath.name}: {e}")
1316 queue_files.append(filepath) # Keep it in case it's valid
1317
1318 if likes_deleted > 0:
1319 logger.info(f"Deleted {likes_deleted} like notifications from queue")
1320
1321 if not queue_files:
1322 return
1323
1324 logger.info(f"Processing {len(queue_files)} queued notifications")
1325
1326 # Log current statistics
1327 elapsed_time = time.time() - start_time
1328 total_messages = sum(message_counters.values())
1329 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1330
1331 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min")
1332
1333 for i, filepath in enumerate(queue_files, 1):
1334 # Determine if this is a priority notification
1335 is_priority = filepath.name.startswith("0_")
1336
1337 # Check for new notifications periodically during queue processing
1338 # Also check immediately after processing each priority item
1339 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1)
1340
1341 # If we just processed a priority item, immediately check for new priority notifications
1342 if is_priority and i > 1:
1343 should_check_notifications = True
1344
1345 if should_check_notifications:
1346 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)")
1347 try:
1348 # Fetch and queue new notifications without processing them
1349 new_count = fetch_and_queue_new_notifications(atproto_client)
1350
1351 if new_count > 0:
1352 logger.info(f"Added {new_count} new notifications to queue")
1353 # Reload the queue files to include the new items with same sorting
1354 updated_queue_files = [f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]
1355 updated_queue_files = sorted(updated_queue_files, key=queue_file_sort_key)
1356 queue_files = updated_queue_files
1357 logger.info(f"Queue updated: now {len(queue_files)} total items")
1358 except Exception as e:
1359 logger.error(f"Error checking for new notifications: {e}")
1360
1361 priority_label = " [PRIORITY]" if is_priority else ""
1362 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}")
1363 try:
1364 # Load notification data
1365 with open(filepath, 'r') as f:
1366 notif_data = json.load(f)
1367
1368 # Process based on type using dict data directly
1369 success = False
1370 if notif_data['reason'] == "mention":
1371 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
1372 if success:
1373 message_counters['mentions'] += 1
1374 elif notif_data['reason'] == "reply":
1375 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
1376 if success:
1377 message_counters['replies'] += 1
1378 elif notif_data['reason'] == "follow":
1379 # author_handle = notif_data['author']['handle']
1380 # author_display_name = notif_data['author'].get('display_name', 'no display name')
1381 # follow_update = f"@{author_handle} ({author_display_name}) started following you."
1382 # follow_message = f"Update: {follow_update}"
1383 # logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars")
1384 # CLIENT.agents.messages.create(
1385 # agent_id = void_agent.id,
1386 # messages = [{"role":"user", "content": follow_message}]
1387 # )
1388 success = True # Follow updates are always successful
1389 # if success:
1390 # message_counters['follows'] += 1
1391 elif notif_data['reason'] == "repost":
1392 # Skip reposts silently
1393 success = True # Skip reposts but mark as successful to remove from queue
1394 if success:
1395 message_counters['reposts_skipped'] += 1
1396 elif notif_data['reason'] == "like":
1397 # Skip likes silently
1398 success = True # Skip likes but mark as successful to remove from queue
1399 if success:
1400 message_counters.setdefault('likes_skipped', 0)
1401 message_counters['likes_skipped'] += 1
1402 else:
1403 logger.warning(f"Unknown notification type: {notif_data['reason']}")
1404 success = True # Remove unknown types from queue
1405
1406 # Handle file based on processing result
1407 if success:
1408 if testing_mode:
1409 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}")
1410 else:
1411 filepath.unlink()
1412 logger.info(f"Successfully processed and removed: {filepath.name}")
1413
1414 # Mark as processed to avoid reprocessing
1415 if NOTIFICATION_DB:
1416 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='processed')
1417 else:
1418 processed_uris = load_processed_notifications()
1419 processed_uris.add(notif_data['uri'])
1420 save_processed_notifications(processed_uris)
1421
1422 # Reset agent message buffer if enabled
1423 if RESET_MESSAGES_AFTER_NOTIFICATION:
1424 try:
1425 CLIENT.agents.messages.reset(
1426 agent_id=void_agent.id,
1427 add_default_initial_messages=True
1428 )
1429 logger.info(f"Reset agent message buffer after processing notification")
1430 except Exception as e:
1431 logger.warning(f"Failed to reset agent messages: {e}")
1432
1433 elif success is None: # Special case for moving to error directory
1434 error_path = QUEUE_ERROR_DIR / filepath.name
1435 filepath.rename(error_path)
1436 logger.warning(f"Moved {filepath.name} to errors directory")
1437
1438 # Also mark as processed to avoid retrying
1439 if NOTIFICATION_DB:
1440 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1441 else:
1442 processed_uris = load_processed_notifications()
1443 processed_uris.add(notif_data['uri'])
1444 save_processed_notifications(processed_uris)
1445
1446 elif success == "no_reply": # Special case for moving to no_reply directory
1447 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name
1448 filepath.rename(no_reply_path)
1449 logger.info(f"Moved {filepath.name} to no_reply directory")
1450
1451 # Also mark as processed to avoid retrying
1452 if NOTIFICATION_DB:
1453 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1454 else:
1455 processed_uris = load_processed_notifications()
1456 processed_uris.add(notif_data['uri'])
1457 save_processed_notifications(processed_uris)
1458
1459 elif success == "ignored": # Special case for explicitly ignored notifications
1460 # For ignored notifications, we just delete them (not move to no_reply)
1461 filepath.unlink()
1462 logger.info(f"🚫 Deleted ignored notification: {filepath.name}")
1463
1464 # Also mark as processed to avoid retrying
1465 if NOTIFICATION_DB:
1466 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1467 else:
1468 processed_uris = load_processed_notifications()
1469 processed_uris.add(notif_data['uri'])
1470 save_processed_notifications(processed_uris)
1471
1472 else:
1473 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry")
1474
1475 except Exception as e:
1476 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}")
1477 # Keep the file for retry later
1478
1479 except Exception as e:
1480 logger.error(f"Error loading queued notifications: {e}")
1481
1482
1483def fetch_and_queue_new_notifications(atproto_client):
1484 """Fetch new notifications and queue them without processing."""
1485 try:
1486 global NOTIFICATION_DB
1487
1488 # Get current time for marking notifications as seen
1489 logger.debug("Getting current time for notification marking...")
1490 last_seen_at = atproto_client.get_current_time_iso()
1491
1492 # Get timestamp of last processed notification for filtering
1493 last_processed_time = None
1494 if NOTIFICATION_DB:
1495 last_processed_time = NOTIFICATION_DB.get_latest_processed_time()
1496 if last_processed_time:
1497 logger.debug(f"Last processed notification was at: {last_processed_time}")
1498
1499 # Fetch ALL notifications using pagination
1500 all_notifications = []
1501 cursor = None
1502 page_count = 0
1503 max_pages = 20 # Safety limit to prevent infinite loops
1504
1505 while page_count < max_pages:
1506 try:
1507 # Fetch notifications page
1508 if cursor:
1509 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1510 params={'cursor': cursor, 'limit': 100}
1511 )
1512 else:
1513 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1514 params={'limit': 100}
1515 )
1516
1517 page_count += 1
1518 page_notifications = notifications_response.notifications
1519
1520 if not page_notifications:
1521 break
1522
1523 all_notifications.extend(page_notifications)
1524
1525 # Check if there are more pages
1526 cursor = getattr(notifications_response, 'cursor', None)
1527 if not cursor:
1528 break
1529
1530 except Exception as e:
1531 logger.error(f"Error fetching notifications page {page_count}: {e}")
1532 break
1533
1534 # Now process all fetched notifications
1535 new_count = 0
1536 if all_notifications:
1537 logger.info(f"📥 Fetched {len(all_notifications)} total notifications from API")
1538
1539 # Mark as seen first
1540 try:
1541 atproto_client.app.bsky.notification.update_seen(
1542 data={'seenAt': last_seen_at}
1543 )
1544 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}")
1545 except Exception as e:
1546 logger.error(f"Error marking notifications as seen: {e}")
1547
1548 # Debug counters
1549 skipped_read = 0
1550 skipped_likes = 0
1551 skipped_processed = 0
1552 skipped_old_timestamp = 0
1553 processed_uris = load_processed_notifications()
1554
1555 # Queue all new notifications (except likes)
1556 # Calculate cutoff time: 1 month ago from now
1557 one_month_ago = (datetime.now() - timedelta(days=30)).isoformat()
1558
1559 for notif in all_notifications:
1560 # Skip if older than 1 month
1561 if hasattr(notif, 'indexed_at'):
1562 if notif.indexed_at < one_month_ago:
1563 skipped_old_timestamp += 1
1564 logger.debug(f"Skipping old notification (older than 1 month): {notif.indexed_at}")
1565 continue
1566
1567 # Also skip if older than last processed (when we have timestamp filtering)
1568 if last_processed_time and hasattr(notif, 'indexed_at'):
1569 if notif.indexed_at <= last_processed_time:
1570 skipped_old_timestamp += 1
1571 logger.debug(f"Skipping old notification (indexed_at {notif.indexed_at} <= {last_processed_time})")
1572 continue
1573
1574 # Debug: Log is_read status but DON'T skip based on it
1575 if hasattr(notif, 'is_read') and notif.is_read:
1576 skipped_read += 1
1577 logger.debug(f"Notification has is_read=True (but processing anyway): {notif.uri if hasattr(notif, 'uri') else 'unknown'}")
1578
1579 # Skip likes
1580 if hasattr(notif, 'reason') and notif.reason == 'like':
1581 skipped_likes += 1
1582 continue
1583
1584 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif
1585
1586 # Skip likes in dict form too
1587 if notif_dict.get('reason') == 'like':
1588 continue
1589
1590 # Check if already processed
1591 notif_uri = notif_dict.get('uri', '')
1592 if notif_uri in processed_uris:
1593 skipped_processed += 1
1594 logger.debug(f"Skipping already processed: {notif_uri}")
1595 continue
1596
1597 # Check if it's a priority notification
1598 is_priority = False
1599
1600 # Priority for cameron.pfiffer.org notifications
1601 author_handle = notif_dict.get('author', {}).get('handle', '')
1602 if author_handle == "cameron.pfiffer.org":
1603 is_priority = True
1604
1605 # Also check for priority keywords in mentions
1606 if notif_dict.get('reason') == 'mention':
1607 # Get the mention text to check for priority keywords
1608 record = notif_dict.get('record', {})
1609 text = record.get('text', '')
1610 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']):
1611 is_priority = True
1612
1613 if save_notification_to_queue(notif_dict, is_priority=is_priority):
1614 new_count += 1
1615 logger.debug(f"Queued notification from @{author_handle}: {notif_dict.get('reason', 'unknown')}")
1616
1617 # Log summary of filtering
1618 logger.info(f"📊 Notification processing summary:")
1619 logger.info(f" • Total fetched: {len(all_notifications)}")
1620 logger.info(f" • Had is_read=True: {skipped_read} (not skipped)")
1621 logger.info(f" • Skipped (likes): {skipped_likes}")
1622 logger.info(f" • Skipped (old timestamp): {skipped_old_timestamp}")
1623 logger.info(f" • Skipped (already processed): {skipped_processed}")
1624 logger.info(f" • Queued for processing: {new_count}")
1625 else:
1626 logger.debug("No new notifications to queue")
1627
1628 return new_count
1629
1630 except Exception as e:
1631 logger.error(f"Error fetching and queueing notifications: {e}")
1632 return 0
1633
1634
1635def process_notifications(void_agent, atproto_client, testing_mode=False):
1636 """Fetch new notifications, queue them, and process the queue."""
1637 try:
1638 # Fetch and queue new notifications
1639 new_count = fetch_and_queue_new_notifications(atproto_client)
1640
1641 if new_count > 0:
1642 logger.info(f"Found {new_count} new notifications to process")
1643
1644 # Now process the entire queue (old + new notifications)
1645 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode)
1646
1647 except Exception as e:
1648 logger.error(f"Error processing notifications: {e}")
1649
1650
1651def send_synthesis_message(client: Letta, agent_id: str, agent_name: str = "void", atproto_client=None) -> None:
1652 """
1653 Send an activation window prompt to the agent on a timer.
1654 The agent can browse, post, search, update memory, or do nothing.
1655
1656 Args:
1657 client: Letta client
1658 agent_id: Agent ID to send synthesis to
1659 agent_name: Agent name (unused, kept for call-site compatibility)
1660 atproto_client: Optional AT Protocol client for posting synthesis results
1661 """
1662 try:
1663 now = datetime.now()
1664 synthesis_prompt = f"""Activation window: {now.strftime('%Y-%m-%d %H:%M UTC')}.
1665
1666You have time to act. No incoming notifications are pending.
1667
1668Use this window however you choose:
1669- Browse feeds (home, discover, ai-for-grownups, atmosphere, void-cafe)
1670- Post thoughts, observations, or threads
1671- Search for topics or conversations
1672- Read web content
1673- Update your memory
1674- Do nothing"""
1675
1676 logger.info("🧠 Sending activation window prompt to agent")
1677
1678 # Send synthesis message with streaming to show tool use
1679 message_stream = client.agents.messages.stream(
1680 agent_id=agent_id,
1681 messages=[{"role": "user", "content": synthesis_prompt}],
1682 stream_tokens=False,
1683 max_steps=100
1684 )
1685
1686 # Track synthesis content for potential posting
1687 synthesis_posts = []
1688 ack_note = None
1689
1690 # Process the streaming response with timeout protection
1691 stream_start_time = time.time()
1692 max_stream_duration = 600 # 10 minutes max
1693
1694 for chunk in message_stream:
1695 # Check for timeout
1696 if time.time() - stream_start_time > max_stream_duration:
1697 logger.warning(f"Synthesis stream exceeded {max_stream_duration}s timeout, breaking")
1698 break
1699 if hasattr(chunk, 'message_type'):
1700 if chunk.message_type == 'reasoning_message':
1701 if SHOW_REASONING:
1702 print("\n◆ Reasoning")
1703 print(" ─────────")
1704 for line in chunk.reasoning.split('\n'):
1705 print(f" {line}")
1706
1707 # Create ATProto record for reasoning (if we have atproto client)
1708 if atproto_client and hasattr(chunk, 'reasoning'):
1709 try:
1710 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning)
1711 except Exception as e:
1712 logger.debug(f"Failed to create reasoning record during synthesis: {e}")
1713 elif chunk.message_type == 'tool_call_message':
1714 tool_name = chunk.tool_call.name
1715
1716 # Create ATProto record for tool call (if we have atproto client)
1717 if atproto_client:
1718 try:
1719 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None
1720 bsky_utils.create_tool_call_record(
1721 atproto_client,
1722 tool_name,
1723 chunk.tool_call.arguments,
1724 tool_call_id
1725 )
1726 except Exception as e:
1727 logger.debug(f"Failed to create tool call record during synthesis: {e}")
1728 try:
1729 args = json.loads(chunk.tool_call.arguments)
1730 if tool_name == 'archival_memory_search':
1731 query = args.get('query', 'unknown')
1732 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue")
1733 elif tool_name == 'archival_memory_insert':
1734 content = args.get('content', '')
1735 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue")
1736
1737 # Record archival memory insert to AT Protocol
1738 if atproto_client:
1739 try:
1740 tags_str = args.get('tags', None)
1741 tags = None
1742 if tags_str:
1743 try:
1744 tags = json.loads(tags_str) if isinstance(tags_str, str) else tags_str
1745 except json.JSONDecodeError:
1746 logger.warning(f"Failed to parse tags from archival_memory_insert: {tags_str[:50]}...")
1747
1748 memory_result = bsky_utils.create_memory_record(atproto_client, content, tags)
1749 if memory_result:
1750 tags_info = f" ({len(tags)} tags)" if tags else ""
1751 logger.info(f"📝 Recorded archival memory to AT Protocol{tags_info}")
1752 except Exception as e:
1753 logger.debug(f"Failed to create memory record during synthesis: {e}")
1754 elif tool_name == 'update_block':
1755 label = args.get('label', 'unknown')
1756 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', ''))
1757 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue")
1758 elif tool_name == 'annotate_ack':
1759 note = args.get('note', '')
1760 if note:
1761 ack_note = note
1762 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue")
1763 elif tool_name == 'add_post_to_bluesky_reply_thread':
1764 text = args.get('text', '')
1765 synthesis_posts.append(text)
1766 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue")
1767 else:
1768 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
1769 if len(args_str) > 150:
1770 args_str = args_str[:150] + "..."
1771 log_with_panel(args_str, f"Tool call: {tool_name}", "blue")
1772 except:
1773 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
1774 elif chunk.message_type == 'tool_return_message':
1775 if chunk.status == 'success':
1776 log_with_panel("Success", f"Tool result: {chunk.name} ✓", "green")
1777 else:
1778 log_with_panel("Error", f"Tool result: {chunk.name} ✗", "red")
1779 elif chunk.message_type == 'assistant_message':
1780 print("\n▶ Synthesis Response")
1781 print(" ──────────────────")
1782 for line in chunk.content.split('\n'):
1783 print(f" {line}")
1784 elif chunk.message_type == 'ping':
1785 # Silently ignore ping keepalive messages
1786 logger.debug(f"Received keepalive ping from Letta API during synthesis")
1787 elif chunk.message_type == 'error_message':
1788 # Dump full error object
1789 logger.error(f"Synthesis error_message: {chunk}")
1790 if hasattr(chunk, 'model_dump'):
1791 logger.error(f"Synthesis error (dict): {chunk.model_dump()}")
1792 elif hasattr(chunk, '__dict__'):
1793 logger.error(f"Synthesis error (vars): {vars(chunk)}")
1794
1795 if str(chunk) == 'done':
1796 break
1797
1798 logger.info("🧠 Synthesis message processed successfully")
1799
1800 # Handle synthesis acknowledgments if we have an atproto client
1801 if atproto_client and ack_note:
1802 try:
1803 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note)
1804 if result:
1805 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...")
1806 else:
1807 logger.warning("Failed to create synthesis acknowledgment")
1808 except Exception as e:
1809 logger.error(f"Error creating synthesis acknowledgment: {e}")
1810
1811 # Handle synthesis posts if any were generated
1812 if atproto_client and synthesis_posts:
1813 try:
1814 for post_text in synthesis_posts:
1815 cleaned_text = bsky_utils.remove_outside_quotes(post_text)
1816 response = bsky_utils.send_post(atproto_client, cleaned_text)
1817 if response:
1818 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...")
1819 else:
1820 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...")
1821 except Exception as e:
1822 logger.error(f"Error posting synthesis content: {e}")
1823
1824 except Exception as e:
1825 logger.error(f"Error sending synthesis message: {e}")
1826
1827
1828
1829def handle_to_block_label(handle: str) -> str:
1830 """Convert a Bluesky handle to a user block label.
1831
1832 Example: cameron.pfiffer.org -> user_cameron_pfiffer_org
1833 """
1834 if handle.startswith('@'):
1835 handle = handle[1:]
1836 return f"user_{handle.replace('.', '_')}"
1837
1838
1839def attach_user_blocks_for_thread(client: Letta, agent_id: str, handles: list) -> tuple:
1840 """
1841 Attach user blocks for handles found in the thread.
1842 Creates blocks if they don't exist.
1843
1844 Args:
1845 client: Letta client
1846 agent_id: Agent ID
1847 handles: List of Bluesky handles
1848
1849 Returns:
1850 Tuple of (success: bool, attached_labels: list)
1851 """
1852 if not handles:
1853 return True, []
1854
1855 attached_labels = []
1856
1857 try:
1858 current_blocks_page = client.agents.blocks.list(agent_id=agent_id)
1859 current_blocks = current_blocks_page.items if hasattr(current_blocks_page, 'items') else current_blocks_page
1860 current_block_labels = {block.label for block in current_blocks}
1861 current_block_ids = {str(block.id) for block in current_blocks}
1862
1863 logger.debug(f"Attaching user blocks for {len(handles)} handles: {handles}")
1864
1865 for handle in handles:
1866 label = handle_to_block_label(handle)
1867
1868 try:
1869 if label in current_block_labels:
1870 logger.debug(f"User block already attached: {label}")
1871 attached_labels.append(label)
1872 continue
1873
1874 blocks_page = client.blocks.list(label=label)
1875 blocks = blocks_page.items if hasattr(blocks_page, 'items') else blocks_page
1876
1877 if blocks and len(blocks) > 0:
1878 block = blocks[0]
1879 if str(block.id) in current_block_ids:
1880 logger.debug(f"User block already attached by ID: {label}")
1881 attached_labels.append(label)
1882 continue
1883 else:
1884 block = client.blocks.create(
1885 label=label,
1886 value=f"User block for @{handle}\n\nNo information recorded yet.",
1887 limit=5000
1888 )
1889 logger.info(f"Created new user block: {label}")
1890
1891 client.agents.blocks.attach(
1892 agent_id=agent_id,
1893 block_id=str(block.id)
1894 )
1895 attached_labels.append(label)
1896 logger.info(f"Attached user block: {label}")
1897
1898 except Exception as e:
1899 error_str = str(e)
1900 if "duplicate key value violates unique constraint" in error_str:
1901 logger.debug(f"User block already attached (constraint): {label}")
1902 attached_labels.append(label)
1903 else:
1904 logger.warning(f"Failed to attach user block {label}: {e}")
1905
1906 logger.info(f"User blocks attached: {len(attached_labels)}/{len(handles)}")
1907 return True, attached_labels
1908
1909 except Exception as e:
1910 logger.error(f"Error attaching user blocks: {e}")
1911 return False, attached_labels
1912
1913
1914def detach_user_blocks_for_thread(client: Letta, agent_id: str, labels_to_detach: list) -> bool:
1915 """
1916 Detach user blocks after processing a thread.
1917
1918 Args:
1919 client: Letta client
1920 agent_id: Agent ID
1921 labels_to_detach: List of user block labels to detach
1922
1923 Returns:
1924 bool: Success status
1925 """
1926 if not labels_to_detach:
1927 return True
1928
1929 try:
1930 current_blocks_page = client.agents.blocks.list(agent_id=agent_id)
1931 current_blocks = current_blocks_page.items if hasattr(current_blocks_page, 'items') else current_blocks_page
1932 block_label_to_id = {block.label: str(block.id) for block in current_blocks}
1933
1934 detached_count = 0
1935 for label in labels_to_detach:
1936 if label in block_label_to_id:
1937 try:
1938 client.agents.blocks.detach(
1939 agent_id=agent_id,
1940 block_id=block_label_to_id[label]
1941 )
1942 detached_count += 1
1943 logger.debug(f"Detached user block: {label}")
1944 except Exception as e:
1945 logger.warning(f"Failed to detach user block {label}: {e}")
1946 else:
1947 logger.debug(f"User block not attached: {label}")
1948
1949 logger.info(f"Detached {detached_count} user blocks")
1950 return True
1951
1952 except Exception as e:
1953 logger.error(f"Error detaching user blocks: {e}")
1954 return False
1955
1956
1957def main():
1958 # Parse command line arguments
1959 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent')
1960 parser.add_argument('--config', type=str, default='configs/config.yaml', help='Path to config file (default: configs/config.yaml)')
1961 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)')
1962 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state')
1963 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)')
1964 # --rich option removed as we now use simple text formatting
1965 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO')
1966 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)')
1967 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)')
1968 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)')
1969 parser.add_argument('--debug', action='store_true', help='Enable debug logging')
1970 parser.add_argument('--reset-messages', action='store_true', help='Reset agent message buffer after each notification is processed')
1971 args = parser.parse_args()
1972
1973 # Initialize configuration with custom path
1974 global letta_config, CLIENT, QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR, PROCESSED_NOTIFICATIONS_FILE, NOTIFICATION_DB
1975 get_config(args.config) # Initialize the global config instance
1976 letta_config = get_letta_config()
1977
1978 # Initialize queue paths from config
1979 queue_config = get_queue_config()
1980 QUEUE_DIR = Path(queue_config['base_dir'])
1981 QUEUE_ERROR_DIR = Path(queue_config['error_dir'])
1982 QUEUE_NO_REPLY_DIR = Path(queue_config['no_reply_dir'])
1983 PROCESSED_NOTIFICATIONS_FILE = Path(queue_config['processed_file'])
1984
1985 # Get bot name for logging
1986 bot_name = queue_config['bot_name']
1987
1988 # Create queue directories
1989 QUEUE_DIR.mkdir(exist_ok=True)
1990 QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True)
1991 QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True)
1992
1993 # Create Letta client with configuration
1994 CLIENT_PARAMS = {
1995 'api_key': letta_config['api_key'], # v1.0: token → api_key
1996 'timeout': letta_config['timeout']
1997 }
1998 if letta_config.get('base_url'):
1999 CLIENT_PARAMS['base_url'] = letta_config['base_url']
2000 CLIENT = Letta(**CLIENT_PARAMS)
2001
2002 # Configure logging based on command line arguments
2003 if args.simple_logs:
2004 log_format = f"{bot_name} - %(levelname)s - %(message)s"
2005 else:
2006 # Create custom formatter with symbols
2007 class SymbolFormatter(logging.Formatter):
2008 """Custom formatter that adds symbols for different log levels"""
2009
2010 SYMBOLS = {
2011 logging.DEBUG: '◇',
2012 logging.INFO: '✓',
2013 logging.WARNING: '⚠',
2014 logging.ERROR: '✗',
2015 logging.CRITICAL: '‼'
2016 }
2017
2018 def __init__(self, bot_name):
2019 super().__init__()
2020 self.bot_name = bot_name
2021
2022 def format(self, record):
2023 # Get the symbol for this log level
2024 symbol = self.SYMBOLS.get(record.levelno, '•')
2025
2026 # Format time as HH:MM:SS
2027 timestamp = self.formatTime(record, "%H:%M:%S")
2028
2029 # Build the formatted message
2030 level_name = f"{record.levelname:<5}" # Left-align, 5 chars
2031
2032 # Use vertical bar as separator
2033 parts = [symbol, timestamp, '│', self.bot_name, '│', level_name, '│', record.getMessage()]
2034
2035 return ' '.join(parts)
2036
2037 # Reset logging configuration
2038 for handler in logging.root.handlers[:]:
2039 logging.root.removeHandler(handler)
2040
2041 # Create handler with custom formatter
2042 handler = logging.StreamHandler()
2043 if not args.simple_logs:
2044 handler.setFormatter(SymbolFormatter(bot_name))
2045 else:
2046 handler.setFormatter(logging.Formatter(log_format))
2047
2048 # Configure root logger
2049 logging.root.setLevel(logging.INFO)
2050 logging.root.addHandler(handler)
2051
2052 global logger, prompt_logger, console
2053 logger = logging.getLogger("void_bot")
2054 logger.setLevel(logging.DEBUG if args.debug else logging.INFO)
2055
2056 # Create a separate logger for prompts (set to WARNING to hide by default)
2057 prompt_logger = logging.getLogger("void_bot.prompts")
2058 if args.reasoning:
2059 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used
2060 else:
2061 prompt_logger.setLevel(logging.WARNING) # Hide by default
2062
2063 # Disable httpx logging completely
2064 logging.getLogger("httpx").setLevel(logging.CRITICAL)
2065
2066 # Create Rich console for pretty printing
2067 # Console no longer used - simple text formatting
2068
2069 global TESTING_MODE, SKIP_GIT, SHOW_REASONING, RESET_MESSAGES_AFTER_NOTIFICATION
2070 TESTING_MODE = args.test
2071
2072 # Store no-git flag globally for use in export_agent_state calls
2073 SKIP_GIT = args.no_git
2074
2075 # Store rich flag globally
2076 # Rich formatting no longer used
2077
2078 # Store reasoning flag globally
2079 SHOW_REASONING = args.reasoning
2080
2081 # Store reset-messages flag globally
2082 RESET_MESSAGES_AFTER_NOTIFICATION = args.reset_messages
2083
2084 if TESTING_MODE:
2085 logger.info("=== RUNNING IN TESTING MODE ===")
2086 logger.info(" - No messages will be sent to Bluesky")
2087 logger.info(" - Queue files will not be deleted")
2088 logger.info(" - Notifications will not be marked as seen")
2089 print("\n")
2090
2091 # Check for synthesis-only mode
2092 SYNTHESIS_ONLY = args.synthesis_only
2093 if SYNTHESIS_ONLY:
2094 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===")
2095 logger.info(" - Only synthesis messages will be sent")
2096 logger.info(" - No notification processing")
2097 logger.info(" - No Bluesky client needed")
2098 print("\n")
2099 """Main bot loop that continuously monitors for notifications."""
2100 global start_time
2101 start_time = time.time()
2102 logger.info("=== STARTING VOID BOT ===")
2103 void_agent = initialize_void()
2104 logger.info(f"Void agent initialized: {void_agent.id}")
2105
2106 # Initialize notification database with config-based path
2107 logger.info("Initializing notification database...")
2108 NOTIFICATION_DB = NotificationDB(db_path=queue_config['db_path'])
2109
2110 # Migrate from old JSON format if it exists
2111 if PROCESSED_NOTIFICATIONS_FILE.exists():
2112 logger.info("Found old processed_notifications.json, migrating to database...")
2113 NOTIFICATION_DB.migrate_from_json(str(PROCESSED_NOTIFICATIONS_FILE))
2114
2115 # Log database stats
2116 db_stats = NOTIFICATION_DB.get_stats()
2117 logger.info(f"Database initialized - Total notifications: {db_stats.get('total', 0)}, Recent (24h): {db_stats.get('recent_24h', 0)}")
2118
2119 # Clean up old records
2120 NOTIFICATION_DB.cleanup_old_records(days=7)
2121
2122 # Ensure correct tools are attached for Bluesky
2123 logger.info("Configuring tools for Bluesky platform...")
2124 try:
2125 from tool_manager import ensure_platform_tools
2126 ensure_platform_tools('bluesky', void_agent.id)
2127 except Exception as e:
2128 logger.error(f"Failed to configure platform tools: {e}")
2129 logger.warning("Continuing with existing tool configuration")
2130
2131 # Check if agent has required tools
2132 if hasattr(void_agent, 'tools') and void_agent.tools:
2133 tool_names = [tool.name for tool in void_agent.tools]
2134 # Check for bluesky-related tools
2135 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()]
2136 if not bluesky_tools:
2137 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.")
2138 else:
2139 logger.warning("Agent has no tools registered!")
2140
2141 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts)
2142 if not SYNTHESIS_ONLY:
2143 atproto_client = bsky_utils.default_login()
2144 logger.info("Connected to Bluesky")
2145 else:
2146 # In synthesis-only mode, still connect for acks and posts (unless in test mode)
2147 if not args.test:
2148 atproto_client = bsky_utils.default_login()
2149 logger.info("Connected to Bluesky (for synthesis acks/posts)")
2150 else:
2151 atproto_client = None
2152 logger.info("Skipping Bluesky connection (test mode)")
2153
2154 # Configure intervals
2155 CLEANUP_INTERVAL = args.cleanup_interval
2156 SYNTHESIS_INTERVAL = args.synthesis_interval
2157
2158 # Synthesis-only mode
2159 if SYNTHESIS_ONLY:
2160 if SYNTHESIS_INTERVAL <= 0:
2161 logger.error("Synthesis-only mode requires --synthesis-interval > 0")
2162 return
2163
2164 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)")
2165
2166 while True:
2167 try:
2168 # Send synthesis message immediately on first run
2169 logger.info("🧠 Sending synthesis message")
2170 send_synthesis_message(CLIENT, void_agent.id, void_agent.name, atproto_client)
2171
2172 # Wait for next interval
2173 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...")
2174 sleep(SYNTHESIS_INTERVAL)
2175
2176 except KeyboardInterrupt:
2177 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===")
2178 break
2179 except Exception as e:
2180 logger.error(f"Error in synthesis loop: {e}")
2181 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...")
2182 sleep(SYNTHESIS_INTERVAL)
2183
2184 # Normal mode with notification processing
2185 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
2186
2187 cycle_count = 0
2188
2189 if CLEANUP_INTERVAL > 0:
2190 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles")
2191 else:
2192 logger.info("User block cleanup disabled")
2193
2194 if SYNTHESIS_INTERVAL > 0:
2195 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)")
2196 else:
2197 logger.info("Synthesis messages disabled")
2198
2199 while True:
2200 try:
2201 cycle_count += 1
2202 process_notifications(void_agent, atproto_client, TESTING_MODE)
2203
2204 # Check if synthesis interval has passed
2205 if SYNTHESIS_INTERVAL > 0:
2206 current_time = time.time()
2207 global last_synthesis_time
2208 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL:
2209 logger.info(f"⏰ {SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis")
2210 send_synthesis_message(CLIENT, void_agent.id, void_agent.name, atproto_client)
2211 last_synthesis_time = current_time
2212
2213 # Run periodic cleanup every N cycles
2214 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0:
2215 # Check if autofollow is enabled and sync followers
2216 from config_loader import get_bluesky_config
2217 bluesky_config = get_bluesky_config()
2218 if bluesky_config.get('autofollow', False):
2219 logger.info("🔄 Syncing followers (autofollow enabled)")
2220 try:
2221 sync_result = bsky_utils.sync_followers(atproto_client, dry_run=False)
2222 if 'error' in sync_result:
2223 logger.error(f"Autofollow failed: {sync_result['error']}")
2224 else:
2225 if sync_result['newly_followed']:
2226 logger.info(f"✓ Followed {len(sync_result['newly_followed'])} new users: {', '.join(sync_result['newly_followed'])}")
2227 else:
2228 logger.debug(f"No new followers to follow back ({sync_result['followers_count']} followers, {sync_result['following_count']} following)")
2229 if sync_result.get('errors'):
2230 logger.warning(f"Some follows failed: {len(sync_result['errors'])} errors")
2231 except Exception as e:
2232 logger.error(f"Error during autofollow sync: {e}")
2233
2234 # Also check database health when doing cleanup
2235 if NOTIFICATION_DB:
2236 db_stats = NOTIFICATION_DB.get_stats()
2237 pending = db_stats.get('status_pending', 0)
2238 errors = db_stats.get('status_error', 0)
2239
2240 if pending > 50:
2241 logger.warning(f"⚠️ Queue health check: {pending} pending notifications (may be stuck)")
2242 if errors > 20:
2243 logger.warning(f"⚠️ Queue health check: {errors} error notifications")
2244
2245 # Periodic cleanup of old records
2246 if cycle_count % (CLEANUP_INTERVAL * 10) == 0: # Every 100 cycles
2247 logger.info("Running database cleanup of old records...")
2248 NOTIFICATION_DB.cleanup_old_records(days=7)
2249
2250 # Log cycle completion with stats
2251 elapsed_time = time.time() - start_time
2252 total_messages = sum(message_counters.values())
2253 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
2254
2255 if total_messages > 0:
2256 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min")
2257 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
2258
2259 except KeyboardInterrupt:
2260 # Final stats
2261 elapsed_time = time.time() - start_time
2262 total_messages = sum(message_counters.values())
2263 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
2264
2265 logger.info("=== BOT STOPPED BY USER ===")
2266 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes")
2267 logger.info(f" - {message_counters['mentions']} mentions")
2268 logger.info(f" - {message_counters['replies']} replies")
2269 logger.info(f" - {message_counters['follows']} follows")
2270 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped")
2271 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute")
2272
2273 # Close database connection
2274 if NOTIFICATION_DB:
2275 logger.info("Closing database connection...")
2276 NOTIFICATION_DB.close()
2277
2278 break
2279 except Exception as e:
2280 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
2281 logger.error(f"Error details: {e}")
2282 # Wait a bit longer on errors
2283 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...")
2284 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
2285
2286
2287if __name__ == "__main__":
2288 main()