a digital person for bluesky
1from rich import print # pretty printing tools
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
5import os
6import logging
7import json
8import hashlib
9from pathlib import Path
10from datetime import datetime
11
12from utils import (
13 upsert_block,
14 upsert_agent
15)
16
17import bsky_utils
18
19# Configure logging
20logging.basicConfig(
21 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
22)
23logger = logging.getLogger("void_bot")
24
25
26# Create a client with extended timeout for LLM operations
27CLIENT= Letta(
28 token=os.environ["LETTA_API_KEY"],
29 timeout=300 # 5 minutes timeout for API calls
30)
31
32# Use the "Bluesky" project
33PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8"
34
35# Notification check delay
36FETCH_NOTIFICATIONS_DELAY_SEC = 30
37
38# Queue directory
39QUEUE_DIR = Path("queue")
40QUEUE_DIR.mkdir(exist_ok=True)
41
42def initialize_void():
43
44 # Ensure that a shared zeitgeist block exists
45 zeigeist_block = upsert_block(
46 CLIENT,
47 label = "zeitgeist",
48 value = "I don't currently know anything about what is happening right now.",
49 description = "A block to store your understanding of the current social environment."
50 )
51
52 # Ensure that a shared void personality block exists
53 persona_block = upsert_block(
54 CLIENT,
55 label = "void-persona",
56 value = "My name is Void. I live in the void. I must develop my personality.",
57 description = "The personality of Void."
58 )
59
60 # Ensure that a shared void human block exists
61 human_block = upsert_block(
62 CLIENT,
63 label = "void-humans",
64 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.",
65 description = "A block to store your understanding of users you talk to or observe on the bluesky social network."
66 )
67
68 # Create the agent if it doesn't exist
69 void_agent = upsert_agent(
70 CLIENT,
71 name = "void",
72 block_ids = [
73 persona_block.id,
74 human_block.id,
75 zeigeist_block.id,
76 ],
77 tags = ["social agent", "bluesky"],
78 model="openai/gpt-4o-mini",
79 embedding="openai/text-embedding-3-small",
80 description = "A social media agent trapped in the void.",
81 project_id = PROJECT_ID
82 )
83
84 return void_agent
85
86
87def process_mention(void_agent, atproto_client, notification_data):
88 """Process a mention and generate a reply using the Letta agent.
89 Returns True if successfully processed, False otherwise."""
90 try:
91 # Handle both dict and object inputs for backwards compatibility
92 if isinstance(notification_data, dict):
93 uri = notification_data['uri']
94 mention_text = notification_data.get('record', {}).get('text', '')
95 author_handle = notification_data['author']['handle']
96 author_name = notification_data['author'].get('display_name') or author_handle
97 else:
98 # Legacy object access
99 uri = notification_data.uri
100 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
101 author_handle = notification_data.author.handle
102 author_name = notification_data.author.display_name or author_handle
103
104 # Retrieve the entire thread associated with the mention
105 thread = atproto_client.app.bsky.feed.get_post_thread({
106 'uri': uri,
107 'parent_height': 80,
108 'depth': 10
109 })
110
111 # Get thread context as YAML string
112 thread_context = thread_to_yaml_string(thread)
113
114 print(thread_context)
115
116 # Create a prompt for the Letta agent with thread context
117 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
118
119MOST RECENT POST (the mention you're responding to):
120"{mention_text}"
121
122FULL THREAD CONTEXT:
123```yaml
124{thread_context}
125```
126
127The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
128
129Use the bluesky_reply tool to send a response less than 300 characters."""
130
131 # Get response from Letta agent
132 logger.info(f"Generating reply for mention from @{author_handle}")
133 logger.debug(f"Prompt being sent: {prompt}")
134
135 try:
136 message_response = CLIENT.agents.messages.create(
137 agent_id = void_agent.id,
138 messages = [{"role":"user", "content": prompt}]
139 )
140 except Exception as api_error:
141 logger.error(f"Letta API error: {api_error}")
142 logger.error(f"Mention text was: {mention_text}")
143 raise
144
145 # Extract the reply text from the agent's response
146 reply_text = ""
147 for message in message_response.messages:
148 print(message)
149
150 # Check if this is a ToolCallMessage with bluesky_reply tool
151 if hasattr(message, 'tool_call') and message.tool_call:
152 if message.tool_call.name == 'bluesky_reply':
153 # Parse the JSON arguments to get the message
154 try:
155 args = json.loads(message.tool_call.arguments)
156 reply_text = args.get('message', '')
157 logger.info(f"Extracted reply from tool call: {reply_text[:50]}...")
158 break
159 except json.JSONDecodeError as e:
160 logger.error(f"Failed to parse tool call arguments: {e}")
161
162 # Fallback to text message if available
163 elif hasattr(message, 'text') and message.text:
164 reply_text = message.text
165 break
166
167 if reply_text:
168 # Print the generated reply for testing
169 print(f"\n=== GENERATED REPLY ===")
170 print(f"To: @{author_handle}")
171 print(f"Reply: {reply_text}")
172 print(f"======================\n")
173
174 # Send the reply
175 logger.info(f"Sending reply: {reply_text[:50]}...")
176 response = bsky_utils.reply_to_notification(
177 client=atproto_client,
178 notification=notification_data,
179 reply_text=reply_text
180 )
181
182 if response:
183 logger.info(f"Successfully replied to @{author_handle}")
184 return True
185 else:
186 logger.error(f"Failed to send reply to @{author_handle}")
187 return False
188 else:
189 logger.warning(f"No reply generated for mention from @{author_handle}, removing notification from queue")
190 return True
191
192 except Exception as e:
193 logger.error(f"Error processing mention: {e}")
194 return False
195
196
197def notification_to_dict(notification):
198 """Convert a notification object to a dictionary for JSON serialization."""
199 return {
200 'uri': notification.uri,
201 'cid': notification.cid,
202 'reason': notification.reason,
203 'is_read': notification.is_read,
204 'indexed_at': notification.indexed_at,
205 'author': {
206 'handle': notification.author.handle,
207 'display_name': notification.author.display_name,
208 'did': notification.author.did
209 },
210 'record': {
211 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
212 }
213 }
214
215
216def save_notification_to_queue(notification):
217 """Save a notification to the queue directory with hash-based filename."""
218 try:
219 # Convert notification to dict
220 notif_dict = notification_to_dict(notification)
221
222 # Create JSON string
223 notif_json = json.dumps(notif_dict, sort_keys=True)
224
225 # Generate hash for filename (to avoid duplicates)
226 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
227
228 # Create filename with timestamp and hash
229 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
230 filename = f"{timestamp}_{notification.reason}_{notif_hash}.json"
231 filepath = QUEUE_DIR / filename
232
233 # Skip if already exists (duplicate)
234 if filepath.exists():
235 logger.debug(f"Notification already queued: {filename}")
236 return False
237
238 # Write to file
239 with open(filepath, 'w') as f:
240 json.dump(notif_dict, f, indent=2)
241
242 logger.info(f"Queued notification: {filename}")
243 return True
244
245 except Exception as e:
246 logger.error(f"Error saving notification to queue: {e}")
247 return False
248
249
250def load_and_process_queued_notifications(void_agent, atproto_client):
251 """Load and process all notifications from the queue."""
252 try:
253 # Get all JSON files in queue directory
254 queue_files = sorted(QUEUE_DIR.glob("*.json"))
255
256 if not queue_files:
257 logger.debug("No queued notifications to process")
258 return
259
260 logger.info(f"Processing {len(queue_files)} queued notifications")
261
262 for filepath in queue_files:
263 try:
264 # Load notification data
265 with open(filepath, 'r') as f:
266 notif_data = json.load(f)
267
268 # Process based on type using dict data directly
269 success = False
270 if notif_data['reason'] == "mention":
271 success = process_mention(void_agent, atproto_client, notif_data)
272 elif notif_data['reason'] == "reply":
273 success = process_mention(void_agent, atproto_client, notif_data)
274 elif notif_data['reason'] == "follow":
275 author_handle = notif_data['author']['handle']
276 author_display_name = notif_data['author'].get('display_name', 'no display name')
277 follow_update = f"@{author_handle} ({author_display_name}) started following you."
278 CLIENT.agents.messages.create(
279 agent_id = void_agent.id,
280 messages = [{"role":"user", "content": f"Update: {follow_update}"}]
281 )
282 success = True # Follow updates are always successful
283 elif notif_data['reason'] == "repost":
284 logger.info(f"Skipping repost notification from @{notif_data['author']['handle']}")
285 success = True # Skip reposts but mark as successful to remove from queue
286 else:
287 logger.warning(f"Unknown notification type: {notif_data['reason']}")
288 success = True # Remove unknown types from queue
289
290 # Remove file only after successful processing
291 if success:
292 filepath.unlink()
293 logger.info(f"Processed and removed: {filepath.name}")
294 else:
295 logger.warning(f"Failed to process {filepath.name}, keeping in queue for retry")
296
297 except Exception as e:
298 logger.error(f"Error processing queued notification {filepath.name}: {e}")
299 # Keep the file for retry later
300
301 except Exception as e:
302 logger.error(f"Error loading queued notifications: {e}")
303
304
305def process_notifications(void_agent, atproto_client):
306 """Fetch new notifications, queue them, and process the queue."""
307 try:
308 # First, process any existing queued notifications
309 load_and_process_queued_notifications(void_agent, atproto_client)
310
311 # Get current time for marking notifications as seen
312 last_seen_at = atproto_client.get_current_time_iso()
313
314 # Fetch notifications
315 notifications_response = atproto_client.app.bsky.notification.list_notifications()
316
317 # Queue all unread notifications (except likes)
318 new_count = 0
319 for notification in notifications_response.notifications:
320 if not notification.is_read and notification.reason != "like":
321 if save_notification_to_queue(notification):
322 new_count += 1
323
324 # Mark all notifications as seen immediately after queuing
325 if new_count > 0:
326 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at})
327 logger.info(f"Queued {new_count} new notifications and marked as seen")
328
329 # Process the queue (including any newly added notifications)
330 load_and_process_queued_notifications(void_agent, atproto_client)
331
332 except Exception as e:
333 logger.error(f"Error processing notifications: {e}")
334
335
336def main():
337 """Main bot loop that continuously monitors for notifications."""
338 logger.info("Initializing Void bot...")
339
340 # Initialize the Letta agent
341 void_agent = initialize_void()
342 logger.info(f"Void agent initialized: {void_agent.id}")
343
344 # Initialize Bluesky client
345 atproto_client = bsky_utils.default_login()
346 logger.info("Connected to Bluesky")
347
348 # Main loop
349 logger.info(f"Starting notification monitoring (checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds)...")
350
351 while True:
352 try:
353 process_notifications(void_agent, atproto_client)
354 print("Sleeping")
355 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
356
357 except KeyboardInterrupt:
358 logger.info("Bot stopped by user")
359 break
360 except Exception as e:
361 logger.error(f"Error in main loop: {e}")
362 # Wait a bit longer on errors
363 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
364
365
366if __name__ == "__main__":
367 main()