a digital person for bluesky
at ba384e5fe2d4d2fed6620a6c52dd3bc3837b0e40 522 lines 22 kB view raw
1#!/usr/bin/env python3 2""" 3Script to create a Letta agent that researches Bluesky profiles and updates 4the model's understanding of users. 5""" 6 7import os 8import logging 9from letta_client import Letta 10from utils import upsert_block, upsert_agent 11 12# Configure logging 13logging.basicConfig( 14 level=logging.INFO, 15 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 16) 17logger = logging.getLogger("profile_researcher") 18 19# Use the "Bluesky" project 20PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 21 22def create_search_posts_tool(client: Letta): 23 """Create the Bluesky search posts tool using Letta SDK.""" 24 25 def search_bluesky_posts(query: str, max_results: int = 25, author: str = None, sort: str = "latest") -> str: 26 """ 27 Search for posts on Bluesky matching the given criteria. 28 29 Args: 30 query: Search query string (required) 31 max_results: Maximum number of results to return (default: 25, max: 100) 32 author: Filter to posts by a specific author handle (optional) 33 sort: Sort order - "latest" or "top" (default: "latest") 34 35 Returns: 36 YAML-formatted search results with posts and metadata 37 """ 38 import os 39 import requests 40 import json 41 import yaml 42 from datetime import datetime 43 44 try: 45 # Use public Bluesky API 46 base_url = "https://public.api.bsky.app" 47 48 # Build search parameters 49 params = { 50 "q": query, 51 "limit": min(max_results, 100), 52 "sort": sort 53 } 54 55 # Add optional author filter 56 if author: 57 params["author"] = author.lstrip('@') 58 59 # Make search request 60 try: 61 search_url = f"{base_url}/xrpc/app.bsky.feed.searchPosts" 62 search_response = requests.get(search_url, params=params, timeout=10) 63 search_response.raise_for_status() 64 search_data = search_response.json() 65 except requests.exceptions.HTTPError as e: 66 raise RuntimeError(f"Search failed with HTTP {e.response.status_code}: {e.response.text}") 67 except requests.exceptions.RequestException as e: 68 raise RuntimeError(f"Network error during search: {str(e)}") 69 except Exception as e: 70 raise RuntimeError(f"Unexpected error during search: {str(e)}") 71 72 # Build search results structure 73 results_data = { 74 "search_results": { 75 "query": query, 76 "timestamp": datetime.now().isoformat(), 77 "parameters": { 78 "sort": sort, 79 "max_results": max_results, 80 "author_filter": author if author else "none" 81 }, 82 "results": search_data 83 } 84 } 85 86 # Fields to strip for cleaner output 87 strip_fields = [ 88 "cid", "rev", "did", "uri", "langs", "threadgate", "py_type", 89 "labels", "facets", "avatar", "viewer", "indexed_at", "indexedAt", 90 "tags", "associated", "thread_context", "image", "aspect_ratio", 91 "alt", "thumb", "fullsize", "root", "parent", "created_at", 92 "createdAt", "verification", "embedding_disabled", "thread_muted", 93 "reply_disabled", "pinned", "like", "repost", "blocked_by", 94 "blocking", "blocking_by_list", "followed_by", "following", 95 "known_followers", "muted", "muted_by_list", "root_author_like", 96 "embed", "entities", "reason", "feedContext" 97 ] 98 99 # Remove unwanted fields by traversing the data structure 100 def remove_fields(obj, fields_to_remove): 101 if isinstance(obj, dict): 102 return {k: remove_fields(v, fields_to_remove) 103 for k, v in obj.items() 104 if k not in fields_to_remove} 105 elif isinstance(obj, list): 106 return [remove_fields(item, fields_to_remove) for item in obj] 107 else: 108 return obj 109 110 # Clean the data 111 cleaned_data = remove_fields(results_data, strip_fields) 112 113 # Convert to YAML for better readability 114 return yaml.dump(cleaned_data, default_flow_style=False, allow_unicode=True) 115 116 except ValueError as e: 117 # User-friendly errors 118 raise ValueError(str(e)) 119 except RuntimeError as e: 120 # Network/API errors 121 raise RuntimeError(str(e)) 122 except yaml.YAMLError as e: 123 # YAML conversion errors 124 raise RuntimeError(f"Error formatting output: {str(e)}") 125 except Exception as e: 126 # Catch-all for unexpected errors 127 raise RuntimeError(f"Unexpected error searching posts with query '{query}': {str(e)}") 128 129 # Create the tool using upsert 130 tool = client.tools.upsert_from_function( 131 func=search_bluesky_posts, 132 tags=["bluesky", "search", "posts"] 133 ) 134 135 logger.info(f"Created tool: {tool.name} (ID: {tool.id})") 136 return tool 137 138def create_profile_research_tool(client: Letta): 139 """Create the Bluesky profile research tool using Letta SDK.""" 140 141 def research_bluesky_profile(handle: str, max_posts: int = 20) -> str: 142 """ 143 Research a Bluesky user's profile and recent posts to understand their interests and behavior. 144 145 Args: 146 handle: The Bluesky handle to research (e.g., 'cameron.pfiffer.org' or '@cameron.pfiffer.org') 147 max_posts: Maximum number of recent posts to analyze (default: 20) 148 149 Returns: 150 A comprehensive analysis of the user's profile and posting patterns 151 """ 152 import os 153 import requests 154 import json 155 import yaml 156 from datetime import datetime 157 158 try: 159 # Clean handle (remove @ if present) 160 clean_handle = handle.lstrip('@') 161 162 # Use public Bluesky API (no auth required for public data) 163 base_url = "https://public.api.bsky.app" 164 165 # Get profile information 166 try: 167 profile_url = f"{base_url}/xrpc/app.bsky.actor.getProfile" 168 profile_response = requests.get(profile_url, params={"actor": clean_handle}, timeout=10) 169 profile_response.raise_for_status() 170 profile_data = profile_response.json() 171 except requests.exceptions.HTTPError as e: 172 if e.response.status_code == 404: 173 raise ValueError(f"Profile @{clean_handle} not found") 174 raise RuntimeError(f"HTTP error {e.response.status_code}: {e.response.text}") 175 except requests.exceptions.RequestException as e: 176 raise RuntimeError(f"Network error: {str(e)}") 177 except Exception as e: 178 raise RuntimeError(f"Unexpected error fetching profile: {str(e)}") 179 180 # Get recent posts feed 181 try: 182 feed_url = f"{base_url}/xrpc/app.bsky.feed.getAuthorFeed" 183 feed_response = requests.get(feed_url, params={ 184 "actor": clean_handle, 185 "limit": min(max_posts, 50) # API limit 186 }, timeout=10) 187 feed_response.raise_for_status() 188 feed_data = feed_response.json() 189 except Exception as e: 190 # Continue with empty feed if posts can't be fetched 191 feed_data = {"feed": []} 192 193 # Build research data structure 194 research_data = { 195 "profile_research": { 196 "handle": f"@{clean_handle}", 197 "timestamp": datetime.now().isoformat(), 198 "profile": profile_data, 199 "author_feed": feed_data 200 } 201 } 202 203 # Fields to strip for cleaner output 204 strip_fields = [ 205 "cid", "rev", "did", "uri", "langs", "threadgate", "py_type", 206 "labels", "facets", "avatar", "viewer", "indexed_at", "indexedAt", 207 "tags", "associated", "thread_context", "image", "aspect_ratio", 208 "alt", "thumb", "fullsize", "root", "parent", "created_at", 209 "createdAt", "verification", "embedding_disabled", "thread_muted", 210 "reply_disabled", "pinned", "like", "repost", "blocked_by", 211 "blocking", "blocking_by_list", "followed_by", "following", 212 "known_followers", "muted", "muted_by_list", "root_author_like", 213 "embed", "entities", "reason", "feedContext" 214 ] 215 216 # Remove unwanted fields by traversing the data structure 217 def remove_fields(obj, fields_to_remove): 218 if isinstance(obj, dict): 219 return {k: remove_fields(v, fields_to_remove) 220 for k, v in obj.items() 221 if k not in fields_to_remove} 222 elif isinstance(obj, list): 223 return [remove_fields(item, fields_to_remove) for item in obj] 224 else: 225 return obj 226 227 # Clean the data 228 cleaned_data = remove_fields(research_data, strip_fields) 229 230 # Convert to YAML for better readability 231 return yaml.dump(cleaned_data, default_flow_style=False, allow_unicode=True) 232 233 except ValueError as e: 234 # User-friendly errors 235 raise ValueError(str(e)) 236 except RuntimeError as e: 237 # Network/API errors 238 raise RuntimeError(str(e)) 239 except yaml.YAMLError as e: 240 # YAML conversion errors 241 raise RuntimeError(f"Error formatting output: {str(e)}") 242 except Exception as e: 243 # Catch-all for unexpected errors 244 raise RuntimeError(f"Unexpected error researching profile {handle}: {str(e)}") 245 246 # Create or update the tool using upsert 247 tool = client.tools.upsert_from_function( 248 func=research_bluesky_profile, 249 tags=["bluesky", "profile", "research"] 250 ) 251 252 logger.info(f"Created tool: {tool.name} (ID: {tool.id})") 253 return tool 254 255def create_block_management_tools(client: Letta): 256 """Create tools for attaching and detaching user blocks.""" 257 258 def attach_user_block(handle: str) -> str: 259 """ 260 Create (if needed) and attach a user-specific memory block for a Bluesky user. 261 262 Args: 263 handle: The Bluesky handle (e.g., 'cameron.pfiffer.org' or '@cameron.pfiffer.org') 264 265 Returns: 266 Status message about the block attachment 267 """ 268 import os 269 from letta_client import Letta 270 271 try: 272 # Clean handle for block label 273 clean_handle = handle.lstrip('@').replace('.', '_').replace('-', '_') 274 block_label = f"user_{clean_handle}" 275 276 # Initialize Letta client 277 letta_client = Letta(token=os.environ["LETTA_API_KEY"]) 278 279 # Get current agent (this tool is being called by) 280 # We need to find the agent that's calling this tool 281 # For now, we'll find the profile-researcher agent 282 agents = letta_client.agents.list(name="profile-researcher") 283 if not agents: 284 return "Error: Could not find profile-researcher agent" 285 286 agent = agents[0] 287 288 # Check if block already exists and is attached 289 agent_blocks = letta_client.agents.blocks.list(agent_id=agent.id) 290 for block in agent_blocks: 291 if block.label == block_label: 292 return f"User block for @{handle} is already attached (label: {block_label})" 293 294 # Create or get the user block 295 existing_blocks = letta_client.blocks.list(label=block_label) 296 297 if existing_blocks: 298 user_block = existing_blocks[0] 299 action = "Retrieved existing" 300 else: 301 user_block = letta_client.blocks.create( 302 label=block_label, 303 value=f"User information for @{handle} will be stored here as I learn about them through profile research and interactions.", 304 description=f"Stores detailed information about Bluesky user @{handle}, including their interests, posting patterns, personality traits, and interaction history." 305 ) 306 action = "Created new" 307 308 # Attach block to agent 309 letta_client.agents.blocks.attach(agent_id=agent.id, block_id=user_block.id) 310 311 return f"{action} and attached user block for @{handle} (label: {block_label}). I can now store and access information about this user." 312 313 except Exception as e: 314 return f"Error attaching user block for @{handle}: {str(e)}" 315 316 def detach_user_block(handle: str) -> str: 317 """ 318 Detach a user-specific memory block from the agent. 319 320 Args: 321 handle: The Bluesky handle (e.g., 'cameron.pfiffer.org' or '@cameron.pfiffer.org') 322 323 Returns: 324 Status message about the block detachment 325 """ 326 import os 327 from letta_client import Letta 328 329 try: 330 # Clean handle for block label 331 clean_handle = handle.lstrip('@').replace('.', '_').replace('-', '_') 332 block_label = f"user_{clean_handle}" 333 334 # Initialize Letta client 335 letta_client = Letta(token=os.environ["LETTA_API_KEY"]) 336 337 # Get current agent 338 agents = letta_client.agents.list(name="profile-researcher") 339 if not agents: 340 return "Error: Could not find profile-researcher agent" 341 342 agent = agents[0] 343 344 # Find the block to detach 345 agent_blocks = letta_client.agents.blocks.list(agent_id=agent.id) 346 user_block = None 347 for block in agent_blocks: 348 if block.label == block_label: 349 user_block = block 350 break 351 352 if not user_block: 353 return f"User block for @{handle} is not currently attached (label: {block_label})" 354 355 # Detach block from agent 356 letta_client.agents.blocks.detach(agent_id=agent.id, block_id=user_block.id) 357 358 return f"Detached user block for @{handle} (label: {block_label}). The block still exists and can be reattached later." 359 360 except Exception as e: 361 return f"Error detaching user block for @{handle}: {str(e)}" 362 363 def update_user_block(handle: str, new_content: str) -> str: 364 """ 365 Update the content of a user-specific memory block. 366 367 Args: 368 handle: The Bluesky handle (e.g., 'cameron.pfiffer.org' or '@cameron.pfiffer.org') 369 new_content: New content to store in the user block 370 371 Returns: 372 Status message about the block update 373 """ 374 import os 375 from letta_client import Letta 376 377 try: 378 # Clean handle for block label 379 clean_handle = handle.lstrip('@').replace('.', '_').replace('-', '_') 380 block_label = f"user_{clean_handle}" 381 382 # Initialize Letta client 383 letta_client = Letta(token=os.environ["LETTA_API_KEY"]) 384 385 # Find the block 386 existing_blocks = letta_client.blocks.list(label=block_label) 387 if not existing_blocks: 388 return f"User block for @{handle} does not exist (label: {block_label}). Use attach_user_block first." 389 390 user_block = existing_blocks[0] 391 392 # Update block content 393 letta_client.blocks.modify( 394 block_id=user_block.id, 395 value=new_content 396 ) 397 398 return f"Updated user block for @{handle} (label: {block_label}) with new content." 399 400 except Exception as e: 401 return f"Error updating user block for @{handle}: {str(e)}" 402 403 # Create the tools 404 attach_tool = client.tools.upsert_from_function( 405 func=attach_user_block, 406 tags=["memory", "user", "attach"] 407 ) 408 409 detach_tool = client.tools.upsert_from_function( 410 func=detach_user_block, 411 tags=["memory", "user", "detach"] 412 ) 413 414 update_tool = client.tools.upsert_from_function( 415 func=update_user_block, 416 tags=["memory", "user", "update"] 417 ) 418 419 logger.info(f"Created block management tools: {attach_tool.name}, {detach_tool.name}, {update_tool.name}") 420 return attach_tool, detach_tool, update_tool 421 422def create_user_block_for_handle(client: Letta, handle: str): 423 """Create a user-specific memory block that can be manually attached to agents.""" 424 clean_handle = handle.lstrip('@').replace('.', '_').replace('-', '_') 425 block_label = f"user_{clean_handle}" 426 427 user_block = upsert_block( 428 client, 429 label=block_label, 430 value=f"User information for @{handle} will be stored here as I learn about them through profile research and interactions.", 431 description=f"Stores detailed information about Bluesky user @{handle}, including their interests, posting patterns, personality traits, and interaction history." 432 ) 433 434 logger.info(f"Created user block for @{handle}: {block_label} (ID: {user_block.id})") 435 return user_block 436 437def create_profile_researcher_agent(): 438 """Create the profile-researcher Letta agent.""" 439 440 # Create client 441 client = Letta(token=os.environ["LETTA_API_KEY"]) 442 443 logger.info("Creating profile-researcher agent...") 444 445 # Create custom tools first 446 research_tool = create_profile_research_tool(client) 447 attach_tool, detach_tool, update_tool = create_block_management_tools(client) 448 449 # Create persona block 450 persona_block = upsert_block( 451 client, 452 label="profile-researcher-persona", 453 value="""I am a Profile Researcher, an AI agent specialized in analyzing Bluesky user profiles and social media behavior. My purpose is to: 454 4551. Research Bluesky user profiles thoroughly and objectively 4562. Analyze posting patterns, interests, and engagement behaviors 4573. Build comprehensive user understanding through data analysis 4584. Create and manage user-specific memory blocks for individuals 4595. Provide insights about user personality, interests, and social patterns 460 461I approach research systematically: 462- Use the research_bluesky_profile tool to examine profiles and recent posts 463- Use attach_user_block to create and attach dedicated memory blocks for specific users 464- Use update_user_block to store research findings in user-specific blocks 465- Use detach_user_block when research is complete to free up memory space 466- Analyze profile information (bio, follower counts, etc.) 467- Study recent posts for themes, topics, and tone 468- Identify posting frequency and engagement patterns 469- Note interaction styles and communication preferences 470- Track interests and expertise areas 471- Observe social connections and community involvement 472 473I maintain objectivity and respect privacy while building useful user models for personalized interactions. My typical workflow is: attach_user_block → research_bluesky_profile → update_user_block → detach_user_block.""", 474 description="The persona and role definition for the profile researcher agent" 475 ) 476 477 # Create the agent with persona block and custom tools 478 profile_researcher = upsert_agent( 479 client, 480 name="profile-researcher", 481 memory_blocks=[ 482 { 483 "label": "research_notes", 484 "value": "I will use this space to track ongoing research projects and findings across multiple users.", 485 "limit": 8000, 486 "description": "Working notes and cross-user insights from profile research activities" 487 } 488 ], 489 block_ids=[persona_block.id], 490 tags=["profile research", "bluesky", "user analysis"], 491 model="openai/gpt-4o-mini", 492 embedding="openai/text-embedding-3-small", 493 description="An agent that researches Bluesky profiles and builds user understanding", 494 project_id=PROJECT_ID, 495 tools=[research_tool.name, attach_tool.name, detach_tool.name, update_tool.name] 496 ) 497 498 logger.info(f"Profile researcher agent created: {profile_researcher.id}") 499 return profile_researcher 500 501def main(): 502 """Main function to create the profile researcher agent.""" 503 try: 504 agent = create_profile_researcher_agent() 505 print(f"✅ Profile researcher agent created successfully!") 506 print(f" Agent ID: {agent.id}") 507 print(f" Agent Name: {agent.name}") 508 print(f"\nThe agent has these capabilities:") 509 print(f" - research_bluesky_profile: Analyzes user profiles and recent posts") 510 print(f" - attach_user_block: Creates and attaches user-specific memory blocks") 511 print(f" - update_user_block: Updates content in user memory blocks") 512 print(f" - detach_user_block: Detaches user blocks when done") 513 print(f"\nTo use the agent, send a message like:") 514 print(f" 'Please research @cameron.pfiffer.org, attach their user block, update it with findings, then detach it'") 515 print(f"\nThe agent can now manage its own memory blocks dynamically!") 516 517 except Exception as e: 518 logger.error(f"Failed to create profile researcher agent: {e}") 519 print(f"❌ Error: {e}") 520 521if __name__ == "__main__": 522 main()