social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at next 359 lines 11 kB view raw
1import logging 2from datetime import UTC, datetime 3from typing import Any, cast 4 5import httpx 6 7from atproto.models import ( 8 AtUri, 9 CreateRecordResponse, 10 Facet, 11 ImageEmbed, 12 PostGate, 13 PostRecord, 14 RecordEmbed, 15 RecordWithMediaEmbed, 16 ReplyRef, 17 RepostRecord, 18 SelfLabels, 19 StrongRef, 20 ThreadGate, 21 VideoEmbed, 22) 23from atproto.store import AtprotoStore 24from atproto.xrpc import XRPCClient, XRPCError, resolve_identity 25from util.util import normalize_service_url 26 27 28logger = logging.getLogger(__name__) 29 30 31class BlueskyClient: 32 def __init__( 33 self, 34 pds_url: str, 35 store: AtprotoStore, 36 identifier: str, 37 http: httpx.Client | None = None, 38 password: str | None = None, 39 ) -> None: 40 self.pds_url: str = normalize_service_url(pds_url) 41 self.store: AtprotoStore = store 42 43 identity = resolve_identity(identifier, store, http) 44 self.did: str = identity.did 45 self.xrpc: XRPCClient = XRPCClient(pds_url, store, http, self.did, password) 46 47 def _get_timestamp(self, time_iso: str | None = None) -> str: 48 if time_iso: 49 return time_iso 50 return datetime.now(UTC).isoformat().replace("+00:00", "Z") 51 52 def _upload_blob(self, data: bytes, content_type: str) -> dict[str, Any]: 53 return self.xrpc.upload_blob(data, content_type, self.did) 54 55 def send_post( 56 self, 57 text: str, 58 facets: list[Facet] | None = None, 59 embed: dict[str, Any] | None = None, 60 reply_to: ReplyRef | None = None, 61 labels: SelfLabels | None = None, 62 langs: list[str] | None = None, 63 time_iso: str | None = None, 64 ) -> CreateRecordResponse: 65 record = PostRecord( 66 text=text, 67 facets=facets, 68 embed=embed, 69 reply=reply_to, 70 labels=labels, 71 langs=langs, 72 created_at=self._get_timestamp(time_iso), 73 ) 74 75 response = self.xrpc.call( 76 "com.atproto.repo.createRecord", 77 data={ 78 "repo": self.did, 79 "collection": "app.bsky.feed.post", 80 "record": record.to_dict(), 81 }, 82 did=self.did, 83 ) 84 85 return CreateRecordResponse.from_dict(response.data) 86 87 def send_images( 88 self, 89 text: str, 90 images: list[ImageEmbed], 91 facets: list[Facet] | None = None, 92 embed: dict[str, Any] | None = None, 93 reply_to: ReplyRef | None = None, 94 labels: SelfLabels | None = None, 95 langs: list[str] | None = None, 96 time_iso: str | None = None, 97 ) -> CreateRecordResponse: 98 image_refs: list[dict[str, Any]] = [] 99 for img in images[:4]: 100 blob_ref = self._upload_blob(img.image, "image/jpeg") 101 image_data = img.to_dict(blob_ref["blob"]) 102 image_refs.append(image_data) 103 104 image_embed: dict[str, Any] = { 105 "$type": "app.bsky.embed.images", 106 "images": image_refs, 107 } 108 109 if embed: 110 combined_embed: dict[str, Any] = { 111 "$type": "app.bsky.embed.recordWithMedia", 112 "record": embed, 113 "media": image_embed, 114 } 115 return self.send_post( 116 text=text, 117 facets=facets, 118 embed=combined_embed, 119 reply_to=reply_to, 120 labels=labels, 121 langs=langs, 122 time_iso=time_iso, 123 ) 124 125 return self.send_post( 126 text=text, 127 facets=facets, 128 embed=image_embed, 129 reply_to=reply_to, 130 labels=labels, 131 langs=langs, 132 time_iso=time_iso, 133 ) 134 135 def send_video( 136 self, 137 text: str, 138 video: bytes, 139 alt: str | None = None, 140 aspect_ratio: tuple[int, int] | None = None, 141 facets: list[Facet] | None = None, 142 embed: dict[str, Any] | None = None, 143 reply_to: ReplyRef | None = None, 144 labels: SelfLabels | None = None, 145 langs: list[str] | None = None, 146 time_iso: str | None = None, 147 ) -> CreateRecordResponse: 148 blob_ref = self._upload_blob(video, "video/mp4") 149 150 video_embed = VideoEmbed( 151 video=video, 152 alt=alt, 153 aspect_ratio=aspect_ratio, 154 ) 155 video_embed_dict = video_embed.to_dict(blob_ref["blob"]) 156 157 if embed: 158 combined_embed: dict[str, Any] = { 159 "$type": "app.bsky.embed.recordWithMedia", 160 "record": embed, 161 "media": video_embed_dict, 162 } 163 return self.send_post( 164 text=text, 165 facets=facets, 166 embed=combined_embed, 167 reply_to=reply_to, 168 labels=labels, 169 langs=langs, 170 time_iso=time_iso, 171 ) 172 173 return self.send_post( 174 text=text, 175 facets=facets, 176 embed=video_embed_dict, 177 reply_to=reply_to, 178 labels=labels, 179 langs=langs, 180 time_iso=time_iso, 181 ) 182 183 def send_quote( 184 self, 185 text: str, 186 quoted_uri: str, 187 quoted_cid: str, 188 facets: list[Facet] | None = None, 189 embed_media: ImageEmbed | VideoEmbed | None = None, 190 embed_blob_ref: dict[str, Any] | None = None, 191 reply_to: ReplyRef | None = None, 192 labels: SelfLabels | None = None, 193 langs: list[str] | None = None, 194 time_iso: str | None = None, 195 ) -> CreateRecordResponse: 196 quoted_ref = StrongRef(uri=quoted_uri, cid=quoted_cid) 197 198 if embed_media and embed_blob_ref: 199 embed: dict[str, Any] = RecordWithMediaEmbed( 200 record=quoted_ref, 201 media=embed_media, 202 media_blob_ref=embed_blob_ref, 203 ).to_dict() 204 else: 205 embed = RecordEmbed(record=quoted_ref).to_dict() 206 207 return self.send_post( 208 text=text, 209 facets=facets, 210 embed=embed, 211 reply_to=reply_to, 212 labels=labels, 213 langs=langs, 214 time_iso=time_iso, 215 ) 216 217 def repost( 218 self, subject_uri: str, subject_cid: str, time_iso: str | None = None 219 ) -> CreateRecordResponse: 220 subject = StrongRef(uri=subject_uri, cid=subject_cid) 221 record = RepostRecord( 222 subject=subject, 223 created_at=self._get_timestamp(time_iso), 224 ) 225 226 record_dict = record.to_dict() 227 228 response = self.xrpc.call( 229 "com.atproto.repo.createRecord", 230 data={ 231 "repo": self.did, 232 "collection": "app.bsky.feed.repost", 233 "record": record_dict, 234 }, 235 did=self.did, 236 ) 237 238 return CreateRecordResponse.from_dict(response.data) 239 240 def delete_post(self, post_uri: str) -> None: 241 _, _, rkey = AtUri.record_uri(post_uri) 242 243 self.xrpc.call( 244 "com.atproto.repo.deleteRecord", 245 data={ 246 "repo": self.did, 247 "collection": "app.bsky.feed.post", 248 "rkey": rkey, 249 }, 250 did=self.did, 251 ) 252 253 def delete_repost(self, repost_uri: str) -> None: 254 _, _, rkey = AtUri.record_uri(repost_uri) 255 256 self.xrpc.call( 257 "com.atproto.repo.deleteRecord", 258 data={ 259 "repo": self.did, 260 "collection": "app.bsky.feed.repost", 261 "rkey": rkey, 262 }, 263 did=self.did, 264 ) 265 266 def create_threadgate( 267 self, post_uri: str, allow_gates: list[str] | None 268 ) -> CreateRecordResponse: 269 allow: list[dict[str, Any]] = [] 270 if allow_gates: 271 for gate in allow_gates: 272 match gate: 273 case "mentioned": 274 allow.append({"$type": "app.bsky.feed.threadgate#mentionRule"}) 275 case "following": 276 allow.append( 277 {"$type": "app.bsky.feed.threadgate#followingRule"} 278 ) 279 case "followers": 280 allow.append({"$type": "app.bsky.feed.threadgate#followerRule"}) 281 282 threadgate = ThreadGate( 283 allow=allow, post=post_uri, created_at=self._get_timestamp() 284 ) 285 286 _, _, rkey = AtUri.record_uri(post_uri) 287 288 response = self.xrpc.call( 289 "com.atproto.repo.createRecord", 290 data={ 291 "repo": self.did, 292 "collection": "app.bsky.feed.threadgate", 293 "record": threadgate.to_dict(), 294 "rkey": rkey, 295 }, 296 did=self.did, 297 ) 298 299 return CreateRecordResponse.from_dict(response.data) 300 301 def create_postgate( 302 self, post_uri: str, quote_gate: bool = True 303 ) -> CreateRecordResponse: 304 postgate = PostGate( 305 post=post_uri, 306 created_at=self._get_timestamp(), 307 embedding_rules=[{"$type": "app.bsky.feed.postgate#disableRule"}] 308 if quote_gate 309 else None, 310 ) 311 312 _, _, rkey = AtUri.record_uri(post_uri) 313 314 response = self.xrpc.call( 315 "com.atproto.repo.createRecord", 316 data={ 317 "repo": self.did, 318 "collection": "app.bsky.feed.postgate", 319 "record": postgate.to_dict(), 320 "rkey": rkey, 321 }, 322 did=self.did, 323 ) 324 325 return CreateRecordResponse.from_dict(response.data) 326 327 def create_gates( 328 self, 329 post_uri: str, 330 thread_gate: list[str] | None, 331 quote_gate: bool, 332 ) -> tuple[CreateRecordResponse | None, CreateRecordResponse | None]: 333 threadgate_response: CreateRecordResponse | None = None 334 postgate_response: CreateRecordResponse | None = None 335 336 if thread_gate is not None: 337 threadgate_response = self.create_threadgate(post_uri, thread_gate) 338 339 if quote_gate: 340 postgate_response = self.create_postgate(post_uri, quote_gate) 341 342 return threadgate_response, postgate_response 343 344 def get_post(self, uri: str) -> dict[str, Any] | None: 345 try: 346 response = self.xrpc.call( 347 "app.bsky.feed.getPosts", 348 params={"uris": [uri]}, 349 ) 350 posts = cast(list[dict[str, Any]], response.data.get("posts", [])) 351 return posts[0] if posts else None 352 except XRPCError as e: 353 if e.status_code == 404: 354 return None 355 logger.warning("Failed to get post %s: %s", uri, e) 356 return None 357 except Exception as e: 358 logger.warning("Unexpected error getting post %s: %s", uri, e) 359 return None