social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import logging
2from datetime import UTC, datetime
3from typing import Any, cast
4
5import httpx
6
7from atproto.models import (
8 AtUri,
9 CreateRecordResponse,
10 Facet,
11 ImageEmbed,
12 PostGate,
13 PostRecord,
14 RecordEmbed,
15 RecordWithMediaEmbed,
16 ReplyRef,
17 RepostRecord,
18 SelfLabels,
19 StrongRef,
20 ThreadGate,
21 VideoEmbed,
22)
23from atproto.store import AtprotoStore
24from atproto.xrpc import XRPCClient, XRPCError, resolve_identity
25from util.util import normalize_service_url
26
27
28logger = logging.getLogger(__name__)
29
30
31class BlueskyClient:
32 def __init__(
33 self,
34 pds_url: str,
35 store: AtprotoStore,
36 identifier: str,
37 http: httpx.Client | None = None,
38 password: str | None = None,
39 ) -> None:
40 self.pds_url: str = normalize_service_url(pds_url)
41 self.store: AtprotoStore = store
42
43 identity = resolve_identity(identifier, store, http)
44 self.did: str = identity.did
45 self.xrpc: XRPCClient = XRPCClient(pds_url, store, http, self.did, password)
46
47 def _get_timestamp(self, time_iso: str | None = None) -> str:
48 if time_iso:
49 return time_iso
50 return datetime.now(UTC).isoformat().replace("+00:00", "Z")
51
52 def _upload_blob(self, data: bytes, content_type: str) -> dict[str, Any]:
53 return self.xrpc.upload_blob(data, content_type, self.did)
54
55 def send_post(
56 self,
57 text: str,
58 facets: list[Facet] | None = None,
59 embed: dict[str, Any] | None = None,
60 reply_to: ReplyRef | None = None,
61 labels: SelfLabels | None = None,
62 langs: list[str] | None = None,
63 time_iso: str | None = None,
64 ) -> CreateRecordResponse:
65 record = PostRecord(
66 text=text,
67 facets=facets,
68 embed=embed,
69 reply=reply_to,
70 labels=labels,
71 langs=langs,
72 created_at=self._get_timestamp(time_iso),
73 )
74
75 response = self.xrpc.call(
76 "com.atproto.repo.createRecord",
77 data={
78 "repo": self.did,
79 "collection": "app.bsky.feed.post",
80 "record": record.to_dict(),
81 },
82 did=self.did,
83 )
84
85 return CreateRecordResponse.from_dict(response.data)
86
87 def send_images(
88 self,
89 text: str,
90 images: list[ImageEmbed],
91 facets: list[Facet] | None = None,
92 embed: dict[str, Any] | None = None,
93 reply_to: ReplyRef | None = None,
94 labels: SelfLabels | None = None,
95 langs: list[str] | None = None,
96 time_iso: str | None = None,
97 ) -> CreateRecordResponse:
98 image_refs: list[dict[str, Any]] = []
99 for img in images[:4]:
100 blob_ref = self._upload_blob(img.image, "image/jpeg")
101 image_data = img.to_dict(blob_ref["blob"])
102 image_refs.append(image_data)
103
104 image_embed: dict[str, Any] = {
105 "$type": "app.bsky.embed.images",
106 "images": image_refs,
107 }
108
109 if embed:
110 combined_embed: dict[str, Any] = {
111 "$type": "app.bsky.embed.recordWithMedia",
112 "record": embed,
113 "media": image_embed,
114 }
115 return self.send_post(
116 text=text,
117 facets=facets,
118 embed=combined_embed,
119 reply_to=reply_to,
120 labels=labels,
121 langs=langs,
122 time_iso=time_iso,
123 )
124
125 return self.send_post(
126 text=text,
127 facets=facets,
128 embed=image_embed,
129 reply_to=reply_to,
130 labels=labels,
131 langs=langs,
132 time_iso=time_iso,
133 )
134
135 def send_video(
136 self,
137 text: str,
138 video: bytes,
139 alt: str | None = None,
140 aspect_ratio: tuple[int, int] | None = None,
141 facets: list[Facet] | None = None,
142 embed: dict[str, Any] | None = None,
143 reply_to: ReplyRef | None = None,
144 labels: SelfLabels | None = None,
145 langs: list[str] | None = None,
146 time_iso: str | None = None,
147 ) -> CreateRecordResponse:
148 blob_ref = self._upload_blob(video, "video/mp4")
149
150 video_embed = VideoEmbed(
151 video=video,
152 alt=alt,
153 aspect_ratio=aspect_ratio,
154 )
155 video_embed_dict = video_embed.to_dict(blob_ref["blob"])
156
157 if embed:
158 combined_embed: dict[str, Any] = {
159 "$type": "app.bsky.embed.recordWithMedia",
160 "record": embed,
161 "media": video_embed_dict,
162 }
163 return self.send_post(
164 text=text,
165 facets=facets,
166 embed=combined_embed,
167 reply_to=reply_to,
168 labels=labels,
169 langs=langs,
170 time_iso=time_iso,
171 )
172
173 return self.send_post(
174 text=text,
175 facets=facets,
176 embed=video_embed_dict,
177 reply_to=reply_to,
178 labels=labels,
179 langs=langs,
180 time_iso=time_iso,
181 )
182
183 def send_quote(
184 self,
185 text: str,
186 quoted_uri: str,
187 quoted_cid: str,
188 facets: list[Facet] | None = None,
189 embed_media: ImageEmbed | VideoEmbed | None = None,
190 embed_blob_ref: dict[str, Any] | None = None,
191 reply_to: ReplyRef | None = None,
192 labels: SelfLabels | None = None,
193 langs: list[str] | None = None,
194 time_iso: str | None = None,
195 ) -> CreateRecordResponse:
196 quoted_ref = StrongRef(uri=quoted_uri, cid=quoted_cid)
197
198 if embed_media and embed_blob_ref:
199 embed: dict[str, Any] = RecordWithMediaEmbed(
200 record=quoted_ref,
201 media=embed_media,
202 media_blob_ref=embed_blob_ref,
203 ).to_dict()
204 else:
205 embed = RecordEmbed(record=quoted_ref).to_dict()
206
207 return self.send_post(
208 text=text,
209 facets=facets,
210 embed=embed,
211 reply_to=reply_to,
212 labels=labels,
213 langs=langs,
214 time_iso=time_iso,
215 )
216
217 def repost(
218 self, subject_uri: str, subject_cid: str, time_iso: str | None = None
219 ) -> CreateRecordResponse:
220 subject = StrongRef(uri=subject_uri, cid=subject_cid)
221 record = RepostRecord(
222 subject=subject,
223 created_at=self._get_timestamp(time_iso),
224 )
225
226 record_dict = record.to_dict()
227
228 response = self.xrpc.call(
229 "com.atproto.repo.createRecord",
230 data={
231 "repo": self.did,
232 "collection": "app.bsky.feed.repost",
233 "record": record_dict,
234 },
235 did=self.did,
236 )
237
238 return CreateRecordResponse.from_dict(response.data)
239
240 def delete_post(self, post_uri: str) -> None:
241 _, _, rkey = AtUri.record_uri(post_uri)
242
243 self.xrpc.call(
244 "com.atproto.repo.deleteRecord",
245 data={
246 "repo": self.did,
247 "collection": "app.bsky.feed.post",
248 "rkey": rkey,
249 },
250 did=self.did,
251 )
252
253 def delete_repost(self, repost_uri: str) -> None:
254 _, _, rkey = AtUri.record_uri(repost_uri)
255
256 self.xrpc.call(
257 "com.atproto.repo.deleteRecord",
258 data={
259 "repo": self.did,
260 "collection": "app.bsky.feed.repost",
261 "rkey": rkey,
262 },
263 did=self.did,
264 )
265
266 def create_threadgate(
267 self, post_uri: str, allow_gates: list[str] | None
268 ) -> CreateRecordResponse:
269 allow: list[dict[str, Any]] = []
270 if allow_gates:
271 for gate in allow_gates:
272 match gate:
273 case "mentioned":
274 allow.append({"$type": "app.bsky.feed.threadgate#mentionRule"})
275 case "following":
276 allow.append(
277 {"$type": "app.bsky.feed.threadgate#followingRule"}
278 )
279 case "followers":
280 allow.append({"$type": "app.bsky.feed.threadgate#followerRule"})
281
282 threadgate = ThreadGate(
283 allow=allow, post=post_uri, created_at=self._get_timestamp()
284 )
285
286 _, _, rkey = AtUri.record_uri(post_uri)
287
288 response = self.xrpc.call(
289 "com.atproto.repo.createRecord",
290 data={
291 "repo": self.did,
292 "collection": "app.bsky.feed.threadgate",
293 "record": threadgate.to_dict(),
294 "rkey": rkey,
295 },
296 did=self.did,
297 )
298
299 return CreateRecordResponse.from_dict(response.data)
300
301 def create_postgate(
302 self, post_uri: str, quote_gate: bool = True
303 ) -> CreateRecordResponse:
304 postgate = PostGate(
305 post=post_uri,
306 created_at=self._get_timestamp(),
307 embedding_rules=[{"$type": "app.bsky.feed.postgate#disableRule"}]
308 if quote_gate
309 else None,
310 )
311
312 _, _, rkey = AtUri.record_uri(post_uri)
313
314 response = self.xrpc.call(
315 "com.atproto.repo.createRecord",
316 data={
317 "repo": self.did,
318 "collection": "app.bsky.feed.postgate",
319 "record": postgate.to_dict(),
320 "rkey": rkey,
321 },
322 did=self.did,
323 )
324
325 return CreateRecordResponse.from_dict(response.data)
326
327 def create_gates(
328 self,
329 post_uri: str,
330 thread_gate: list[str] | None,
331 quote_gate: bool,
332 ) -> tuple[CreateRecordResponse | None, CreateRecordResponse | None]:
333 threadgate_response: CreateRecordResponse | None = None
334 postgate_response: CreateRecordResponse | None = None
335
336 if thread_gate is not None:
337 threadgate_response = self.create_threadgate(post_uri, thread_gate)
338
339 if quote_gate:
340 postgate_response = self.create_postgate(post_uri, quote_gate)
341
342 return threadgate_response, postgate_response
343
344 def get_post(self, uri: str) -> dict[str, Any] | None:
345 try:
346 response = self.xrpc.call(
347 "app.bsky.feed.getPosts",
348 params={"uris": [uri]},
349 )
350 posts = cast(list[dict[str, Any]], response.data.get("posts", []))
351 return posts[0] if posts else None
352 except XRPCError as e:
353 if e.status_code == 404:
354 return None
355 logger.warning("Failed to get post %s: %s", uri, e)
356 return None
357 except Exception as e:
358 logger.warning("Unexpected error getting post %s: %s", uri, e)
359 return None