a digital entity named phi that roams bsky
at 88bdcec5a6cd31bf44eb1c73bb2bfa5c3e530ae0 86 lines 2.4 kB view raw
1#!/usr/bin/env python3 2"""Test script to verify posting capabilities""" 3 4import asyncio 5from datetime import datetime 6 7from atproto import Client 8 9from bot.config import settings 10 11 12async def test_post(): 13 """Test creating a post on Bluesky""" 14 client = Client(base_url=settings.bluesky_service) 15 16 print(f"Logging in as {settings.bluesky_handle}...") 17 client.login(settings.bluesky_handle, settings.bluesky_password) 18 19 test_text = f"Test post from bot at {datetime.now().isoformat()} 🤖" 20 21 print(f"Creating post: {test_text}") 22 # Use the simpler send_post method 23 response = client.send_post(text=test_text) 24 25 post_uri = response.uri 26 print("✅ Post created successfully!") 27 print(f"URI: {post_uri}") 28 print( 29 f"View at: https://bsky.app/profile/{settings.bluesky_handle}/post/{post_uri.split('/')[-1]}" 30 ) 31 32 return post_uri 33 34 35async def test_reply(post_uri: str): 36 """Test replying to a post""" 37 client = Client(base_url=settings.bluesky_service) 38 client.login(settings.bluesky_handle, settings.bluesky_password) 39 40 # Get the post we're replying to 41 parent_post = client.app.bsky.feed.get_posts(params={"uris": [post_uri]}) 42 if not parent_post.posts: 43 raise ValueError("Parent post not found") 44 45 # Build reply reference 46 from atproto import models 47 48 parent_cid = parent_post.posts[0].cid 49 parent_ref = models.ComAtprotoRepoStrongRef.Main(uri=post_uri, cid=parent_cid) 50 reply_ref = models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=parent_ref) 51 52 reply_text = "This is a test reply from the bot 🔄" 53 54 print(f"Creating reply: {reply_text}") 55 # Use send_post with reply_to 56 response = client.send_post(text=reply_text, reply_to=reply_ref) 57 58 print("✅ Reply created successfully!") 59 print(f"URI: {response.uri}") 60 61 62async def main(): 63 """Run all tests""" 64 print("🧪 Testing Bluesky posting capabilities...\n") 65 66 try: 67 # Test creating a post 68 post_uri = await test_post() 69 70 print("\nWaiting 2 seconds before replying...") 71 await asyncio.sleep(2) 72 73 # Test replying to the post 74 await test_reply(post_uri) 75 76 print("\n✨ All tests passed!") 77 78 except Exception as e: 79 print(f"\n❌ Error: {e}") 80 import traceback 81 82 traceback.print_exc() 83 84 85if __name__ == "__main__": 86 asyncio.run(main())