a digital person for bluesky

Fix X bot rate limiting issues with improved logging and caching

## Rate Limit Improvements:
- Add specific endpoint logging to identify rate limit sources
- Implement user info caching to reduce /users/me API calls from ~720/day to 1/day
- Add get_username() and get_user_info() methods with 24-hour caching
- Update all /users/me calls to use cached methods

## Enhanced Logging:
- Add endpoint-specific rate limit error messages
- Add configurable logging level from x_config.yaml
- Add debug logging for API operations
- Show exact API endpoints when rate limits occur

## Benefits:
- Prevents rate limiting on bot startup and routine operations
- Better debugging visibility for API issues
- Improved bot reliability and uptime

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

+139 -32
+139 -32
x.py
··· 21 pass 22 23 24 - # Configure logging 25 logging.basicConfig( 26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 27 ) 28 logger = logging.getLogger("x_client") 29 30 # X-specific file paths 31 X_QUEUE_DIR = Path("x_queue") ··· 79 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]: 80 """Make a request to the X API with proper error handling and exponential backoff.""" 81 url = f"{self.base_url}{endpoint}" 82 - 83 for attempt in range(max_retries): 84 try: 85 if method.upper() == "GET": ··· 111 if attempt < max_retries - 1: 112 # Exponential backoff: 60s, 120s, 240s 113 backoff_time = 60 * (2 ** attempt) 114 - logger.warning(f"X API rate limit exceeded (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry") 115 logger.error(f"Response: {response.text}") 116 time.sleep(backoff_time) 117 continue 118 else: 119 - logger.error("X API rate limit exceeded - max retries reached") 120 logger.error(f"Response: {response.text}") 121 - raise XRateLimitError("X API rate limit exceeded") 122 else: 123 if attempt < max_retries - 1: 124 # Exponential backoff for other HTTP errors too ··· 255 return cached_data 256 257 # First, get the original tweet directly since it might not appear in conversation search 258 original_tweet = None 259 try: 260 endpoint = f"/tweets/{conversation_id}" ··· 263 "user.fields": "id,name,username", 264 "expansions": "author_id" 265 } 266 response = self._make_request(endpoint, params) 267 if response and "data" in response: 268 original_tweet = response["data"] ··· 287 logger.info(f"Using until_id={until_id} to exclude future tweets") 288 289 logger.info(f"Fetching thread context for conversation {conversation_id}") 290 response = self._make_request(endpoint, params) 291 292 tweets = [] ··· 369 "user.fields": "id,name,username", 370 "expansions": "author_id" 371 } 372 response = self._make_request(endpoint, params) 373 374 if response and "data" in response: ··· 455 } 456 457 logger.info(f"Attempting to post reply with {self.auth_method} authentication") 458 result = self._make_request(endpoint, method="POST", data=payload) 459 460 if result: ··· 486 } 487 488 logger.info(f"Attempting to post tweet with {self.auth_method} authentication") 489 result = self._make_request(endpoint, method="POST", data=payload) 490 491 if result: ··· 495 logger.error("Failed to post tweet") 496 return None 497 498 def load_x_config(config_path: str = "x_config.yaml") -> Dict[str, Any]: 499 """Load complete X configuration from x_config.yaml.""" 500 try: ··· 916 except Exception as e: 917 logger.error(f"Error caching individual tweets: {e}") 918 919 def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool: 920 """ 921 Determine if we have sufficient context to skip backfilling missing tweets. ··· 971 last_seen_id = load_last_seen_id() 972 973 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}") 974 - 975 - # Search for mentions 976 mentions = client.search_mentions( 977 username=username, 978 since_id=last_seen_id, ··· 1009 try: 1010 client = create_x_client() 1011 1012 - # Use the /2/users/me endpoint to get authenticated user info 1013 - endpoint = "/users/me" 1014 - params = { 1015 - "user.fields": "id,name,username,description" 1016 - } 1017 - 1018 print("Fetching authenticated user information...") 1019 - response = client._make_request(endpoint, params=params) 1020 - 1021 - if response and "data" in response: 1022 - user_data = response["data"] 1023 print(f"✅ Found authenticated user:") 1024 print(f" ID: {user_data.get('id')}") 1025 print(f" Username: @{user_data.get('username')}") ··· 1043 client = create_x_client() 1044 1045 # First get our username 1046 - user_info = client._make_request("/users/me", params={"user.fields": "username"}) 1047 - if not user_info or "data" not in user_info: 1048 print("❌ Could not get username") 1049 return 1050 - 1051 - username = user_info["data"]["username"] 1052 print(f"🔍 Searching for mentions of @{username}") 1053 1054 mentions = client.search_mentions(username, max_results=5) ··· 1069 client = create_x_client() 1070 1071 # Get our username 1072 - user_info = client._make_request("/users/me", params={"user.fields": "username"}) 1073 - if not user_info or "data" not in user_info: 1074 print("❌ Could not get username") 1075 return 1076 - 1077 - username = user_info["data"]["username"] 1078 print(f"🔄 Fetching and queueing mentions for @{username}") 1079 1080 # Show current state ··· 1410 "user.fields": "id,name,username", 1411 "expansions": "author_id" 1412 } 1413 response = x_client._make_request(endpoint, params) 1414 if response and "data" in response: 1415 missing_tweet = response["data"] ··· 1949 Similar to bsky.py process_notifications but for X. 1950 """ 1951 try: 1952 - # Get username for fetching mentions 1953 - user_info = x_client._make_request("/users/me", params={"user.fields": "username"}) 1954 - if not user_info or "data" not in user_info: 1955 logger.error("Could not get username for X mentions") 1956 return 1957 - 1958 - username = user_info["data"]["username"] 1959 1960 # Fetch and queue new mentions 1961 new_count = fetch_and_queue_mentions(username) ··· 2034 logger.info("Configuring tools for X platform...") 2035 try: 2036 from tool_manager import ensure_platform_tools 2037 - ensure_platform_tools('x', void_agent.id) 2038 except Exception as e: 2039 logger.error(f"Failed to configure platform tools: {e}") 2040 logger.warning("Continuing with existing tool configuration") ··· 2059 from letta_client import Letta 2060 2061 logger.info("=== STARTING X VOID BOT ===") 2062 - 2063 # Initialize void agent 2064 void_agent = initialize_x_void() 2065 logger.info(f"X void agent initialized: {void_agent.id}")
··· 21 pass 22 23 24 + # Configure logging (will be updated by setup_logging_from_config if called) 25 logging.basicConfig( 26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 27 ) 28 logger = logging.getLogger("x_client") 29 + 30 + def setup_logging_from_config(config_path: str = "x_config.yaml"): 31 + """Configure logging based on x_config.yaml settings.""" 32 + try: 33 + config = load_x_config(config_path) 34 + logging_config = config.get('logging', {}) 35 + log_level = logging_config.get('level', 'INFO').upper() 36 + 37 + # Convert string level to logging constant 38 + numeric_level = getattr(logging, log_level, logging.INFO) 39 + 40 + # Update the root logger level 41 + logging.getLogger().setLevel(numeric_level) 42 + # Update our specific logger level 43 + logger.setLevel(numeric_level) 44 + 45 + logger.info(f"Logging level set to {log_level}") 46 + 47 + except Exception as e: 48 + logger.warning(f"Failed to configure logging from config: {e}, using default INFO level") 49 50 # X-specific file paths 51 X_QUEUE_DIR = Path("x_queue") ··· 99 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]: 100 """Make a request to the X API with proper error handling and exponential backoff.""" 101 url = f"{self.base_url}{endpoint}" 102 + 103 + # Log the specific API call being made 104 + logger.debug(f"Making X API request: {method} {endpoint}") 105 + 106 for attempt in range(max_retries): 107 try: 108 if method.upper() == "GET": ··· 134 if attempt < max_retries - 1: 135 # Exponential backoff: 60s, 120s, 240s 136 backoff_time = 60 * (2 ** attempt) 137 + logger.warning(f"X API rate limit exceeded on {method} {endpoint} (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry") 138 logger.error(f"Response: {response.text}") 139 time.sleep(backoff_time) 140 continue 141 else: 142 + logger.error(f"X API rate limit exceeded on {method} {endpoint} - max retries reached") 143 logger.error(f"Response: {response.text}") 144 + raise XRateLimitError(f"X API rate limit exceeded on {method} {endpoint}") 145 else: 146 if attempt < max_retries - 1: 147 # Exponential backoff for other HTTP errors too ··· 278 return cached_data 279 280 # First, get the original tweet directly since it might not appear in conversation search 281 + logger.debug(f"Getting thread context for conversation {conversation_id}") 282 original_tweet = None 283 try: 284 endpoint = f"/tweets/{conversation_id}" ··· 287 "user.fields": "id,name,username", 288 "expansions": "author_id" 289 } 290 + logger.debug(f"Fetching original tweet: GET {endpoint}") 291 response = self._make_request(endpoint, params) 292 if response and "data" in response: 293 original_tweet = response["data"] ··· 312 logger.info(f"Using until_id={until_id} to exclude future tweets") 313 314 logger.info(f"Fetching thread context for conversation {conversation_id}") 315 + logger.debug(f"Searching conversation: GET {endpoint} with query={params['query']}") 316 response = self._make_request(endpoint, params) 317 318 tweets = [] ··· 395 "user.fields": "id,name,username", 396 "expansions": "author_id" 397 } 398 + logger.debug(f"Batch fetching missing tweets: GET {endpoint} (ids: {len(batch_ids)} tweets)") 399 response = self._make_request(endpoint, params) 400 401 if response and "data" in response: ··· 482 } 483 484 logger.info(f"Attempting to post reply with {self.auth_method} authentication") 485 + logger.debug(f"Posting reply: POST {endpoint}") 486 result = self._make_request(endpoint, method="POST", data=payload) 487 488 if result: ··· 514 } 515 516 logger.info(f"Attempting to post tweet with {self.auth_method} authentication") 517 + logger.debug(f"Posting tweet: POST {endpoint}") 518 result = self._make_request(endpoint, method="POST", data=payload) 519 520 if result: ··· 524 logger.error("Failed to post tweet") 525 return None 526 527 + def get_user_info(self, fields: Optional[str] = None) -> Optional[Dict]: 528 + """ 529 + Get the authenticated user's information, using cached data when available. 530 + This reduces API calls significantly since user info rarely changes. 531 + 532 + Args: 533 + fields: Optional comma-separated list of user fields to fetch 534 + 535 + Returns: 536 + User data dict if successful, None if failed 537 + """ 538 + # First try to get from cache 539 + cached_user_info = get_cached_user_info() 540 + if cached_user_info: 541 + # Check if cached data has all requested fields 542 + requested_fields = set(fields.split(',') if fields else ['id', 'username', 'name']) 543 + cached_fields = set(cached_user_info.keys()) 544 + if requested_fields.issubset(cached_fields): 545 + return cached_user_info 546 + 547 + # Cache miss, expired, or missing requested fields - fetch from API 548 + logger.debug("Fetching fresh user info from /users/me API") 549 + endpoint = "/users/me" 550 + params = {"user.fields": fields or "id,username,name,description"} 551 + 552 + response = self._make_request(endpoint, params=params) 553 + if response and "data" in response: 554 + user_data = response["data"] 555 + # Cache the result for future use 556 + save_cached_user_info(user_data) 557 + return user_data 558 + else: 559 + logger.error("Failed to get user info from /users/me API") 560 + return None 561 + 562 + def get_username(self) -> Optional[str]: 563 + """ 564 + Get the authenticated user's username, using cached data when available. 565 + This reduces API calls significantly since username rarely changes. 566 + 567 + Returns: 568 + Username string if successful, None if failed 569 + """ 570 + user_info = self.get_user_info("id,username,name") 571 + return user_info.get("username") if user_info else None 572 + 573 def load_x_config(config_path: str = "x_config.yaml") -> Dict[str, Any]: 574 """Load complete X configuration from x_config.yaml.""" 575 try: ··· 991 except Exception as e: 992 logger.error(f"Error caching individual tweets: {e}") 993 994 + def get_cached_user_info() -> Optional[Dict]: 995 + """Load cached user info if available and not expired.""" 996 + cache_file = X_CACHE_DIR / "user_info.json" 997 + if cache_file.exists(): 998 + try: 999 + with open(cache_file, 'r') as f: 1000 + cached_data = json.load(f) 1001 + # Check if cache is recent (within 24 hours) 1002 + from datetime import datetime, timedelta 1003 + cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 1004 + if datetime.now() - cached_time < timedelta(hours=24): 1005 + logger.debug("Using cached user info") 1006 + return cached_data.get('data') 1007 + else: 1008 + logger.debug("Cached user info expired (>24 hours old)") 1009 + except Exception as e: 1010 + logger.warning(f"Error loading cached user info: {e}") 1011 + return None 1012 + 1013 + def save_cached_user_info(user_data: Dict): 1014 + """Save user info to cache.""" 1015 + try: 1016 + X_CACHE_DIR.mkdir(exist_ok=True) 1017 + cache_file = X_CACHE_DIR / "user_info.json" 1018 + 1019 + from datetime import datetime 1020 + cache_data = { 1021 + 'data': user_data, 1022 + 'cached_at': datetime.now().isoformat() 1023 + } 1024 + 1025 + with open(cache_file, 'w') as f: 1026 + json.dump(cache_data, f, indent=2) 1027 + 1028 + logger.debug(f"Cached user info: {user_data.get('username')}") 1029 + 1030 + except Exception as e: 1031 + logger.error(f"Error caching user info: {e}") 1032 + 1033 def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool: 1034 """ 1035 Determine if we have sufficient context to skip backfilling missing tweets. ··· 1085 last_seen_id = load_last_seen_id() 1086 1087 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}") 1088 + 1089 + # Search for mentions - this calls GET /2/tweets/search/recent 1090 + logger.debug(f"Calling search_mentions API for @{username}") 1091 mentions = client.search_mentions( 1092 username=username, 1093 since_id=last_seen_id, ··· 1124 try: 1125 client = create_x_client() 1126 1127 + # Get authenticated user info using cached method 1128 print("Fetching authenticated user information...") 1129 + user_data = client.get_user_info("id,name,username,description") 1130 + 1131 + if user_data: 1132 print(f"✅ Found authenticated user:") 1133 print(f" ID: {user_data.get('id')}") 1134 print(f" Username: @{user_data.get('username')}") ··· 1152 client = create_x_client() 1153 1154 # First get our username 1155 + username = client.get_username() 1156 + if not username: 1157 print("❌ Could not get username") 1158 return 1159 print(f"🔍 Searching for mentions of @{username}") 1160 1161 mentions = client.search_mentions(username, max_results=5) ··· 1176 client = create_x_client() 1177 1178 # Get our username 1179 + username = client.get_username() 1180 + if not username: 1181 print("❌ Could not get username") 1182 return 1183 print(f"🔄 Fetching and queueing mentions for @{username}") 1184 1185 # Show current state ··· 1515 "user.fields": "id,name,username", 1516 "expansions": "author_id" 1517 } 1518 + logger.debug(f"Fetching individual missing tweet: GET {endpoint}") 1519 response = x_client._make_request(endpoint, params) 1520 if response and "data" in response: 1521 missing_tweet = response["data"] ··· 2055 Similar to bsky.py process_notifications but for X. 2056 """ 2057 try: 2058 + # Get username for fetching mentions - uses cached data to avoid rate limits 2059 + username = x_client.get_username() 2060 + if not username: 2061 logger.error("Could not get username for X mentions") 2062 return 2063 2064 # Fetch and queue new mentions 2065 new_count = fetch_and_queue_mentions(username) ··· 2138 logger.info("Configuring tools for X platform...") 2139 try: 2140 from tool_manager import ensure_platform_tools 2141 + ensure_platform_tools('x', void_agent.id, config['api_key']) 2142 except Exception as e: 2143 logger.error(f"Failed to configure platform tools: {e}") 2144 logger.warning("Continuing with existing tool configuration") ··· 2163 from letta_client import Letta 2164 2165 logger.info("=== STARTING X VOID BOT ===") 2166 + 2167 + # Configure logging from config file 2168 + setup_logging_from_config() 2169 + 2170 # Initialize void agent 2171 void_agent = initialize_x_void() 2172 logger.info(f"X void agent initialized: {void_agent.id}")