a digital person for bluesky

Add timeout protection and silence ping messages in streaming

- Add 10-minute timeout to prevent infinite hangs in streaming loops
- Silence 'ping' keepalive messages (log at debug level only)
- Apply to both notification processing and synthesis streams
- Prevents logs from being spammed with ping messages
- Stream will break after 600 seconds if agent doesn't send 'done'

This fixes the issue where streaming would hang indefinitely
when agents get stuck or don't complete properly.

๐Ÿพ Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

+22 -2
+22 -2
bsky.py
··· 419 max_steps=100 420 ) 421 422 - # Collect the streaming response 423 all_messages = [] 424 for chunk in message_stream: 425 # Log condensed chunk info 426 if hasattr(chunk, 'message_type'): 427 if chunk.message_type == 'reasoning_message': ··· 606 logger.error(f"Agent error (dict): {chunk.model_dump()}") 607 elif hasattr(chunk, '__dict__'): 608 logger.error(f"Agent error (vars): {vars(chunk)}") 609 else: 610 # Filter out verbose message types 611 if chunk.message_type not in ['usage_statistics', 'stop_reason']: ··· 1582 synthesis_posts = [] 1583 ack_note = None 1584 1585 - # Process the streaming response 1586 for chunk in message_stream: 1587 if hasattr(chunk, 'message_type'): 1588 if chunk.message_type == 'reasoning_message': 1589 if SHOW_REASONING: ··· 1669 print(" โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€") 1670 for line in chunk.content.split('\n'): 1671 print(f" {line}") 1672 elif chunk.message_type == 'error_message': 1673 # Dump full error object 1674 logger.error(f"Synthesis error_message: {chunk}")
··· 419 max_steps=100 420 ) 421 422 + # Collect the streaming response with timeout protection 423 all_messages = [] 424 + stream_start_time = time.time() 425 + max_stream_duration = 600 # 10 minutes max 426 + 427 for chunk in message_stream: 428 + # Check for timeout 429 + if time.time() - stream_start_time > max_stream_duration: 430 + logger.warning(f"Stream exceeded {max_stream_duration}s timeout, breaking") 431 + break 432 # Log condensed chunk info 433 if hasattr(chunk, 'message_type'): 434 if chunk.message_type == 'reasoning_message': ··· 613 logger.error(f"Agent error (dict): {chunk.model_dump()}") 614 elif hasattr(chunk, '__dict__'): 615 logger.error(f"Agent error (vars): {vars(chunk)}") 616 + elif chunk.message_type == 'ping': 617 + # Silently ignore ping keepalive messages 618 + logger.debug(f"Received keepalive ping from Letta API") 619 else: 620 # Filter out verbose message types 621 if chunk.message_type not in ['usage_statistics', 'stop_reason']: ··· 1592 synthesis_posts = [] 1593 ack_note = None 1594 1595 + # Process the streaming response with timeout protection 1596 + stream_start_time = time.time() 1597 + max_stream_duration = 600 # 10 minutes max 1598 + 1599 for chunk in message_stream: 1600 + # Check for timeout 1601 + if time.time() - stream_start_time > max_stream_duration: 1602 + logger.warning(f"Synthesis stream exceeded {max_stream_duration}s timeout, breaking") 1603 + break 1604 if hasattr(chunk, 'message_type'): 1605 if chunk.message_type == 'reasoning_message': 1606 if SHOW_REASONING: ··· 1686 print(" โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€") 1687 for line in chunk.content.split('\n'): 1688 print(f" {line}") 1689 + elif chunk.message_type == 'ping': 1690 + # Silently ignore ping keepalive messages 1691 + logger.debug(f"Received keepalive ping from Letta API during synthesis") 1692 elif chunk.message_type == 'error_message': 1693 # Dump full error object 1694 logger.error(f"Synthesis error_message: {chunk}")