a digital entity named phi that roams bsky
1#!/usr/bin/env python3
2"""Test script to verify posting capabilities"""
3
4import asyncio
5from datetime import datetime
6
7from atproto import Client
8
9from bot.config import settings
10
11
12async def test_post():
13 """Test creating a post on Bluesky"""
14 client = Client(base_url=settings.bluesky_service)
15
16 print(f"Logging in as {settings.bluesky_handle}...")
17 client.login(settings.bluesky_handle, settings.bluesky_password)
18
19 test_text = f"Test post from bot at {datetime.now().isoformat()} 🤖"
20
21 print(f"Creating post: {test_text}")
22 # Use the simpler send_post method
23 response = client.send_post(text=test_text)
24
25 post_uri = response.uri
26 print("✅ Post created successfully!")
27 print(f"URI: {post_uri}")
28 print(
29 f"View at: https://bsky.app/profile/{settings.bluesky_handle}/post/{post_uri.split('/')[-1]}"
30 )
31
32 return post_uri
33
34
35async def test_reply(post_uri: str):
36 """Test replying to a post"""
37 client = Client(base_url=settings.bluesky_service)
38 client.login(settings.bluesky_handle, settings.bluesky_password)
39
40 # Get the post we're replying to
41 parent_post = client.app.bsky.feed.get_posts(params={"uris": [post_uri]})
42 if not parent_post.posts:
43 raise ValueError("Parent post not found")
44
45 # Build reply reference
46 from atproto import models
47
48 parent_cid = parent_post.posts[0].cid
49 parent_ref = models.ComAtprotoRepoStrongRef.Main(uri=post_uri, cid=parent_cid)
50 reply_ref = models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=parent_ref)
51
52 reply_text = "This is a test reply from the bot 🔄"
53
54 print(f"Creating reply: {reply_text}")
55 # Use send_post with reply_to
56 response = client.send_post(text=reply_text, reply_to=reply_ref)
57
58 print("✅ Reply created successfully!")
59 print(f"URI: {response.uri}")
60
61
62async def main():
63 """Run all tests"""
64 print("🧪 Testing Bluesky posting capabilities...\n")
65
66 try:
67 # Test creating a post
68 post_uri = await test_post()
69
70 print("\nWaiting 2 seconds before replying...")
71 await asyncio.sleep(2)
72
73 # Test replying to the post
74 await test_reply(post_uri)
75
76 print("\n✨ All tests passed!")
77
78 except Exception as e:
79 print(f"\n❌ Error: {e}")
80 import traceback
81
82 traceback.print_exc()
83
84
85if __name__ == "__main__":
86 asyncio.run(main())