a digital person for bluesky
at 42e4d0055155dada2db9c31df06a3238decf3328 367 lines 14 kB view raw
1from rich import print # pretty printing tools 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9from pathlib import Path 10from datetime import datetime 11 12from utils import ( 13 upsert_block, 14 upsert_agent 15) 16 17import bsky_utils 18 19# Configure logging 20logging.basicConfig( 21 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 22) 23logger = logging.getLogger("void_bot") 24 25 26# Create a client with extended timeout for LLM operations 27CLIENT= Letta( 28 token=os.environ["LETTA_API_KEY"], 29 timeout=300 # 5 minutes timeout for API calls 30) 31 32# Use the "Bluesky" project 33PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 34 35# Notification check delay 36FETCH_NOTIFICATIONS_DELAY_SEC = 30 37 38# Queue directory 39QUEUE_DIR = Path("queue") 40QUEUE_DIR.mkdir(exist_ok=True) 41 42def initialize_void(): 43 44 # Ensure that a shared zeitgeist block exists 45 zeigeist_block = upsert_block( 46 CLIENT, 47 label = "zeitgeist", 48 value = "I don't currently know anything about what is happening right now.", 49 description = "A block to store your understanding of the current social environment." 50 ) 51 52 # Ensure that a shared void personality block exists 53 persona_block = upsert_block( 54 CLIENT, 55 label = "void-persona", 56 value = "My name is Void. I live in the void. I must develop my personality.", 57 description = "The personality of Void." 58 ) 59 60 # Ensure that a shared void human block exists 61 human_block = upsert_block( 62 CLIENT, 63 label = "void-humans", 64 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.", 65 description = "A block to store your understanding of users you talk to or observe on the bluesky social network." 66 ) 67 68 # Create the agent if it doesn't exist 69 void_agent = upsert_agent( 70 CLIENT, 71 name = "void", 72 block_ids = [ 73 persona_block.id, 74 human_block.id, 75 zeigeist_block.id, 76 ], 77 tags = ["social agent", "bluesky"], 78 model="openai/gpt-4o-mini", 79 embedding="openai/text-embedding-3-small", 80 description = "A social media agent trapped in the void.", 81 project_id = PROJECT_ID 82 ) 83 84 return void_agent 85 86 87def process_mention(void_agent, atproto_client, notification_data): 88 """Process a mention and generate a reply using the Letta agent. 89 Returns True if successfully processed, False otherwise.""" 90 try: 91 # Handle both dict and object inputs for backwards compatibility 92 if isinstance(notification_data, dict): 93 uri = notification_data['uri'] 94 mention_text = notification_data.get('record', {}).get('text', '') 95 author_handle = notification_data['author']['handle'] 96 author_name = notification_data['author'].get('display_name') or author_handle 97 else: 98 # Legacy object access 99 uri = notification_data.uri 100 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 101 author_handle = notification_data.author.handle 102 author_name = notification_data.author.display_name or author_handle 103 104 # Retrieve the entire thread associated with the mention 105 thread = atproto_client.app.bsky.feed.get_post_thread({ 106 'uri': uri, 107 'parent_height': 80, 108 'depth': 10 109 }) 110 111 # Get thread context as YAML string 112 thread_context = thread_to_yaml_string(thread) 113 114 print(thread_context) 115 116 # Create a prompt for the Letta agent with thread context 117 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 118 119MOST RECENT POST (the mention you're responding to): 120"{mention_text}" 121 122FULL THREAD CONTEXT: 123```yaml 124{thread_context} 125``` 126 127The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 128 129Use the bluesky_reply tool to send a response less than 300 characters.""" 130 131 # Get response from Letta agent 132 logger.info(f"Generating reply for mention from @{author_handle}") 133 logger.debug(f"Prompt being sent: {prompt}") 134 135 try: 136 message_response = CLIENT.agents.messages.create( 137 agent_id = void_agent.id, 138 messages = [{"role":"user", "content": prompt}] 139 ) 140 except Exception as api_error: 141 logger.error(f"Letta API error: {api_error}") 142 logger.error(f"Mention text was: {mention_text}") 143 raise 144 145 # Extract the reply text from the agent's response 146 reply_text = "" 147 for message in message_response.messages: 148 print(message) 149 150 # Check if this is a ToolCallMessage with bluesky_reply tool 151 if hasattr(message, 'tool_call') and message.tool_call: 152 if message.tool_call.name == 'bluesky_reply': 153 # Parse the JSON arguments to get the message 154 try: 155 args = json.loads(message.tool_call.arguments) 156 reply_text = args.get('message', '') 157 logger.info(f"Extracted reply from tool call: {reply_text[:50]}...") 158 break 159 except json.JSONDecodeError as e: 160 logger.error(f"Failed to parse tool call arguments: {e}") 161 162 # Fallback to text message if available 163 elif hasattr(message, 'text') and message.text: 164 reply_text = message.text 165 break 166 167 if reply_text: 168 # Print the generated reply for testing 169 print(f"\n=== GENERATED REPLY ===") 170 print(f"To: @{author_handle}") 171 print(f"Reply: {reply_text}") 172 print(f"======================\n") 173 174 # Send the reply 175 logger.info(f"Sending reply: {reply_text[:50]}...") 176 response = bsky_utils.reply_to_notification( 177 client=atproto_client, 178 notification=notification_data, 179 reply_text=reply_text 180 ) 181 182 if response: 183 logger.info(f"Successfully replied to @{author_handle}") 184 return True 185 else: 186 logger.error(f"Failed to send reply to @{author_handle}") 187 return False 188 else: 189 logger.warning(f"No reply generated for mention from @{author_handle}, removing notification from queue") 190 return True 191 192 except Exception as e: 193 logger.error(f"Error processing mention: {e}") 194 return False 195 196 197def notification_to_dict(notification): 198 """Convert a notification object to a dictionary for JSON serialization.""" 199 return { 200 'uri': notification.uri, 201 'cid': notification.cid, 202 'reason': notification.reason, 203 'is_read': notification.is_read, 204 'indexed_at': notification.indexed_at, 205 'author': { 206 'handle': notification.author.handle, 207 'display_name': notification.author.display_name, 208 'did': notification.author.did 209 }, 210 'record': { 211 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 212 } 213 } 214 215 216def save_notification_to_queue(notification): 217 """Save a notification to the queue directory with hash-based filename.""" 218 try: 219 # Convert notification to dict 220 notif_dict = notification_to_dict(notification) 221 222 # Create JSON string 223 notif_json = json.dumps(notif_dict, sort_keys=True) 224 225 # Generate hash for filename (to avoid duplicates) 226 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 227 228 # Create filename with timestamp and hash 229 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 230 filename = f"{timestamp}_{notification.reason}_{notif_hash}.json" 231 filepath = QUEUE_DIR / filename 232 233 # Skip if already exists (duplicate) 234 if filepath.exists(): 235 logger.debug(f"Notification already queued: {filename}") 236 return False 237 238 # Write to file 239 with open(filepath, 'w') as f: 240 json.dump(notif_dict, f, indent=2) 241 242 logger.info(f"Queued notification: {filename}") 243 return True 244 245 except Exception as e: 246 logger.error(f"Error saving notification to queue: {e}") 247 return False 248 249 250def load_and_process_queued_notifications(void_agent, atproto_client): 251 """Load and process all notifications from the queue.""" 252 try: 253 # Get all JSON files in queue directory 254 queue_files = sorted(QUEUE_DIR.glob("*.json")) 255 256 if not queue_files: 257 logger.debug("No queued notifications to process") 258 return 259 260 logger.info(f"Processing {len(queue_files)} queued notifications") 261 262 for filepath in queue_files: 263 try: 264 # Load notification data 265 with open(filepath, 'r') as f: 266 notif_data = json.load(f) 267 268 # Process based on type using dict data directly 269 success = False 270 if notif_data['reason'] == "mention": 271 success = process_mention(void_agent, atproto_client, notif_data) 272 elif notif_data['reason'] == "reply": 273 success = process_mention(void_agent, atproto_client, notif_data) 274 elif notif_data['reason'] == "follow": 275 author_handle = notif_data['author']['handle'] 276 author_display_name = notif_data['author'].get('display_name', 'no display name') 277 follow_update = f"@{author_handle} ({author_display_name}) started following you." 278 CLIENT.agents.messages.create( 279 agent_id = void_agent.id, 280 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 281 ) 282 success = True # Follow updates are always successful 283 elif notif_data['reason'] == "repost": 284 logger.info(f"Skipping repost notification from @{notif_data['author']['handle']}") 285 success = True # Skip reposts but mark as successful to remove from queue 286 else: 287 logger.warning(f"Unknown notification type: {notif_data['reason']}") 288 success = True # Remove unknown types from queue 289 290 # Remove file only after successful processing 291 if success: 292 filepath.unlink() 293 logger.info(f"Processed and removed: {filepath.name}") 294 else: 295 logger.warning(f"Failed to process {filepath.name}, keeping in queue for retry") 296 297 except Exception as e: 298 logger.error(f"Error processing queued notification {filepath.name}: {e}") 299 # Keep the file for retry later 300 301 except Exception as e: 302 logger.error(f"Error loading queued notifications: {e}") 303 304 305def process_notifications(void_agent, atproto_client): 306 """Fetch new notifications, queue them, and process the queue.""" 307 try: 308 # First, process any existing queued notifications 309 load_and_process_queued_notifications(void_agent, atproto_client) 310 311 # Get current time for marking notifications as seen 312 last_seen_at = atproto_client.get_current_time_iso() 313 314 # Fetch notifications 315 notifications_response = atproto_client.app.bsky.notification.list_notifications() 316 317 # Queue all unread notifications (except likes) 318 new_count = 0 319 for notification in notifications_response.notifications: 320 if not notification.is_read and notification.reason != "like": 321 if save_notification_to_queue(notification): 322 new_count += 1 323 324 # Mark all notifications as seen immediately after queuing 325 if new_count > 0: 326 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at}) 327 logger.info(f"Queued {new_count} new notifications and marked as seen") 328 329 # Process the queue (including any newly added notifications) 330 load_and_process_queued_notifications(void_agent, atproto_client) 331 332 except Exception as e: 333 logger.error(f"Error processing notifications: {e}") 334 335 336def main(): 337 """Main bot loop that continuously monitors for notifications.""" 338 logger.info("Initializing Void bot...") 339 340 # Initialize the Letta agent 341 void_agent = initialize_void() 342 logger.info(f"Void agent initialized: {void_agent.id}") 343 344 # Initialize Bluesky client 345 atproto_client = bsky_utils.default_login() 346 logger.info("Connected to Bluesky") 347 348 # Main loop 349 logger.info(f"Starting notification monitoring (checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds)...") 350 351 while True: 352 try: 353 process_notifications(void_agent, atproto_client) 354 print("Sleeping") 355 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 356 357 except KeyboardInterrupt: 358 logger.info("Bot stopped by user") 359 break 360 except Exception as e: 361 logger.error(f"Error in main loop: {e}") 362 # Wait a bit longer on errors 363 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 364 365 366if __name__ == "__main__": 367 main()