a digital person for bluesky
at 2ef0ea18928e4e1b9dfdfd7a7af2683b50d6e259 1835 lines 78 kB view raw
1import os 2import logging 3import requests 4import yaml 5import json 6import hashlib 7from typing import Optional, Dict, Any, List 8from datetime import datetime 9from pathlib import Path 10from requests_oauthlib import OAuth1 11from rich import print as rprint 12from rich.panel import Panel 13from rich.text import Text 14 15import bsky_utils 16 17 18# Configure logging 19logging.basicConfig( 20 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 21) 22logger = logging.getLogger("x_client") 23 24# X-specific file paths 25X_QUEUE_DIR = Path("x_queue") 26X_CACHE_DIR = Path("x_cache") 27X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json") 28X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json") 29 30class XClient: 31 """X (Twitter) API client for fetching mentions and managing interactions.""" 32 33 def __init__(self, api_key: str, user_id: str, access_token: str = None, 34 consumer_key: str = None, consumer_secret: str = None, 35 access_token_secret: str = None): 36 self.api_key = api_key 37 self.access_token = access_token 38 self.user_id = user_id 39 self.base_url = "https://api.x.com/2" 40 41 # Check if we have OAuth 1.0a credentials 42 if (consumer_key and consumer_secret and access_token and access_token_secret): 43 # Use OAuth 1.0a for User Context 44 self.oauth = OAuth1( 45 consumer_key, 46 client_secret=consumer_secret, 47 resource_owner_key=access_token, 48 resource_owner_secret=access_token_secret 49 ) 50 self.headers = {"Content-Type": "application/json"} 51 self.auth_method = "oauth1a" 52 logger.info("Using OAuth 1.0a User Context authentication for X API") 53 elif access_token: 54 # Use OAuth 2.0 Bearer token for User Context 55 self.oauth = None 56 self.headers = { 57 "Authorization": f"Bearer {access_token}", 58 "Content-Type": "application/json" 59 } 60 self.auth_method = "oauth2_user" 61 logger.info("Using OAuth 2.0 User Context access token for X API") 62 else: 63 # Use Application-Only Bearer token 64 self.oauth = None 65 self.headers = { 66 "Authorization": f"Bearer {api_key}", 67 "Content-Type": "application/json" 68 } 69 self.auth_method = "bearer" 70 logger.info("Using Application-Only Bearer token for X API") 71 72 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None) -> Optional[Dict]: 73 """Make a request to the X API with proper error handling.""" 74 url = f"{self.base_url}{endpoint}" 75 76 try: 77 if method.upper() == "GET": 78 if self.oauth: 79 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth) 80 else: 81 response = requests.get(url, headers=self.headers, params=params) 82 elif method.upper() == "POST": 83 if self.oauth: 84 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth) 85 else: 86 response = requests.post(url, headers=self.headers, json=data) 87 else: 88 raise ValueError(f"Unsupported HTTP method: {method}") 89 90 response.raise_for_status() 91 return response.json() 92 except requests.exceptions.HTTPError as e: 93 if response.status_code == 401: 94 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials") 95 logger.error(f"Response: {response.text}") 96 elif response.status_code == 403: 97 logger.error(f"X API forbidden with {self.auth_method} - check app permissions") 98 logger.error(f"Response: {response.text}") 99 elif response.status_code == 429: 100 logger.error("X API rate limit exceeded") 101 logger.error(f"Response: {response.text}") 102 else: 103 logger.error(f"X API request failed: {e}") 104 logger.error(f"Response: {response.text}") 105 return None 106 except Exception as e: 107 logger.error(f"Unexpected error making X API request: {e}") 108 return None 109 110 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]: 111 """ 112 Fetch mentions for the configured user. 113 114 Args: 115 since_id: Minimum Post ID to include (for getting newer mentions) 116 max_results: Number of results to return (5-100) 117 118 Returns: 119 List of mention objects or None if request failed 120 """ 121 endpoint = f"/users/{self.user_id}/mentions" 122 params = { 123 "max_results": min(max(max_results, 5), 100), # Ensure within API limits 124 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets", 125 "user.fields": "id,name,username", 126 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 127 } 128 129 if since_id: 130 params["since_id"] = since_id 131 132 logger.info(f"Fetching mentions for user {self.user_id}") 133 response = self._make_request(endpoint, params) 134 135 if response: 136 logger.debug(f"X API response: {response}") 137 138 if response and "data" in response: 139 mentions = response["data"] 140 logger.info(f"Retrieved {len(mentions)} mentions") 141 return mentions 142 else: 143 if response: 144 logger.info(f"No mentions in response. Full response: {response}") 145 else: 146 logger.warning("Request failed - no response received") 147 return [] 148 149 def get_user_info(self, user_id: str) -> Optional[Dict]: 150 """Get information about a specific user.""" 151 endpoint = f"/users/{user_id}" 152 params = { 153 "user.fields": "id,name,username,description,public_metrics" 154 } 155 156 response = self._make_request(endpoint, params) 157 return response.get("data") if response else None 158 159 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]: 160 """ 161 Search for mentions using the search endpoint instead of mentions endpoint. 162 This might have better rate limits than the direct mentions endpoint. 163 164 Args: 165 username: Username to search for mentions of (without @) 166 max_results: Number of results to return (10-100) 167 since_id: Only return results newer than this tweet ID 168 169 Returns: 170 List of tweets mentioning the username 171 """ 172 endpoint = "/tweets/search/recent" 173 174 # Search for mentions of the username 175 query = f"@{username}" 176 177 params = { 178 "query": query, 179 "max_results": min(max(max_results, 10), 100), 180 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 181 "user.fields": "id,name,username", 182 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 183 } 184 185 if since_id: 186 params["since_id"] = since_id 187 188 logger.info(f"Searching for mentions of @{username}") 189 response = self._make_request(endpoint, params) 190 191 if response and "data" in response: 192 tweets = response["data"] 193 logger.info(f"Found {len(tweets)} mentions via search") 194 return tweets 195 else: 196 if response: 197 logger.info(f"No mentions found via search. Response: {response}") 198 else: 199 logger.warning("Search request failed") 200 return [] 201 202 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]: 203 """ 204 Get all tweets in a conversation thread up to a specific tweet ID. 205 206 Args: 207 conversation_id: The conversation ID to fetch (should be the original tweet ID) 208 use_cache: Whether to use cached data if available 209 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID) 210 211 Returns: 212 List of tweets in the conversation, ordered chronologically 213 """ 214 # Check cache first if enabled 215 if use_cache: 216 cached_data = get_cached_thread_context(conversation_id) 217 if cached_data: 218 return cached_data 219 220 # First, get the original tweet directly since it might not appear in conversation search 221 original_tweet = None 222 try: 223 endpoint = f"/tweets/{conversation_id}" 224 params = { 225 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 226 "user.fields": "id,name,username", 227 "expansions": "author_id" 228 } 229 response = self._make_request(endpoint, params) 230 if response and "data" in response: 231 original_tweet = response["data"] 232 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}") 233 except Exception as e: 234 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}") 235 236 # Then search for all tweets in this conversation 237 endpoint = "/tweets/search/recent" 238 params = { 239 "query": f"conversation_id:{conversation_id}", 240 "max_results": 100, # Get as many as possible 241 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 242 "user.fields": "id,name,username", 243 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id", 244 "sort_order": "recency" # Get newest first, we'll reverse later 245 } 246 247 # Add until_id parameter to exclude tweets after the mention being processed 248 if until_id: 249 params["until_id"] = until_id 250 logger.info(f"Using until_id={until_id} to exclude future tweets") 251 252 logger.info(f"Fetching thread context for conversation {conversation_id}") 253 response = self._make_request(endpoint, params) 254 255 tweets = [] 256 users_data = {} 257 258 # Collect tweets from search 259 if response and "data" in response: 260 tweets.extend(response["data"]) 261 # Store user data for reference 262 if "includes" in response and "users" in response["includes"]: 263 for user in response["includes"]["users"]: 264 users_data[user["id"]] = user 265 266 # Add original tweet if we got it and it's not already in the list 267 if original_tweet: 268 tweet_ids = [t.get('id') for t in tweets] 269 if original_tweet.get('id') not in tweet_ids: 270 tweets.append(original_tweet) 271 logger.info("Added original tweet to thread context") 272 273 # Attempt to fill gaps by fetching referenced tweets that are missing 274 # This helps with X API's incomplete conversation search results 275 tweet_ids = set(t.get('id') for t in tweets) 276 missing_tweet_ids = set() 277 278 # Collect all referenced tweet IDs that aren't in our current set 279 for tweet in tweets: 280 referenced_tweets = tweet.get('referenced_tweets', []) 281 for ref in referenced_tweets: 282 ref_id = ref.get('id') 283 if ref_id and ref_id not in tweet_ids: 284 missing_tweet_ids.add(ref_id) 285 286 # Fetch missing referenced tweets individually 287 for missing_id in missing_tweet_ids: 288 try: 289 endpoint = f"/tweets/{missing_id}" 290 params = { 291 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 292 "user.fields": "id,name,username", 293 "expansions": "author_id" 294 } 295 response = self._make_request(endpoint, params) 296 if response and "data" in response: 297 missing_tweet = response["data"] 298 # Only add if it's actually part of this conversation 299 if missing_tweet.get('conversation_id') == conversation_id: 300 tweets.append(missing_tweet) 301 tweet_ids.add(missing_id) 302 logger.info(f"Retrieved missing referenced tweet: {missing_id}") 303 304 # Also add user data if available 305 if "includes" in response and "users" in response["includes"]: 306 for user in response["includes"]["users"]: 307 users_data[user["id"]] = user 308 except Exception as e: 309 logger.warning(f"Could not fetch missing tweet {missing_id}: {e}") 310 311 if tweets: 312 # Filter out tweets that occur after until_id (if specified) 313 if until_id: 314 original_count = len(tweets) 315 # Convert until_id to int for comparison (Twitter IDs are sequential) 316 until_id_int = int(until_id) 317 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int] 318 filtered_count = len(tweets) 319 if original_count != filtered_count: 320 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}") 321 322 # Sort chronologically (oldest first) 323 tweets.sort(key=lambda x: x.get('created_at', '')) 324 logger.info(f"Retrieved {len(tweets)} tweets in thread") 325 326 thread_data = {"tweets": tweets, "users": users_data} 327 328 # Cache the result 329 if use_cache: 330 save_cached_thread_context(conversation_id, thread_data) 331 332 return thread_data 333 else: 334 logger.warning("No tweets found for thread context") 335 return None 336 337 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]: 338 """ 339 Post a reply to a specific tweet. 340 341 Args: 342 reply_text: The text content of the reply 343 in_reply_to_tweet_id: The ID of the tweet to reply to 344 345 Returns: 346 Response data if successful, None if failed 347 """ 348 endpoint = "/tweets" 349 350 payload = { 351 "text": reply_text, 352 "reply": { 353 "in_reply_to_tweet_id": in_reply_to_tweet_id 354 } 355 } 356 357 logger.info(f"Attempting to post reply with {self.auth_method} authentication") 358 result = self._make_request(endpoint, method="POST", data=payload) 359 360 if result: 361 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}") 362 return result 363 else: 364 logger.error("Failed to post reply") 365 return None 366 367def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]: 368 """Load X configuration from config file.""" 369 try: 370 with open(config_path, 'r') as f: 371 config = yaml.safe_load(f) 372 373 x_config = config.get('x', {}) 374 if not x_config.get('api_key') or not x_config.get('user_id'): 375 raise ValueError("X API key and user_id must be configured in config.yaml") 376 377 return x_config 378 except Exception as e: 379 logger.error(f"Failed to load X configuration: {e}") 380 raise 381 382def create_x_client(config_path: str = "config.yaml") -> XClient: 383 """Create and return an X client with configuration loaded from file.""" 384 config = load_x_config(config_path) 385 return XClient( 386 api_key=config['api_key'], 387 user_id=config['user_id'], 388 access_token=config.get('access_token'), 389 consumer_key=config.get('consumer_key'), 390 consumer_secret=config.get('consumer_secret'), 391 access_token_secret=config.get('access_token_secret') 392 ) 393 394def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str: 395 """ 396 Convert a mention object to a YAML string for better AI comprehension. 397 Similar to thread_to_yaml_string in bsky_utils.py 398 """ 399 # Extract relevant fields 400 simplified_mention = { 401 'id': mention.get('id'), 402 'text': mention.get('text'), 403 'author_id': mention.get('author_id'), 404 'created_at': mention.get('created_at'), 405 'in_reply_to_user_id': mention.get('in_reply_to_user_id') 406 } 407 408 # Add user information if available 409 if users_data and mention.get('author_id') in users_data: 410 user = users_data[mention.get('author_id')] 411 simplified_mention['author'] = { 412 'username': user.get('username'), 413 'name': user.get('name') 414 } 415 416 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False) 417 418def thread_to_yaml_string(thread_data: Dict) -> str: 419 """ 420 Convert X thread context to YAML string for AI comprehension. 421 Similar to Bluesky's thread_to_yaml_string function. 422 423 Args: 424 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 425 426 Returns: 427 YAML string representation of the thread 428 """ 429 if not thread_data or "tweets" not in thread_data: 430 return "conversation: []\n" 431 432 tweets = thread_data["tweets"] 433 users_data = thread_data.get("users", {}) 434 435 simplified_thread = { 436 "conversation": [] 437 } 438 439 for tweet in tweets: 440 # Get user info 441 author_id = tweet.get('author_id') 442 author_info = {} 443 if author_id and author_id in users_data: 444 user = users_data[author_id] 445 author_info = { 446 'username': user.get('username'), 447 'name': user.get('name') 448 } 449 450 # Build tweet object (simplified for AI consumption) 451 tweet_obj = { 452 'text': tweet.get('text'), 453 'created_at': tweet.get('created_at'), 454 'author': author_info, 455 'author_id': author_id # Include user ID for block management 456 } 457 458 simplified_thread["conversation"].append(tweet_obj) 459 460 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False) 461 462 463def ensure_x_user_blocks_attached(thread_data: Dict, agent_id: Optional[str] = None) -> None: 464 """ 465 Ensure all users in the thread have their X user blocks attached. 466 Creates blocks with initial content including their handle if they don't exist. 467 468 Args: 469 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 470 agent_id: The Letta agent ID to attach blocks to (defaults to config agent_id) 471 """ 472 if not thread_data or "users" not in thread_data: 473 return 474 475 try: 476 from tools.blocks import attach_x_user_blocks, x_user_note_set 477 from config_loader import get_letta_config 478 from letta_client import Letta 479 480 # Get Letta client and agent_id from config 481 config = get_letta_config() 482 client = Letta(token=config['api_key'], timeout=config['timeout']) 483 484 # Use provided agent_id or get from config 485 if agent_id is None: 486 agent_id = config['agent_id'] 487 488 # Get agent info to create a mock agent_state for the functions 489 class MockAgentState: 490 def __init__(self, agent_id): 491 self.id = agent_id 492 493 agent_state = MockAgentState(agent_id) 494 495 users_data = thread_data["users"] 496 user_ids = list(users_data.keys()) 497 498 if not user_ids: 499 return 500 501 logger.info(f"Ensuring X user blocks for {len(user_ids)} users: {user_ids}") 502 503 # Get current blocks to check which users already have blocks with content 504 current_blocks = client.agents.blocks.list(agent_id=agent_id) 505 existing_user_blocks = {} 506 507 for block in current_blocks: 508 if block.label.startswith("x_user_"): 509 user_id = block.label.replace("x_user_", "") 510 existing_user_blocks[user_id] = block 511 512 # Attach all user blocks (this will create missing ones with basic content) 513 attach_result = attach_x_user_blocks(user_ids, agent_state) 514 logger.info(f"X user block attachment result: {attach_result}") 515 516 # For newly created blocks, update with user handle information 517 for user_id in user_ids: 518 if user_id not in existing_user_blocks: 519 user_info = users_data[user_id] 520 username = user_info.get('username', 'unknown') 521 name = user_info.get('name', 'Unknown') 522 523 # Set initial content with handle information 524 initial_content = f"# X User: {user_id}\n\n**Handle:** @{username}\n**Name:** {name}\n\nNo additional information about this user yet." 525 526 try: 527 x_user_note_set(user_id, initial_content, agent_state) 528 logger.info(f"Set initial content for X user {user_id} (@{username})") 529 except Exception as e: 530 logger.error(f"Failed to set initial content for X user {user_id}: {e}") 531 532 except Exception as e: 533 logger.error(f"Error ensuring X user blocks: {e}") 534 535 536# X Caching and Queue System Functions 537 538def load_last_seen_id() -> Optional[str]: 539 """Load the last seen mention ID for incremental fetching.""" 540 if X_LAST_SEEN_FILE.exists(): 541 try: 542 with open(X_LAST_SEEN_FILE, 'r') as f: 543 data = json.load(f) 544 return data.get('last_seen_id') 545 except Exception as e: 546 logger.error(f"Error loading last seen ID: {e}") 547 return None 548 549def save_last_seen_id(mention_id: str): 550 """Save the last seen mention ID.""" 551 try: 552 X_QUEUE_DIR.mkdir(exist_ok=True) 553 with open(X_LAST_SEEN_FILE, 'w') as f: 554 json.dump({ 555 'last_seen_id': mention_id, 556 'updated_at': datetime.now().isoformat() 557 }, f) 558 logger.debug(f"Saved last seen ID: {mention_id}") 559 except Exception as e: 560 logger.error(f"Error saving last seen ID: {e}") 561 562def load_processed_mentions() -> set: 563 """Load the set of processed mention IDs.""" 564 if X_PROCESSED_MENTIONS_FILE.exists(): 565 try: 566 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f: 567 data = json.load(f) 568 # Keep only recent entries (last 10000) 569 if len(data) > 10000: 570 data = data[-10000:] 571 save_processed_mentions(set(data)) 572 return set(data) 573 except Exception as e: 574 logger.error(f"Error loading processed mentions: {e}") 575 return set() 576 577def save_processed_mentions(processed_set: set): 578 """Save the set of processed mention IDs.""" 579 try: 580 X_QUEUE_DIR.mkdir(exist_ok=True) 581 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f: 582 json.dump(list(processed_set), f) 583 except Exception as e: 584 logger.error(f"Error saving processed mentions: {e}") 585 586def save_mention_to_queue(mention: Dict): 587 """Save a mention to the queue directory for async processing.""" 588 try: 589 mention_id = mention.get('id') 590 if not mention_id: 591 logger.error("Mention missing ID, cannot queue") 592 return 593 594 # Check if already processed 595 processed_mentions = load_processed_mentions() 596 if mention_id in processed_mentions: 597 logger.debug(f"Mention {mention_id} already processed, skipping") 598 return 599 600 # Create queue directory 601 X_QUEUE_DIR.mkdir(exist_ok=True) 602 603 # Create filename using hash (similar to Bluesky system) 604 mention_str = json.dumps(mention, sort_keys=True) 605 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16] 606 filename = f"x_mention_{mention_hash}.json" 607 608 queue_file = X_QUEUE_DIR / filename 609 610 # Save mention data with enhanced debugging information 611 mention_data = { 612 'mention': mention, 613 'queued_at': datetime.now().isoformat(), 614 'type': 'x_mention', 615 # Debug info for conversation tracking 616 'debug_info': { 617 'mention_id': mention.get('id'), 618 'author_id': mention.get('author_id'), 619 'conversation_id': mention.get('conversation_id'), 620 'in_reply_to_user_id': mention.get('in_reply_to_user_id'), 621 'referenced_tweets': mention.get('referenced_tweets', []), 622 'text_preview': mention.get('text', '')[:200], 623 'created_at': mention.get('created_at'), 624 'public_metrics': mention.get('public_metrics', {}), 625 'context_annotations': mention.get('context_annotations', []) 626 } 627 } 628 629 with open(queue_file, 'w') as f: 630 json.dump(mention_data, f, indent=2) 631 632 logger.info(f"Queued X mention {mention_id} -> {filename}") 633 634 except Exception as e: 635 logger.error(f"Error saving mention to queue: {e}") 636 637# X Cache Functions 638def get_cached_thread_context(conversation_id: str) -> Optional[Dict]: 639 """Load cached thread context if available.""" 640 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 641 if cache_file.exists(): 642 try: 643 with open(cache_file, 'r') as f: 644 cached_data = json.load(f) 645 # Check if cache is recent (within 1 hour) 646 from datetime import datetime, timedelta 647 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 648 if datetime.now() - cached_time < timedelta(hours=1): 649 logger.info(f"Using cached thread context for {conversation_id}") 650 return cached_data.get('thread_data') 651 except Exception as e: 652 logger.warning(f"Error loading cached thread context: {e}") 653 return None 654 655def save_cached_thread_context(conversation_id: str, thread_data: Dict): 656 """Save thread context to cache.""" 657 try: 658 X_CACHE_DIR.mkdir(exist_ok=True) 659 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 660 661 cache_data = { 662 'conversation_id': conversation_id, 663 'thread_data': thread_data, 664 'cached_at': datetime.now().isoformat() 665 } 666 667 with open(cache_file, 'w') as f: 668 json.dump(cache_data, f, indent=2) 669 670 logger.debug(f"Cached thread context for {conversation_id}") 671 except Exception as e: 672 logger.error(f"Error caching thread context: {e}") 673 674def fetch_and_queue_mentions(username: str) -> int: 675 """ 676 Single-pass function to fetch new mentions and queue them. 677 Returns number of new mentions found. 678 """ 679 try: 680 client = create_x_client() 681 682 # Load last seen ID for incremental fetching 683 last_seen_id = load_last_seen_id() 684 685 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}") 686 687 # Search for mentions 688 mentions = client.search_mentions( 689 username=username, 690 since_id=last_seen_id, 691 max_results=100 # Get as many as possible 692 ) 693 694 if not mentions: 695 logger.info("No new mentions found") 696 return 0 697 698 # Process mentions (newest first, so reverse to process oldest first) 699 mentions.reverse() 700 new_count = 0 701 702 for mention in mentions: 703 save_mention_to_queue(mention) 704 new_count += 1 705 706 # Update last seen ID to the most recent mention 707 if mentions: 708 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent 709 save_last_seen_id(most_recent_id) 710 711 logger.info(f"Queued {new_count} new X mentions") 712 return new_count 713 714 except Exception as e: 715 logger.error(f"Error fetching and queuing mentions: {e}") 716 return 0 717 718# Simple test function 719def get_my_user_info(): 720 """Get the authenticated user's information to find correct user ID.""" 721 try: 722 client = create_x_client() 723 724 # Use the /2/users/me endpoint to get authenticated user info 725 endpoint = "/users/me" 726 params = { 727 "user.fields": "id,name,username,description" 728 } 729 730 print("Fetching authenticated user information...") 731 response = client._make_request(endpoint, params=params) 732 733 if response and "data" in response: 734 user_data = response["data"] 735 print(f"✅ Found authenticated user:") 736 print(f" ID: {user_data.get('id')}") 737 print(f" Username: @{user_data.get('username')}") 738 print(f" Name: {user_data.get('name')}") 739 print(f" Description: {user_data.get('description', 'N/A')[:100]}...") 740 print(f"\n🔧 Update your config.yaml with:") 741 print(f" user_id: \"{user_data.get('id')}\"") 742 return user_data 743 else: 744 print("❌ Failed to get user information") 745 print(f"Response: {response}") 746 return None 747 748 except Exception as e: 749 print(f"Error getting user info: {e}") 750 return None 751 752def test_search_mentions(): 753 """Test the search-based mention detection.""" 754 try: 755 client = create_x_client() 756 757 # First get our username 758 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 759 if not user_info or "data" not in user_info: 760 print("❌ Could not get username") 761 return 762 763 username = user_info["data"]["username"] 764 print(f"🔍 Searching for mentions of @{username}") 765 766 mentions = client.search_mentions(username, max_results=5) 767 768 if mentions: 769 print(f"✅ Found {len(mentions)} mentions via search:") 770 for mention in mentions: 771 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...") 772 else: 773 print("No mentions found via search") 774 775 except Exception as e: 776 print(f"Search test failed: {e}") 777 778def test_fetch_and_queue(): 779 """Test the single-pass fetch and queue function.""" 780 try: 781 client = create_x_client() 782 783 # Get our username 784 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 785 if not user_info or "data" not in user_info: 786 print("❌ Could not get username") 787 return 788 789 username = user_info["data"]["username"] 790 print(f"🔄 Fetching and queueing mentions for @{username}") 791 792 # Show current state 793 last_seen = load_last_seen_id() 794 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}") 795 796 # Fetch and queue 797 new_count = fetch_and_queue_mentions(username) 798 799 if new_count > 0: 800 print(f"✅ Queued {new_count} new mentions") 801 print(f"📁 Check ./x_queue/ directory for queued mentions") 802 803 # Show updated state 804 new_last_seen = load_last_seen_id() 805 print(f"📍 Updated last seen ID: {new_last_seen}") 806 else: 807 print("ℹ️ No new mentions to queue") 808 809 except Exception as e: 810 print(f"Fetch and queue test failed: {e}") 811 812def test_thread_context(): 813 """Test thread context retrieval from a queued mention.""" 814 try: 815 import json 816 817 # Find a queued mention file 818 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 819 if not queue_files: 820 print("❌ No queued mentions found. Run 'python x.py queue' first.") 821 return 822 823 # Read the first mention 824 mention_file = queue_files[0] 825 with open(mention_file, 'r') as f: 826 mention_data = json.load(f) 827 828 mention = mention_data['mention'] 829 print(f"📄 Using mention: {mention.get('id')}") 830 print(f"📝 Text: {mention.get('text')}") 831 832 # Check if it has a conversation_id 833 conversation_id = mention.get('conversation_id') 834 if not conversation_id: 835 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.") 836 return 837 838 print(f"🧵 Getting thread context for conversation: {conversation_id}") 839 840 # Get thread context 841 client = create_x_client() 842 thread_data = client.get_thread_context(conversation_id) 843 844 if thread_data: 845 tweets = thread_data.get('tweets', []) 846 print(f"✅ Retrieved thread with {len(tweets)} tweets") 847 848 # Convert to YAML 849 yaml_thread = thread_to_yaml_string(thread_data) 850 851 # Save thread context for inspection 852 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml" 853 with open(thread_file, 'w') as f: 854 f.write(yaml_thread) 855 856 print(f"💾 Saved thread context to: {thread_file}") 857 print("\n📋 Thread preview:") 858 print(yaml_thread) 859 else: 860 print("❌ Failed to retrieve thread context") 861 862 except Exception as e: 863 print(f"Thread context test failed: {e}") 864 865def test_letta_integration(agent_id: str = None): 866 """Test sending X thread context to Letta agent.""" 867 try: 868 from letta_client import Letta 869 import json 870 import yaml 871 872 # Load full config to access letta section 873 try: 874 with open("config.yaml", 'r') as f: 875 full_config = yaml.safe_load(f) 876 877 letta_config = full_config.get('letta', {}) 878 api_key = letta_config.get('api_key') 879 config_agent_id = letta_config.get('agent_id') 880 881 # Use agent_id from config if not provided as parameter 882 if not agent_id: 883 if config_agent_id: 884 agent_id = config_agent_id 885 print(f"ℹ️ Using agent_id from config: {agent_id}") 886 else: 887 print("❌ No agent_id found in config.yaml") 888 print("Expected config structure:") 889 print(" letta:") 890 print(" agent_id: your-agent-id") 891 return 892 else: 893 print(f"ℹ️ Using provided agent_id: {agent_id}") 894 895 if not api_key: 896 # Try loading from environment as fallback 897 import os 898 api_key = os.getenv('LETTA_API_KEY') 899 if not api_key: 900 print("❌ LETTA_API_KEY not found in config.yaml or environment") 901 print("Expected config structure:") 902 print(" letta:") 903 print(" api_key: your-letta-api-key") 904 return 905 else: 906 print("ℹ️ Using LETTA_API_KEY from environment") 907 else: 908 print("ℹ️ Using LETTA_API_KEY from config.yaml") 909 910 except Exception as e: 911 print(f"❌ Error loading config: {e}") 912 return 913 914 letta_client = Letta(token=api_key, timeout=600) 915 print(f"🤖 Connected to Letta, using agent: {agent_id}") 916 917 # Find a queued mention file 918 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 919 if not queue_files: 920 print("❌ No queued mentions found. Run 'python x.py queue' first.") 921 return 922 923 # Read the first mention 924 mention_file = queue_files[0] 925 with open(mention_file, 'r') as f: 926 mention_data = json.load(f) 927 928 mention = mention_data['mention'] 929 conversation_id = mention.get('conversation_id') 930 931 if not conversation_id: 932 print("❌ No conversation_id found in mention.") 933 return 934 935 print(f"🧵 Getting thread context for conversation: {conversation_id}") 936 937 # Get thread context 938 x_client = create_x_client() 939 thread_data = x_client.get_thread_context(conversation_id) 940 941 if not thread_data: 942 print("❌ Failed to retrieve thread context") 943 return 944 945 # Convert to YAML 946 yaml_thread = thread_to_yaml_string(thread_data) 947 948 # Create prompt for the agent 949 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately. 950 951Here is the thread context: 952 953{yaml_thread} 954 955Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona.""" 956 957 print(f"📤 Sending thread context to Letta agent...") 958 959 # Print the prompt in a rich panel 960 rprint(Panel(prompt, title="Prompt", border_style="blue")) 961 962 # Send to Letta agent using streaming 963 message_stream = letta_client.agents.messages.create_stream( 964 agent_id=agent_id, 965 messages=[{"role": "user", "content": prompt}], 966 stream_tokens=False, 967 max_steps=10 968 ) 969 970 print("🔄 Streaming response from agent...") 971 response_text = "" 972 973 for chunk in message_stream: 974 print(chunk) 975 if hasattr(chunk, 'message_type'): 976 if chunk.message_type == 'assistant_message': 977 print(f"🤖 Agent response: {chunk.content}") 978 response_text = chunk.content 979 elif chunk.message_type == 'reasoning_message': 980 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...") 981 elif chunk.message_type == 'tool_call_message': 982 print(f"🔧 Agent tool call: {chunk.tool_call.name}") 983 984 if response_text: 985 print(f"\n✅ Agent generated response:") 986 print(f"📝 Response: {response_text}") 987 else: 988 print("❌ No response generated by agent") 989 990 except Exception as e: 991 print(f"Letta integration test failed: {e}") 992 import traceback 993 traceback.print_exc() 994 995def test_x_client(): 996 """Test the X client by fetching mentions.""" 997 try: 998 client = create_x_client() 999 mentions = client.get_mentions(max_results=5) 1000 1001 if mentions: 1002 print(f"Successfully retrieved {len(mentions)} mentions:") 1003 for mention in mentions: 1004 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...") 1005 else: 1006 print("No mentions retrieved") 1007 1008 except Exception as e: 1009 print(f"Test failed: {e}") 1010 1011def reply_to_cameron_post(): 1012 """ 1013 Reply to Cameron's specific X post. 1014 1015 NOTE: This requires OAuth User Context authentication, not Bearer token. 1016 Current Bearer token is Application-Only which can't post. 1017 """ 1018 try: 1019 client = create_x_client() 1020 1021 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618 1022 cameron_post_id = "1950690566909710618" 1023 1024 # Simple reply message 1025 reply_text = "Hello from void! 🤖 Testing X integration." 1026 1027 print(f"Attempting to reply to post {cameron_post_id}") 1028 print(f"Reply text: {reply_text}") 1029 print("\nNOTE: This will fail with current Bearer token (Application-Only)") 1030 print("Posting requires OAuth User Context authentication") 1031 1032 result = client.post_reply(reply_text, cameron_post_id) 1033 1034 if result: 1035 print(f"✅ Successfully posted reply!") 1036 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}") 1037 else: 1038 print("❌ Failed to post reply (expected with current auth)") 1039 1040 except Exception as e: 1041 print(f"Reply failed: {e}") 1042 1043def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False): 1044 """ 1045 Process an X mention and generate a reply using the Letta agent. 1046 Similar to bsky.py process_mention but for X/Twitter. 1047 1048 Args: 1049 void_agent: The Letta agent instance 1050 x_client: The X API client 1051 mention_data: The mention data dictionary 1052 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 1053 testing_mode: If True, don't actually post to X 1054 1055 Returns: 1056 True: Successfully processed, remove from queue 1057 False: Failed but retryable, keep in queue 1058 None: Failed with non-retryable error, move to errors directory 1059 "no_reply": No reply was generated, move to no_reply directory 1060 """ 1061 try: 1062 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}") 1063 1064 # Extract mention details 1065 if isinstance(mention_data, dict): 1066 # Handle both raw mention and queued mention formats 1067 if 'mention' in mention_data: 1068 mention = mention_data['mention'] 1069 else: 1070 mention = mention_data 1071 else: 1072 mention = mention_data 1073 1074 mention_id = mention.get('id') 1075 mention_text = mention.get('text', '') 1076 author_id = mention.get('author_id') 1077 conversation_id = mention.get('conversation_id') 1078 in_reply_to_user_id = mention.get('in_reply_to_user_id') 1079 referenced_tweets = mention.get('referenced_tweets', []) 1080 1081 # Enhanced conversation tracking for debug - especially important for Grok handling 1082 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}") 1083 logger.info(f" Author ID: {author_id}") 1084 logger.info(f" Conversation ID: {conversation_id}") 1085 logger.info(f" In Reply To User ID: {in_reply_to_user_id}") 1086 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items") 1087 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets 1088 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}") 1089 logger.info(f" Text preview: {mention_text[:100]}...") 1090 1091 if not conversation_id: 1092 logger.warning(f"❌ No conversation_id found for mention {mention_id} - this may cause thread context issues") 1093 return None 1094 1095 # Get thread context (disable cache for missing context issues) 1096 # Use mention_id as until_id to exclude tweets that occurred after this mention 1097 try: 1098 thread_data = x_client.get_thread_context(conversation_id, use_cache=False, until_id=mention_id) 1099 if not thread_data: 1100 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}") 1101 return False 1102 1103 # If this mention references a specific tweet, ensure we have that tweet in context 1104 if referenced_tweets: 1105 for ref in referenced_tweets: 1106 if ref.get('type') == 'replied_to': 1107 ref_id = ref.get('id') 1108 # Check if the referenced tweet is in our thread data 1109 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])] 1110 if ref_id and ref_id not in thread_tweet_ids: 1111 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch") 1112 try: 1113 # Fetch the missing referenced tweet directly 1114 endpoint = f"/tweets/{ref_id}" 1115 params = { 1116 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 1117 "user.fields": "id,name,username", 1118 "expansions": "author_id" 1119 } 1120 response = x_client._make_request(endpoint, params) 1121 if response and "data" in response: 1122 missing_tweet = response["data"] 1123 if missing_tweet.get('conversation_id') == conversation_id: 1124 # Add to thread data 1125 if 'tweets' not in thread_data: 1126 thread_data['tweets'] = [] 1127 thread_data['tweets'].append(missing_tweet) 1128 1129 # Add user data if available 1130 if "includes" in response and "users" in response["includes"]: 1131 if 'users' not in thread_data: 1132 thread_data['users'] = {} 1133 for user in response["includes"]["users"]: 1134 thread_data['users'][user["id"]] = user 1135 1136 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context") 1137 else: 1138 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}") 1139 except Exception as e: 1140 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}") 1141 1142 # Enhanced thread context debugging 1143 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}") 1144 thread_posts = thread_data.get('tweets', []) 1145 thread_users = thread_data.get('users', {}) 1146 logger.info(f" Posts in thread: {len(thread_posts)}") 1147 logger.info(f" Users in thread: {len(thread_users)}") 1148 1149 # Log thread participants for Grok detection 1150 for user_id, user_info in thread_users.items(): 1151 username = user_info.get('username', 'unknown') 1152 name = user_info.get('name', 'Unknown') 1153 is_verified = user_info.get('verified', False) 1154 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}") 1155 1156 # Special logging for Grok or AI-related users 1157 if 'grok' in username.lower() or 'grok' in name.lower(): 1158 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})") 1159 1160 # Log conversation structure 1161 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts 1162 post_id = post.get('id') 1163 post_author = post.get('author_id') 1164 post_text = post.get('text', '')[:50] 1165 is_reply = 'in_reply_to_user_id' in post 1166 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...") 1167 1168 except Exception as e: 1169 logger.error(f"❌ Error getting thread context: {e}") 1170 return False 1171 1172 # Convert to YAML string 1173 thread_context = thread_to_yaml_string(thread_data) 1174 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters") 1175 1176 # Save comprehensive conversation data for debugging 1177 try: 1178 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1179 debug_dir.mkdir(parents=True, exist_ok=True) 1180 1181 # Save raw thread data (JSON) 1182 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f: 1183 json.dump(thread_data, f, indent=2) 1184 1185 # Save YAML thread context 1186 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f: 1187 f.write(thread_context) 1188 1189 # Save mention processing debug info 1190 debug_info = { 1191 'processed_at': datetime.now().isoformat(), 1192 'mention_id': mention_id, 1193 'conversation_id': conversation_id, 1194 'author_id': author_id, 1195 'in_reply_to_user_id': in_reply_to_user_id, 1196 'referenced_tweets': referenced_tweets, 1197 'thread_stats': { 1198 'total_posts': len(thread_posts), 1199 'total_users': len(thread_users), 1200 'yaml_length': len(thread_context) 1201 }, 1202 'users_in_conversation': { 1203 user_id: { 1204 'username': user_info.get('username'), 1205 'name': user_info.get('name'), 1206 'verified': user_info.get('verified', False), 1207 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower() 1208 } 1209 for user_id, user_info in thread_users.items() 1210 } 1211 } 1212 1213 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f: 1214 json.dump(debug_info, f, indent=2) 1215 1216 logger.info(f"💾 Saved conversation debug data to: {debug_dir}") 1217 1218 except Exception as debug_error: 1219 logger.warning(f"Failed to save debug data: {debug_error}") 1220 # Continue processing even if debug save fails 1221 1222 # Check for #voidstop 1223 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower(): 1224 logger.info("Found #voidstop, skipping this mention") 1225 return True 1226 1227 # Ensure X user blocks are attached 1228 try: 1229 ensure_x_user_blocks_attached(thread_data, void_agent.id) 1230 except Exception as e: 1231 logger.warning(f"Failed to ensure X user blocks: {e}") 1232 # Continue without user blocks rather than failing completely 1233 1234 # Create prompt for Letta agent 1235 author_info = thread_data.get('users', {}).get(author_id, {}) 1236 author_username = author_info.get('username', 'unknown') 1237 author_name = author_info.get('name', author_username) 1238 1239 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}). 1240 1241MOST RECENT POST (the mention you're responding to): 1242"{mention_text}" 1243 1244FULL THREAD CONTEXT: 1245```yaml 1246{thread_context} 1247``` 1248 1249The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 1250 1251If you need to update user information, use the x_user_* tools. 1252 1253To reply, use the add_post_to_x_thread tool: 1254- Each call creates one post (max 280 characters) 1255- For most responses, a single call is sufficient 1256- Only use multiple calls for threaded replies when: 1257 * The topic requires extended explanation that cannot fit in 280 characters 1258 * You're explicitly asked for a detailed/long response 1259 * The conversation naturally benefits from a structured multi-part answer 1260- Avoid unnecessary threads - be concise when possible""" 1261 1262 # Log mention processing 1263 title = f"X MENTION FROM @{author_username}" 1264 print(f"\n{title}") 1265 print(f" {'' * len(title)}") 1266 for line in mention_text.split('\n'): 1267 print(f" {line}") 1268 1269 # Send to Letta agent 1270 from config_loader import get_letta_config 1271 from letta_client import Letta 1272 1273 config = get_letta_config() 1274 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 1275 1276 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars") 1277 1278 try: 1279 # Use streaming to avoid timeout errors 1280 message_stream = letta_client.agents.messages.create_stream( 1281 agent_id=void_agent.id, 1282 messages=[{"role": "user", "content": prompt}], 1283 stream_tokens=False, 1284 max_steps=100 1285 ) 1286 1287 # Collect streaming response (simplified version of bsky.py logic) 1288 all_messages = [] 1289 for chunk in message_stream: 1290 if hasattr(chunk, 'message_type'): 1291 if chunk.message_type == 'reasoning_message': 1292 print("\n◆ Reasoning") 1293 print(" ─────────") 1294 for line in chunk.reasoning.split('\n'): 1295 print(f" {line}") 1296 elif chunk.message_type == 'tool_call_message': 1297 tool_name = chunk.tool_call.name 1298 if tool_name == 'add_post_to_x_thread': 1299 try: 1300 args = json.loads(chunk.tool_call.arguments) 1301 text = args.get('text', '') 1302 if text: 1303 print("\n✎ X Post") 1304 print(" ────────") 1305 for line in text.split('\n'): 1306 print(f" {line}") 1307 except: 1308 pass 1309 elif tool_name == 'halt_activity': 1310 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT") 1311 if queue_filepath and queue_filepath.exists(): 1312 queue_filepath.unlink() 1313 logger.info(f"Deleted queue file: {queue_filepath.name}") 1314 logger.info("=== X BOT TERMINATED BY AGENT ===") 1315 exit(0) 1316 elif chunk.message_type == 'tool_return_message': 1317 tool_name = chunk.name 1318 status = chunk.status 1319 if status == 'success' and tool_name == 'add_post_to_x_thread': 1320 print("\n✓ X Post Queued") 1321 print(" ──────────────") 1322 print(" Post queued successfully") 1323 elif chunk.message_type == 'assistant_message': 1324 print("\n▶ Assistant Response") 1325 print(" ──────────────────") 1326 for line in chunk.content.split('\n'): 1327 print(f" {line}") 1328 1329 all_messages.append(chunk) 1330 if str(chunk) == 'done': 1331 break 1332 1333 # Convert streaming response for compatibility 1334 message_response = type('StreamingResponse', (), { 1335 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 1336 })() 1337 1338 except Exception as api_error: 1339 logger.error(f"Letta API error: {api_error}") 1340 raise 1341 1342 # Extract successful add_post_to_x_thread tool calls 1343 reply_candidates = [] 1344 tool_call_results = {} 1345 ignored_notification = False 1346 ack_note = None # Track any note from annotate_ack tool 1347 1348 # First pass: collect tool return statuses 1349 for message in message_response.messages: 1350 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 1351 if message.name == 'add_post_to_x_thread': 1352 tool_call_results[message.tool_call_id] = message.status 1353 elif message.name == 'ignore_notification': 1354 if message.status == 'success': 1355 ignored_notification = True 1356 logger.info("🚫 X notification ignored") 1357 1358 # Second pass: collect successful tool calls 1359 for message in message_response.messages: 1360 if hasattr(message, 'tool_call') and message.tool_call: 1361 # Collect annotate_ack tool calls 1362 if message.tool_call.name == 'annotate_ack': 1363 try: 1364 args = json.loads(message.tool_call.arguments) 1365 note = args.get('note', '') 1366 if note: 1367 ack_note = note 1368 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 1369 except json.JSONDecodeError as e: 1370 logger.error(f"Failed to parse annotate_ack arguments: {e}") 1371 1372 # Collect add_post_to_x_thread tool calls - only if they were successful 1373 elif message.tool_call.name == 'add_post_to_x_thread': 1374 tool_call_id = message.tool_call.tool_call_id 1375 tool_status = tool_call_results.get(tool_call_id, 'unknown') 1376 1377 if tool_status == 'success': 1378 try: 1379 args = json.loads(message.tool_call.arguments) 1380 reply_text = args.get('text', '') 1381 if reply_text: 1382 reply_candidates.append(reply_text) 1383 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...") 1384 except json.JSONDecodeError as e: 1385 logger.error(f"Failed to parse tool call arguments: {e}") 1386 1387 # Save agent response data to debug folder 1388 try: 1389 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1390 1391 # Save complete agent interaction 1392 agent_response_data = { 1393 'processed_at': datetime.now().isoformat(), 1394 'mention_id': mention_id, 1395 'conversation_id': conversation_id, 1396 'prompt_sent': prompt, 1397 'reply_candidates': reply_candidates, 1398 'ignored_notification': ignored_notification, 1399 'ack_note': ack_note, 1400 'tool_call_results': tool_call_results, 1401 'all_messages': [] 1402 } 1403 1404 # Convert messages to serializable format 1405 for message in message_response.messages: 1406 msg_data = { 1407 'message_type': getattr(message, 'message_type', 'unknown'), 1408 'content': getattr(message, 'content', ''), 1409 'reasoning': getattr(message, 'reasoning', ''), 1410 'status': getattr(message, 'status', ''), 1411 'name': getattr(message, 'name', ''), 1412 } 1413 1414 if hasattr(message, 'tool_call') and message.tool_call: 1415 msg_data['tool_call'] = { 1416 'name': message.tool_call.name, 1417 'arguments': message.tool_call.arguments, 1418 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '') 1419 } 1420 1421 agent_response_data['all_messages'].append(msg_data) 1422 1423 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f: 1424 json.dump(agent_response_data, f, indent=2) 1425 1426 logger.info(f"💾 Saved agent response debug data") 1427 1428 except Exception as debug_error: 1429 logger.warning(f"Failed to save agent response debug data: {debug_error}") 1430 1431 # Handle conflicts 1432 if reply_candidates and ignored_notification: 1433 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!") 1434 return False 1435 1436 if reply_candidates: 1437 # Post replies to X 1438 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X") 1439 1440 if len(reply_candidates) == 1: 1441 content = reply_candidates[0] 1442 title = f"Reply to @{author_username}" 1443 else: 1444 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)]) 1445 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)" 1446 1447 print(f"\n{title}") 1448 print(f" {'' * len(title)}") 1449 for line in content.split('\n'): 1450 print(f" {line}") 1451 1452 if testing_mode: 1453 logger.info("TESTING MODE: Skipping actual X post") 1454 return True 1455 else: 1456 # Post to X using thread approach 1457 success = post_x_thread_replies(x_client, mention_id, reply_candidates) 1458 if success: 1459 logger.info(f"Successfully replied to @{author_username} on X") 1460 1461 # Acknowledge the post we're replying to 1462 try: 1463 ack_result = acknowledge_x_post(x_client, mention_id, ack_note) 1464 if ack_result: 1465 if ack_note: 1466 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")") 1467 else: 1468 logger.info(f"Successfully acknowledged X post from @{author_username}") 1469 else: 1470 logger.warning(f"Failed to acknowledge X post from @{author_username}") 1471 except Exception as e: 1472 logger.error(f"Error acknowledging X post from @{author_username}: {e}") 1473 # Don't fail the entire operation if acknowledgment fails 1474 1475 return True 1476 else: 1477 logger.error(f"Failed to send reply to @{author_username} on X") 1478 return False 1479 else: 1480 if ignored_notification: 1481 logger.info(f"X mention from @{author_username} was explicitly ignored") 1482 return "ignored" 1483 else: 1484 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username}") 1485 return "no_reply" 1486 1487 except Exception as e: 1488 logger.error(f"Error processing X mention: {e}") 1489 return False 1490 1491def acknowledge_x_post(x_client, post_id, note=None): 1492 """ 1493 Acknowledge an X post that we replied to. 1494 Uses the same Bluesky client and uploads to the void data repository on atproto, 1495 just like Bluesky acknowledgments. 1496 1497 Args: 1498 x_client: XClient instance (not used, kept for compatibility) 1499 post_id: The X post ID we're acknowledging 1500 note: Optional note to include with the acknowledgment 1501 1502 Returns: 1503 True if successful, False otherwise 1504 """ 1505 try: 1506 # Use Bluesky client to upload acks to the void data repository on atproto 1507 bsky_client = bsky_utils.default_login() 1508 1509 # Create a synthetic URI and CID for the X post 1510 # X posts don't have atproto URIs/CIDs, so we create identifiers 1511 post_uri = f"x://twitter.com/post/{post_id}" 1512 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts 1513 1514 # Use the same acknowledge_post function as Bluesky 1515 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note) 1516 1517 if ack_result: 1518 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else "")) 1519 return True 1520 else: 1521 logger.error(f"Failed to acknowledge X post {post_id}") 1522 return False 1523 1524 except Exception as e: 1525 logger.error(f"Error acknowledging X post {post_id}: {e}") 1526 return False 1527 1528def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages): 1529 """ 1530 Post a series of replies to X, threading them properly. 1531 1532 Args: 1533 x_client: XClient instance 1534 in_reply_to_tweet_id: The original tweet ID to reply to 1535 reply_messages: List of reply text strings 1536 1537 Returns: 1538 True if successful, False otherwise 1539 """ 1540 try: 1541 current_reply_id = in_reply_to_tweet_id 1542 1543 for i, reply_text in enumerate(reply_messages): 1544 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...") 1545 1546 result = x_client.post_reply(reply_text, current_reply_id) 1547 1548 if result and 'data' in result: 1549 new_tweet_id = result['data']['id'] 1550 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}") 1551 # For threading, the next reply should reply to this one 1552 current_reply_id = new_tweet_id 1553 else: 1554 logger.error(f"Failed to post X reply {i+1}") 1555 return False 1556 1557 return True 1558 1559 except Exception as e: 1560 logger.error(f"Error posting X thread replies: {e}") 1561 return False 1562 1563def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False): 1564 """ 1565 Load and process all X mentions from the queue. 1566 Similar to bsky.py load_and_process_queued_notifications but for X. 1567 """ 1568 try: 1569 # Get all X mention files in queue directory 1570 queue_files = sorted(X_QUEUE_DIR.glob("x_mention_*.json")) 1571 1572 if not queue_files: 1573 return 1574 1575 logger.info(f"Processing {len(queue_files)} queued X mentions") 1576 1577 for i, filepath in enumerate(queue_files, 1): 1578 logger.info(f"Processing X queue file {i}/{len(queue_files)}: {filepath.name}") 1579 1580 try: 1581 # Load mention data 1582 with open(filepath, 'r') as f: 1583 queue_data = json.load(f) 1584 1585 mention_data = queue_data.get('mention', queue_data) 1586 1587 # Process the mention 1588 success = process_x_mention(void_agent, x_client, mention_data, 1589 queue_filepath=filepath, testing_mode=testing_mode) 1590 1591 # Handle file based on processing result 1592 if success: 1593 if testing_mode: 1594 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}") 1595 else: 1596 filepath.unlink() 1597 logger.info(f"Successfully processed and removed X file: {filepath.name}") 1598 1599 # Mark as processed 1600 processed_mentions = load_processed_mentions() 1601 processed_mentions.add(mention_data.get('id')) 1602 save_processed_mentions(processed_mentions) 1603 1604 elif success is None: # Move to error directory 1605 error_dir = X_QUEUE_DIR / "errors" 1606 error_dir.mkdir(exist_ok=True) 1607 error_path = error_dir / filepath.name 1608 filepath.rename(error_path) 1609 logger.warning(f"Moved X file {filepath.name} to errors directory") 1610 1611 elif success == "no_reply": # Move to no_reply directory 1612 no_reply_dir = X_QUEUE_DIR / "no_reply" 1613 no_reply_dir.mkdir(exist_ok=True) 1614 no_reply_path = no_reply_dir / filepath.name 1615 filepath.rename(no_reply_path) 1616 logger.info(f"Moved X file {filepath.name} to no_reply directory") 1617 1618 elif success == "ignored": # Delete ignored notifications 1619 filepath.unlink() 1620 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}") 1621 1622 else: 1623 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry") 1624 1625 except Exception as e: 1626 logger.error(f"💥 Error processing queued X mention {filepath.name}: {e}") 1627 1628 except Exception as e: 1629 logger.error(f"Error loading queued X mentions: {e}") 1630 1631def process_x_notifications(void_agent, x_client, testing_mode=False): 1632 """ 1633 Fetch new X mentions, queue them, and process the queue. 1634 Similar to bsky.py process_notifications but for X. 1635 """ 1636 try: 1637 # Get username for fetching mentions 1638 user_info = x_client._make_request("/users/me", params={"user.fields": "username"}) 1639 if not user_info or "data" not in user_info: 1640 logger.error("Could not get username for X mentions") 1641 return 1642 1643 username = user_info["data"]["username"] 1644 1645 # Fetch and queue new mentions 1646 new_count = fetch_and_queue_mentions(username) 1647 1648 if new_count > 0: 1649 logger.info(f"Found {new_count} new X mentions to process") 1650 1651 # Process the entire queue 1652 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 1653 1654 except Exception as e: 1655 logger.error(f"Error processing X notifications: {e}") 1656 1657def initialize_x_void(): 1658 """Initialize the void agent for X operations.""" 1659 logger.info("Starting void agent initialization for X...") 1660 1661 from config_loader import get_letta_config 1662 from letta_client import Letta 1663 1664 # Get config 1665 config = get_letta_config() 1666 client = Letta(token=config['api_key'], timeout=config['timeout']) 1667 agent_id = config['agent_id'] 1668 1669 try: 1670 void_agent = client.agents.retrieve(agent_id=agent_id) 1671 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})") 1672 except Exception as e: 1673 logger.error(f"Failed to load void agent {agent_id}: {e}") 1674 raise e 1675 1676 # Ensure correct tools are attached for X 1677 logger.info("Configuring tools for X platform...") 1678 try: 1679 from tool_manager import ensure_platform_tools 1680 ensure_platform_tools('x', void_agent.id) 1681 except Exception as e: 1682 logger.error(f"Failed to configure platform tools: {e}") 1683 logger.warning("Continuing with existing tool configuration") 1684 1685 # Log agent details 1686 logger.info(f"X Void agent details - ID: {void_agent.id}") 1687 logger.info(f"Agent name: {void_agent.name}") 1688 1689 return void_agent 1690 1691def x_main_loop(testing_mode=False): 1692 """ 1693 Main X bot loop that continuously monitors for mentions and processes them. 1694 Similar to bsky.py main() but for X/Twitter. 1695 """ 1696 import time 1697 from time import sleep 1698 1699 logger.info("=== STARTING X VOID BOT ===") 1700 1701 # Initialize void agent 1702 void_agent = initialize_x_void() 1703 logger.info(f"X void agent initialized: {void_agent.id}") 1704 1705 # Initialize X client 1706 x_client = create_x_client() 1707 logger.info("Connected to X API") 1708 1709 # Main loop 1710 FETCH_DELAY_SEC = 60 # Check every minute for X mentions 1711 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds") 1712 1713 if testing_mode: 1714 logger.info("=== RUNNING IN X TESTING MODE ===") 1715 logger.info(" - No messages will be sent to X") 1716 logger.info(" - Queue files will not be deleted") 1717 1718 cycle_count = 0 1719 start_time = time.time() 1720 1721 while True: 1722 try: 1723 cycle_count += 1 1724 logger.info(f"=== X CYCLE {cycle_count} ===") 1725 1726 # Process X notifications (fetch, queue, and process) 1727 process_x_notifications(void_agent, x_client, testing_mode) 1728 1729 # Log cycle completion 1730 elapsed_time = time.time() - start_time 1731 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes") 1732 1733 sleep(FETCH_DELAY_SEC) 1734 1735 except KeyboardInterrupt: 1736 elapsed_time = time.time() - start_time 1737 logger.info("=== X BOT STOPPED BY USER ===") 1738 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes") 1739 break 1740 except Exception as e: 1741 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===") 1742 logger.error(f"Error details: {e}") 1743 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...") 1744 sleep(FETCH_DELAY_SEC * 2) 1745 1746def process_queue_only(testing_mode=False): 1747 """ 1748 Process all queued X mentions without fetching new ones. 1749 Useful for rate limit management - queue first, then process separately. 1750 1751 Args: 1752 testing_mode: If True, don't actually post to X and keep queue files 1753 """ 1754 logger.info("=== PROCESSING X QUEUE ONLY ===") 1755 1756 if testing_mode: 1757 logger.info("=== RUNNING IN X TESTING MODE ===") 1758 logger.info(" - No messages will be sent to X") 1759 logger.info(" - Queue files will not be deleted") 1760 1761 try: 1762 # Initialize void agent 1763 void_agent = initialize_x_void() 1764 logger.info(f"X void agent initialized: {void_agent.id}") 1765 1766 # Initialize X client 1767 x_client = create_x_client() 1768 logger.info("Connected to X API") 1769 1770 # Process the queue without fetching new mentions 1771 logger.info("Processing existing X queue...") 1772 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 1773 1774 logger.info("=== X QUEUE PROCESSING COMPLETE ===") 1775 1776 except Exception as e: 1777 logger.error(f"Error processing X queue: {e}") 1778 raise 1779 1780def x_notification_loop(): 1781 """ 1782 DEPRECATED: Old X notification loop using search-based mention detection. 1783 Use x_main_loop() instead for the full bot experience. 1784 """ 1785 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.") 1786 x_main_loop() 1787 1788if __name__ == "__main__": 1789 import sys 1790 import argparse 1791 1792 if len(sys.argv) > 1: 1793 if sys.argv[1] == "bot": 1794 # Main bot with optional --test flag 1795 parser = argparse.ArgumentParser(description='X Void Bot') 1796 parser.add_argument('command', choices=['bot']) 1797 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)') 1798 args = parser.parse_args() 1799 x_main_loop(testing_mode=args.test) 1800 elif sys.argv[1] == "loop": 1801 x_notification_loop() 1802 elif sys.argv[1] == "reply": 1803 reply_to_cameron_post() 1804 elif sys.argv[1] == "me": 1805 get_my_user_info() 1806 elif sys.argv[1] == "search": 1807 test_search_mentions() 1808 elif sys.argv[1] == "queue": 1809 test_fetch_and_queue() 1810 elif sys.argv[1] == "thread": 1811 test_thread_context() 1812 elif sys.argv[1] == "process": 1813 # Process all queued mentions with optional --test flag 1814 testing_mode = "--test" in sys.argv 1815 process_queue_only(testing_mode=testing_mode) 1816 elif sys.argv[1] == "letta": 1817 # Use specific agent ID if provided, otherwise use from config 1818 agent_id = sys.argv[2] if len(sys.argv) > 2 else None 1819 test_letta_integration(agent_id) 1820 else: 1821 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta]") 1822 print(" bot - Run the main X bot (use --test for testing mode)") 1823 print(" Example: python x.py bot --test") 1824 print(" queue - Fetch and queue mentions only (no processing)") 1825 print(" process - Process all queued mentions only (no fetching)") 1826 print(" Example: python x.py process --test") 1827 print(" loop - Run the old notification monitoring loop (deprecated)") 1828 print(" reply - Reply to Cameron's specific post") 1829 print(" me - Get authenticated user info and correct user ID") 1830 print(" search - Test search-based mention detection") 1831 print(" thread - Test thread context retrieval from queued mention") 1832 print(" letta - Test sending thread context to Letta agent") 1833 print(" Optional: python x.py letta <agent-id>") 1834 else: 1835 test_x_client()