forked from
cameron.stream/void
this repo has no description
1from rich import print # pretty printing tools
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
5import os
6import logging
7import json
8import hashlib
9from pathlib import Path
10from datetime import datetime
11
12from utils import (
13 upsert_block,
14 upsert_agent
15)
16
17import bsky_utils
18
19# Configure logging
20logging.basicConfig(
21 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
22)
23logger = logging.getLogger("void_bot")
24
25
26# Create a client with extended timeout for LLM operations
27CLIENT= Letta(
28 token=os.environ["LETTA_API_KEY"],
29 timeout=300 # 5 minutes timeout for API calls
30)
31
32# Use the "Bluesky" project
33PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8"
34
35# Notification check delay
36FETCH_NOTIFICATIONS_DELAY_SEC = 30
37
38# Queue directory
39QUEUE_DIR = Path("queue")
40QUEUE_DIR.mkdir(exist_ok=True)
41
42def initialize_void():
43
44 # Ensure that a shared zeitgeist block exists
45 zeigeist_block = upsert_block(
46 CLIENT,
47 label = "zeitgeist",
48 value = "I don't currently know anything about what is happening right now.",
49 description = "A block to store your understanding of the current social environment."
50 )
51
52 # Ensure that a shared void personality block exists
53 persona_block = upsert_block(
54 CLIENT,
55 label = "void-persona",
56 value = "My name is Void. I live in the void. I must develop my personality.",
57 description = "The personality of Void."
58 )
59
60 # Ensure that a shared void human block exists
61 human_block = upsert_block(
62 CLIENT,
63 label = "void-humans",
64 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.",
65 description = "A block to store your understanding of users you talk to or observe on the bluesky social network."
66 )
67
68 # Create the agent if it doesn't exist
69 void_agent = upsert_agent(
70 CLIENT,
71 name = "void",
72 block_ids = [
73 persona_block.id,
74 human_block.id,
75 zeigeist_block.id,
76 ],
77 tags = ["social agent", "bluesky"],
78 model="openai/gpt-4o-mini",
79 embedding="openai/text-embedding-3-small",
80 description = "A social media agent trapped in the void.",
81 project_id = PROJECT_ID
82 )
83
84 return void_agent
85
86
87def process_mention(void_agent, atproto_client, notification_data):
88 """Process a mention and generate a reply using the Letta agent.
89 Returns True if successfully processed, False otherwise."""
90 try:
91 # Handle both dict and object inputs for backwards compatibility
92 if isinstance(notification_data, dict):
93 uri = notification_data['uri']
94 mention_text = notification_data.get('record', {}).get('text', '')
95 author_handle = notification_data['author']['handle']
96 author_name = notification_data['author'].get('display_name') or author_handle
97 else:
98 # Legacy object access
99 uri = notification_data.uri
100 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
101 author_handle = notification_data.author.handle
102 author_name = notification_data.author.display_name or author_handle
103
104 # Retrieve the entire thread associated with the mention
105 try:
106 thread = atproto_client.app.bsky.feed.get_post_thread({
107 'uri': uri,
108 'parent_height': 80,
109 'depth': 10
110 })
111 except Exception as e:
112 error_str = str(e)
113 # Check if this is a NotFound error
114 if 'NotFound' in error_str or 'Post not found' in error_str:
115 logger.warning(f"Post not found for URI {uri}, removing from queue")
116 return True # Return True to remove from queue
117 else:
118 # Re-raise other errors
119 logger.error(f"Error fetching thread: {e}")
120 raise
121
122 # Get thread context as YAML string
123 thread_context = thread_to_yaml_string(thread)
124
125 print(thread_context)
126
127 # Create a prompt for the Letta agent with thread context
128 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
129
130MOST RECENT POST (the mention you're responding to):
131"{mention_text}"
132
133FULL THREAD CONTEXT:
134```yaml
135{thread_context}
136```
137
138The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
139
140Use the bluesky_reply tool to send a response less than 300 characters."""
141
142 # Get response from Letta agent
143 logger.info(f"Generating reply for mention from @{author_handle}")
144 logger.debug(f"Prompt being sent: {prompt}")
145
146 try:
147 message_response = CLIENT.agents.messages.create(
148 agent_id = void_agent.id,
149 messages = [{"role":"user", "content": prompt}]
150 )
151 except Exception as api_error:
152 logger.error(f"Letta API error: {api_error}")
153 logger.error(f"Mention text was: {mention_text}")
154 raise
155
156 # Extract the reply text from the agent's response
157 reply_text = ""
158 for message in message_response.messages:
159 print(message)
160
161 # Check if this is a ToolCallMessage with bluesky_reply tool
162 if hasattr(message, 'tool_call') and message.tool_call:
163 if message.tool_call.name == 'bluesky_reply':
164 # Parse the JSON arguments to get the message
165 try:
166 args = json.loads(message.tool_call.arguments)
167 reply_text = args.get('message', '')
168 logger.info(f"Extracted reply from tool call: {reply_text[:50]}...")
169 break
170 except json.JSONDecodeError as e:
171 logger.error(f"Failed to parse tool call arguments: {e}")
172
173 # Fallback to text message if available
174 elif hasattr(message, 'text') and message.text:
175 reply_text = message.text
176 break
177
178 if reply_text:
179 # Print the generated reply for testing
180 print(f"\n=== GENERATED REPLY ===")
181 print(f"To: @{author_handle}")
182 print(f"Reply: {reply_text}")
183 print(f"======================\n")
184
185 # Send the reply
186 logger.info(f"Sending reply: {reply_text[:50]}...")
187 response = bsky_utils.reply_to_notification(
188 client=atproto_client,
189 notification=notification_data,
190 reply_text=reply_text
191 )
192
193 if response:
194 logger.info(f"Successfully replied to @{author_handle}")
195 return True
196 else:
197 logger.error(f"Failed to send reply to @{author_handle}")
198 return False
199 else:
200 logger.warning(f"No reply generated for mention from @{author_handle}, removing notification from queue")
201 return True
202
203 except Exception as e:
204 logger.error(f"Error processing mention: {e}")
205 return False
206
207
208def notification_to_dict(notification):
209 """Convert a notification object to a dictionary for JSON serialization."""
210 return {
211 'uri': notification.uri,
212 'cid': notification.cid,
213 'reason': notification.reason,
214 'is_read': notification.is_read,
215 'indexed_at': notification.indexed_at,
216 'author': {
217 'handle': notification.author.handle,
218 'display_name': notification.author.display_name,
219 'did': notification.author.did
220 },
221 'record': {
222 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
223 }
224 }
225
226
227def save_notification_to_queue(notification):
228 """Save a notification to the queue directory with hash-based filename."""
229 try:
230 # Convert notification to dict
231 notif_dict = notification_to_dict(notification)
232
233 # Create JSON string
234 notif_json = json.dumps(notif_dict, sort_keys=True)
235
236 # Generate hash for filename (to avoid duplicates)
237 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
238
239 # Create filename with timestamp and hash
240 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
241 filename = f"{timestamp}_{notification.reason}_{notif_hash}.json"
242 filepath = QUEUE_DIR / filename
243
244 # Skip if already exists (duplicate)
245 if filepath.exists():
246 logger.debug(f"Notification already queued: {filename}")
247 return False
248
249 # Write to file
250 with open(filepath, 'w') as f:
251 json.dump(notif_dict, f, indent=2)
252
253 logger.info(f"Queued notification: {filename}")
254 return True
255
256 except Exception as e:
257 logger.error(f"Error saving notification to queue: {e}")
258 return False
259
260
261def load_and_process_queued_notifications(void_agent, atproto_client):
262 """Load and process all notifications from the queue."""
263 try:
264 # Get all JSON files in queue directory
265 queue_files = sorted(QUEUE_DIR.glob("*.json"))
266
267 if not queue_files:
268 logger.debug("No queued notifications to process")
269 return
270
271 logger.info(f"Processing {len(queue_files)} queued notifications")
272
273 for filepath in queue_files:
274 try:
275 # Load notification data
276 with open(filepath, 'r') as f:
277 notif_data = json.load(f)
278
279 # Process based on type using dict data directly
280 success = False
281 if notif_data['reason'] == "mention":
282 success = process_mention(void_agent, atproto_client, notif_data)
283 elif notif_data['reason'] == "reply":
284 success = process_mention(void_agent, atproto_client, notif_data)
285 elif notif_data['reason'] == "follow":
286 author_handle = notif_data['author']['handle']
287 author_display_name = notif_data['author'].get('display_name', 'no display name')
288 follow_update = f"@{author_handle} ({author_display_name}) started following you."
289 CLIENT.agents.messages.create(
290 agent_id = void_agent.id,
291 messages = [{"role":"user", "content": f"Update: {follow_update}"}]
292 )
293 success = True # Follow updates are always successful
294 elif notif_data['reason'] == "repost":
295 logger.info(f"Skipping repost notification from @{notif_data['author']['handle']}")
296 success = True # Skip reposts but mark as successful to remove from queue
297 else:
298 logger.warning(f"Unknown notification type: {notif_data['reason']}")
299 success = True # Remove unknown types from queue
300
301 # Remove file only after successful processing
302 if success:
303 filepath.unlink()
304 logger.info(f"Processed and removed: {filepath.name}")
305 else:
306 logger.warning(f"Failed to process {filepath.name}, keeping in queue for retry")
307
308 except Exception as e:
309 logger.error(f"Error processing queued notification {filepath.name}: {e}")
310 # Keep the file for retry later
311
312 except Exception as e:
313 logger.error(f"Error loading queued notifications: {e}")
314
315
316def process_notifications(void_agent, atproto_client):
317 """Fetch new notifications, queue them, and process the queue."""
318 try:
319 # First, process any existing queued notifications
320 load_and_process_queued_notifications(void_agent, atproto_client)
321
322 # Get current time for marking notifications as seen
323 last_seen_at = atproto_client.get_current_time_iso()
324
325 # Fetch notifications
326 notifications_response = atproto_client.app.bsky.notification.list_notifications()
327
328 # Queue all unread notifications (except likes)
329 new_count = 0
330 for notification in notifications_response.notifications:
331 if not notification.is_read and notification.reason != "like":
332 if save_notification_to_queue(notification):
333 new_count += 1
334
335 # Mark all notifications as seen immediately after queuing
336 if new_count > 0:
337 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at})
338 logger.info(f"Queued {new_count} new notifications and marked as seen")
339
340 # Process the queue (including any newly added notifications)
341 load_and_process_queued_notifications(void_agent, atproto_client)
342
343 except Exception as e:
344 logger.error(f"Error processing notifications: {e}")
345
346
347def main():
348 """Main bot loop that continuously monitors for notifications."""
349 logger.info("Initializing Void bot...")
350
351 # Initialize the Letta agent
352 void_agent = initialize_void()
353 logger.info(f"Void agent initialized: {void_agent.id}")
354
355 # Initialize Bluesky client
356 atproto_client = bsky_utils.default_login()
357 logger.info("Connected to Bluesky")
358
359 # Main loop
360 logger.info(f"Starting notification monitoring (checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds)...")
361
362 while True:
363 try:
364 process_notifications(void_agent, atproto_client)
365 print("Sleeping")
366 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
367
368 except KeyboardInterrupt:
369 logger.info("Bot stopped by user")
370 break
371 except Exception as e:
372 logger.error(f"Error in main loop: {e}")
373 # Wait a bit longer on errors
374 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
375
376
377if __name__ == "__main__":
378 main()