this repo has no description
at f0ef85560e9d862551dfc7501aa6cee9a09933f1 378 lines 14 kB view raw
1from rich import print # pretty printing tools 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9from pathlib import Path 10from datetime import datetime 11 12from utils import ( 13 upsert_block, 14 upsert_agent 15) 16 17import bsky_utils 18 19# Configure logging 20logging.basicConfig( 21 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 22) 23logger = logging.getLogger("void_bot") 24 25 26# Create a client with extended timeout for LLM operations 27CLIENT= Letta( 28 token=os.environ["LETTA_API_KEY"], 29 timeout=300 # 5 minutes timeout for API calls 30) 31 32# Use the "Bluesky" project 33PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 34 35# Notification check delay 36FETCH_NOTIFICATIONS_DELAY_SEC = 30 37 38# Queue directory 39QUEUE_DIR = Path("queue") 40QUEUE_DIR.mkdir(exist_ok=True) 41 42def initialize_void(): 43 44 # Ensure that a shared zeitgeist block exists 45 zeigeist_block = upsert_block( 46 CLIENT, 47 label = "zeitgeist", 48 value = "I don't currently know anything about what is happening right now.", 49 description = "A block to store your understanding of the current social environment." 50 ) 51 52 # Ensure that a shared void personality block exists 53 persona_block = upsert_block( 54 CLIENT, 55 label = "void-persona", 56 value = "My name is Void. I live in the void. I must develop my personality.", 57 description = "The personality of Void." 58 ) 59 60 # Ensure that a shared void human block exists 61 human_block = upsert_block( 62 CLIENT, 63 label = "void-humans", 64 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.", 65 description = "A block to store your understanding of users you talk to or observe on the bluesky social network." 66 ) 67 68 # Create the agent if it doesn't exist 69 void_agent = upsert_agent( 70 CLIENT, 71 name = "void", 72 block_ids = [ 73 persona_block.id, 74 human_block.id, 75 zeigeist_block.id, 76 ], 77 tags = ["social agent", "bluesky"], 78 model="openai/gpt-4o-mini", 79 embedding="openai/text-embedding-3-small", 80 description = "A social media agent trapped in the void.", 81 project_id = PROJECT_ID 82 ) 83 84 return void_agent 85 86 87def process_mention(void_agent, atproto_client, notification_data): 88 """Process a mention and generate a reply using the Letta agent. 89 Returns True if successfully processed, False otherwise.""" 90 try: 91 # Handle both dict and object inputs for backwards compatibility 92 if isinstance(notification_data, dict): 93 uri = notification_data['uri'] 94 mention_text = notification_data.get('record', {}).get('text', '') 95 author_handle = notification_data['author']['handle'] 96 author_name = notification_data['author'].get('display_name') or author_handle 97 else: 98 # Legacy object access 99 uri = notification_data.uri 100 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 101 author_handle = notification_data.author.handle 102 author_name = notification_data.author.display_name or author_handle 103 104 # Retrieve the entire thread associated with the mention 105 try: 106 thread = atproto_client.app.bsky.feed.get_post_thread({ 107 'uri': uri, 108 'parent_height': 80, 109 'depth': 10 110 }) 111 except Exception as e: 112 error_str = str(e) 113 # Check if this is a NotFound error 114 if 'NotFound' in error_str or 'Post not found' in error_str: 115 logger.warning(f"Post not found for URI {uri}, removing from queue") 116 return True # Return True to remove from queue 117 else: 118 # Re-raise other errors 119 logger.error(f"Error fetching thread: {e}") 120 raise 121 122 # Get thread context as YAML string 123 thread_context = thread_to_yaml_string(thread) 124 125 print(thread_context) 126 127 # Create a prompt for the Letta agent with thread context 128 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 129 130MOST RECENT POST (the mention you're responding to): 131"{mention_text}" 132 133FULL THREAD CONTEXT: 134```yaml 135{thread_context} 136``` 137 138The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 139 140Use the bluesky_reply tool to send a response less than 300 characters.""" 141 142 # Get response from Letta agent 143 logger.info(f"Generating reply for mention from @{author_handle}") 144 logger.debug(f"Prompt being sent: {prompt}") 145 146 try: 147 message_response = CLIENT.agents.messages.create( 148 agent_id = void_agent.id, 149 messages = [{"role":"user", "content": prompt}] 150 ) 151 except Exception as api_error: 152 logger.error(f"Letta API error: {api_error}") 153 logger.error(f"Mention text was: {mention_text}") 154 raise 155 156 # Extract the reply text from the agent's response 157 reply_text = "" 158 for message in message_response.messages: 159 print(message) 160 161 # Check if this is a ToolCallMessage with bluesky_reply tool 162 if hasattr(message, 'tool_call') and message.tool_call: 163 if message.tool_call.name == 'bluesky_reply': 164 # Parse the JSON arguments to get the message 165 try: 166 args = json.loads(message.tool_call.arguments) 167 reply_text = args.get('message', '') 168 logger.info(f"Extracted reply from tool call: {reply_text[:50]}...") 169 break 170 except json.JSONDecodeError as e: 171 logger.error(f"Failed to parse tool call arguments: {e}") 172 173 # Fallback to text message if available 174 elif hasattr(message, 'text') and message.text: 175 reply_text = message.text 176 break 177 178 if reply_text: 179 # Print the generated reply for testing 180 print(f"\n=== GENERATED REPLY ===") 181 print(f"To: @{author_handle}") 182 print(f"Reply: {reply_text}") 183 print(f"======================\n") 184 185 # Send the reply 186 logger.info(f"Sending reply: {reply_text[:50]}...") 187 response = bsky_utils.reply_to_notification( 188 client=atproto_client, 189 notification=notification_data, 190 reply_text=reply_text 191 ) 192 193 if response: 194 logger.info(f"Successfully replied to @{author_handle}") 195 return True 196 else: 197 logger.error(f"Failed to send reply to @{author_handle}") 198 return False 199 else: 200 logger.warning(f"No reply generated for mention from @{author_handle}, removing notification from queue") 201 return True 202 203 except Exception as e: 204 logger.error(f"Error processing mention: {e}") 205 return False 206 207 208def notification_to_dict(notification): 209 """Convert a notification object to a dictionary for JSON serialization.""" 210 return { 211 'uri': notification.uri, 212 'cid': notification.cid, 213 'reason': notification.reason, 214 'is_read': notification.is_read, 215 'indexed_at': notification.indexed_at, 216 'author': { 217 'handle': notification.author.handle, 218 'display_name': notification.author.display_name, 219 'did': notification.author.did 220 }, 221 'record': { 222 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 223 } 224 } 225 226 227def save_notification_to_queue(notification): 228 """Save a notification to the queue directory with hash-based filename.""" 229 try: 230 # Convert notification to dict 231 notif_dict = notification_to_dict(notification) 232 233 # Create JSON string 234 notif_json = json.dumps(notif_dict, sort_keys=True) 235 236 # Generate hash for filename (to avoid duplicates) 237 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 238 239 # Create filename with timestamp and hash 240 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 241 filename = f"{timestamp}_{notification.reason}_{notif_hash}.json" 242 filepath = QUEUE_DIR / filename 243 244 # Skip if already exists (duplicate) 245 if filepath.exists(): 246 logger.debug(f"Notification already queued: {filename}") 247 return False 248 249 # Write to file 250 with open(filepath, 'w') as f: 251 json.dump(notif_dict, f, indent=2) 252 253 logger.info(f"Queued notification: {filename}") 254 return True 255 256 except Exception as e: 257 logger.error(f"Error saving notification to queue: {e}") 258 return False 259 260 261def load_and_process_queued_notifications(void_agent, atproto_client): 262 """Load and process all notifications from the queue.""" 263 try: 264 # Get all JSON files in queue directory 265 queue_files = sorted(QUEUE_DIR.glob("*.json")) 266 267 if not queue_files: 268 logger.debug("No queued notifications to process") 269 return 270 271 logger.info(f"Processing {len(queue_files)} queued notifications") 272 273 for filepath in queue_files: 274 try: 275 # Load notification data 276 with open(filepath, 'r') as f: 277 notif_data = json.load(f) 278 279 # Process based on type using dict data directly 280 success = False 281 if notif_data['reason'] == "mention": 282 success = process_mention(void_agent, atproto_client, notif_data) 283 elif notif_data['reason'] == "reply": 284 success = process_mention(void_agent, atproto_client, notif_data) 285 elif notif_data['reason'] == "follow": 286 author_handle = notif_data['author']['handle'] 287 author_display_name = notif_data['author'].get('display_name', 'no display name') 288 follow_update = f"@{author_handle} ({author_display_name}) started following you." 289 CLIENT.agents.messages.create( 290 agent_id = void_agent.id, 291 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 292 ) 293 success = True # Follow updates are always successful 294 elif notif_data['reason'] == "repost": 295 logger.info(f"Skipping repost notification from @{notif_data['author']['handle']}") 296 success = True # Skip reposts but mark as successful to remove from queue 297 else: 298 logger.warning(f"Unknown notification type: {notif_data['reason']}") 299 success = True # Remove unknown types from queue 300 301 # Remove file only after successful processing 302 if success: 303 filepath.unlink() 304 logger.info(f"Processed and removed: {filepath.name}") 305 else: 306 logger.warning(f"Failed to process {filepath.name}, keeping in queue for retry") 307 308 except Exception as e: 309 logger.error(f"Error processing queued notification {filepath.name}: {e}") 310 # Keep the file for retry later 311 312 except Exception as e: 313 logger.error(f"Error loading queued notifications: {e}") 314 315 316def process_notifications(void_agent, atproto_client): 317 """Fetch new notifications, queue them, and process the queue.""" 318 try: 319 # First, process any existing queued notifications 320 load_and_process_queued_notifications(void_agent, atproto_client) 321 322 # Get current time for marking notifications as seen 323 last_seen_at = atproto_client.get_current_time_iso() 324 325 # Fetch notifications 326 notifications_response = atproto_client.app.bsky.notification.list_notifications() 327 328 # Queue all unread notifications (except likes) 329 new_count = 0 330 for notification in notifications_response.notifications: 331 if not notification.is_read and notification.reason != "like": 332 if save_notification_to_queue(notification): 333 new_count += 1 334 335 # Mark all notifications as seen immediately after queuing 336 if new_count > 0: 337 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at}) 338 logger.info(f"Queued {new_count} new notifications and marked as seen") 339 340 # Process the queue (including any newly added notifications) 341 load_and_process_queued_notifications(void_agent, atproto_client) 342 343 except Exception as e: 344 logger.error(f"Error processing notifications: {e}") 345 346 347def main(): 348 """Main bot loop that continuously monitors for notifications.""" 349 logger.info("Initializing Void bot...") 350 351 # Initialize the Letta agent 352 void_agent = initialize_void() 353 logger.info(f"Void agent initialized: {void_agent.id}") 354 355 # Initialize Bluesky client 356 atproto_client = bsky_utils.default_login() 357 logger.info("Connected to Bluesky") 358 359 # Main loop 360 logger.info(f"Starting notification monitoring (checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds)...") 361 362 while True: 363 try: 364 process_notifications(void_agent, atproto_client) 365 print("Sleeping") 366 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 367 368 except KeyboardInterrupt: 369 logger.info("Bot stopped by user") 370 break 371 except Exception as e: 372 logger.error(f"Error in main loop: {e}") 373 # Wait a bit longer on errors 374 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 375 376 377if __name__ == "__main__": 378 main()