import logging from datetime import UTC, datetime from typing import Any, cast import httpx from atproto.models import ( AtUri, CreateRecordResponse, Facet, ImageEmbed, PostGate, PostRecord, RecordEmbed, RecordWithMediaEmbed, ReplyRef, RepostRecord, SelfLabels, StrongRef, ThreadGate, VideoEmbed, ) from atproto.store import AtprotoStore from atproto.xrpc import XRPCClient, XRPCError, resolve_identity from util.util import normalize_service_url logger = logging.getLogger(__name__) class BlueskyClient: def __init__( self, pds_url: str, store: AtprotoStore, identifier: str, http: httpx.Client | None = None, password: str | None = None, ) -> None: self.pds_url: str = normalize_service_url(pds_url) self.store: AtprotoStore = store identity = resolve_identity(identifier, store, http) self.did: str = identity.did self.xrpc: XRPCClient = XRPCClient(pds_url, store, http, self.did, password) def _get_timestamp(self, time_iso: str | None = None) -> str: if time_iso: return time_iso return datetime.now(UTC).isoformat().replace("+00:00", "Z") def _upload_blob(self, data: bytes, content_type: str) -> dict[str, Any]: return self.xrpc.upload_blob(data, content_type, self.did) def send_post( self, text: str, facets: list[Facet] | None = None, embed: dict[str, Any] | None = None, reply_to: ReplyRef | None = None, labels: SelfLabels | None = None, langs: list[str] | None = None, time_iso: str | None = None, ) -> CreateRecordResponse: record = PostRecord( text=text, facets=facets, embed=embed, reply=reply_to, labels=labels, langs=langs, created_at=self._get_timestamp(time_iso), ) response = self.xrpc.call( "com.atproto.repo.createRecord", data={ "repo": self.did, "collection": "app.bsky.feed.post", "record": record.to_dict(), }, did=self.did, ) return CreateRecordResponse.from_dict(response.data) def send_images( self, text: str, images: list[ImageEmbed], facets: list[Facet] | None = None, embed: dict[str, Any] | None = None, reply_to: ReplyRef | None = None, labels: SelfLabels | None = None, langs: list[str] | None = None, time_iso: str | None = None, ) -> CreateRecordResponse: image_refs: list[dict[str, Any]] = [] for img in images[:4]: blob_ref = self._upload_blob(img.image, "image/jpeg") image_data = img.to_dict(blob_ref["blob"]) image_refs.append(image_data) image_embed: dict[str, Any] = { "$type": "app.bsky.embed.images", "images": image_refs, } if embed: combined_embed: dict[str, Any] = { "$type": "app.bsky.embed.recordWithMedia", "record": embed, "media": image_embed, } return self.send_post( text=text, facets=facets, embed=combined_embed, reply_to=reply_to, labels=labels, langs=langs, time_iso=time_iso, ) return self.send_post( text=text, facets=facets, embed=image_embed, reply_to=reply_to, labels=labels, langs=langs, time_iso=time_iso, ) def send_video( self, text: str, video: bytes, alt: str | None = None, aspect_ratio: tuple[int, int] | None = None, facets: list[Facet] | None = None, embed: dict[str, Any] | None = None, reply_to: ReplyRef | None = None, labels: SelfLabels | None = None, langs: list[str] | None = None, time_iso: str | None = None, ) -> CreateRecordResponse: blob_ref = self._upload_blob(video, "video/mp4") video_embed = VideoEmbed( video=video, alt=alt, aspect_ratio=aspect_ratio, ) video_embed_dict = video_embed.to_dict(blob_ref["blob"]) if embed: combined_embed: dict[str, Any] = { "$type": "app.bsky.embed.recordWithMedia", "record": embed, "media": video_embed_dict, } return self.send_post( text=text, facets=facets, embed=combined_embed, reply_to=reply_to, labels=labels, langs=langs, time_iso=time_iso, ) return self.send_post( text=text, facets=facets, embed=video_embed_dict, reply_to=reply_to, labels=labels, langs=langs, time_iso=time_iso, ) def send_quote( self, text: str, quoted_uri: str, quoted_cid: str, facets: list[Facet] | None = None, embed_media: ImageEmbed | VideoEmbed | None = None, embed_blob_ref: dict[str, Any] | None = None, reply_to: ReplyRef | None = None, labels: SelfLabels | None = None, langs: list[str] | None = None, time_iso: str | None = None, ) -> CreateRecordResponse: quoted_ref = StrongRef(uri=quoted_uri, cid=quoted_cid) if embed_media and embed_blob_ref: embed: dict[str, Any] = RecordWithMediaEmbed( record=quoted_ref, media=embed_media, media_blob_ref=embed_blob_ref, ).to_dict() else: embed = RecordEmbed(record=quoted_ref).to_dict() return self.send_post( text=text, facets=facets, embed=embed, reply_to=reply_to, labels=labels, langs=langs, time_iso=time_iso, ) def repost( self, subject_uri: str, subject_cid: str, time_iso: str | None = None ) -> CreateRecordResponse: subject = StrongRef(uri=subject_uri, cid=subject_cid) record = RepostRecord( subject=subject, created_at=self._get_timestamp(time_iso), ) record_dict = record.to_dict() response = self.xrpc.call( "com.atproto.repo.createRecord", data={ "repo": self.did, "collection": "app.bsky.feed.repost", "record": record_dict, }, did=self.did, ) return CreateRecordResponse.from_dict(response.data) def delete_post(self, post_uri: str) -> None: _, _, rkey = AtUri.record_uri(post_uri) self.xrpc.call( "com.atproto.repo.deleteRecord", data={ "repo": self.did, "collection": "app.bsky.feed.post", "rkey": rkey, }, did=self.did, ) def delete_repost(self, repost_uri: str) -> None: _, _, rkey = AtUri.record_uri(repost_uri) self.xrpc.call( "com.atproto.repo.deleteRecord", data={ "repo": self.did, "collection": "app.bsky.feed.repost", "rkey": rkey, }, did=self.did, ) def create_threadgate( self, post_uri: str, allow_gates: list[str] | None ) -> CreateRecordResponse: allow: list[dict[str, Any]] = [] if allow_gates: for gate in allow_gates: match gate: case "mentioned": allow.append({"$type": "app.bsky.feed.threadgate#mentionRule"}) case "following": allow.append( {"$type": "app.bsky.feed.threadgate#followingRule"} ) case "followers": allow.append({"$type": "app.bsky.feed.threadgate#followerRule"}) threadgate = ThreadGate( allow=allow, post=post_uri, created_at=self._get_timestamp() ) _, _, rkey = AtUri.record_uri(post_uri) response = self.xrpc.call( "com.atproto.repo.createRecord", data={ "repo": self.did, "collection": "app.bsky.feed.threadgate", "record": threadgate.to_dict(), "rkey": rkey, }, did=self.did, ) return CreateRecordResponse.from_dict(response.data) def create_postgate( self, post_uri: str, quote_gate: bool = True ) -> CreateRecordResponse: postgate = PostGate( post=post_uri, created_at=self._get_timestamp(), embedding_rules=[{"$type": "app.bsky.feed.postgate#disableRule"}] if quote_gate else None, ) _, _, rkey = AtUri.record_uri(post_uri) response = self.xrpc.call( "com.atproto.repo.createRecord", data={ "repo": self.did, "collection": "app.bsky.feed.postgate", "record": postgate.to_dict(), "rkey": rkey, }, did=self.did, ) return CreateRecordResponse.from_dict(response.data) def create_gates( self, post_uri: str, thread_gate: list[str] | None, quote_gate: bool, ) -> tuple[CreateRecordResponse | None, CreateRecordResponse | None]: threadgate_response: CreateRecordResponse | None = None postgate_response: CreateRecordResponse | None = None if thread_gate is not None: threadgate_response = self.create_threadgate(post_uri, thread_gate) if quote_gate: postgate_response = self.create_postgate(post_uri, quote_gate) return threadgate_response, postgate_response def get_post(self, uri: str) -> dict[str, Any] | None: try: response = self.xrpc.call( "app.bsky.feed.getPosts", params={"uris": [uri]}, ) posts = cast(list[dict[str, Any]], response.data.get("posts", [])) return posts[0] if posts else None except XRPCError as e: if e.status_code == 404: return None logger.warning("Failed to get post %s: %s", uri, e) return None except Exception as e: logger.warning("Unexpected error getting post %s: %s", uri, e) return None