a digital person for bluesky
at 31ce00b21b108cd98a2a2ddc423ba5e0aa8796d1 2229 lines 96 kB view raw
1import os 2import logging 3import requests 4import yaml 5import json 6import hashlib 7import random 8import time 9from typing import Optional, Dict, Any, List, Set 10from datetime import datetime 11from pathlib import Path 12from requests_oauthlib import OAuth1 13from rich import print as rprint 14from rich.panel import Panel 15from rich.text import Text 16 17import bsky_utils 18 19class XRateLimitError(Exception): 20 """Exception raised when X API rate limit is exceeded""" 21 pass 22 23 24# Configure logging 25logging.basicConfig( 26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 27) 28logger = logging.getLogger("x_client") 29 30# X-specific file paths 31X_QUEUE_DIR = Path("x_queue") 32X_CACHE_DIR = Path("x_cache") 33X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json") 34X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json") 35X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt") 36 37class XClient: 38 """X (Twitter) API client for fetching mentions and managing interactions.""" 39 40 def __init__(self, api_key: str, user_id: str, access_token: str = None, 41 consumer_key: str = None, consumer_secret: str = None, 42 access_token_secret: str = None): 43 self.api_key = api_key 44 self.access_token = access_token 45 self.user_id = user_id 46 self.base_url = "https://api.x.com/2" 47 48 # Check if we have OAuth 1.0a credentials 49 if (consumer_key and consumer_secret and access_token and access_token_secret): 50 # Use OAuth 1.0a for User Context 51 self.oauth = OAuth1( 52 consumer_key, 53 client_secret=consumer_secret, 54 resource_owner_key=access_token, 55 resource_owner_secret=access_token_secret 56 ) 57 self.headers = {"Content-Type": "application/json"} 58 self.auth_method = "oauth1a" 59 logger.info("Using OAuth 1.0a User Context authentication for X API") 60 elif access_token: 61 # Use OAuth 2.0 Bearer token for User Context 62 self.oauth = None 63 self.headers = { 64 "Authorization": f"Bearer {access_token}", 65 "Content-Type": "application/json" 66 } 67 self.auth_method = "oauth2_user" 68 logger.info("Using OAuth 2.0 User Context access token for X API") 69 else: 70 # Use Application-Only Bearer token 71 self.oauth = None 72 self.headers = { 73 "Authorization": f"Bearer {api_key}", 74 "Content-Type": "application/json" 75 } 76 self.auth_method = "bearer" 77 logger.info("Using Application-Only Bearer token for X API") 78 79 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]: 80 """Make a request to the X API with proper error handling and exponential backoff.""" 81 url = f"{self.base_url}{endpoint}" 82 83 for attempt in range(max_retries): 84 try: 85 if method.upper() == "GET": 86 if self.oauth: 87 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth) 88 else: 89 response = requests.get(url, headers=self.headers, params=params) 90 elif method.upper() == "POST": 91 if self.oauth: 92 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth) 93 else: 94 response = requests.post(url, headers=self.headers, json=data) 95 else: 96 raise ValueError(f"Unsupported HTTP method: {method}") 97 98 response.raise_for_status() 99 return response.json() 100 101 except requests.exceptions.HTTPError as e: 102 if response.status_code == 401: 103 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials") 104 logger.error(f"Response: {response.text}") 105 return None # Don't retry auth failures 106 elif response.status_code == 403: 107 logger.error(f"X API forbidden with {self.auth_method} - check app permissions") 108 logger.error(f"Response: {response.text}") 109 return None # Don't retry permission failures 110 elif response.status_code == 429: 111 if attempt < max_retries - 1: 112 # Exponential backoff: 60s, 120s, 240s 113 backoff_time = 60 * (2 ** attempt) 114 logger.warning(f"X API rate limit exceeded (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry") 115 logger.error(f"Response: {response.text}") 116 time.sleep(backoff_time) 117 continue 118 else: 119 logger.error("X API rate limit exceeded - max retries reached") 120 logger.error(f"Response: {response.text}") 121 raise XRateLimitError("X API rate limit exceeded") 122 else: 123 if attempt < max_retries - 1: 124 # Exponential backoff for other HTTP errors too 125 backoff_time = 30 * (2 ** attempt) 126 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 127 logger.error(f"Response: {response.text}") 128 time.sleep(backoff_time) 129 continue 130 else: 131 logger.error(f"X API request failed after {max_retries} attempts: {e}") 132 logger.error(f"Response: {response.text}") 133 return None 134 135 except Exception as e: 136 if attempt < max_retries - 1: 137 backoff_time = 15 * (2 ** attempt) 138 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 139 time.sleep(backoff_time) 140 continue 141 else: 142 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}") 143 return None 144 145 return None 146 147 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]: 148 """ 149 Fetch mentions for the configured user. 150 151 Args: 152 since_id: Minimum Post ID to include (for getting newer mentions) 153 max_results: Number of results to return (5-100) 154 155 Returns: 156 List of mention objects or None if request failed 157 """ 158 endpoint = f"/users/{self.user_id}/mentions" 159 params = { 160 "max_results": min(max(max_results, 5), 100), # Ensure within API limits 161 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets", 162 "user.fields": "id,name,username", 163 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 164 } 165 166 if since_id: 167 params["since_id"] = since_id 168 169 logger.info(f"Fetching mentions for user {self.user_id}") 170 response = self._make_request(endpoint, params) 171 172 if response: 173 logger.debug(f"X API response: {response}") 174 175 if response and "data" in response: 176 mentions = response["data"] 177 logger.info(f"Retrieved {len(mentions)} mentions") 178 return mentions 179 else: 180 if response: 181 logger.info(f"No mentions in response. Full response: {response}") 182 else: 183 logger.warning("Request failed - no response received") 184 return [] 185 186 def get_user_info(self, user_id: str) -> Optional[Dict]: 187 """Get information about a specific user.""" 188 endpoint = f"/users/{user_id}" 189 params = { 190 "user.fields": "id,name,username,description,public_metrics" 191 } 192 193 response = self._make_request(endpoint, params) 194 return response.get("data") if response else None 195 196 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]: 197 """ 198 Search for mentions using the search endpoint instead of mentions endpoint. 199 This might have better rate limits than the direct mentions endpoint. 200 201 Args: 202 username: Username to search for mentions of (without @) 203 max_results: Number of results to return (10-100) 204 since_id: Only return results newer than this tweet ID 205 206 Returns: 207 List of tweets mentioning the username 208 """ 209 endpoint = "/tweets/search/recent" 210 211 # Search for mentions of the username 212 query = f"@{username}" 213 214 params = { 215 "query": query, 216 "max_results": min(max(max_results, 10), 100), 217 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 218 "user.fields": "id,name,username", 219 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 220 } 221 222 if since_id: 223 params["since_id"] = since_id 224 225 logger.info(f"Searching for mentions of @{username}") 226 response = self._make_request(endpoint, params) 227 228 if response and "data" in response: 229 tweets = response["data"] 230 logger.info(f"Found {len(tweets)} mentions via search") 231 return tweets 232 else: 233 if response: 234 logger.info(f"No mentions found via search. Response: {response}") 235 else: 236 logger.warning("Search request failed") 237 return [] 238 239 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]: 240 """ 241 Get all tweets in a conversation thread up to a specific tweet ID. 242 243 Args: 244 conversation_id: The conversation ID to fetch (should be the original tweet ID) 245 use_cache: Whether to use cached data if available 246 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID) 247 248 Returns: 249 List of tweets in the conversation, ordered chronologically 250 """ 251 # Check cache first if enabled 252 if use_cache: 253 cached_data = get_cached_thread_context(conversation_id) 254 if cached_data: 255 return cached_data 256 257 # First, get the original tweet directly since it might not appear in conversation search 258 original_tweet = None 259 try: 260 endpoint = f"/tweets/{conversation_id}" 261 params = { 262 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 263 "user.fields": "id,name,username", 264 "expansions": "author_id" 265 } 266 response = self._make_request(endpoint, params) 267 if response and "data" in response: 268 original_tweet = response["data"] 269 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}") 270 except Exception as e: 271 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}") 272 273 # Then search for all tweets in this conversation 274 endpoint = "/tweets/search/recent" 275 params = { 276 "query": f"conversation_id:{conversation_id}", 277 "max_results": 100, # Get as many as possible 278 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 279 "user.fields": "id,name,username", 280 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id", 281 "sort_order": "recency" # Get newest first, we'll reverse later 282 } 283 284 # Add until_id parameter to exclude tweets after the mention being processed 285 if until_id: 286 params["until_id"] = until_id 287 logger.info(f"Using until_id={until_id} to exclude future tweets") 288 289 logger.info(f"Fetching thread context for conversation {conversation_id}") 290 response = self._make_request(endpoint, params) 291 292 tweets = [] 293 users_data = {} 294 295 # Collect tweets from search 296 if response and "data" in response: 297 tweets.extend(response["data"]) 298 # Store user data for reference 299 if "includes" in response and "users" in response["includes"]: 300 for user in response["includes"]["users"]: 301 users_data[user["id"]] = user 302 303 # Add original tweet if we got it and it's not already in the list 304 if original_tweet: 305 tweet_ids = [t.get('id') for t in tweets] 306 if original_tweet.get('id') not in tweet_ids: 307 tweets.append(original_tweet) 308 logger.info("Added original tweet to thread context") 309 310 # Attempt to fill gaps by fetching referenced tweets that are missing 311 # This helps with X API's incomplete conversation search results 312 tweet_ids = set(t.get('id') for t in tweets) 313 missing_tweet_ids = set() 314 critical_missing_ids = set() 315 316 # Collect referenced tweet IDs, prioritizing critical ones 317 for tweet in tweets: 318 referenced_tweets = tweet.get('referenced_tweets', []) 319 for ref in referenced_tweets: 320 ref_id = ref.get('id') 321 ref_type = ref.get('type') 322 if ref_id and ref_id not in tweet_ids: 323 missing_tweet_ids.add(ref_id) 324 # Prioritize direct replies and quoted tweets over retweets 325 if ref_type in ['replied_to', 'quoted']: 326 critical_missing_ids.add(ref_id) 327 328 # For rate limit efficiency, only fetch critical missing tweets if we have many 329 if len(missing_tweet_ids) > 10: 330 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones") 331 missing_tweet_ids = critical_missing_ids 332 333 # Context sufficiency check - skip backfill if we already have enough context 334 if has_sufficient_context(tweets, missing_tweet_ids): 335 logger.info("Thread has sufficient context, skipping missing tweet backfill") 336 missing_tweet_ids = set() 337 338 # Fetch missing referenced tweets in batches (more rate-limit friendly) 339 if missing_tweet_ids: 340 missing_list = list(missing_tweet_ids) 341 342 # First, check cache for missing tweets 343 cached_tweets = get_cached_tweets(missing_list) 344 for tweet_id, cached_tweet in cached_tweets.items(): 345 if cached_tweet.get('conversation_id') == conversation_id: 346 tweets.append(cached_tweet) 347 tweet_ids.add(tweet_id) 348 logger.info(f"Retrieved missing tweet from cache: {tweet_id}") 349 350 # Add user data if available in cache 351 if cached_tweet.get('author_info'): 352 author_id = cached_tweet.get('author_id') 353 if author_id: 354 users_data[author_id] = cached_tweet['author_info'] 355 356 # Only fetch tweets that weren't found in cache 357 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets] 358 359 if uncached_ids: 360 batch_size = 100 # X API limit for bulk tweet lookup 361 362 for i in range(0, len(uncached_ids), batch_size): 363 batch_ids = uncached_ids[i:i + batch_size] 364 try: 365 endpoint = "/tweets" 366 params = { 367 "ids": ",".join(batch_ids), 368 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 369 "user.fields": "id,name,username", 370 "expansions": "author_id" 371 } 372 response = self._make_request(endpoint, params) 373 374 if response and "data" in response: 375 fetched_tweets = [] 376 batch_users_data = {} 377 378 for missing_tweet in response["data"]: 379 # Only add if it's actually part of this conversation 380 if missing_tweet.get('conversation_id') == conversation_id: 381 tweets.append(missing_tweet) 382 tweet_ids.add(missing_tweet.get('id')) 383 fetched_tweets.append(missing_tweet) 384 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}") 385 386 # Add user data if available 387 if "includes" in response and "users" in response["includes"]: 388 for user in response["includes"]["users"]: 389 users_data[user["id"]] = user 390 batch_users_data[user["id"]] = user 391 392 # Cache the newly fetched tweets 393 if fetched_tweets: 394 save_cached_tweets(fetched_tweets, batch_users_data) 395 396 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested") 397 398 # Handle partial success - log any missing tweets that weren't found 399 if response and "errors" in response: 400 for error in response["errors"]: 401 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}") 402 403 except Exception as e: 404 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}") 405 else: 406 logger.info(f"All {len(missing_list)} missing tweets found in cache") 407 408 if tweets: 409 # Filter out tweets that occur after until_id (if specified) 410 if until_id: 411 original_count = len(tweets) 412 # Convert until_id to int for comparison (Twitter IDs are sequential) 413 until_id_int = int(until_id) 414 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int] 415 filtered_count = len(tweets) 416 if original_count != filtered_count: 417 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}") 418 419 # Sort chronologically (oldest first) 420 tweets.sort(key=lambda x: x.get('created_at', '')) 421 logger.info(f"Retrieved {len(tweets)} tweets in thread") 422 423 thread_data = {"tweets": tweets, "users": users_data} 424 425 # Cache individual tweets from the thread for future backfill 426 save_cached_tweets(tweets, users_data) 427 428 # Cache the result 429 if use_cache: 430 save_cached_thread_context(conversation_id, thread_data) 431 432 return thread_data 433 else: 434 logger.warning("No tweets found for thread context") 435 return None 436 437 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]: 438 """ 439 Post a reply to a specific tweet. 440 441 Args: 442 reply_text: The text content of the reply 443 in_reply_to_tweet_id: The ID of the tweet to reply to 444 445 Returns: 446 Response data if successful, None if failed 447 """ 448 endpoint = "/tweets" 449 450 payload = { 451 "text": reply_text, 452 "reply": { 453 "in_reply_to_tweet_id": in_reply_to_tweet_id 454 } 455 } 456 457 logger.info(f"Attempting to post reply with {self.auth_method} authentication") 458 result = self._make_request(endpoint, method="POST", data=payload) 459 460 if result: 461 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}") 462 return result 463 else: 464 logger.error("Failed to post reply") 465 return None 466 467 def post_tweet(self, tweet_text: str) -> Optional[Dict]: 468 """ 469 Post a standalone tweet (not a reply). 470 471 Args: 472 tweet_text: The text content of the tweet (max 280 characters) 473 474 Returns: 475 Response data if successful, None if failed 476 """ 477 endpoint = "/tweets" 478 479 # Validate tweet length 480 if len(tweet_text) > 280: 481 logger.error(f"Tweet text too long: {len(tweet_text)} characters (max 280)") 482 return None 483 484 payload = { 485 "text": tweet_text 486 } 487 488 logger.info(f"Attempting to post tweet with {self.auth_method} authentication") 489 result = self._make_request(endpoint, method="POST", data=payload) 490 491 if result: 492 logger.info(f"Successfully posted tweet") 493 return result 494 else: 495 logger.error("Failed to post tweet") 496 return None 497 498def load_x_config(config_path: str = "x_config.yaml") -> Dict[str, Any]: 499 """Load complete X configuration from x_config.yaml.""" 500 try: 501 with open(config_path, 'r') as f: 502 config = yaml.safe_load(f) 503 504 if not config: 505 raise ValueError(f"Empty or invalid configuration file: {config_path}") 506 507 # Validate required sections 508 x_config = config.get('x', {}) 509 letta_config = config.get('letta', {}) 510 511 if not x_config.get('api_key') or not x_config.get('user_id'): 512 raise ValueError("X API key and user_id must be configured in x_config.yaml") 513 514 if not letta_config.get('api_key') or not letta_config.get('agent_id'): 515 raise ValueError("Letta API key and agent_id must be configured in x_config.yaml") 516 517 return config 518 except Exception as e: 519 logger.error(f"Failed to load X configuration: {e}") 520 raise 521 522def get_x_letta_config(config_path: str = "x_config.yaml") -> Dict[str, Any]: 523 """Get Letta configuration from X config file.""" 524 config = load_x_config(config_path) 525 return config['letta'] 526 527def create_x_client(config_path: str = "x_config.yaml") -> XClient: 528 """Create and return an X client with configuration loaded from file.""" 529 config = load_x_config(config_path) 530 x_config = config['x'] 531 return XClient( 532 api_key=x_config['api_key'], 533 user_id=x_config['user_id'], 534 access_token=x_config.get('access_token'), 535 consumer_key=x_config.get('consumer_key'), 536 consumer_secret=x_config.get('consumer_secret'), 537 access_token_secret=x_config.get('access_token_secret') 538 ) 539 540def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str: 541 """ 542 Convert a mention object to a YAML string for better AI comprehension. 543 Similar to thread_to_yaml_string in bsky_utils.py 544 """ 545 # Extract relevant fields 546 simplified_mention = { 547 'id': mention.get('id'), 548 'text': mention.get('text'), 549 'author_id': mention.get('author_id'), 550 'created_at': mention.get('created_at'), 551 'in_reply_to_user_id': mention.get('in_reply_to_user_id') 552 } 553 554 # Add user information if available 555 if users_data and mention.get('author_id') in users_data: 556 user = users_data[mention.get('author_id')] 557 simplified_mention['author'] = { 558 'username': user.get('username'), 559 'name': user.get('name') 560 } 561 562 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False) 563 564def thread_to_yaml_string(thread_data: Dict) -> str: 565 """ 566 Convert X thread context to YAML string for AI comprehension. 567 Similar to Bluesky's thread_to_yaml_string function. 568 569 Args: 570 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 571 572 Returns: 573 YAML string representation of the thread 574 """ 575 if not thread_data or "tweets" not in thread_data: 576 return "conversation: []\n" 577 578 tweets = thread_data["tweets"] 579 users_data = thread_data.get("users", {}) 580 581 simplified_thread = { 582 "conversation": [] 583 } 584 585 for tweet in tweets: 586 # Get user info 587 author_id = tweet.get('author_id') 588 author_info = {} 589 if author_id and author_id in users_data: 590 user = users_data[author_id] 591 author_info = { 592 'username': user.get('username'), 593 'name': user.get('name') 594 } 595 596 # Build tweet object (simplified for AI consumption) 597 tweet_obj = { 598 'text': tweet.get('text'), 599 'created_at': tweet.get('created_at'), 600 'author': author_info, 601 'author_id': author_id # Include user ID for block management 602 } 603 604 simplified_thread["conversation"].append(tweet_obj) 605 606 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False) 607 608 609def ensure_x_user_blocks_attached(thread_data: Dict, agent_id: Optional[str] = None) -> None: 610 """ 611 Ensure all users in the thread have their X user blocks attached. 612 Creates blocks with initial content including their handle if they don't exist. 613 614 Args: 615 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 616 agent_id: The Letta agent ID to attach blocks to (defaults to config agent_id) 617 """ 618 if not thread_data or "users" not in thread_data: 619 return 620 621 try: 622 from tools.blocks import attach_x_user_blocks, x_user_note_set 623 from letta_client import Letta 624 625 # Get Letta client and agent_id from X config 626 config = get_x_letta_config() 627 client = Letta(token=config['api_key'], timeout=config['timeout']) 628 629 # Use provided agent_id or get from config 630 if agent_id is None: 631 agent_id = config['agent_id'] 632 633 # Get agent info to create a mock agent_state for the functions 634 class MockAgentState: 635 def __init__(self, agent_id): 636 self.id = agent_id 637 638 agent_state = MockAgentState(agent_id) 639 640 users_data = thread_data["users"] 641 user_ids = list(users_data.keys()) 642 643 if not user_ids: 644 return 645 646 logger.info(f"Ensuring X user blocks for {len(user_ids)} users: {user_ids}") 647 648 # Get current blocks to check which users already have blocks with content 649 current_blocks = client.agents.blocks.list(agent_id=agent_id) 650 existing_user_blocks = {} 651 652 for block in current_blocks: 653 if block.label.startswith("x_user_"): 654 user_id = block.label.replace("x_user_", "") 655 existing_user_blocks[user_id] = block 656 657 # Attach all user blocks (this will create missing ones with basic content) 658 attach_result = attach_x_user_blocks(user_ids, agent_state) 659 logger.info(f"X user block attachment result: {attach_result}") 660 661 # For newly created blocks, update with user handle information 662 for user_id in user_ids: 663 if user_id not in existing_user_blocks: 664 user_info = users_data[user_id] 665 username = user_info.get('username', 'unknown') 666 name = user_info.get('name', 'Unknown') 667 668 # Set initial content with handle information 669 initial_content = f"# X User: {user_id}\n\n**Handle:** @{username}\n**Name:** {name}\n\nNo additional information about this user yet." 670 671 try: 672 x_user_note_set(user_id, initial_content, agent_state) 673 logger.info(f"Set initial content for X user {user_id} (@{username})") 674 except Exception as e: 675 logger.error(f"Failed to set initial content for X user {user_id}: {e}") 676 677 except Exception as e: 678 logger.error(f"Error ensuring X user blocks: {e}") 679 680 681# X Caching and Queue System Functions 682 683def load_last_seen_id() -> Optional[str]: 684 """Load the last seen mention ID for incremental fetching.""" 685 if X_LAST_SEEN_FILE.exists(): 686 try: 687 with open(X_LAST_SEEN_FILE, 'r') as f: 688 data = json.load(f) 689 return data.get('last_seen_id') 690 except Exception as e: 691 logger.error(f"Error loading last seen ID: {e}") 692 return None 693 694def save_last_seen_id(mention_id: str): 695 """Save the last seen mention ID.""" 696 try: 697 X_QUEUE_DIR.mkdir(exist_ok=True) 698 with open(X_LAST_SEEN_FILE, 'w') as f: 699 json.dump({ 700 'last_seen_id': mention_id, 701 'updated_at': datetime.now().isoformat() 702 }, f) 703 logger.debug(f"Saved last seen ID: {mention_id}") 704 except Exception as e: 705 logger.error(f"Error saving last seen ID: {e}") 706 707def load_processed_mentions() -> set: 708 """Load the set of processed mention IDs.""" 709 if X_PROCESSED_MENTIONS_FILE.exists(): 710 try: 711 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f: 712 data = json.load(f) 713 # Keep only recent entries (last 10000) 714 if len(data) > 10000: 715 data = data[-10000:] 716 save_processed_mentions(set(data)) 717 return set(data) 718 except Exception as e: 719 logger.error(f"Error loading processed mentions: {e}") 720 return set() 721 722def save_processed_mentions(processed_set: set): 723 """Save the set of processed mention IDs.""" 724 try: 725 X_QUEUE_DIR.mkdir(exist_ok=True) 726 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f: 727 json.dump(list(processed_set), f) 728 except Exception as e: 729 logger.error(f"Error saving processed mentions: {e}") 730 731def load_downrank_users() -> Set[str]: 732 """Load the set of user IDs that should be downranked (responded to 10% of the time).""" 733 try: 734 if not X_DOWNRANK_USERS_FILE.exists(): 735 return set() 736 737 downrank_users = set() 738 with open(X_DOWNRANK_USERS_FILE, 'r') as f: 739 for line in f: 740 line = line.strip() 741 # Skip empty lines and comments 742 if line and not line.startswith('#'): 743 downrank_users.add(line) 744 745 logger.info(f"Loaded {len(downrank_users)} downrank users") 746 return downrank_users 747 except Exception as e: 748 logger.error(f"Error loading downrank users: {e}") 749 return set() 750 751def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool: 752 """ 753 Check if we should respond to a downranked user. 754 Returns True 10% of the time for downranked users, True 100% of the time for others. 755 """ 756 if user_id not in downrank_users: 757 return True 758 759 # 10% chance for downranked users 760 should_respond = random.random() < 0.1 761 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)") 762 return should_respond 763 764def save_mention_to_queue(mention: Dict): 765 """Save a mention to the queue directory for async processing.""" 766 try: 767 mention_id = mention.get('id') 768 if not mention_id: 769 logger.error("Mention missing ID, cannot queue") 770 return 771 772 # Check if already processed 773 processed_mentions = load_processed_mentions() 774 if mention_id in processed_mentions: 775 logger.debug(f"Mention {mention_id} already processed, skipping") 776 return 777 778 # Create queue directory 779 X_QUEUE_DIR.mkdir(exist_ok=True) 780 781 # Create filename using hash (similar to Bluesky system) 782 mention_str = json.dumps(mention, sort_keys=True) 783 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16] 784 filename = f"x_mention_{mention_hash}.json" 785 786 queue_file = X_QUEUE_DIR / filename 787 788 # Save mention data with enhanced debugging information 789 mention_data = { 790 'mention': mention, 791 'queued_at': datetime.now().isoformat(), 792 'type': 'x_mention', 793 # Debug info for conversation tracking 794 'debug_info': { 795 'mention_id': mention.get('id'), 796 'author_id': mention.get('author_id'), 797 'conversation_id': mention.get('conversation_id'), 798 'in_reply_to_user_id': mention.get('in_reply_to_user_id'), 799 'referenced_tweets': mention.get('referenced_tweets', []), 800 'text_preview': mention.get('text', '')[:200], 801 'created_at': mention.get('created_at'), 802 'public_metrics': mention.get('public_metrics', {}), 803 'context_annotations': mention.get('context_annotations', []) 804 } 805 } 806 807 with open(queue_file, 'w') as f: 808 json.dump(mention_data, f, indent=2) 809 810 logger.info(f"Queued X mention {mention_id} -> {filename}") 811 812 except Exception as e: 813 logger.error(f"Error saving mention to queue: {e}") 814 815# X Cache Functions 816def get_cached_thread_context(conversation_id: str) -> Optional[Dict]: 817 """Load cached thread context if available.""" 818 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 819 if cache_file.exists(): 820 try: 821 with open(cache_file, 'r') as f: 822 cached_data = json.load(f) 823 # Check if cache is recent (within 1 hour) 824 from datetime import datetime, timedelta 825 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 826 if datetime.now() - cached_time < timedelta(hours=1): 827 logger.info(f"Using cached thread context for {conversation_id}") 828 return cached_data.get('thread_data') 829 except Exception as e: 830 logger.warning(f"Error loading cached thread context: {e}") 831 return None 832 833def save_cached_thread_context(conversation_id: str, thread_data: Dict): 834 """Save thread context to cache.""" 835 try: 836 X_CACHE_DIR.mkdir(exist_ok=True) 837 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 838 839 cache_data = { 840 'conversation_id': conversation_id, 841 'thread_data': thread_data, 842 'cached_at': datetime.now().isoformat() 843 } 844 845 with open(cache_file, 'w') as f: 846 json.dump(cache_data, f, indent=2) 847 848 logger.debug(f"Cached thread context for {conversation_id}") 849 except Exception as e: 850 logger.error(f"Error caching thread context: {e}") 851 852def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]: 853 """ 854 Load cached individual tweets if available. 855 Returns dict mapping tweet_id -> tweet_data for found tweets. 856 """ 857 cached_tweets = {} 858 859 for tweet_id in tweet_ids: 860 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 861 if cache_file.exists(): 862 try: 863 with open(cache_file, 'r') as f: 864 cached_data = json.load(f) 865 866 # Use longer cache times for older tweets (24 hours vs 1 hour) 867 from datetime import datetime, timedelta 868 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 869 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '') 870 871 # Parse tweet creation time to determine age 872 try: 873 from dateutil.parser import parse 874 tweet_age = datetime.now() - parse(tweet_created) 875 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1) 876 except: 877 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails 878 879 if datetime.now() - cached_time < cache_duration: 880 cached_tweets[tweet_id] = cached_data.get('tweet_data') 881 logger.debug(f"Using cached tweet {tweet_id}") 882 883 except Exception as e: 884 logger.warning(f"Error loading cached tweet {tweet_id}: {e}") 885 886 return cached_tweets 887 888def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None): 889 """Save individual tweets to cache for future reuse.""" 890 try: 891 X_CACHE_DIR.mkdir(exist_ok=True) 892 893 for tweet in tweets_data: 894 tweet_id = tweet.get('id') 895 if not tweet_id: 896 continue 897 898 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 899 900 # Include user data if available 901 tweet_with_user = tweet.copy() 902 if users_data and tweet.get('author_id') in users_data: 903 tweet_with_user['author_info'] = users_data[tweet.get('author_id')] 904 905 cache_data = { 906 'tweet_id': tweet_id, 907 'tweet_data': tweet_with_user, 908 'cached_at': datetime.now().isoformat() 909 } 910 911 with open(cache_file, 'w') as f: 912 json.dump(cache_data, f, indent=2) 913 914 logger.debug(f"Cached individual tweet {tweet_id}") 915 916 except Exception as e: 917 logger.error(f"Error caching individual tweets: {e}") 918 919def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool: 920 """ 921 Determine if we have sufficient context to skip backfilling missing tweets. 922 923 Args: 924 tweets: List of tweets already in the thread 925 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch 926 927 Returns: 928 True if context is sufficient, False if backfill is needed 929 """ 930 # If no missing tweets, context is sufficient 931 if not missing_tweet_ids: 932 return True 933 934 # If we have a substantial conversation (5+ tweets), likely sufficient 935 if len(tweets) >= 5: 936 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient") 937 return True 938 939 # If only a few missing tweets and we have some context, might be enough 940 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3: 941 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient") 942 return True 943 944 # Check if we have conversational flow (mentions between users) 945 has_conversation_flow = False 946 for tweet in tweets: 947 text = tweet.get('text', '').lower() 948 # Look for mentions, replies, or conversational indicators 949 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1: 950 has_conversation_flow = True 951 break 952 953 # If we have clear conversational flow and reasonable length, sufficient 954 if has_conversation_flow and len(tweets) >= 2: 955 logger.debug("Thread has conversational flow, considering sufficient") 956 return True 957 958 # Otherwise, we need to backfill 959 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow") 960 return False 961 962def fetch_and_queue_mentions(username: str) -> int: 963 """ 964 Single-pass function to fetch new mentions and queue them. 965 Returns number of new mentions found. 966 """ 967 try: 968 client = create_x_client() 969 970 # Load last seen ID for incremental fetching 971 last_seen_id = load_last_seen_id() 972 973 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}") 974 975 # Search for mentions 976 mentions = client.search_mentions( 977 username=username, 978 since_id=last_seen_id, 979 max_results=100 # Get as many as possible 980 ) 981 982 if not mentions: 983 logger.info("No new mentions found") 984 return 0 985 986 # Process mentions (newest first, so reverse to process oldest first) 987 mentions.reverse() 988 new_count = 0 989 990 for mention in mentions: 991 save_mention_to_queue(mention) 992 new_count += 1 993 994 # Update last seen ID to the most recent mention 995 if mentions: 996 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent 997 save_last_seen_id(most_recent_id) 998 999 logger.info(f"Queued {new_count} new X mentions") 1000 return new_count 1001 1002 except Exception as e: 1003 logger.error(f"Error fetching and queuing mentions: {e}") 1004 return 0 1005 1006# Simple test function 1007def get_my_user_info(): 1008 """Get the authenticated user's information to find correct user ID.""" 1009 try: 1010 client = create_x_client() 1011 1012 # Use the /2/users/me endpoint to get authenticated user info 1013 endpoint = "/users/me" 1014 params = { 1015 "user.fields": "id,name,username,description" 1016 } 1017 1018 print("Fetching authenticated user information...") 1019 response = client._make_request(endpoint, params=params) 1020 1021 if response and "data" in response: 1022 user_data = response["data"] 1023 print(f"✅ Found authenticated user:") 1024 print(f" ID: {user_data.get('id')}") 1025 print(f" Username: @{user_data.get('username')}") 1026 print(f" Name: {user_data.get('name')}") 1027 print(f" Description: {user_data.get('description', 'N/A')[:100]}...") 1028 print(f"\n🔧 Update your x_config.yaml with:") 1029 print(f" user_id: \"{user_data.get('id')}\"") 1030 return user_data 1031 else: 1032 print("❌ Failed to get user information") 1033 print(f"Response: {response}") 1034 return None 1035 1036 except Exception as e: 1037 print(f"Error getting user info: {e}") 1038 return None 1039 1040def test_search_mentions(): 1041 """Test the search-based mention detection.""" 1042 try: 1043 client = create_x_client() 1044 1045 # First get our username 1046 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 1047 if not user_info or "data" not in user_info: 1048 print("❌ Could not get username") 1049 return 1050 1051 username = user_info["data"]["username"] 1052 print(f"🔍 Searching for mentions of @{username}") 1053 1054 mentions = client.search_mentions(username, max_results=5) 1055 1056 if mentions: 1057 print(f"✅ Found {len(mentions)} mentions via search:") 1058 for mention in mentions: 1059 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...") 1060 else: 1061 print("No mentions found via search") 1062 1063 except Exception as e: 1064 print(f"Search test failed: {e}") 1065 1066def test_fetch_and_queue(): 1067 """Test the single-pass fetch and queue function.""" 1068 try: 1069 client = create_x_client() 1070 1071 # Get our username 1072 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 1073 if not user_info or "data" not in user_info: 1074 print("❌ Could not get username") 1075 return 1076 1077 username = user_info["data"]["username"] 1078 print(f"🔄 Fetching and queueing mentions for @{username}") 1079 1080 # Show current state 1081 last_seen = load_last_seen_id() 1082 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}") 1083 1084 # Fetch and queue 1085 new_count = fetch_and_queue_mentions(username) 1086 1087 if new_count > 0: 1088 print(f"✅ Queued {new_count} new mentions") 1089 print(f"📁 Check ./x_queue/ directory for queued mentions") 1090 1091 # Show updated state 1092 new_last_seen = load_last_seen_id() 1093 print(f"📍 Updated last seen ID: {new_last_seen}") 1094 else: 1095 print("ℹ️ No new mentions to queue") 1096 1097 except Exception as e: 1098 print(f"Fetch and queue test failed: {e}") 1099 1100def test_thread_context(): 1101 """Test thread context retrieval from a queued mention.""" 1102 try: 1103 import json 1104 1105 # Find a queued mention file 1106 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1107 if not queue_files: 1108 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1109 return 1110 1111 # Read the first mention 1112 mention_file = queue_files[0] 1113 with open(mention_file, 'r') as f: 1114 mention_data = json.load(f) 1115 1116 mention = mention_data['mention'] 1117 print(f"📄 Using mention: {mention.get('id')}") 1118 print(f"📝 Text: {mention.get('text')}") 1119 1120 # Check if it has a conversation_id 1121 conversation_id = mention.get('conversation_id') 1122 if not conversation_id: 1123 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.") 1124 return 1125 1126 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1127 1128 # Get thread context 1129 client = create_x_client() 1130 thread_data = client.get_thread_context(conversation_id) 1131 1132 if thread_data: 1133 tweets = thread_data.get('tweets', []) 1134 print(f"✅ Retrieved thread with {len(tweets)} tweets") 1135 1136 # Convert to YAML 1137 yaml_thread = thread_to_yaml_string(thread_data) 1138 1139 # Save thread context for inspection 1140 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml" 1141 with open(thread_file, 'w') as f: 1142 f.write(yaml_thread) 1143 1144 print(f"💾 Saved thread context to: {thread_file}") 1145 print("\n📋 Thread preview:") 1146 print(yaml_thread) 1147 else: 1148 print("❌ Failed to retrieve thread context") 1149 1150 except Exception as e: 1151 print(f"Thread context test failed: {e}") 1152 1153def test_letta_integration(agent_id: str = None): 1154 """Test sending X thread context to Letta agent.""" 1155 try: 1156 from letta_client import Letta 1157 import json 1158 import yaml 1159 1160 # Load X config to access letta section 1161 try: 1162 x_config = load_x_config() 1163 letta_config = x_config.get('letta', {}) 1164 api_key = letta_config.get('api_key') 1165 config_agent_id = letta_config.get('agent_id') 1166 1167 # Use agent_id from config if not provided as parameter 1168 if not agent_id: 1169 if config_agent_id: 1170 agent_id = config_agent_id 1171 print(f"ℹ️ Using agent_id from config: {agent_id}") 1172 else: 1173 print("❌ No agent_id found in x_config.yaml") 1174 print("Expected config structure:") 1175 print(" letta:") 1176 print(" agent_id: your-agent-id") 1177 return 1178 else: 1179 print(f"ℹ️ Using provided agent_id: {agent_id}") 1180 1181 if not api_key: 1182 # Try loading from environment as fallback 1183 import os 1184 api_key = os.getenv('LETTA_API_KEY') 1185 if not api_key: 1186 print("❌ LETTA_API_KEY not found in x_config.yaml or environment") 1187 print("Expected config structure:") 1188 print(" letta:") 1189 print(" api_key: your-letta-api-key") 1190 return 1191 else: 1192 print("ℹ️ Using LETTA_API_KEY from environment") 1193 else: 1194 print("ℹ️ Using LETTA_API_KEY from x_config.yaml") 1195 1196 except Exception as e: 1197 print(f"❌ Error loading config: {e}") 1198 return 1199 1200 letta_client = Letta(token=api_key, timeout=600) 1201 print(f"🤖 Connected to Letta, using agent: {agent_id}") 1202 1203 # Find a queued mention file 1204 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1205 if not queue_files: 1206 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1207 return 1208 1209 # Read the first mention 1210 mention_file = queue_files[0] 1211 with open(mention_file, 'r') as f: 1212 mention_data = json.load(f) 1213 1214 mention = mention_data['mention'] 1215 conversation_id = mention.get('conversation_id') 1216 1217 if not conversation_id: 1218 print("❌ No conversation_id found in mention.") 1219 return 1220 1221 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1222 1223 # Get thread context 1224 x_client = create_x_client() 1225 thread_data = x_client.get_thread_context(conversation_id) 1226 1227 if not thread_data: 1228 print("❌ Failed to retrieve thread context") 1229 return 1230 1231 # Convert to YAML 1232 yaml_thread = thread_to_yaml_string(thread_data) 1233 1234 # Create prompt for the agent 1235 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately. 1236 1237Here is the thread context: 1238 1239{yaml_thread} 1240 1241Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona.""" 1242 1243 prompt_char_count = len(prompt) 1244 print(f"📤 Sending thread context to Letta agent... | prompt: {prompt_char_count} chars") 1245 1246 # Print the prompt in a rich panel 1247 rprint(Panel(prompt, title=f"Prompt ({prompt_char_count} chars)", border_style="blue")) 1248 1249 # Send to Letta agent using streaming 1250 message_stream = letta_client.agents.messages.create_stream( 1251 agent_id=agent_id, 1252 messages=[{"role": "user", "content": prompt}], 1253 stream_tokens=False, 1254 max_steps=10 1255 ) 1256 1257 print("🔄 Streaming response from agent...") 1258 response_text = "" 1259 1260 for chunk in message_stream: 1261 print(chunk) 1262 if hasattr(chunk, 'message_type'): 1263 if chunk.message_type == 'assistant_message': 1264 print(f"🤖 Agent response: {chunk.content}") 1265 response_text = chunk.content 1266 elif chunk.message_type == 'reasoning_message': 1267 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...") 1268 elif chunk.message_type == 'tool_call_message': 1269 print(f"🔧 Agent tool call: {chunk.tool_call.name}") 1270 1271 if response_text: 1272 print(f"\n✅ Agent generated response:") 1273 print(f"📝 Response: {response_text}") 1274 else: 1275 print("❌ No response generated by agent") 1276 1277 except Exception as e: 1278 print(f"Letta integration test failed: {e}") 1279 import traceback 1280 traceback.print_exc() 1281 1282def test_x_client(): 1283 """Test the X client by fetching mentions.""" 1284 try: 1285 client = create_x_client() 1286 mentions = client.get_mentions(max_results=5) 1287 1288 if mentions: 1289 print(f"Successfully retrieved {len(mentions)} mentions:") 1290 for mention in mentions: 1291 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...") 1292 else: 1293 print("No mentions retrieved") 1294 1295 except Exception as e: 1296 print(f"Test failed: {e}") 1297 1298def reply_to_cameron_post(): 1299 """ 1300 Reply to Cameron's specific X post. 1301 1302 NOTE: This requires OAuth User Context authentication, not Bearer token. 1303 Current Bearer token is Application-Only which can't post. 1304 """ 1305 try: 1306 client = create_x_client() 1307 1308 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618 1309 cameron_post_id = "1950690566909710618" 1310 1311 # Simple reply message 1312 reply_text = "Hello from void! 🤖 Testing X integration." 1313 1314 print(f"Attempting to reply to post {cameron_post_id}") 1315 print(f"Reply text: {reply_text}") 1316 print("\nNOTE: This will fail with current Bearer token (Application-Only)") 1317 print("Posting requires OAuth User Context authentication") 1318 1319 result = client.post_reply(reply_text, cameron_post_id) 1320 1321 if result: 1322 print(f"✅ Successfully posted reply!") 1323 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}") 1324 else: 1325 print("❌ Failed to post reply (expected with current auth)") 1326 1327 except Exception as e: 1328 print(f"Reply failed: {e}") 1329 1330def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False): 1331 """ 1332 Process an X mention and generate a reply using the Letta agent. 1333 Similar to bsky.py process_mention but for X/Twitter. 1334 1335 Args: 1336 void_agent: The Letta agent instance 1337 x_client: The X API client 1338 mention_data: The mention data dictionary 1339 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 1340 testing_mode: If True, don't actually post to X 1341 1342 Returns: 1343 True: Successfully processed, remove from queue 1344 False: Failed but retryable, keep in queue 1345 None: Failed with non-retryable error, move to errors directory 1346 "no_reply": No reply was generated, move to no_reply directory 1347 """ 1348 try: 1349 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}") 1350 1351 # Extract mention details 1352 if isinstance(mention_data, dict): 1353 # Handle both raw mention and queued mention formats 1354 if 'mention' in mention_data: 1355 mention = mention_data['mention'] 1356 else: 1357 mention = mention_data 1358 else: 1359 mention = mention_data 1360 1361 mention_id = mention.get('id') 1362 mention_text = mention.get('text', '') 1363 author_id = mention.get('author_id') 1364 conversation_id = mention.get('conversation_id') 1365 in_reply_to_user_id = mention.get('in_reply_to_user_id') 1366 referenced_tweets = mention.get('referenced_tweets', []) 1367 1368 # Check downrank list - only respond to downranked users 10% of the time 1369 downrank_users = load_downrank_users() 1370 if not should_respond_to_downranked_user(str(author_id), downrank_users): 1371 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection") 1372 return "no_reply" 1373 1374 # Enhanced conversation tracking for debug - especially important for Grok handling 1375 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}") 1376 logger.info(f" Author ID: {author_id}") 1377 logger.info(f" Conversation ID: {conversation_id}") 1378 logger.info(f" In Reply To User ID: {in_reply_to_user_id}") 1379 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items") 1380 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets 1381 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}") 1382 logger.info(f" Text preview: {mention_text[:100]}...") 1383 1384 if not conversation_id: 1385 logger.warning(f"❌ No conversation_id found for mention {mention_id} - this may cause thread context issues") 1386 return None 1387 1388 # Get thread context with caching enabled for efficiency 1389 # Use mention_id as until_id to exclude tweets that occurred after this mention 1390 try: 1391 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id) 1392 if not thread_data: 1393 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}") 1394 return False 1395 1396 # If this mention references a specific tweet, ensure we have that tweet in context 1397 if referenced_tweets: 1398 for ref in referenced_tweets: 1399 if ref.get('type') == 'replied_to': 1400 ref_id = ref.get('id') 1401 # Check if the referenced tweet is in our thread data 1402 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])] 1403 if ref_id and ref_id not in thread_tweet_ids: 1404 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch") 1405 try: 1406 # Fetch the missing referenced tweet directly 1407 endpoint = f"/tweets/{ref_id}" 1408 params = { 1409 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 1410 "user.fields": "id,name,username", 1411 "expansions": "author_id" 1412 } 1413 response = x_client._make_request(endpoint, params) 1414 if response and "data" in response: 1415 missing_tweet = response["data"] 1416 if missing_tweet.get('conversation_id') == conversation_id: 1417 # Add to thread data 1418 if 'tweets' not in thread_data: 1419 thread_data['tweets'] = [] 1420 thread_data['tweets'].append(missing_tweet) 1421 1422 # Add user data if available 1423 if "includes" in response and "users" in response["includes"]: 1424 if 'users' not in thread_data: 1425 thread_data['users'] = {} 1426 for user in response["includes"]["users"]: 1427 thread_data['users'][user["id"]] = user 1428 1429 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context") 1430 else: 1431 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}") 1432 except Exception as e: 1433 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}") 1434 1435 # Enhanced thread context debugging 1436 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}") 1437 thread_posts = thread_data.get('tweets', []) 1438 thread_users = thread_data.get('users', {}) 1439 logger.info(f" Posts in thread: {len(thread_posts)}") 1440 logger.info(f" Users in thread: {len(thread_users)}") 1441 1442 # Log thread participants for Grok detection 1443 for user_id, user_info in thread_users.items(): 1444 username = user_info.get('username', 'unknown') 1445 name = user_info.get('name', 'Unknown') 1446 is_verified = user_info.get('verified', False) 1447 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}") 1448 1449 # Special logging for Grok or AI-related users 1450 if 'grok' in username.lower() or 'grok' in name.lower(): 1451 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})") 1452 1453 # Log conversation structure 1454 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts 1455 post_id = post.get('id') 1456 post_author = post.get('author_id') 1457 post_text = post.get('text', '')[:50] 1458 is_reply = 'in_reply_to_user_id' in post 1459 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...") 1460 1461 except Exception as e: 1462 logger.error(f"❌ Error getting thread context: {e}") 1463 return False 1464 1465 # Convert to YAML string 1466 thread_context = thread_to_yaml_string(thread_data) 1467 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters") 1468 1469 # Save comprehensive conversation data for debugging 1470 try: 1471 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1472 debug_dir.mkdir(parents=True, exist_ok=True) 1473 1474 # Save raw thread data (JSON) 1475 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f: 1476 json.dump(thread_data, f, indent=2) 1477 1478 # Save YAML thread context 1479 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f: 1480 f.write(thread_context) 1481 1482 # Save mention processing debug info 1483 debug_info = { 1484 'processed_at': datetime.now().isoformat(), 1485 'mention_id': mention_id, 1486 'conversation_id': conversation_id, 1487 'author_id': author_id, 1488 'in_reply_to_user_id': in_reply_to_user_id, 1489 'referenced_tweets': referenced_tweets, 1490 'thread_stats': { 1491 'total_posts': len(thread_posts), 1492 'total_users': len(thread_users), 1493 'yaml_length': len(thread_context) 1494 }, 1495 'users_in_conversation': { 1496 user_id: { 1497 'username': user_info.get('username'), 1498 'name': user_info.get('name'), 1499 'verified': user_info.get('verified', False), 1500 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower() 1501 } 1502 for user_id, user_info in thread_users.items() 1503 } 1504 } 1505 1506 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f: 1507 json.dump(debug_info, f, indent=2) 1508 1509 logger.info(f"💾 Saved conversation debug data to: {debug_dir}") 1510 1511 except Exception as debug_error: 1512 logger.warning(f"Failed to save debug data: {debug_error}") 1513 # Continue processing even if debug save fails 1514 1515 # Check for #voidstop 1516 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower(): 1517 logger.info("Found #voidstop, skipping this mention") 1518 return True 1519 1520 # Ensure X user blocks are attached 1521 try: 1522 ensure_x_user_blocks_attached(thread_data, void_agent.id) 1523 except Exception as e: 1524 logger.warning(f"Failed to ensure X user blocks: {e}") 1525 # Continue without user blocks rather than failing completely 1526 1527 # Create prompt for Letta agent 1528 author_info = thread_data.get('users', {}).get(author_id, {}) 1529 author_username = author_info.get('username', 'unknown') 1530 author_name = author_info.get('name', author_username) 1531 1532 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}). 1533 1534MOST RECENT POST (the mention you're responding to): 1535"{mention_text}" 1536 1537FULL THREAD CONTEXT: 1538```yaml 1539{thread_context} 1540``` 1541 1542The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 1543 1544If you need to update user information, use the x_user_* tools. 1545 1546To reply, use the add_post_to_x_thread tool: 1547- Each call creates one post (max 280 characters) 1548- For most responses, a single call is sufficient 1549- Only use multiple calls for threaded replies when: 1550 * The topic requires extended explanation that cannot fit in 280 characters 1551 * You're explicitly asked for a detailed/long response 1552 * The conversation naturally benefits from a structured multi-part answer 1553- Avoid unnecessary threads - be concise when possible""" 1554 1555 # Log mention processing 1556 title = f"X MENTION FROM @{author_username}" 1557 print(f"\n{title}") 1558 print(f" {'' * len(title)}") 1559 for line in mention_text.split('\n'): 1560 print(f" {line}") 1561 1562 # Send to Letta agent 1563 from letta_client import Letta 1564 1565 config = get_x_letta_config() 1566 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 1567 1568 prompt_char_count = len(prompt) 1569 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars | prompt: {prompt_char_count} chars") 1570 1571 try: 1572 # Use streaming to avoid timeout errors 1573 message_stream = letta_client.agents.messages.create_stream( 1574 agent_id=void_agent.id, 1575 messages=[{"role": "user", "content": prompt}], 1576 stream_tokens=False, 1577 max_steps=100 1578 ) 1579 1580 # Collect streaming response (simplified version of bsky.py logic) 1581 all_messages = [] 1582 for chunk in message_stream: 1583 if hasattr(chunk, 'message_type'): 1584 if chunk.message_type == 'reasoning_message': 1585 print("\n◆ Reasoning") 1586 print(" ─────────") 1587 for line in chunk.reasoning.split('\n'): 1588 print(f" {line}") 1589 elif chunk.message_type == 'tool_call_message': 1590 tool_name = chunk.tool_call.name 1591 if tool_name == 'add_post_to_x_thread': 1592 try: 1593 args = json.loads(chunk.tool_call.arguments) 1594 text = args.get('text', '') 1595 if text: 1596 print("\n✎ X Post") 1597 print(" ────────") 1598 for line in text.split('\n'): 1599 print(f" {line}") 1600 except: 1601 pass 1602 elif tool_name == 'halt_activity': 1603 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT") 1604 if queue_filepath and queue_filepath.exists(): 1605 queue_filepath.unlink() 1606 logger.info(f"Deleted queue file: {queue_filepath.name}") 1607 logger.info("=== X BOT TERMINATED BY AGENT ===") 1608 exit(0) 1609 elif chunk.message_type == 'tool_return_message': 1610 tool_name = chunk.name 1611 status = chunk.status 1612 if status == 'success' and tool_name == 'add_post_to_x_thread': 1613 print("\n✓ X Post Queued") 1614 print(" ──────────────") 1615 print(" Post queued successfully") 1616 elif chunk.message_type == 'assistant_message': 1617 print("\n▶ Assistant Response") 1618 print(" ──────────────────") 1619 for line in chunk.content.split('\n'): 1620 print(f" {line}") 1621 1622 all_messages.append(chunk) 1623 if str(chunk) == 'done': 1624 break 1625 1626 # Convert streaming response for compatibility 1627 message_response = type('StreamingResponse', (), { 1628 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 1629 })() 1630 1631 except Exception as api_error: 1632 logger.error(f"Letta API error: {api_error}") 1633 raise 1634 1635 # Extract successful add_post_to_x_thread tool calls 1636 reply_candidates = [] 1637 tool_call_results = {} 1638 ignored_notification = False 1639 ack_note = None # Track any note from annotate_ack tool 1640 1641 # First pass: collect tool return statuses 1642 for message in message_response.messages: 1643 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 1644 if message.name == 'add_post_to_x_thread': 1645 tool_call_results[message.tool_call_id] = message.status 1646 elif message.name == 'ignore_notification': 1647 if message.status == 'success': 1648 ignored_notification = True 1649 logger.info("🚫 X notification ignored") 1650 1651 # Second pass: collect successful tool calls 1652 for message in message_response.messages: 1653 if hasattr(message, 'tool_call') and message.tool_call: 1654 # Collect annotate_ack tool calls 1655 if message.tool_call.name == 'annotate_ack': 1656 try: 1657 args = json.loads(message.tool_call.arguments) 1658 note = args.get('note', '') 1659 if note: 1660 ack_note = note 1661 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 1662 except json.JSONDecodeError as e: 1663 logger.error(f"Failed to parse annotate_ack arguments: {e}") 1664 1665 # Collect add_post_to_x_thread tool calls - only if they were successful 1666 elif message.tool_call.name == 'add_post_to_x_thread': 1667 tool_call_id = message.tool_call.tool_call_id 1668 tool_status = tool_call_results.get(tool_call_id, 'unknown') 1669 1670 if tool_status == 'success': 1671 try: 1672 args = json.loads(message.tool_call.arguments) 1673 reply_text = args.get('text', '') 1674 if reply_text: 1675 reply_candidates.append(reply_text) 1676 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...") 1677 except json.JSONDecodeError as e: 1678 logger.error(f"Failed to parse tool call arguments: {e}") 1679 1680 # Save agent response data to debug folder 1681 try: 1682 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1683 1684 # Save complete agent interaction 1685 agent_response_data = { 1686 'processed_at': datetime.now().isoformat(), 1687 'mention_id': mention_id, 1688 'conversation_id': conversation_id, 1689 'prompt_sent': prompt, 1690 'reply_candidates': reply_candidates, 1691 'ignored_notification': ignored_notification, 1692 'ack_note': ack_note, 1693 'tool_call_results': tool_call_results, 1694 'all_messages': [] 1695 } 1696 1697 # Convert messages to serializable format 1698 for message in message_response.messages: 1699 msg_data = { 1700 'message_type': getattr(message, 'message_type', 'unknown'), 1701 'content': getattr(message, 'content', ''), 1702 'reasoning': getattr(message, 'reasoning', ''), 1703 'status': getattr(message, 'status', ''), 1704 'name': getattr(message, 'name', ''), 1705 } 1706 1707 if hasattr(message, 'tool_call') and message.tool_call: 1708 msg_data['tool_call'] = { 1709 'name': message.tool_call.name, 1710 'arguments': message.tool_call.arguments, 1711 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '') 1712 } 1713 1714 agent_response_data['all_messages'].append(msg_data) 1715 1716 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f: 1717 json.dump(agent_response_data, f, indent=2) 1718 1719 logger.info(f"💾 Saved agent response debug data") 1720 1721 except Exception as debug_error: 1722 logger.warning(f"Failed to save agent response debug data: {debug_error}") 1723 1724 # Handle conflicts 1725 if reply_candidates and ignored_notification: 1726 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!") 1727 return False 1728 1729 if reply_candidates: 1730 # Post replies to X 1731 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X") 1732 1733 if len(reply_candidates) == 1: 1734 content = reply_candidates[0] 1735 title = f"Reply to @{author_username}" 1736 else: 1737 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)]) 1738 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)" 1739 1740 print(f"\n{title}") 1741 print(f" {'' * len(title)}") 1742 for line in content.split('\n'): 1743 print(f" {line}") 1744 1745 if testing_mode: 1746 logger.info("TESTING MODE: Skipping actual X post") 1747 return True 1748 else: 1749 # Post to X using thread approach 1750 success = post_x_thread_replies(x_client, mention_id, reply_candidates) 1751 if success: 1752 logger.info(f"Successfully replied to @{author_username} on X") 1753 1754 # Acknowledge the post we're replying to 1755 try: 1756 ack_result = acknowledge_x_post(x_client, mention_id, ack_note) 1757 if ack_result: 1758 if ack_note: 1759 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")") 1760 else: 1761 logger.info(f"Successfully acknowledged X post from @{author_username}") 1762 else: 1763 logger.warning(f"Failed to acknowledge X post from @{author_username}") 1764 except Exception as e: 1765 logger.error(f"Error acknowledging X post from @{author_username}: {e}") 1766 # Don't fail the entire operation if acknowledgment fails 1767 1768 return True 1769 else: 1770 logger.error(f"Failed to send reply to @{author_username} on X") 1771 return False 1772 else: 1773 if ignored_notification: 1774 logger.info(f"X mention from @{author_username} was explicitly ignored") 1775 return "ignored" 1776 else: 1777 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass") 1778 return False # Keep in queue for retry instead of removing 1779 1780 except Exception as e: 1781 logger.error(f"Error processing X mention: {e}") 1782 return False 1783 1784def acknowledge_x_post(x_client, post_id, note=None): 1785 """ 1786 Acknowledge an X post that we replied to. 1787 Uses the same Bluesky client and uploads to the void data repository on atproto, 1788 just like Bluesky acknowledgments. 1789 1790 Args: 1791 x_client: XClient instance (not used, kept for compatibility) 1792 post_id: The X post ID we're acknowledging 1793 note: Optional note to include with the acknowledgment 1794 1795 Returns: 1796 True if successful, False otherwise 1797 """ 1798 try: 1799 # Use Bluesky client to upload acks to the void data repository on atproto 1800 bsky_client = bsky_utils.default_login() 1801 1802 # Create a synthetic URI and CID for the X post 1803 # X posts don't have atproto URIs/CIDs, so we create identifiers 1804 post_uri = f"x://twitter.com/post/{post_id}" 1805 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts 1806 1807 # Use the same acknowledge_post function as Bluesky 1808 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note) 1809 1810 if ack_result: 1811 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else "")) 1812 return True 1813 else: 1814 logger.error(f"Failed to acknowledge X post {post_id}") 1815 return False 1816 1817 except Exception as e: 1818 logger.error(f"Error acknowledging X post {post_id}: {e}") 1819 return False 1820 1821def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages): 1822 """ 1823 Post a series of replies to X, threading them properly. 1824 1825 Args: 1826 x_client: XClient instance 1827 in_reply_to_tweet_id: The original tweet ID to reply to 1828 reply_messages: List of reply text strings 1829 1830 Returns: 1831 True if successful, False otherwise 1832 """ 1833 try: 1834 current_reply_id = in_reply_to_tweet_id 1835 1836 for i, reply_text in enumerate(reply_messages): 1837 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...") 1838 1839 result = x_client.post_reply(reply_text, current_reply_id) 1840 1841 if result and 'data' in result: 1842 new_tweet_id = result['data']['id'] 1843 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}") 1844 # For threading, the next reply should reply to this one 1845 current_reply_id = new_tweet_id 1846 else: 1847 logger.error(f"Failed to post X reply {i+1}") 1848 return False 1849 1850 return True 1851 1852 except Exception as e: 1853 logger.error(f"Error posting X thread replies: {e}") 1854 return False 1855 1856def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False): 1857 """ 1858 Load and process all X mentions from the queue. 1859 Similar to bsky.py load_and_process_queued_notifications but for X. 1860 """ 1861 try: 1862 # Get all X mention files in queue directory 1863 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1864 1865 if not queue_files: 1866 return 1867 1868 # Load file metadata and sort by creation time (chronological order) 1869 file_metadata = [] 1870 for filepath in queue_files: 1871 try: 1872 with open(filepath, 'r') as f: 1873 queue_data = json.load(f) 1874 mention_data = queue_data.get('mention', queue_data) 1875 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing 1876 file_metadata.append((created_at, filepath)) 1877 except Exception as e: 1878 logger.warning(f"Error reading queue file {filepath.name}: {e}") 1879 # Add with default timestamp so it still gets processed 1880 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath)) 1881 1882 # Sort by creation time (oldest first) 1883 file_metadata.sort(key=lambda x: x[0]) 1884 1885 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order") 1886 1887 for i, (created_at, filepath) in enumerate(file_metadata, 1): 1888 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})") 1889 1890 try: 1891 # Load mention data 1892 with open(filepath, 'r') as f: 1893 queue_data = json.load(f) 1894 1895 mention_data = queue_data.get('mention', queue_data) 1896 1897 # Process the mention 1898 success = process_x_mention(void_agent, x_client, mention_data, 1899 queue_filepath=filepath, testing_mode=testing_mode) 1900 1901 except XRateLimitError: 1902 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning") 1903 break 1904 1905 except Exception as e: 1906 logger.error(f"Error processing X queue file {filepath.name}: {e}") 1907 continue 1908 1909 # Handle file based on processing result 1910 if success: 1911 if testing_mode: 1912 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}") 1913 else: 1914 filepath.unlink() 1915 logger.info(f"Successfully processed and removed X file: {filepath.name}") 1916 1917 # Mark as processed 1918 processed_mentions = load_processed_mentions() 1919 processed_mentions.add(mention_data.get('id')) 1920 save_processed_mentions(processed_mentions) 1921 1922 elif success is None: # Move to error directory 1923 error_dir = X_QUEUE_DIR / "errors" 1924 error_dir.mkdir(exist_ok=True) 1925 error_path = error_dir / filepath.name 1926 filepath.rename(error_path) 1927 logger.warning(f"Moved X file {filepath.name} to errors directory") 1928 1929 elif success == "no_reply": # Move to no_reply directory 1930 no_reply_dir = X_QUEUE_DIR / "no_reply" 1931 no_reply_dir.mkdir(exist_ok=True) 1932 no_reply_path = no_reply_dir / filepath.name 1933 filepath.rename(no_reply_path) 1934 logger.info(f"Moved X file {filepath.name} to no_reply directory") 1935 1936 elif success == "ignored": # Delete ignored notifications 1937 filepath.unlink() 1938 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}") 1939 1940 else: 1941 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry") 1942 1943 except Exception as e: 1944 logger.error(f"Error loading queued X mentions: {e}") 1945 1946def process_x_notifications(void_agent, x_client, testing_mode=False): 1947 """ 1948 Fetch new X mentions, queue them, and process the queue. 1949 Similar to bsky.py process_notifications but for X. 1950 """ 1951 try: 1952 # Get username for fetching mentions 1953 user_info = x_client._make_request("/users/me", params={"user.fields": "username"}) 1954 if not user_info or "data" not in user_info: 1955 logger.error("Could not get username for X mentions") 1956 return 1957 1958 username = user_info["data"]["username"] 1959 1960 # Fetch and queue new mentions 1961 new_count = fetch_and_queue_mentions(username) 1962 1963 if new_count > 0: 1964 logger.info(f"Found {new_count} new X mentions to process") 1965 1966 # Process the entire queue 1967 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 1968 1969 except Exception as e: 1970 logger.error(f"Error processing X notifications: {e}") 1971 1972def periodic_user_block_cleanup(client, agent_id: str) -> None: 1973 """ 1974 Detach all user blocks from the agent to prevent memory bloat. 1975 This should be called periodically to ensure clean state. 1976 """ 1977 try: 1978 # Get all blocks attached to the agent 1979 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1980 1981 user_blocks_to_detach = [] 1982 for block in attached_blocks: 1983 if hasattr(block, 'label') and block.label.startswith('user_'): 1984 user_blocks_to_detach.append({ 1985 'label': block.label, 1986 'id': block.id 1987 }) 1988 1989 if not user_blocks_to_detach: 1990 logger.debug("No user blocks found to detach during periodic cleanup") 1991 return 1992 1993 logger.info(f"Found {len(user_blocks_to_detach)} user blocks to detach") 1994 1995 # Detach each user block 1996 for block in user_blocks_to_detach: 1997 try: 1998 client.agents.blocks.detach( 1999 agent_id=agent_id, 2000 block_id=block['id'] 2001 ) 2002 logger.debug(f"Detached user block: {block['label']}") 2003 except Exception as e: 2004 logger.error(f"Failed to detach user block {block['label']}: {e}") 2005 2006 logger.info(f"Periodic cleanup complete: detached {len(user_blocks_to_detach)} user blocks") 2007 2008 except Exception as e: 2009 logger.error(f"Error during periodic user block cleanup: {e}") 2010 2011def initialize_x_void(): 2012 """Initialize the void agent for X operations.""" 2013 logger.info("Starting void agent initialization for X...") 2014 2015 from letta_client import Letta 2016 2017 # Get config 2018 config = get_x_letta_config() 2019 client = Letta(token=config['api_key'], timeout=config['timeout']) 2020 agent_id = config['agent_id'] 2021 2022 try: 2023 void_agent = client.agents.retrieve(agent_id=agent_id) 2024 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})") 2025 except Exception as e: 2026 logger.error(f"Failed to load void agent {agent_id}: {e}") 2027 raise e 2028 2029 # Clean up all user blocks at startup 2030 logger.info("🧹 Cleaning up user blocks at X startup...") 2031 periodic_user_block_cleanup(client, agent_id) 2032 2033 # Ensure correct tools are attached for X 2034 logger.info("Configuring tools for X platform...") 2035 try: 2036 from tool_manager import ensure_platform_tools 2037 ensure_platform_tools('x', void_agent.id) 2038 except Exception as e: 2039 logger.error(f"Failed to configure platform tools: {e}") 2040 logger.warning("Continuing with existing tool configuration") 2041 2042 # Log agent details 2043 logger.info(f"X Void agent details - ID: {void_agent.id}") 2044 logger.info(f"Agent name: {void_agent.name}") 2045 2046 return void_agent 2047 2048def x_main_loop(testing_mode=False, cleanup_interval=10): 2049 """ 2050 Main X bot loop that continuously monitors for mentions and processes them. 2051 Similar to bsky.py main() but for X/Twitter. 2052 2053 Args: 2054 testing_mode: If True, don't actually post to X 2055 cleanup_interval: Run user block cleanup every N cycles (0 to disable) 2056 """ 2057 import time 2058 from time import sleep 2059 from letta_client import Letta 2060 2061 logger.info("=== STARTING X VOID BOT ===") 2062 2063 # Initialize void agent 2064 void_agent = initialize_x_void() 2065 logger.info(f"X void agent initialized: {void_agent.id}") 2066 2067 # Initialize X client 2068 x_client = create_x_client() 2069 logger.info("Connected to X API") 2070 2071 # Get Letta client for periodic cleanup 2072 config = get_x_letta_config() 2073 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 2074 2075 # Main loop 2076 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls) 2077 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds") 2078 2079 if testing_mode: 2080 logger.info("=== RUNNING IN X TESTING MODE ===") 2081 logger.info(" - No messages will be sent to X") 2082 logger.info(" - Queue files will not be deleted") 2083 2084 if cleanup_interval > 0: 2085 logger.info(f"User block cleanup enabled every {cleanup_interval} cycles") 2086 else: 2087 logger.info("User block cleanup disabled") 2088 2089 cycle_count = 0 2090 start_time = time.time() 2091 2092 while True: 2093 try: 2094 cycle_count += 1 2095 logger.info(f"=== X CYCLE {cycle_count} ===") 2096 2097 # Process X notifications (fetch, queue, and process) 2098 process_x_notifications(void_agent, x_client, testing_mode) 2099 2100 # Run periodic cleanup every N cycles 2101 if cleanup_interval > 0 and cycle_count % cleanup_interval == 0: 2102 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 2103 periodic_user_block_cleanup(letta_client, void_agent.id) 2104 2105 # Log cycle completion 2106 elapsed_time = time.time() - start_time 2107 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes") 2108 2109 sleep(FETCH_DELAY_SEC) 2110 2111 except KeyboardInterrupt: 2112 elapsed_time = time.time() - start_time 2113 logger.info("=== X BOT STOPPED BY USER ===") 2114 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes") 2115 break 2116 except Exception as e: 2117 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===") 2118 logger.error(f"Error details: {e}") 2119 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...") 2120 sleep(FETCH_DELAY_SEC * 2) 2121 2122def process_queue_only(testing_mode=False): 2123 """ 2124 Process all queued X mentions without fetching new ones. 2125 Useful for rate limit management - queue first, then process separately. 2126 2127 Args: 2128 testing_mode: If True, don't actually post to X and keep queue files 2129 """ 2130 logger.info("=== PROCESSING X QUEUE ONLY ===") 2131 2132 if testing_mode: 2133 logger.info("=== RUNNING IN X TESTING MODE ===") 2134 logger.info(" - No messages will be sent to X") 2135 logger.info(" - Queue files will not be deleted") 2136 2137 try: 2138 # Initialize void agent 2139 void_agent = initialize_x_void() 2140 logger.info(f"X void agent initialized: {void_agent.id}") 2141 2142 # Initialize X client 2143 x_client = create_x_client() 2144 logger.info("Connected to X API") 2145 2146 # Process the queue without fetching new mentions 2147 logger.info("Processing existing X queue...") 2148 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 2149 2150 logger.info("=== X QUEUE PROCESSING COMPLETE ===") 2151 2152 except Exception as e: 2153 logger.error(f"Error processing X queue: {e}") 2154 raise 2155 2156def x_notification_loop(): 2157 """ 2158 DEPRECATED: Old X notification loop using search-based mention detection. 2159 Use x_main_loop() instead for the full bot experience. 2160 """ 2161 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.") 2162 x_main_loop() 2163 2164if __name__ == "__main__": 2165 import sys 2166 import argparse 2167 2168 if len(sys.argv) > 1: 2169 if sys.argv[1] == "bot": 2170 # Main bot with optional --test flag and cleanup interval 2171 parser = argparse.ArgumentParser(description='X Void Bot') 2172 parser.add_argument('command', choices=['bot']) 2173 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)') 2174 parser.add_argument('--cleanup-interval', type=int, default=10, 2175 help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 2176 args = parser.parse_args() 2177 x_main_loop(testing_mode=args.test, cleanup_interval=args.cleanup_interval) 2178 elif sys.argv[1] == "loop": 2179 x_notification_loop() 2180 elif sys.argv[1] == "reply": 2181 reply_to_cameron_post() 2182 elif sys.argv[1] == "me": 2183 get_my_user_info() 2184 elif sys.argv[1] == "search": 2185 test_search_mentions() 2186 elif sys.argv[1] == "queue": 2187 test_fetch_and_queue() 2188 elif sys.argv[1] == "thread": 2189 test_thread_context() 2190 elif sys.argv[1] == "process": 2191 # Process all queued mentions with optional --test flag 2192 testing_mode = "--test" in sys.argv 2193 process_queue_only(testing_mode=testing_mode) 2194 elif sys.argv[1] == "letta": 2195 # Use specific agent ID if provided, otherwise use from config 2196 agent_id = sys.argv[2] if len(sys.argv) > 2 else None 2197 test_letta_integration(agent_id) 2198 elif sys.argv[1] == "downrank": 2199 # View or manage downrank list 2200 if len(sys.argv) > 2 and sys.argv[2] == "list": 2201 downrank_users = load_downrank_users() 2202 if downrank_users: 2203 print(f"📋 Downrank users ({len(downrank_users)} total):") 2204 for user_id in sorted(downrank_users): 2205 print(f" - {user_id}") 2206 else: 2207 print("📋 No downrank users configured") 2208 else: 2209 print("Usage: python x.py downrank list") 2210 print(" list - Show all downranked user IDs") 2211 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list") 2212 else: 2213 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]") 2214 print(" bot - Run the main X bot (use --test for testing mode)") 2215 print(" Example: python x.py bot --test") 2216 print(" queue - Fetch and queue mentions only (no processing)") 2217 print(" process - Process all queued mentions only (no fetching)") 2218 print(" Example: python x.py process --test") 2219 print(" downrank - Manage downrank users (10% response rate)") 2220 print(" Example: python x.py downrank list") 2221 print(" loop - Run the old notification monitoring loop (deprecated)") 2222 print(" reply - Reply to Cameron's specific post") 2223 print(" me - Get authenticated user info and correct user ID") 2224 print(" search - Test search-based mention detection") 2225 print(" thread - Test thread context retrieval from queued mention") 2226 print(" letta - Test sending thread context to Letta agent") 2227 print(" Optional: python x.py letta <agent-id>") 2228 else: 2229 test_x_client()