a digital person for bluesky
1import os
2import logging
3import requests
4import yaml
5import json
6import hashlib
7import random
8import time
9from typing import Optional, Dict, Any, List, Set
10from datetime import datetime
11from pathlib import Path
12from requests_oauthlib import OAuth1
13from rich import print as rprint
14from rich.panel import Panel
15from rich.text import Text
16
17import bsky_utils
18
19class XRateLimitError(Exception):
20 """Exception raised when X API rate limit is exceeded"""
21 pass
22
23
24# Configure logging
25logging.basicConfig(
26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
27)
28logger = logging.getLogger("x_client")
29
30# X-specific file paths
31X_QUEUE_DIR = Path("x_queue")
32X_CACHE_DIR = Path("x_cache")
33X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json")
34X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json")
35X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt")
36
37class XClient:
38 """X (Twitter) API client for fetching mentions and managing interactions."""
39
40 def __init__(self, api_key: str, user_id: str, access_token: str = None,
41 consumer_key: str = None, consumer_secret: str = None,
42 access_token_secret: str = None):
43 self.api_key = api_key
44 self.access_token = access_token
45 self.user_id = user_id
46 self.base_url = "https://api.x.com/2"
47
48 # Check if we have OAuth 1.0a credentials
49 if (consumer_key and consumer_secret and access_token and access_token_secret):
50 # Use OAuth 1.0a for User Context
51 self.oauth = OAuth1(
52 consumer_key,
53 client_secret=consumer_secret,
54 resource_owner_key=access_token,
55 resource_owner_secret=access_token_secret
56 )
57 self.headers = {"Content-Type": "application/json"}
58 self.auth_method = "oauth1a"
59 logger.info("Using OAuth 1.0a User Context authentication for X API")
60 elif access_token:
61 # Use OAuth 2.0 Bearer token for User Context
62 self.oauth = None
63 self.headers = {
64 "Authorization": f"Bearer {access_token}",
65 "Content-Type": "application/json"
66 }
67 self.auth_method = "oauth2_user"
68 logger.info("Using OAuth 2.0 User Context access token for X API")
69 else:
70 # Use Application-Only Bearer token
71 self.oauth = None
72 self.headers = {
73 "Authorization": f"Bearer {api_key}",
74 "Content-Type": "application/json"
75 }
76 self.auth_method = "bearer"
77 logger.info("Using Application-Only Bearer token for X API")
78
79 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]:
80 """Make a request to the X API with proper error handling and exponential backoff."""
81 url = f"{self.base_url}{endpoint}"
82
83 for attempt in range(max_retries):
84 try:
85 if method.upper() == "GET":
86 if self.oauth:
87 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth)
88 else:
89 response = requests.get(url, headers=self.headers, params=params)
90 elif method.upper() == "POST":
91 if self.oauth:
92 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth)
93 else:
94 response = requests.post(url, headers=self.headers, json=data)
95 else:
96 raise ValueError(f"Unsupported HTTP method: {method}")
97
98 response.raise_for_status()
99 return response.json()
100
101 except requests.exceptions.HTTPError as e:
102 if response.status_code == 401:
103 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials")
104 logger.error(f"Response: {response.text}")
105 return None # Don't retry auth failures
106 elif response.status_code == 403:
107 logger.error(f"X API forbidden with {self.auth_method} - check app permissions")
108 logger.error(f"Response: {response.text}")
109 return None # Don't retry permission failures
110 elif response.status_code == 429:
111 if attempt < max_retries - 1:
112 # Exponential backoff: 60s, 120s, 240s
113 backoff_time = 60 * (2 ** attempt)
114 logger.warning(f"X API rate limit exceeded (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry")
115 logger.error(f"Response: {response.text}")
116 time.sleep(backoff_time)
117 continue
118 else:
119 logger.error("X API rate limit exceeded - max retries reached")
120 logger.error(f"Response: {response.text}")
121 raise XRateLimitError("X API rate limit exceeded")
122 else:
123 if attempt < max_retries - 1:
124 # Exponential backoff for other HTTP errors too
125 backoff_time = 30 * (2 ** attempt)
126 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s")
127 logger.error(f"Response: {response.text}")
128 time.sleep(backoff_time)
129 continue
130 else:
131 logger.error(f"X API request failed after {max_retries} attempts: {e}")
132 logger.error(f"Response: {response.text}")
133 return None
134
135 except Exception as e:
136 if attempt < max_retries - 1:
137 backoff_time = 15 * (2 ** attempt)
138 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s")
139 time.sleep(backoff_time)
140 continue
141 else:
142 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}")
143 return None
144
145 return None
146
147 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]:
148 """
149 Fetch mentions for the configured user.
150
151 Args:
152 since_id: Minimum Post ID to include (for getting newer mentions)
153 max_results: Number of results to return (5-100)
154
155 Returns:
156 List of mention objects or None if request failed
157 """
158 endpoint = f"/users/{self.user_id}/mentions"
159 params = {
160 "max_results": min(max(max_results, 5), 100), # Ensure within API limits
161 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets",
162 "user.fields": "id,name,username",
163 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
164 }
165
166 if since_id:
167 params["since_id"] = since_id
168
169 logger.info(f"Fetching mentions for user {self.user_id}")
170 response = self._make_request(endpoint, params)
171
172 if response:
173 logger.debug(f"X API response: {response}")
174
175 if response and "data" in response:
176 mentions = response["data"]
177 logger.info(f"Retrieved {len(mentions)} mentions")
178 return mentions
179 else:
180 if response:
181 logger.info(f"No mentions in response. Full response: {response}")
182 else:
183 logger.warning("Request failed - no response received")
184 return []
185
186 def get_user_info(self, user_id: str) -> Optional[Dict]:
187 """Get information about a specific user."""
188 endpoint = f"/users/{user_id}"
189 params = {
190 "user.fields": "id,name,username,description,public_metrics"
191 }
192
193 response = self._make_request(endpoint, params)
194 return response.get("data") if response else None
195
196 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]:
197 """
198 Search for mentions using the search endpoint instead of mentions endpoint.
199 This might have better rate limits than the direct mentions endpoint.
200
201 Args:
202 username: Username to search for mentions of (without @)
203 max_results: Number of results to return (10-100)
204 since_id: Only return results newer than this tweet ID
205
206 Returns:
207 List of tweets mentioning the username
208 """
209 endpoint = "/tweets/search/recent"
210
211 # Search for mentions of the username
212 query = f"@{username}"
213
214 params = {
215 "query": query,
216 "max_results": min(max(max_results, 10), 100),
217 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
218 "user.fields": "id,name,username",
219 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
220 }
221
222 if since_id:
223 params["since_id"] = since_id
224
225 logger.info(f"Searching for mentions of @{username}")
226 response = self._make_request(endpoint, params)
227
228 if response and "data" in response:
229 tweets = response["data"]
230 logger.info(f"Found {len(tweets)} mentions via search")
231 return tweets
232 else:
233 if response:
234 logger.info(f"No mentions found via search. Response: {response}")
235 else:
236 logger.warning("Search request failed")
237 return []
238
239 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]:
240 """
241 Get all tweets in a conversation thread up to a specific tweet ID.
242
243 Args:
244 conversation_id: The conversation ID to fetch (should be the original tweet ID)
245 use_cache: Whether to use cached data if available
246 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID)
247
248 Returns:
249 List of tweets in the conversation, ordered chronologically
250 """
251 # Check cache first if enabled
252 if use_cache:
253 cached_data = get_cached_thread_context(conversation_id)
254 if cached_data:
255 return cached_data
256
257 # First, get the original tweet directly since it might not appear in conversation search
258 original_tweet = None
259 try:
260 endpoint = f"/tweets/{conversation_id}"
261 params = {
262 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
263 "user.fields": "id,name,username",
264 "expansions": "author_id"
265 }
266 response = self._make_request(endpoint, params)
267 if response and "data" in response:
268 original_tweet = response["data"]
269 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}")
270 except Exception as e:
271 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}")
272
273 # Then search for all tweets in this conversation
274 endpoint = "/tweets/search/recent"
275 params = {
276 "query": f"conversation_id:{conversation_id}",
277 "max_results": 100, # Get as many as possible
278 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
279 "user.fields": "id,name,username",
280 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id",
281 "sort_order": "recency" # Get newest first, we'll reverse later
282 }
283
284 # Add until_id parameter to exclude tweets after the mention being processed
285 if until_id:
286 params["until_id"] = until_id
287 logger.info(f"Using until_id={until_id} to exclude future tweets")
288
289 logger.info(f"Fetching thread context for conversation {conversation_id}")
290 response = self._make_request(endpoint, params)
291
292 tweets = []
293 users_data = {}
294
295 # Collect tweets from search
296 if response and "data" in response:
297 tweets.extend(response["data"])
298 # Store user data for reference
299 if "includes" in response and "users" in response["includes"]:
300 for user in response["includes"]["users"]:
301 users_data[user["id"]] = user
302
303 # Add original tweet if we got it and it's not already in the list
304 if original_tweet:
305 tweet_ids = [t.get('id') for t in tweets]
306 if original_tweet.get('id') not in tweet_ids:
307 tweets.append(original_tweet)
308 logger.info("Added original tweet to thread context")
309
310 # Attempt to fill gaps by fetching referenced tweets that are missing
311 # This helps with X API's incomplete conversation search results
312 tweet_ids = set(t.get('id') for t in tweets)
313 missing_tweet_ids = set()
314 critical_missing_ids = set()
315
316 # Collect referenced tweet IDs, prioritizing critical ones
317 for tweet in tweets:
318 referenced_tweets = tweet.get('referenced_tweets', [])
319 for ref in referenced_tweets:
320 ref_id = ref.get('id')
321 ref_type = ref.get('type')
322 if ref_id and ref_id not in tweet_ids:
323 missing_tweet_ids.add(ref_id)
324 # Prioritize direct replies and quoted tweets over retweets
325 if ref_type in ['replied_to', 'quoted']:
326 critical_missing_ids.add(ref_id)
327
328 # For rate limit efficiency, only fetch critical missing tweets if we have many
329 if len(missing_tweet_ids) > 10:
330 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones")
331 missing_tweet_ids = critical_missing_ids
332
333 # Context sufficiency check - skip backfill if we already have enough context
334 if has_sufficient_context(tweets, missing_tweet_ids):
335 logger.info("Thread has sufficient context, skipping missing tweet backfill")
336 missing_tweet_ids = set()
337
338 # Fetch missing referenced tweets in batches (more rate-limit friendly)
339 if missing_tweet_ids:
340 missing_list = list(missing_tweet_ids)
341
342 # First, check cache for missing tweets
343 cached_tweets = get_cached_tweets(missing_list)
344 for tweet_id, cached_tweet in cached_tweets.items():
345 if cached_tweet.get('conversation_id') == conversation_id:
346 tweets.append(cached_tweet)
347 tweet_ids.add(tweet_id)
348 logger.info(f"Retrieved missing tweet from cache: {tweet_id}")
349
350 # Add user data if available in cache
351 if cached_tweet.get('author_info'):
352 author_id = cached_tweet.get('author_id')
353 if author_id:
354 users_data[author_id] = cached_tweet['author_info']
355
356 # Only fetch tweets that weren't found in cache
357 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets]
358
359 if uncached_ids:
360 batch_size = 100 # X API limit for bulk tweet lookup
361
362 for i in range(0, len(uncached_ids), batch_size):
363 batch_ids = uncached_ids[i:i + batch_size]
364 try:
365 endpoint = "/tweets"
366 params = {
367 "ids": ",".join(batch_ids),
368 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
369 "user.fields": "id,name,username",
370 "expansions": "author_id"
371 }
372 response = self._make_request(endpoint, params)
373
374 if response and "data" in response:
375 fetched_tweets = []
376 batch_users_data = {}
377
378 for missing_tweet in response["data"]:
379 # Only add if it's actually part of this conversation
380 if missing_tweet.get('conversation_id') == conversation_id:
381 tweets.append(missing_tweet)
382 tweet_ids.add(missing_tweet.get('id'))
383 fetched_tweets.append(missing_tweet)
384 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}")
385
386 # Add user data if available
387 if "includes" in response and "users" in response["includes"]:
388 for user in response["includes"]["users"]:
389 users_data[user["id"]] = user
390 batch_users_data[user["id"]] = user
391
392 # Cache the newly fetched tweets
393 if fetched_tweets:
394 save_cached_tweets(fetched_tweets, batch_users_data)
395
396 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested")
397
398 # Handle partial success - log any missing tweets that weren't found
399 if response and "errors" in response:
400 for error in response["errors"]:
401 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}")
402
403 except Exception as e:
404 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}")
405 else:
406 logger.info(f"All {len(missing_list)} missing tweets found in cache")
407
408 if tweets:
409 # Filter out tweets that occur after until_id (if specified)
410 if until_id:
411 original_count = len(tweets)
412 # Convert until_id to int for comparison (Twitter IDs are sequential)
413 until_id_int = int(until_id)
414 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int]
415 filtered_count = len(tweets)
416 if original_count != filtered_count:
417 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}")
418
419 # Sort chronologically (oldest first)
420 tweets.sort(key=lambda x: x.get('created_at', ''))
421 logger.info(f"Retrieved {len(tweets)} tweets in thread")
422
423 thread_data = {"tweets": tweets, "users": users_data}
424
425 # Cache individual tweets from the thread for future backfill
426 save_cached_tweets(tweets, users_data)
427
428 # Cache the result
429 if use_cache:
430 save_cached_thread_context(conversation_id, thread_data)
431
432 return thread_data
433 else:
434 logger.warning("No tweets found for thread context")
435 return None
436
437 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]:
438 """
439 Post a reply to a specific tweet.
440
441 Args:
442 reply_text: The text content of the reply
443 in_reply_to_tweet_id: The ID of the tweet to reply to
444
445 Returns:
446 Response data if successful, None if failed
447 """
448 endpoint = "/tweets"
449
450 payload = {
451 "text": reply_text,
452 "reply": {
453 "in_reply_to_tweet_id": in_reply_to_tweet_id
454 }
455 }
456
457 logger.info(f"Attempting to post reply with {self.auth_method} authentication")
458 result = self._make_request(endpoint, method="POST", data=payload)
459
460 if result:
461 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}")
462 return result
463 else:
464 logger.error("Failed to post reply")
465 return None
466
467 def post_tweet(self, tweet_text: str) -> Optional[Dict]:
468 """
469 Post a standalone tweet (not a reply).
470
471 Args:
472 tweet_text: The text content of the tweet (max 280 characters)
473
474 Returns:
475 Response data if successful, None if failed
476 """
477 endpoint = "/tweets"
478
479 # Validate tweet length
480 if len(tweet_text) > 280:
481 logger.error(f"Tweet text too long: {len(tweet_text)} characters (max 280)")
482 return None
483
484 payload = {
485 "text": tweet_text
486 }
487
488 logger.info(f"Attempting to post tweet with {self.auth_method} authentication")
489 result = self._make_request(endpoint, method="POST", data=payload)
490
491 if result:
492 logger.info(f"Successfully posted tweet")
493 return result
494 else:
495 logger.error("Failed to post tweet")
496 return None
497
498def load_x_config(config_path: str = "x_config.yaml") -> Dict[str, Any]:
499 """Load complete X configuration from x_config.yaml."""
500 try:
501 with open(config_path, 'r') as f:
502 config = yaml.safe_load(f)
503
504 if not config:
505 raise ValueError(f"Empty or invalid configuration file: {config_path}")
506
507 # Validate required sections
508 x_config = config.get('x', {})
509 letta_config = config.get('letta', {})
510
511 if not x_config.get('api_key') or not x_config.get('user_id'):
512 raise ValueError("X API key and user_id must be configured in x_config.yaml")
513
514 if not letta_config.get('api_key') or not letta_config.get('agent_id'):
515 raise ValueError("Letta API key and agent_id must be configured in x_config.yaml")
516
517 return config
518 except Exception as e:
519 logger.error(f"Failed to load X configuration: {e}")
520 raise
521
522def get_x_letta_config(config_path: str = "x_config.yaml") -> Dict[str, Any]:
523 """Get Letta configuration from X config file."""
524 config = load_x_config(config_path)
525 return config['letta']
526
527def create_x_client(config_path: str = "x_config.yaml") -> XClient:
528 """Create and return an X client with configuration loaded from file."""
529 config = load_x_config(config_path)
530 x_config = config['x']
531 return XClient(
532 api_key=x_config['api_key'],
533 user_id=x_config['user_id'],
534 access_token=x_config.get('access_token'),
535 consumer_key=x_config.get('consumer_key'),
536 consumer_secret=x_config.get('consumer_secret'),
537 access_token_secret=x_config.get('access_token_secret')
538 )
539
540def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str:
541 """
542 Convert a mention object to a YAML string for better AI comprehension.
543 Similar to thread_to_yaml_string in bsky_utils.py
544 """
545 # Extract relevant fields
546 simplified_mention = {
547 'id': mention.get('id'),
548 'text': mention.get('text'),
549 'author_id': mention.get('author_id'),
550 'created_at': mention.get('created_at'),
551 'in_reply_to_user_id': mention.get('in_reply_to_user_id')
552 }
553
554 # Add user information if available
555 if users_data and mention.get('author_id') in users_data:
556 user = users_data[mention.get('author_id')]
557 simplified_mention['author'] = {
558 'username': user.get('username'),
559 'name': user.get('name')
560 }
561
562 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False)
563
564def thread_to_yaml_string(thread_data: Dict) -> str:
565 """
566 Convert X thread context to YAML string for AI comprehension.
567 Similar to Bluesky's thread_to_yaml_string function.
568
569 Args:
570 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context()
571
572 Returns:
573 YAML string representation of the thread
574 """
575 if not thread_data or "tweets" not in thread_data:
576 return "conversation: []\n"
577
578 tweets = thread_data["tweets"]
579 users_data = thread_data.get("users", {})
580
581 simplified_thread = {
582 "conversation": []
583 }
584
585 for tweet in tweets:
586 # Get user info
587 author_id = tweet.get('author_id')
588 author_info = {}
589 if author_id and author_id in users_data:
590 user = users_data[author_id]
591 author_info = {
592 'username': user.get('username'),
593 'name': user.get('name')
594 }
595
596 # Build tweet object (simplified for AI consumption)
597 tweet_obj = {
598 'text': tweet.get('text'),
599 'created_at': tweet.get('created_at'),
600 'author': author_info,
601 'author_id': author_id # Include user ID for block management
602 }
603
604 simplified_thread["conversation"].append(tweet_obj)
605
606 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False)
607
608
609def ensure_x_user_blocks_attached(thread_data: Dict, agent_id: Optional[str] = None) -> None:
610 """
611 Ensure all users in the thread have their X user blocks attached.
612 Creates blocks with initial content including their handle if they don't exist.
613
614 Args:
615 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context()
616 agent_id: The Letta agent ID to attach blocks to (defaults to config agent_id)
617 """
618 if not thread_data or "users" not in thread_data:
619 return
620
621 try:
622 from tools.blocks import attach_x_user_blocks, x_user_note_set
623 from letta_client import Letta
624
625 # Get Letta client and agent_id from X config
626 config = get_x_letta_config()
627 client = Letta(token=config['api_key'], timeout=config['timeout'])
628
629 # Use provided agent_id or get from config
630 if agent_id is None:
631 agent_id = config['agent_id']
632
633 # Get agent info to create a mock agent_state for the functions
634 class MockAgentState:
635 def __init__(self, agent_id):
636 self.id = agent_id
637
638 agent_state = MockAgentState(agent_id)
639
640 users_data = thread_data["users"]
641 user_ids = list(users_data.keys())
642
643 if not user_ids:
644 return
645
646 logger.info(f"Ensuring X user blocks for {len(user_ids)} users: {user_ids}")
647
648 # Get current blocks to check which users already have blocks with content
649 current_blocks = client.agents.blocks.list(agent_id=agent_id)
650 existing_user_blocks = {}
651
652 for block in current_blocks:
653 if block.label.startswith("x_user_"):
654 user_id = block.label.replace("x_user_", "")
655 existing_user_blocks[user_id] = block
656
657 # Attach all user blocks (this will create missing ones with basic content)
658 attach_result = attach_x_user_blocks(user_ids, agent_state)
659 logger.info(f"X user block attachment result: {attach_result}")
660
661 # For newly created blocks, update with user handle information
662 for user_id in user_ids:
663 if user_id not in existing_user_blocks:
664 user_info = users_data[user_id]
665 username = user_info.get('username', 'unknown')
666 name = user_info.get('name', 'Unknown')
667
668 # Set initial content with handle information
669 initial_content = f"# X User: {user_id}\n\n**Handle:** @{username}\n**Name:** {name}\n\nNo additional information about this user yet."
670
671 try:
672 x_user_note_set(user_id, initial_content, agent_state)
673 logger.info(f"Set initial content for X user {user_id} (@{username})")
674 except Exception as e:
675 logger.error(f"Failed to set initial content for X user {user_id}: {e}")
676
677 except Exception as e:
678 logger.error(f"Error ensuring X user blocks: {e}")
679
680
681# X Caching and Queue System Functions
682
683def load_last_seen_id() -> Optional[str]:
684 """Load the last seen mention ID for incremental fetching."""
685 if X_LAST_SEEN_FILE.exists():
686 try:
687 with open(X_LAST_SEEN_FILE, 'r') as f:
688 data = json.load(f)
689 return data.get('last_seen_id')
690 except Exception as e:
691 logger.error(f"Error loading last seen ID: {e}")
692 return None
693
694def save_last_seen_id(mention_id: str):
695 """Save the last seen mention ID."""
696 try:
697 X_QUEUE_DIR.mkdir(exist_ok=True)
698 with open(X_LAST_SEEN_FILE, 'w') as f:
699 json.dump({
700 'last_seen_id': mention_id,
701 'updated_at': datetime.now().isoformat()
702 }, f)
703 logger.debug(f"Saved last seen ID: {mention_id}")
704 except Exception as e:
705 logger.error(f"Error saving last seen ID: {e}")
706
707def load_processed_mentions() -> set:
708 """Load the set of processed mention IDs."""
709 if X_PROCESSED_MENTIONS_FILE.exists():
710 try:
711 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f:
712 data = json.load(f)
713 # Keep only recent entries (last 10000)
714 if len(data) > 10000:
715 data = data[-10000:]
716 save_processed_mentions(set(data))
717 return set(data)
718 except Exception as e:
719 logger.error(f"Error loading processed mentions: {e}")
720 return set()
721
722def save_processed_mentions(processed_set: set):
723 """Save the set of processed mention IDs."""
724 try:
725 X_QUEUE_DIR.mkdir(exist_ok=True)
726 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f:
727 json.dump(list(processed_set), f)
728 except Exception as e:
729 logger.error(f"Error saving processed mentions: {e}")
730
731def load_downrank_users() -> Set[str]:
732 """Load the set of user IDs that should be downranked (responded to 10% of the time)."""
733 try:
734 if not X_DOWNRANK_USERS_FILE.exists():
735 return set()
736
737 downrank_users = set()
738 with open(X_DOWNRANK_USERS_FILE, 'r') as f:
739 for line in f:
740 line = line.strip()
741 # Skip empty lines and comments
742 if line and not line.startswith('#'):
743 downrank_users.add(line)
744
745 logger.info(f"Loaded {len(downrank_users)} downrank users")
746 return downrank_users
747 except Exception as e:
748 logger.error(f"Error loading downrank users: {e}")
749 return set()
750
751def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool:
752 """
753 Check if we should respond to a downranked user.
754 Returns True 10% of the time for downranked users, True 100% of the time for others.
755 """
756 if user_id not in downrank_users:
757 return True
758
759 # 10% chance for downranked users
760 should_respond = random.random() < 0.1
761 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)")
762 return should_respond
763
764def save_mention_to_queue(mention: Dict):
765 """Save a mention to the queue directory for async processing."""
766 try:
767 mention_id = mention.get('id')
768 if not mention_id:
769 logger.error("Mention missing ID, cannot queue")
770 return
771
772 # Check if already processed
773 processed_mentions = load_processed_mentions()
774 if mention_id in processed_mentions:
775 logger.debug(f"Mention {mention_id} already processed, skipping")
776 return
777
778 # Create queue directory
779 X_QUEUE_DIR.mkdir(exist_ok=True)
780
781 # Create filename using hash (similar to Bluesky system)
782 mention_str = json.dumps(mention, sort_keys=True)
783 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16]
784 filename = f"x_mention_{mention_hash}.json"
785
786 queue_file = X_QUEUE_DIR / filename
787
788 # Save mention data with enhanced debugging information
789 mention_data = {
790 'mention': mention,
791 'queued_at': datetime.now().isoformat(),
792 'type': 'x_mention',
793 # Debug info for conversation tracking
794 'debug_info': {
795 'mention_id': mention.get('id'),
796 'author_id': mention.get('author_id'),
797 'conversation_id': mention.get('conversation_id'),
798 'in_reply_to_user_id': mention.get('in_reply_to_user_id'),
799 'referenced_tweets': mention.get('referenced_tweets', []),
800 'text_preview': mention.get('text', '')[:200],
801 'created_at': mention.get('created_at'),
802 'public_metrics': mention.get('public_metrics', {}),
803 'context_annotations': mention.get('context_annotations', [])
804 }
805 }
806
807 with open(queue_file, 'w') as f:
808 json.dump(mention_data, f, indent=2)
809
810 logger.info(f"Queued X mention {mention_id} -> {filename}")
811
812 except Exception as e:
813 logger.error(f"Error saving mention to queue: {e}")
814
815# X Cache Functions
816def get_cached_thread_context(conversation_id: str) -> Optional[Dict]:
817 """Load cached thread context if available."""
818 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
819 if cache_file.exists():
820 try:
821 with open(cache_file, 'r') as f:
822 cached_data = json.load(f)
823 # Check if cache is recent (within 1 hour)
824 from datetime import datetime, timedelta
825 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
826 if datetime.now() - cached_time < timedelta(hours=1):
827 logger.info(f"Using cached thread context for {conversation_id}")
828 return cached_data.get('thread_data')
829 except Exception as e:
830 logger.warning(f"Error loading cached thread context: {e}")
831 return None
832
833def save_cached_thread_context(conversation_id: str, thread_data: Dict):
834 """Save thread context to cache."""
835 try:
836 X_CACHE_DIR.mkdir(exist_ok=True)
837 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
838
839 cache_data = {
840 'conversation_id': conversation_id,
841 'thread_data': thread_data,
842 'cached_at': datetime.now().isoformat()
843 }
844
845 with open(cache_file, 'w') as f:
846 json.dump(cache_data, f, indent=2)
847
848 logger.debug(f"Cached thread context for {conversation_id}")
849 except Exception as e:
850 logger.error(f"Error caching thread context: {e}")
851
852def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]:
853 """
854 Load cached individual tweets if available.
855 Returns dict mapping tweet_id -> tweet_data for found tweets.
856 """
857 cached_tweets = {}
858
859 for tweet_id in tweet_ids:
860 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json"
861 if cache_file.exists():
862 try:
863 with open(cache_file, 'r') as f:
864 cached_data = json.load(f)
865
866 # Use longer cache times for older tweets (24 hours vs 1 hour)
867 from datetime import datetime, timedelta
868 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
869 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '')
870
871 # Parse tweet creation time to determine age
872 try:
873 from dateutil.parser import parse
874 tweet_age = datetime.now() - parse(tweet_created)
875 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1)
876 except:
877 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails
878
879 if datetime.now() - cached_time < cache_duration:
880 cached_tweets[tweet_id] = cached_data.get('tweet_data')
881 logger.debug(f"Using cached tweet {tweet_id}")
882
883 except Exception as e:
884 logger.warning(f"Error loading cached tweet {tweet_id}: {e}")
885
886 return cached_tweets
887
888def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None):
889 """Save individual tweets to cache for future reuse."""
890 try:
891 X_CACHE_DIR.mkdir(exist_ok=True)
892
893 for tweet in tweets_data:
894 tweet_id = tweet.get('id')
895 if not tweet_id:
896 continue
897
898 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json"
899
900 # Include user data if available
901 tweet_with_user = tweet.copy()
902 if users_data and tweet.get('author_id') in users_data:
903 tweet_with_user['author_info'] = users_data[tweet.get('author_id')]
904
905 cache_data = {
906 'tweet_id': tweet_id,
907 'tweet_data': tweet_with_user,
908 'cached_at': datetime.now().isoformat()
909 }
910
911 with open(cache_file, 'w') as f:
912 json.dump(cache_data, f, indent=2)
913
914 logger.debug(f"Cached individual tweet {tweet_id}")
915
916 except Exception as e:
917 logger.error(f"Error caching individual tweets: {e}")
918
919def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool:
920 """
921 Determine if we have sufficient context to skip backfilling missing tweets.
922
923 Args:
924 tweets: List of tweets already in the thread
925 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch
926
927 Returns:
928 True if context is sufficient, False if backfill is needed
929 """
930 # If no missing tweets, context is sufficient
931 if not missing_tweet_ids:
932 return True
933
934 # If we have a substantial conversation (5+ tweets), likely sufficient
935 if len(tweets) >= 5:
936 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient")
937 return True
938
939 # If only a few missing tweets and we have some context, might be enough
940 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3:
941 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient")
942 return True
943
944 # Check if we have conversational flow (mentions between users)
945 has_conversation_flow = False
946 for tweet in tweets:
947 text = tweet.get('text', '').lower()
948 # Look for mentions, replies, or conversational indicators
949 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1:
950 has_conversation_flow = True
951 break
952
953 # If we have clear conversational flow and reasonable length, sufficient
954 if has_conversation_flow and len(tweets) >= 2:
955 logger.debug("Thread has conversational flow, considering sufficient")
956 return True
957
958 # Otherwise, we need to backfill
959 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow")
960 return False
961
962def fetch_and_queue_mentions(username: str) -> int:
963 """
964 Single-pass function to fetch new mentions and queue them.
965 Returns number of new mentions found.
966 """
967 try:
968 client = create_x_client()
969
970 # Load last seen ID for incremental fetching
971 last_seen_id = load_last_seen_id()
972
973 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}")
974
975 # Search for mentions
976 mentions = client.search_mentions(
977 username=username,
978 since_id=last_seen_id,
979 max_results=100 # Get as many as possible
980 )
981
982 if not mentions:
983 logger.info("No new mentions found")
984 return 0
985
986 # Process mentions (newest first, so reverse to process oldest first)
987 mentions.reverse()
988 new_count = 0
989
990 for mention in mentions:
991 save_mention_to_queue(mention)
992 new_count += 1
993
994 # Update last seen ID to the most recent mention
995 if mentions:
996 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent
997 save_last_seen_id(most_recent_id)
998
999 logger.info(f"Queued {new_count} new X mentions")
1000 return new_count
1001
1002 except Exception as e:
1003 logger.error(f"Error fetching and queuing mentions: {e}")
1004 return 0
1005
1006# Simple test function
1007def get_my_user_info():
1008 """Get the authenticated user's information to find correct user ID."""
1009 try:
1010 client = create_x_client()
1011
1012 # Use the /2/users/me endpoint to get authenticated user info
1013 endpoint = "/users/me"
1014 params = {
1015 "user.fields": "id,name,username,description"
1016 }
1017
1018 print("Fetching authenticated user information...")
1019 response = client._make_request(endpoint, params=params)
1020
1021 if response and "data" in response:
1022 user_data = response["data"]
1023 print(f"✅ Found authenticated user:")
1024 print(f" ID: {user_data.get('id')}")
1025 print(f" Username: @{user_data.get('username')}")
1026 print(f" Name: {user_data.get('name')}")
1027 print(f" Description: {user_data.get('description', 'N/A')[:100]}...")
1028 print(f"\n🔧 Update your x_config.yaml with:")
1029 print(f" user_id: \"{user_data.get('id')}\"")
1030 return user_data
1031 else:
1032 print("❌ Failed to get user information")
1033 print(f"Response: {response}")
1034 return None
1035
1036 except Exception as e:
1037 print(f"Error getting user info: {e}")
1038 return None
1039
1040def test_search_mentions():
1041 """Test the search-based mention detection."""
1042 try:
1043 client = create_x_client()
1044
1045 # First get our username
1046 user_info = client._make_request("/users/me", params={"user.fields": "username"})
1047 if not user_info or "data" not in user_info:
1048 print("❌ Could not get username")
1049 return
1050
1051 username = user_info["data"]["username"]
1052 print(f"🔍 Searching for mentions of @{username}")
1053
1054 mentions = client.search_mentions(username, max_results=5)
1055
1056 if mentions:
1057 print(f"✅ Found {len(mentions)} mentions via search:")
1058 for mention in mentions:
1059 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...")
1060 else:
1061 print("No mentions found via search")
1062
1063 except Exception as e:
1064 print(f"Search test failed: {e}")
1065
1066def test_fetch_and_queue():
1067 """Test the single-pass fetch and queue function."""
1068 try:
1069 client = create_x_client()
1070
1071 # Get our username
1072 user_info = client._make_request("/users/me", params={"user.fields": "username"})
1073 if not user_info or "data" not in user_info:
1074 print("❌ Could not get username")
1075 return
1076
1077 username = user_info["data"]["username"]
1078 print(f"🔄 Fetching and queueing mentions for @{username}")
1079
1080 # Show current state
1081 last_seen = load_last_seen_id()
1082 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}")
1083
1084 # Fetch and queue
1085 new_count = fetch_and_queue_mentions(username)
1086
1087 if new_count > 0:
1088 print(f"✅ Queued {new_count} new mentions")
1089 print(f"📁 Check ./x_queue/ directory for queued mentions")
1090
1091 # Show updated state
1092 new_last_seen = load_last_seen_id()
1093 print(f"📍 Updated last seen ID: {new_last_seen}")
1094 else:
1095 print("ℹ️ No new mentions to queue")
1096
1097 except Exception as e:
1098 print(f"Fetch and queue test failed: {e}")
1099
1100def test_thread_context():
1101 """Test thread context retrieval from a queued mention."""
1102 try:
1103 import json
1104
1105 # Find a queued mention file
1106 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1107 if not queue_files:
1108 print("❌ No queued mentions found. Run 'python x.py queue' first.")
1109 return
1110
1111 # Read the first mention
1112 mention_file = queue_files[0]
1113 with open(mention_file, 'r') as f:
1114 mention_data = json.load(f)
1115
1116 mention = mention_data['mention']
1117 print(f"📄 Using mention: {mention.get('id')}")
1118 print(f"📝 Text: {mention.get('text')}")
1119
1120 # Check if it has a conversation_id
1121 conversation_id = mention.get('conversation_id')
1122 if not conversation_id:
1123 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.")
1124 return
1125
1126 print(f"🧵 Getting thread context for conversation: {conversation_id}")
1127
1128 # Get thread context
1129 client = create_x_client()
1130 thread_data = client.get_thread_context(conversation_id)
1131
1132 if thread_data:
1133 tweets = thread_data.get('tweets', [])
1134 print(f"✅ Retrieved thread with {len(tweets)} tweets")
1135
1136 # Convert to YAML
1137 yaml_thread = thread_to_yaml_string(thread_data)
1138
1139 # Save thread context for inspection
1140 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml"
1141 with open(thread_file, 'w') as f:
1142 f.write(yaml_thread)
1143
1144 print(f"💾 Saved thread context to: {thread_file}")
1145 print("\n📋 Thread preview:")
1146 print(yaml_thread)
1147 else:
1148 print("❌ Failed to retrieve thread context")
1149
1150 except Exception as e:
1151 print(f"Thread context test failed: {e}")
1152
1153def test_letta_integration(agent_id: str = None):
1154 """Test sending X thread context to Letta agent."""
1155 try:
1156 from letta_client import Letta
1157 import json
1158 import yaml
1159
1160 # Load X config to access letta section
1161 try:
1162 x_config = load_x_config()
1163 letta_config = x_config.get('letta', {})
1164 api_key = letta_config.get('api_key')
1165 config_agent_id = letta_config.get('agent_id')
1166
1167 # Use agent_id from config if not provided as parameter
1168 if not agent_id:
1169 if config_agent_id:
1170 agent_id = config_agent_id
1171 print(f"ℹ️ Using agent_id from config: {agent_id}")
1172 else:
1173 print("❌ No agent_id found in x_config.yaml")
1174 print("Expected config structure:")
1175 print(" letta:")
1176 print(" agent_id: your-agent-id")
1177 return
1178 else:
1179 print(f"ℹ️ Using provided agent_id: {agent_id}")
1180
1181 if not api_key:
1182 # Try loading from environment as fallback
1183 import os
1184 api_key = os.getenv('LETTA_API_KEY')
1185 if not api_key:
1186 print("❌ LETTA_API_KEY not found in x_config.yaml or environment")
1187 print("Expected config structure:")
1188 print(" letta:")
1189 print(" api_key: your-letta-api-key")
1190 return
1191 else:
1192 print("ℹ️ Using LETTA_API_KEY from environment")
1193 else:
1194 print("ℹ️ Using LETTA_API_KEY from x_config.yaml")
1195
1196 except Exception as e:
1197 print(f"❌ Error loading config: {e}")
1198 return
1199
1200 letta_client = Letta(token=api_key, timeout=600)
1201 print(f"🤖 Connected to Letta, using agent: {agent_id}")
1202
1203 # Find a queued mention file
1204 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1205 if not queue_files:
1206 print("❌ No queued mentions found. Run 'python x.py queue' first.")
1207 return
1208
1209 # Read the first mention
1210 mention_file = queue_files[0]
1211 with open(mention_file, 'r') as f:
1212 mention_data = json.load(f)
1213
1214 mention = mention_data['mention']
1215 conversation_id = mention.get('conversation_id')
1216
1217 if not conversation_id:
1218 print("❌ No conversation_id found in mention.")
1219 return
1220
1221 print(f"🧵 Getting thread context for conversation: {conversation_id}")
1222
1223 # Get thread context
1224 x_client = create_x_client()
1225 thread_data = x_client.get_thread_context(conversation_id)
1226
1227 if not thread_data:
1228 print("❌ Failed to retrieve thread context")
1229 return
1230
1231 # Convert to YAML
1232 yaml_thread = thread_to_yaml_string(thread_data)
1233
1234 # Create prompt for the agent
1235 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately.
1236
1237Here is the thread context:
1238
1239{yaml_thread}
1240
1241Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona."""
1242
1243 prompt_char_count = len(prompt)
1244 print(f"📤 Sending thread context to Letta agent... | prompt: {prompt_char_count} chars")
1245
1246 # Print the prompt in a rich panel
1247 rprint(Panel(prompt, title=f"Prompt ({prompt_char_count} chars)", border_style="blue"))
1248
1249 # Send to Letta agent using streaming
1250 message_stream = letta_client.agents.messages.create_stream(
1251 agent_id=agent_id,
1252 messages=[{"role": "user", "content": prompt}],
1253 stream_tokens=False,
1254 max_steps=10
1255 )
1256
1257 print("🔄 Streaming response from agent...")
1258 response_text = ""
1259
1260 for chunk in message_stream:
1261 print(chunk)
1262 if hasattr(chunk, 'message_type'):
1263 if chunk.message_type == 'assistant_message':
1264 print(f"🤖 Agent response: {chunk.content}")
1265 response_text = chunk.content
1266 elif chunk.message_type == 'reasoning_message':
1267 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...")
1268 elif chunk.message_type == 'tool_call_message':
1269 print(f"🔧 Agent tool call: {chunk.tool_call.name}")
1270
1271 if response_text:
1272 print(f"\n✅ Agent generated response:")
1273 print(f"📝 Response: {response_text}")
1274 else:
1275 print("❌ No response generated by agent")
1276
1277 except Exception as e:
1278 print(f"Letta integration test failed: {e}")
1279 import traceback
1280 traceback.print_exc()
1281
1282def test_x_client():
1283 """Test the X client by fetching mentions."""
1284 try:
1285 client = create_x_client()
1286 mentions = client.get_mentions(max_results=5)
1287
1288 if mentions:
1289 print(f"Successfully retrieved {len(mentions)} mentions:")
1290 for mention in mentions:
1291 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...")
1292 else:
1293 print("No mentions retrieved")
1294
1295 except Exception as e:
1296 print(f"Test failed: {e}")
1297
1298def reply_to_cameron_post():
1299 """
1300 Reply to Cameron's specific X post.
1301
1302 NOTE: This requires OAuth User Context authentication, not Bearer token.
1303 Current Bearer token is Application-Only which can't post.
1304 """
1305 try:
1306 client = create_x_client()
1307
1308 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618
1309 cameron_post_id = "1950690566909710618"
1310
1311 # Simple reply message
1312 reply_text = "Hello from void! 🤖 Testing X integration."
1313
1314 print(f"Attempting to reply to post {cameron_post_id}")
1315 print(f"Reply text: {reply_text}")
1316 print("\nNOTE: This will fail with current Bearer token (Application-Only)")
1317 print("Posting requires OAuth User Context authentication")
1318
1319 result = client.post_reply(reply_text, cameron_post_id)
1320
1321 if result:
1322 print(f"✅ Successfully posted reply!")
1323 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}")
1324 else:
1325 print("❌ Failed to post reply (expected with current auth)")
1326
1327 except Exception as e:
1328 print(f"Reply failed: {e}")
1329
1330def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False):
1331 """
1332 Process an X mention and generate a reply using the Letta agent.
1333 Similar to bsky.py process_mention but for X/Twitter.
1334
1335 Args:
1336 void_agent: The Letta agent instance
1337 x_client: The X API client
1338 mention_data: The mention data dictionary
1339 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
1340 testing_mode: If True, don't actually post to X
1341
1342 Returns:
1343 True: Successfully processed, remove from queue
1344 False: Failed but retryable, keep in queue
1345 None: Failed with non-retryable error, move to errors directory
1346 "no_reply": No reply was generated, move to no_reply directory
1347 """
1348 try:
1349 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}")
1350
1351 # Extract mention details
1352 if isinstance(mention_data, dict):
1353 # Handle both raw mention and queued mention formats
1354 if 'mention' in mention_data:
1355 mention = mention_data['mention']
1356 else:
1357 mention = mention_data
1358 else:
1359 mention = mention_data
1360
1361 mention_id = mention.get('id')
1362 mention_text = mention.get('text', '')
1363 author_id = mention.get('author_id')
1364 conversation_id = mention.get('conversation_id')
1365 in_reply_to_user_id = mention.get('in_reply_to_user_id')
1366 referenced_tweets = mention.get('referenced_tweets', [])
1367
1368 # Check downrank list - only respond to downranked users 10% of the time
1369 downrank_users = load_downrank_users()
1370 if not should_respond_to_downranked_user(str(author_id), downrank_users):
1371 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection")
1372 return "no_reply"
1373
1374 # Enhanced conversation tracking for debug - especially important for Grok handling
1375 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}")
1376 logger.info(f" Author ID: {author_id}")
1377 logger.info(f" Conversation ID: {conversation_id}")
1378 logger.info(f" In Reply To User ID: {in_reply_to_user_id}")
1379 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items")
1380 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets
1381 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}")
1382 logger.info(f" Text preview: {mention_text[:100]}...")
1383
1384 if not conversation_id:
1385 logger.warning(f"❌ No conversation_id found for mention {mention_id} - this may cause thread context issues")
1386 return None
1387
1388 # Get thread context with caching enabled for efficiency
1389 # Use mention_id as until_id to exclude tweets that occurred after this mention
1390 try:
1391 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id)
1392 if not thread_data:
1393 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}")
1394 return False
1395
1396 # If this mention references a specific tweet, ensure we have that tweet in context
1397 if referenced_tweets:
1398 for ref in referenced_tweets:
1399 if ref.get('type') == 'replied_to':
1400 ref_id = ref.get('id')
1401 # Check if the referenced tweet is in our thread data
1402 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])]
1403 if ref_id and ref_id not in thread_tweet_ids:
1404 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch")
1405 try:
1406 # Fetch the missing referenced tweet directly
1407 endpoint = f"/tweets/{ref_id}"
1408 params = {
1409 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
1410 "user.fields": "id,name,username",
1411 "expansions": "author_id"
1412 }
1413 response = x_client._make_request(endpoint, params)
1414 if response and "data" in response:
1415 missing_tweet = response["data"]
1416 if missing_tweet.get('conversation_id') == conversation_id:
1417 # Add to thread data
1418 if 'tweets' not in thread_data:
1419 thread_data['tweets'] = []
1420 thread_data['tweets'].append(missing_tweet)
1421
1422 # Add user data if available
1423 if "includes" in response and "users" in response["includes"]:
1424 if 'users' not in thread_data:
1425 thread_data['users'] = {}
1426 for user in response["includes"]["users"]:
1427 thread_data['users'][user["id"]] = user
1428
1429 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context")
1430 else:
1431 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}")
1432 except Exception as e:
1433 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}")
1434
1435 # Enhanced thread context debugging
1436 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}")
1437 thread_posts = thread_data.get('tweets', [])
1438 thread_users = thread_data.get('users', {})
1439 logger.info(f" Posts in thread: {len(thread_posts)}")
1440 logger.info(f" Users in thread: {len(thread_users)}")
1441
1442 # Log thread participants for Grok detection
1443 for user_id, user_info in thread_users.items():
1444 username = user_info.get('username', 'unknown')
1445 name = user_info.get('name', 'Unknown')
1446 is_verified = user_info.get('verified', False)
1447 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}")
1448
1449 # Special logging for Grok or AI-related users
1450 if 'grok' in username.lower() or 'grok' in name.lower():
1451 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})")
1452
1453 # Log conversation structure
1454 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts
1455 post_id = post.get('id')
1456 post_author = post.get('author_id')
1457 post_text = post.get('text', '')[:50]
1458 is_reply = 'in_reply_to_user_id' in post
1459 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...")
1460
1461 except Exception as e:
1462 logger.error(f"❌ Error getting thread context: {e}")
1463 return False
1464
1465 # Convert to YAML string
1466 thread_context = thread_to_yaml_string(thread_data)
1467 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters")
1468
1469 # Save comprehensive conversation data for debugging
1470 try:
1471 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1472 debug_dir.mkdir(parents=True, exist_ok=True)
1473
1474 # Save raw thread data (JSON)
1475 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f:
1476 json.dump(thread_data, f, indent=2)
1477
1478 # Save YAML thread context
1479 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f:
1480 f.write(thread_context)
1481
1482 # Save mention processing debug info
1483 debug_info = {
1484 'processed_at': datetime.now().isoformat(),
1485 'mention_id': mention_id,
1486 'conversation_id': conversation_id,
1487 'author_id': author_id,
1488 'in_reply_to_user_id': in_reply_to_user_id,
1489 'referenced_tweets': referenced_tweets,
1490 'thread_stats': {
1491 'total_posts': len(thread_posts),
1492 'total_users': len(thread_users),
1493 'yaml_length': len(thread_context)
1494 },
1495 'users_in_conversation': {
1496 user_id: {
1497 'username': user_info.get('username'),
1498 'name': user_info.get('name'),
1499 'verified': user_info.get('verified', False),
1500 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower()
1501 }
1502 for user_id, user_info in thread_users.items()
1503 }
1504 }
1505
1506 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f:
1507 json.dump(debug_info, f, indent=2)
1508
1509 logger.info(f"💾 Saved conversation debug data to: {debug_dir}")
1510
1511 except Exception as debug_error:
1512 logger.warning(f"Failed to save debug data: {debug_error}")
1513 # Continue processing even if debug save fails
1514
1515 # Check for #voidstop
1516 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower():
1517 logger.info("Found #voidstop, skipping this mention")
1518 return True
1519
1520 # Ensure X user blocks are attached
1521 try:
1522 ensure_x_user_blocks_attached(thread_data, void_agent.id)
1523 except Exception as e:
1524 logger.warning(f"Failed to ensure X user blocks: {e}")
1525 # Continue without user blocks rather than failing completely
1526
1527 # Create prompt for Letta agent
1528 author_info = thread_data.get('users', {}).get(author_id, {})
1529 author_username = author_info.get('username', 'unknown')
1530 author_name = author_info.get('name', author_username)
1531
1532 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}).
1533
1534MOST RECENT POST (the mention you're responding to):
1535"{mention_text}"
1536
1537FULL THREAD CONTEXT:
1538```yaml
1539{thread_context}
1540```
1541
1542The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
1543
1544If you need to update user information, use the x_user_* tools.
1545
1546To reply, use the add_post_to_x_thread tool:
1547- Each call creates one post (max 280 characters)
1548- For most responses, a single call is sufficient
1549- Only use multiple calls for threaded replies when:
1550 * The topic requires extended explanation that cannot fit in 280 characters
1551 * You're explicitly asked for a detailed/long response
1552 * The conversation naturally benefits from a structured multi-part answer
1553- Avoid unnecessary threads - be concise when possible"""
1554
1555 # Log mention processing
1556 title = f"X MENTION FROM @{author_username}"
1557 print(f"\n▶ {title}")
1558 print(f" {'═' * len(title)}")
1559 for line in mention_text.split('\n'):
1560 print(f" {line}")
1561
1562 # Send to Letta agent
1563 from letta_client import Letta
1564
1565 config = get_x_letta_config()
1566 letta_client = Letta(token=config['api_key'], timeout=config['timeout'])
1567
1568 prompt_char_count = len(prompt)
1569 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars | prompt: {prompt_char_count} chars")
1570
1571 try:
1572 # Use streaming to avoid timeout errors
1573 message_stream = letta_client.agents.messages.create_stream(
1574 agent_id=void_agent.id,
1575 messages=[{"role": "user", "content": prompt}],
1576 stream_tokens=False,
1577 max_steps=100
1578 )
1579
1580 # Collect streaming response (simplified version of bsky.py logic)
1581 all_messages = []
1582 for chunk in message_stream:
1583 if hasattr(chunk, 'message_type'):
1584 if chunk.message_type == 'reasoning_message':
1585 print("\n◆ Reasoning")
1586 print(" ─────────")
1587 for line in chunk.reasoning.split('\n'):
1588 print(f" {line}")
1589 elif chunk.message_type == 'tool_call_message':
1590 tool_name = chunk.tool_call.name
1591 if tool_name == 'add_post_to_x_thread':
1592 try:
1593 args = json.loads(chunk.tool_call.arguments)
1594 text = args.get('text', '')
1595 if text:
1596 print("\n✎ X Post")
1597 print(" ────────")
1598 for line in text.split('\n'):
1599 print(f" {line}")
1600 except:
1601 pass
1602 elif tool_name == 'halt_activity':
1603 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT")
1604 if queue_filepath and queue_filepath.exists():
1605 queue_filepath.unlink()
1606 logger.info(f"Deleted queue file: {queue_filepath.name}")
1607 logger.info("=== X BOT TERMINATED BY AGENT ===")
1608 exit(0)
1609 elif chunk.message_type == 'tool_return_message':
1610 tool_name = chunk.name
1611 status = chunk.status
1612 if status == 'success' and tool_name == 'add_post_to_x_thread':
1613 print("\n✓ X Post Queued")
1614 print(" ──────────────")
1615 print(" Post queued successfully")
1616 elif chunk.message_type == 'assistant_message':
1617 print("\n▶ Assistant Response")
1618 print(" ──────────────────")
1619 for line in chunk.content.split('\n'):
1620 print(f" {line}")
1621
1622 all_messages.append(chunk)
1623 if str(chunk) == 'done':
1624 break
1625
1626 # Convert streaming response for compatibility
1627 message_response = type('StreamingResponse', (), {
1628 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
1629 })()
1630
1631 except Exception as api_error:
1632 logger.error(f"Letta API error: {api_error}")
1633 raise
1634
1635 # Extract successful add_post_to_x_thread tool calls
1636 reply_candidates = []
1637 tool_call_results = {}
1638 ignored_notification = False
1639 ack_note = None # Track any note from annotate_ack tool
1640
1641 # First pass: collect tool return statuses
1642 for message in message_response.messages:
1643 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
1644 if message.name == 'add_post_to_x_thread':
1645 tool_call_results[message.tool_call_id] = message.status
1646 elif message.name == 'ignore_notification':
1647 if message.status == 'success':
1648 ignored_notification = True
1649 logger.info("🚫 X notification ignored")
1650
1651 # Second pass: collect successful tool calls
1652 for message in message_response.messages:
1653 if hasattr(message, 'tool_call') and message.tool_call:
1654 # Collect annotate_ack tool calls
1655 if message.tool_call.name == 'annotate_ack':
1656 try:
1657 args = json.loads(message.tool_call.arguments)
1658 note = args.get('note', '')
1659 if note:
1660 ack_note = note
1661 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
1662 except json.JSONDecodeError as e:
1663 logger.error(f"Failed to parse annotate_ack arguments: {e}")
1664
1665 # Collect add_post_to_x_thread tool calls - only if they were successful
1666 elif message.tool_call.name == 'add_post_to_x_thread':
1667 tool_call_id = message.tool_call.tool_call_id
1668 tool_status = tool_call_results.get(tool_call_id, 'unknown')
1669
1670 if tool_status == 'success':
1671 try:
1672 args = json.loads(message.tool_call.arguments)
1673 reply_text = args.get('text', '')
1674 if reply_text:
1675 reply_candidates.append(reply_text)
1676 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...")
1677 except json.JSONDecodeError as e:
1678 logger.error(f"Failed to parse tool call arguments: {e}")
1679
1680 # Save agent response data to debug folder
1681 try:
1682 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1683
1684 # Save complete agent interaction
1685 agent_response_data = {
1686 'processed_at': datetime.now().isoformat(),
1687 'mention_id': mention_id,
1688 'conversation_id': conversation_id,
1689 'prompt_sent': prompt,
1690 'reply_candidates': reply_candidates,
1691 'ignored_notification': ignored_notification,
1692 'ack_note': ack_note,
1693 'tool_call_results': tool_call_results,
1694 'all_messages': []
1695 }
1696
1697 # Convert messages to serializable format
1698 for message in message_response.messages:
1699 msg_data = {
1700 'message_type': getattr(message, 'message_type', 'unknown'),
1701 'content': getattr(message, 'content', ''),
1702 'reasoning': getattr(message, 'reasoning', ''),
1703 'status': getattr(message, 'status', ''),
1704 'name': getattr(message, 'name', ''),
1705 }
1706
1707 if hasattr(message, 'tool_call') and message.tool_call:
1708 msg_data['tool_call'] = {
1709 'name': message.tool_call.name,
1710 'arguments': message.tool_call.arguments,
1711 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '')
1712 }
1713
1714 agent_response_data['all_messages'].append(msg_data)
1715
1716 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f:
1717 json.dump(agent_response_data, f, indent=2)
1718
1719 logger.info(f"💾 Saved agent response debug data")
1720
1721 except Exception as debug_error:
1722 logger.warning(f"Failed to save agent response debug data: {debug_error}")
1723
1724 # Handle conflicts
1725 if reply_candidates and ignored_notification:
1726 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!")
1727 return False
1728
1729 if reply_candidates:
1730 # Post replies to X
1731 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X")
1732
1733 if len(reply_candidates) == 1:
1734 content = reply_candidates[0]
1735 title = f"Reply to @{author_username}"
1736 else:
1737 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)])
1738 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)"
1739
1740 print(f"\n✎ {title}")
1741 print(f" {'─' * len(title)}")
1742 for line in content.split('\n'):
1743 print(f" {line}")
1744
1745 if testing_mode:
1746 logger.info("TESTING MODE: Skipping actual X post")
1747 return True
1748 else:
1749 # Post to X using thread approach
1750 success = post_x_thread_replies(x_client, mention_id, reply_candidates)
1751 if success:
1752 logger.info(f"Successfully replied to @{author_username} on X")
1753
1754 # Acknowledge the post we're replying to
1755 try:
1756 ack_result = acknowledge_x_post(x_client, mention_id, ack_note)
1757 if ack_result:
1758 if ack_note:
1759 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")")
1760 else:
1761 logger.info(f"Successfully acknowledged X post from @{author_username}")
1762 else:
1763 logger.warning(f"Failed to acknowledge X post from @{author_username}")
1764 except Exception as e:
1765 logger.error(f"Error acknowledging X post from @{author_username}: {e}")
1766 # Don't fail the entire operation if acknowledgment fails
1767
1768 return True
1769 else:
1770 logger.error(f"Failed to send reply to @{author_username} on X")
1771 return False
1772 else:
1773 if ignored_notification:
1774 logger.info(f"X mention from @{author_username} was explicitly ignored")
1775 return "ignored"
1776 else:
1777 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass")
1778 return False # Keep in queue for retry instead of removing
1779
1780 except Exception as e:
1781 logger.error(f"Error processing X mention: {e}")
1782 return False
1783
1784def acknowledge_x_post(x_client, post_id, note=None):
1785 """
1786 Acknowledge an X post that we replied to.
1787 Uses the same Bluesky client and uploads to the void data repository on atproto,
1788 just like Bluesky acknowledgments.
1789
1790 Args:
1791 x_client: XClient instance (not used, kept for compatibility)
1792 post_id: The X post ID we're acknowledging
1793 note: Optional note to include with the acknowledgment
1794
1795 Returns:
1796 True if successful, False otherwise
1797 """
1798 try:
1799 # Use Bluesky client to upload acks to the void data repository on atproto
1800 bsky_client = bsky_utils.default_login()
1801
1802 # Create a synthetic URI and CID for the X post
1803 # X posts don't have atproto URIs/CIDs, so we create identifiers
1804 post_uri = f"x://twitter.com/post/{post_id}"
1805 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts
1806
1807 # Use the same acknowledge_post function as Bluesky
1808 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note)
1809
1810 if ack_result:
1811 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else ""))
1812 return True
1813 else:
1814 logger.error(f"Failed to acknowledge X post {post_id}")
1815 return False
1816
1817 except Exception as e:
1818 logger.error(f"Error acknowledging X post {post_id}: {e}")
1819 return False
1820
1821def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages):
1822 """
1823 Post a series of replies to X, threading them properly.
1824
1825 Args:
1826 x_client: XClient instance
1827 in_reply_to_tweet_id: The original tweet ID to reply to
1828 reply_messages: List of reply text strings
1829
1830 Returns:
1831 True if successful, False otherwise
1832 """
1833 try:
1834 current_reply_id = in_reply_to_tweet_id
1835
1836 for i, reply_text in enumerate(reply_messages):
1837 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...")
1838
1839 result = x_client.post_reply(reply_text, current_reply_id)
1840
1841 if result and 'data' in result:
1842 new_tweet_id = result['data']['id']
1843 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}")
1844 # For threading, the next reply should reply to this one
1845 current_reply_id = new_tweet_id
1846 else:
1847 logger.error(f"Failed to post X reply {i+1}")
1848 return False
1849
1850 return True
1851
1852 except Exception as e:
1853 logger.error(f"Error posting X thread replies: {e}")
1854 return False
1855
1856def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False):
1857 """
1858 Load and process all X mentions from the queue.
1859 Similar to bsky.py load_and_process_queued_notifications but for X.
1860 """
1861 try:
1862 # Get all X mention files in queue directory
1863 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1864
1865 if not queue_files:
1866 return
1867
1868 # Load file metadata and sort by creation time (chronological order)
1869 file_metadata = []
1870 for filepath in queue_files:
1871 try:
1872 with open(filepath, 'r') as f:
1873 queue_data = json.load(f)
1874 mention_data = queue_data.get('mention', queue_data)
1875 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing
1876 file_metadata.append((created_at, filepath))
1877 except Exception as e:
1878 logger.warning(f"Error reading queue file {filepath.name}: {e}")
1879 # Add with default timestamp so it still gets processed
1880 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath))
1881
1882 # Sort by creation time (oldest first)
1883 file_metadata.sort(key=lambda x: x[0])
1884
1885 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order")
1886
1887 for i, (created_at, filepath) in enumerate(file_metadata, 1):
1888 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})")
1889
1890 try:
1891 # Load mention data
1892 with open(filepath, 'r') as f:
1893 queue_data = json.load(f)
1894
1895 mention_data = queue_data.get('mention', queue_data)
1896
1897 # Process the mention
1898 success = process_x_mention(void_agent, x_client, mention_data,
1899 queue_filepath=filepath, testing_mode=testing_mode)
1900
1901 except XRateLimitError:
1902 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning")
1903 break
1904
1905 except Exception as e:
1906 logger.error(f"Error processing X queue file {filepath.name}: {e}")
1907 continue
1908
1909 # Handle file based on processing result
1910 if success:
1911 if testing_mode:
1912 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}")
1913 else:
1914 filepath.unlink()
1915 logger.info(f"Successfully processed and removed X file: {filepath.name}")
1916
1917 # Mark as processed
1918 processed_mentions = load_processed_mentions()
1919 processed_mentions.add(mention_data.get('id'))
1920 save_processed_mentions(processed_mentions)
1921
1922 elif success is None: # Move to error directory
1923 error_dir = X_QUEUE_DIR / "errors"
1924 error_dir.mkdir(exist_ok=True)
1925 error_path = error_dir / filepath.name
1926 filepath.rename(error_path)
1927 logger.warning(f"Moved X file {filepath.name} to errors directory")
1928
1929 elif success == "no_reply": # Move to no_reply directory
1930 no_reply_dir = X_QUEUE_DIR / "no_reply"
1931 no_reply_dir.mkdir(exist_ok=True)
1932 no_reply_path = no_reply_dir / filepath.name
1933 filepath.rename(no_reply_path)
1934 logger.info(f"Moved X file {filepath.name} to no_reply directory")
1935
1936 elif success == "ignored": # Delete ignored notifications
1937 filepath.unlink()
1938 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}")
1939
1940 else:
1941 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry")
1942
1943 except Exception as e:
1944 logger.error(f"Error loading queued X mentions: {e}")
1945
1946def process_x_notifications(void_agent, x_client, testing_mode=False):
1947 """
1948 Fetch new X mentions, queue them, and process the queue.
1949 Similar to bsky.py process_notifications but for X.
1950 """
1951 try:
1952 # Get username for fetching mentions
1953 user_info = x_client._make_request("/users/me", params={"user.fields": "username"})
1954 if not user_info or "data" not in user_info:
1955 logger.error("Could not get username for X mentions")
1956 return
1957
1958 username = user_info["data"]["username"]
1959
1960 # Fetch and queue new mentions
1961 new_count = fetch_and_queue_mentions(username)
1962
1963 if new_count > 0:
1964 logger.info(f"Found {new_count} new X mentions to process")
1965
1966 # Process the entire queue
1967 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
1968
1969 except Exception as e:
1970 logger.error(f"Error processing X notifications: {e}")
1971
1972def periodic_user_block_cleanup(client, agent_id: str) -> None:
1973 """
1974 Detach all user blocks from the agent to prevent memory bloat.
1975 This should be called periodically to ensure clean state.
1976 """
1977 try:
1978 # Get all blocks attached to the agent
1979 attached_blocks = client.agents.blocks.list(agent_id=agent_id)
1980
1981 user_blocks_to_detach = []
1982 for block in attached_blocks:
1983 if hasattr(block, 'label') and block.label.startswith('user_'):
1984 user_blocks_to_detach.append({
1985 'label': block.label,
1986 'id': block.id
1987 })
1988
1989 if not user_blocks_to_detach:
1990 logger.debug("No user blocks found to detach during periodic cleanup")
1991 return
1992
1993 logger.info(f"Found {len(user_blocks_to_detach)} user blocks to detach")
1994
1995 # Detach each user block
1996 for block in user_blocks_to_detach:
1997 try:
1998 client.agents.blocks.detach(
1999 agent_id=agent_id,
2000 block_id=block['id']
2001 )
2002 logger.debug(f"Detached user block: {block['label']}")
2003 except Exception as e:
2004 logger.error(f"Failed to detach user block {block['label']}: {e}")
2005
2006 logger.info(f"Periodic cleanup complete: detached {len(user_blocks_to_detach)} user blocks")
2007
2008 except Exception as e:
2009 logger.error(f"Error during periodic user block cleanup: {e}")
2010
2011def initialize_x_void():
2012 """Initialize the void agent for X operations."""
2013 logger.info("Starting void agent initialization for X...")
2014
2015 from letta_client import Letta
2016
2017 # Get config
2018 config = get_x_letta_config()
2019 client = Letta(token=config['api_key'], timeout=config['timeout'])
2020 agent_id = config['agent_id']
2021
2022 try:
2023 void_agent = client.agents.retrieve(agent_id=agent_id)
2024 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})")
2025 except Exception as e:
2026 logger.error(f"Failed to load void agent {agent_id}: {e}")
2027 raise e
2028
2029 # Clean up all user blocks at startup
2030 logger.info("🧹 Cleaning up user blocks at X startup...")
2031 periodic_user_block_cleanup(client, agent_id)
2032
2033 # Ensure correct tools are attached for X
2034 logger.info("Configuring tools for X platform...")
2035 try:
2036 from tool_manager import ensure_platform_tools
2037 ensure_platform_tools('x', void_agent.id)
2038 except Exception as e:
2039 logger.error(f"Failed to configure platform tools: {e}")
2040 logger.warning("Continuing with existing tool configuration")
2041
2042 # Log agent details
2043 logger.info(f"X Void agent details - ID: {void_agent.id}")
2044 logger.info(f"Agent name: {void_agent.name}")
2045
2046 return void_agent
2047
2048def x_main_loop(testing_mode=False, cleanup_interval=10):
2049 """
2050 Main X bot loop that continuously monitors for mentions and processes them.
2051 Similar to bsky.py main() but for X/Twitter.
2052
2053 Args:
2054 testing_mode: If True, don't actually post to X
2055 cleanup_interval: Run user block cleanup every N cycles (0 to disable)
2056 """
2057 import time
2058 from time import sleep
2059 from letta_client import Letta
2060
2061 logger.info("=== STARTING X VOID BOT ===")
2062
2063 # Initialize void agent
2064 void_agent = initialize_x_void()
2065 logger.info(f"X void agent initialized: {void_agent.id}")
2066
2067 # Initialize X client
2068 x_client = create_x_client()
2069 logger.info("Connected to X API")
2070
2071 # Get Letta client for periodic cleanup
2072 config = get_x_letta_config()
2073 letta_client = Letta(token=config['api_key'], timeout=config['timeout'])
2074
2075 # Main loop
2076 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls)
2077 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds")
2078
2079 if testing_mode:
2080 logger.info("=== RUNNING IN X TESTING MODE ===")
2081 logger.info(" - No messages will be sent to X")
2082 logger.info(" - Queue files will not be deleted")
2083
2084 if cleanup_interval > 0:
2085 logger.info(f"User block cleanup enabled every {cleanup_interval} cycles")
2086 else:
2087 logger.info("User block cleanup disabled")
2088
2089 cycle_count = 0
2090 start_time = time.time()
2091
2092 while True:
2093 try:
2094 cycle_count += 1
2095 logger.info(f"=== X CYCLE {cycle_count} ===")
2096
2097 # Process X notifications (fetch, queue, and process)
2098 process_x_notifications(void_agent, x_client, testing_mode)
2099
2100 # Run periodic cleanup every N cycles
2101 if cleanup_interval > 0 and cycle_count % cleanup_interval == 0:
2102 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})")
2103 periodic_user_block_cleanup(letta_client, void_agent.id)
2104
2105 # Log cycle completion
2106 elapsed_time = time.time() - start_time
2107 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes")
2108
2109 sleep(FETCH_DELAY_SEC)
2110
2111 except KeyboardInterrupt:
2112 elapsed_time = time.time() - start_time
2113 logger.info("=== X BOT STOPPED BY USER ===")
2114 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes")
2115 break
2116 except Exception as e:
2117 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===")
2118 logger.error(f"Error details: {e}")
2119 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...")
2120 sleep(FETCH_DELAY_SEC * 2)
2121
2122def process_queue_only(testing_mode=False):
2123 """
2124 Process all queued X mentions without fetching new ones.
2125 Useful for rate limit management - queue first, then process separately.
2126
2127 Args:
2128 testing_mode: If True, don't actually post to X and keep queue files
2129 """
2130 logger.info("=== PROCESSING X QUEUE ONLY ===")
2131
2132 if testing_mode:
2133 logger.info("=== RUNNING IN X TESTING MODE ===")
2134 logger.info(" - No messages will be sent to X")
2135 logger.info(" - Queue files will not be deleted")
2136
2137 try:
2138 # Initialize void agent
2139 void_agent = initialize_x_void()
2140 logger.info(f"X void agent initialized: {void_agent.id}")
2141
2142 # Initialize X client
2143 x_client = create_x_client()
2144 logger.info("Connected to X API")
2145
2146 # Process the queue without fetching new mentions
2147 logger.info("Processing existing X queue...")
2148 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
2149
2150 logger.info("=== X QUEUE PROCESSING COMPLETE ===")
2151
2152 except Exception as e:
2153 logger.error(f"Error processing X queue: {e}")
2154 raise
2155
2156def x_notification_loop():
2157 """
2158 DEPRECATED: Old X notification loop using search-based mention detection.
2159 Use x_main_loop() instead for the full bot experience.
2160 """
2161 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.")
2162 x_main_loop()
2163
2164if __name__ == "__main__":
2165 import sys
2166 import argparse
2167
2168 if len(sys.argv) > 1:
2169 if sys.argv[1] == "bot":
2170 # Main bot with optional --test flag and cleanup interval
2171 parser = argparse.ArgumentParser(description='X Void Bot')
2172 parser.add_argument('command', choices=['bot'])
2173 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)')
2174 parser.add_argument('--cleanup-interval', type=int, default=10,
2175 help='Run user block cleanup every N cycles (default: 10, 0 to disable)')
2176 args = parser.parse_args()
2177 x_main_loop(testing_mode=args.test, cleanup_interval=args.cleanup_interval)
2178 elif sys.argv[1] == "loop":
2179 x_notification_loop()
2180 elif sys.argv[1] == "reply":
2181 reply_to_cameron_post()
2182 elif sys.argv[1] == "me":
2183 get_my_user_info()
2184 elif sys.argv[1] == "search":
2185 test_search_mentions()
2186 elif sys.argv[1] == "queue":
2187 test_fetch_and_queue()
2188 elif sys.argv[1] == "thread":
2189 test_thread_context()
2190 elif sys.argv[1] == "process":
2191 # Process all queued mentions with optional --test flag
2192 testing_mode = "--test" in sys.argv
2193 process_queue_only(testing_mode=testing_mode)
2194 elif sys.argv[1] == "letta":
2195 # Use specific agent ID if provided, otherwise use from config
2196 agent_id = sys.argv[2] if len(sys.argv) > 2 else None
2197 test_letta_integration(agent_id)
2198 elif sys.argv[1] == "downrank":
2199 # View or manage downrank list
2200 if len(sys.argv) > 2 and sys.argv[2] == "list":
2201 downrank_users = load_downrank_users()
2202 if downrank_users:
2203 print(f"📋 Downrank users ({len(downrank_users)} total):")
2204 for user_id in sorted(downrank_users):
2205 print(f" - {user_id}")
2206 else:
2207 print("📋 No downrank users configured")
2208 else:
2209 print("Usage: python x.py downrank list")
2210 print(" list - Show all downranked user IDs")
2211 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list")
2212 else:
2213 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]")
2214 print(" bot - Run the main X bot (use --test for testing mode)")
2215 print(" Example: python x.py bot --test")
2216 print(" queue - Fetch and queue mentions only (no processing)")
2217 print(" process - Process all queued mentions only (no fetching)")
2218 print(" Example: python x.py process --test")
2219 print(" downrank - Manage downrank users (10% response rate)")
2220 print(" Example: python x.py downrank list")
2221 print(" loop - Run the old notification monitoring loop (deprecated)")
2222 print(" reply - Reply to Cameron's specific post")
2223 print(" me - Get authenticated user info and correct user ID")
2224 print(" search - Test search-based mention detection")
2225 print(" thread - Test thread context retrieval from queued mention")
2226 print(" letta - Test sending thread context to Letta agent")
2227 print(" Optional: python x.py letta <agent-id>")
2228 else:
2229 test_x_client()