social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import json
2import re
3from dataclasses import dataclass
4from typing import Any, override
5
6import httpx
7
8import misskey.mfm as mfm
9from atproto.models import (
10 Facet,
11 ImageEmbed,
12 RecordEmbed,
13 ReplyRef,
14 SelfLabel,
15 SelfLabels,
16 StrongRef,
17 cid_from_json,
18)
19from atproto.store import get_store
20from bluesky.client import BlueskyClient
21from bluesky.info import SERVICE, BlueskyService, validate_and_transform
22from bluesky.richtext import tokens_to_richtext
23from cross.attachments import (
24 LabelsAttachment,
25 LanguagesAttachment,
26 MediaAttachment,
27 QuoteAttachment,
28 RemoteUrlAttachment,
29 SensitiveAttachment,
30)
31from cross.media import Blob, compress_image, convert_to_mp4, get_media_meta
32from cross.post import Post, PostRef
33from cross.service import OutputService
34from cross.tokens import LinkToken, TextToken, Token
35from database.connection import DatabasePool
36from util.splitter import TokenSplitter
37
38
39ALLOWED_GATES: list[str] = ["mentioned", "following", "followers"]
40
41ADULT_PATTERN = re.compile(r"\b(adult|sexual|nsfw)\b", re.IGNORECASE)
42PORN_PATTERN = re.compile(r"\b(porn|explicit)\b", re.IGNORECASE)
43
44
45@dataclass(kw_only=True)
46class BlueskyOutputOptions:
47 handle: str | None = None
48 did: str | None = None
49 pds: str | None = None
50 password: str = ""
51 quote_gate: bool = False
52 thread_gate: list[str] | None = None
53 encode_videos: bool = True
54
55 @classmethod
56 def from_dict(cls, data: dict[str, Any]) -> "BlueskyOutputOptions":
57 validate_and_transform(data)
58
59 if "password" not in data:
60 raise KeyError("'password' is required for bluesky")
61
62 if "quote_gate" in data:
63 data["quote_gate"] = bool(data["quote_gate"])
64
65 if (
66 "thread_gate" in data
67 and isinstance(data["thread_gate"], list)
68 and any(v not in ALLOWED_GATES for v in data["thread_gate"])
69 ):
70 raise ValueError(
71 f"'thread_gate' only accepts {', '.join(ALLOWED_GATES)}, "
72 f"got: {data['thread_gate']}"
73 )
74
75 if "encode_videos" in data:
76 data["encode_videos"] = bool(data["encode_videos"])
77
78 return BlueskyOutputOptions(**data)
79
80
81class BlueskyOutputService(BlueskyService, OutputService):
82 def __init__(
83 self, db: DatabasePool, http: httpx.Client, options: BlueskyOutputOptions
84 ) -> None:
85 super().__init__(SERVICE, db)
86 self.http = http
87 self.options: BlueskyOutputOptions = options
88
89 self._store = get_store(db)
90 self._init_identity()
91
92 self._client = BlueskyClient(
93 self.pds,
94 self._store,
95 self.did,
96 http,
97 self.options.password,
98 )
99 self.options.password = ""
100 self.log.info("Logged in as '%s'", self.did)
101
102 @override
103 def get_identity_options(self) -> tuple[str | None, str | None, str | None]:
104 return (self.options.handle, self.options.did, self.options.pds)
105
106 def _split_attachments(
107 self, attachments: list[Blob]
108 ) -> tuple[list[Blob], list[Blob]]:
109 supported: list[Blob] = []
110 unsupported: list[Blob] = []
111
112 for blob in attachments:
113 if blob.mime.startswith(("image/", "video/")):
114 supported.append(blob)
115 else:
116 unsupported.append(blob)
117
118 return supported, unsupported
119
120 def _split_media_per_post(
121 self,
122 token_blocks: list[list[Any]],
123 media: list[Blob],
124 ) -> list[tuple[list[Any], list[Blob]]]:
125 posts: list[dict[str, Any]] = [
126 {"tokens": block, "attachments": []} for block in token_blocks
127 ]
128 available_indices: list[int] = list(range(len(posts)))
129 current_image_post_idx: int | None = None
130
131 def make_blank_post() -> dict[str, Any]:
132 return {"tokens": [], "attachments": []}
133
134 def pop_next_empty_index() -> int:
135 if available_indices:
136 return available_indices.pop(0)
137 new_idx = len(posts)
138 posts.append(make_blank_post())
139 return new_idx
140
141 for blob in media:
142 if blob.mime.startswith("video/"):
143 current_image_post_idx = None
144 idx = pop_next_empty_index()
145 posts[idx]["attachments"].append(blob)
146 elif blob.mime.startswith("image/"):
147 if (
148 current_image_post_idx is not None
149 and len(posts[current_image_post_idx]["attachments"]) < 4
150 ):
151 posts[current_image_post_idx]["attachments"].append(blob)
152 else:
153 idx = pop_next_empty_index()
154 posts[idx]["attachments"].append(blob)
155 current_image_post_idx = idx
156
157 result: list[tuple[list[Any], list[Blob]]] = []
158 for p in posts:
159 result.append((p["tokens"], p["attachments"]))
160
161 return result
162
163 def _build_labels(
164 self,
165 spoiler: str | None,
166 is_sensitive: bool,
167 ) -> SelfLabels | None:
168 unique_labels: set[str] = set()
169
170 if spoiler:
171 unique_labels.add("graphic-media")
172
173 if PORN_PATTERN.search(spoiler):
174 unique_labels.add("porn")
175 elif ADULT_PATTERN.search(spoiler):
176 unique_labels.add("sexual")
177
178 if is_sensitive:
179 unique_labels.add("graphic-media")
180
181 if not unique_labels:
182 return None
183
184 return SelfLabels(values=[SelfLabel(val=label) for label in unique_labels])
185
186 @override
187 def accept_post(self, post: Post):
188 db_post = self._get_post(post.service, post.author, post.id)
189 if not db_post:
190 self.log.error("Skipping '%s': post not found in db")
191 return
192
193 reply_to: ReplyRef | None = None
194 new_root_id: int | None = None
195 new_parent_id: int | None = None
196
197 if post.parent_id:
198 thread = self._find_mapped_thread(
199 post.parent_id,
200 post.service,
201 post.author,
202 self.url,
203 self.did,
204 )
205 if not thread:
206 self.log.error(
207 "Skipping '%s': parent thread tuple not found in db", post.id
208 )
209 return
210
211 root_uri, reply_uri, new_root_id, new_parent_id = thread
212
213 root_post = self._get_post_by_id(new_root_id)
214 reply_post = self._get_post_by_id(new_parent_id)
215
216 if not root_post or not reply_post:
217 self.log.error(
218 "Skipping '%s': failed to fetch parent posts from db", post.id
219 )
220 return
221
222 root_cid = cid_from_json(root_post["extra_data"])
223 reply_cid = cid_from_json(reply_post["extra_data"])
224
225 if not root_cid.is_ok():
226 self.log.error(
227 "Skipping '%s': failed to parse CID. %s", post.id, root_cid.error()
228 )
229 return
230 if not reply_cid.is_ok():
231 self.log.error(
232 "Skipping '%s': failed to parse CID. %s", post.id, reply_cid.error()
233 )
234 return
235
236 root_ref = StrongRef(uri=root_uri, cid=root_cid.value())
237 reply_ref = StrongRef(uri=reply_uri, cid=reply_cid.value())
238 reply_to = ReplyRef(root=root_ref, parent=reply_ref)
239
240 labels_attachment = post.attachments.get(LabelsAttachment)
241 spoiler: str | None = (
242 labels_attachment.labels[0]
243 if labels_attachment and labels_attachment.labels
244 else None
245 )
246 sensitive_attachment = post.attachments.get(SensitiveAttachment)
247 is_sensitive = sensitive_attachment.sensitive if sensitive_attachment else False
248
249 labels = self._build_labels(spoiler, is_sensitive)
250
251 langs: list[str] | None = None
252 langs_attachment = post.attachments.get(LanguagesAttachment)
253 if langs_attachment and langs_attachment.langs:
254 langs = langs_attachment.langs[:3]
255
256 media_attachment = post.attachments.get(MediaAttachment)
257 all_media = media_attachment.blobs if media_attachment else []
258 supported_media, unsupported_media = self._split_attachments(all_media)
259
260 tokens: list[Token] = []
261
262 if spoiler:
263 tokens.append(TextToken(text=f"[{spoiler}]\n\n"))
264
265 tokens.extend(post.tokens)
266
267 if unsupported_media:
268 tokens.append(TextToken(text="\n"))
269 for attachment in unsupported_media:
270 url = (
271 attachment.url
272 if hasattr(attachment, "url") and attachment.url
273 else attachment.name or "unknown"
274 )
275
276 name = "💾 file"
277 if attachment.name:
278 if attachment.mime.startswith("audio/"):
279 name = "🎵 " + attachment.name
280 elif attachment.mime.startswith("text/"):
281 name = "📄 " + attachment.name
282 else:
283 name = "💾 " + attachment.name
284
285 if len(name) > 28:
286 name = name[: 28 - 1] + "…"
287
288 tokens.append(LinkToken(href=url, label=f"[{name}]"))
289 tokens.append(TextToken(text=" "))
290
291 if post.text_type == "text/x.misskeymarkdown":
292 tokens, status = mfm.strip_mfm(tokens)
293 remote_url = post.attachments.get(RemoteUrlAttachment)
294 if status and remote_url and remote_url.url:
295 tokens.append(TextToken(text="\n"))
296 tokens.append(
297 LinkToken(
298 href=remote_url.url, label="[Post contains MFM, see original]"
299 )
300 )
301
302 quote_attachment = post.attachments.get(QuoteAttachment)
303 quoted_cid: str | None = None
304 quoted_uri: str | None = None
305 if quote_attachment:
306 if quote_attachment.quoted_user != post.author:
307 self.log.info("Skipping '%s': quoted other user", post.id)
308 return
309
310 quoted_post = self._get_post(
311 post.service, post.author, quote_attachment.quoted_id
312 )
313 if not quoted_post:
314 self.log.error("Skipping '%s': quoted post not found in db!", post.id)
315 return
316
317 quoted_mappings = self._get_mappings(quoted_post["id"], self.url, self.did)
318 if not quoted_mappings:
319 self.log.error(
320 "Skipping '%s': failed to find mappings for quoted post", post.id
321 )
322 return
323
324 quoted_result = cid_from_json(quoted_mappings[0]["extra_data"])
325 if not quoted_result.is_ok():
326 self.log.error(
327 "Skipping '%s': failed to parse CID. %s",
328 post.id,
329 quoted_result.error(),
330 )
331 return
332
333 quoted_cid = quoted_result.value()
334 quoted_uri = quoted_mappings[0]["identifier"]
335
336 splitter = TokenSplitter(max_chars=300, max_link_len=30)
337 token_blocks = splitter.split(tokens)
338
339 if not token_blocks.is_ok():
340 self.log.error("Skipping '%s': %s", post.id, token_blocks.error())
341 return
342
343 for blob in supported_media:
344 if blob.mime.startswith("image/") and len(blob.io) > 2_000_000:
345 self.log.error(
346 "Skipping '%s': image too large",
347 post.id,
348 )
349 return
350 if blob.mime.startswith("video/"):
351 if blob.mime != "video/mp4" and not self.options.encode_videos:
352 self.log.info(
353 "Skipping '%s': video is not mp4, but encoding is disabled",
354 post.id,
355 )
356 return
357 if len(blob.io) > 100_000_000:
358 self.log.error(
359 "Skipping '%s': video too large",
360 post.id,
361 )
362 return
363
364 baked_media = self._split_media_per_post(
365 [list(block) for block in token_blocks.value()],
366 supported_media,
367 )
368
369 precomputed_richtexts: list[tuple[str, list[Facet]]] = []
370 for block in token_blocks.value():
371 result = tokens_to_richtext(block)
372 if result is None:
373 self.log.error(
374 "Skipping '%s': invalid rich text types",
375 post.id,
376 )
377 return
378 precomputed_richtexts.append(result)
379
380 created_records: list[tuple[str, str]] = []
381 post_root_ref: StrongRef | None = None
382 previous_reply_ref: StrongRef | None = None
383 original_thread_root: StrongRef | None = reply_to.root if reply_to else None
384
385 richtext_index = 0
386
387 for i, (block_tokens, attachments) in enumerate(baked_media):
388 if block_tokens and richtext_index < len(precomputed_richtexts):
389 text, facets = precomputed_richtexts[richtext_index]
390 richtext_index += 1
391 else:
392 text = ""
393 facets = []
394
395 current_reply_to: ReplyRef | None = None
396 if i == 0:
397 current_reply_to = reply_to
398 elif previous_reply_ref and post_root_ref:
399 root_ref = original_thread_root or post_root_ref
400 current_reply_to = ReplyRef(root=root_ref, parent=previous_reply_ref)
401
402 embed: dict[str, Any] | None = None
403 if i == 0 and quoted_uri and quoted_cid:
404 if attachments and attachments[0].mime.startswith("image/"):
405 embed = RecordEmbed(
406 record=StrongRef(uri=quoted_uri, cid=quoted_cid)
407 ).to_dict()
408 else:
409 embed = RecordEmbed(
410 record=StrongRef(uri=quoted_uri, cid=quoted_cid)
411 ).to_dict()
412
413 if not attachments:
414 response = self._client.send_post(
415 text=text or " ",
416 facets=facets or None,
417 embed=embed,
418 reply_to=current_reply_to,
419 labels=labels,
420 langs=langs,
421 )
422 elif attachments[0].mime.startswith("image/"):
423 images: list[ImageEmbed] = []
424 for img_blob in attachments[:4]:
425 image_io = img_blob.io
426 if len(image_io) > 1_000_000:
427 self.log.info("Compressing %s...", img_blob.name or "image")
428 compressed = compress_image(img_blob)
429 image_io = compressed.io
430
431 try:
432 meta = get_media_meta(image_io)
433 aspect_ratio = (meta.width, meta.height)
434 except Exception as e:
435 self.log.error(e)
436 aspect_ratio = None
437
438 images.append(
439 ImageEmbed(
440 image=image_io,
441 alt=img_blob.alt,
442 aspect_ratio=aspect_ratio,
443 )
444 )
445
446 response = self._client.send_images(
447 text=text or "",
448 images=images,
449 facets=facets or None,
450 embed=embed,
451 reply_to=current_reply_to,
452 labels=labels,
453 langs=langs,
454 )
455 else:
456 video_blob = attachments[0]
457 video_io = video_blob.io
458
459 if video_blob.mime != "video/mp4":
460 self.log.info("Converting %s to mp4...", video_blob.name or "video")
461 converted = convert_to_mp4(video_blob)
462 video_io = converted.io
463
464 try:
465 meta = get_media_meta(video_io)
466 aspect_ratio = (meta.width, meta.height)
467 duration = meta.duration
468 except Exception as e:
469 self.log.error(e)
470 aspect_ratio = None
471 duration = None
472
473 if duration and duration > 180:
474 self.log.info(
475 "Skipping '%s': video too long (%.1f > 180s)",
476 post.id,
477 duration,
478 )
479 return
480
481 response = self._client.send_video(
482 text=text or "",
483 video=video_io,
484 alt=video_blob.alt,
485 aspect_ratio=aspect_ratio,
486 embed=embed,
487 reply_to=current_reply_to,
488 labels=labels,
489 langs=langs,
490 )
491
492 created_records.append((response.uri, response.cid))
493
494 if post_root_ref is None:
495 post_root_ref = StrongRef(uri=response.uri, cid=response.cid)
496 previous_reply_ref = StrongRef(uri=response.uri, cid=response.cid)
497
498 if i == 0:
499 self._client.create_gates(
500 response.uri,
501 self.options.thread_gate,
502 self.options.quote_gate,
503 )
504
505 all_records = created_records
506 if new_root_id is None or new_parent_id is None:
507 new_root_id = self._insert_post(
508 {
509 "user": self.did,
510 "service": self.url,
511 "identifier": created_records[0][0],
512 "parent": None,
513 "root": None,
514 "reposted": None,
515 "extra_data": json.dumps({"cid": created_records[0][1]}),
516 "crossposted": 1,
517 }
518 )
519 new_parent_id = new_root_id
520 created_records = created_records[1:]
521 self._insert_post_mapping(db_post["id"], new_parent_id)
522
523 for uri, cid in created_records:
524 new_parent_id = self._insert_post(
525 {
526 "user": self.did,
527 "service": self.url,
528 "identifier": uri,
529 "parent": new_parent_id,
530 "root": new_root_id,
531 "reposted": None,
532 "extra_data": json.dumps({"cid": cid}),
533 "crossposted": 1,
534 }
535 )
536 self._insert_post_mapping(db_post["id"], new_parent_id)
537
538 self.log.info(
539 "Post accepted successfully: %s -> %s",
540 post.id,
541 [r[0] for r in all_records],
542 )
543
544 @override
545 def delete_post(self, post: PostRef):
546 db_post = self._get_post(post.service, post.author, post.id)
547 if not db_post:
548 self.log.warning("Skipping delete '%s': post not found in db", post.id)
549 return
550
551 mappings = self._get_mappings(db_post["id"], self.url, self.did)
552 for mapping in mappings[::-1]:
553 self._client.delete_post(mapping["identifier"])
554 self._delete_post_by_id(mapping["id"])
555 self.log.info("Deleted '%s'", post.id)
556
557 @override
558 def accept_repost(self, repost: PostRef, reposted: PostRef):
559 db_repost = self._get_post(repost.service, repost.author, repost.id)
560 db_reposted = self._get_post(reposted.service, reposted.author, reposted.id)
561 if not db_repost or not db_reposted:
562 self.log.info("Skipping repost '%s': post not found in db", repost.id)
563 return
564
565 mappings = self._get_mappings(db_reposted["id"], self.url, self.did)
566 if not mappings:
567 return
568
569 cid = cid_from_json(mappings[0]["extra_data"])
570 if not cid.is_ok():
571 self.log.exception(
572 "Skipping repost '%s': failed to parse CID from extra_data", repost.id
573 )
574 return
575
576 response = self._client.repost(mappings[0]["identifier"], cid.value())
577
578 self._insert_post(
579 {
580 "user": self.did,
581 "service": self.url,
582 "identifier": response.uri,
583 "parent": None,
584 "root": None,
585 "reposted": mappings[0]["id"],
586 "extra_data": json.dumps({"cid": response.cid}),
587 "crossposted": 1,
588 }
589 )
590 inserted = self._get_post(self.url, self.did, response.uri)
591 if not inserted:
592 raise ValueError("Inserted post not found!")
593 self._insert_post_mapping(db_repost["id"], inserted["id"])
594 self.log.info("Repost accepted successfully: %s", repost.id)
595
596 @override
597 def delete_repost(self, repost: PostRef):
598 db_repost = self._get_post(repost.service, repost.author, repost.id)
599 if not db_repost:
600 self.log.warning("Skipping delete '%s': repost not found in db", repost.id)
601 return
602
603 mappings = self._get_mappings(db_repost["id"], self.url, self.did)
604 if mappings:
605 self._client.delete_repost(mappings[0]["identifier"])
606 self._delete_post_by_id(mappings[0]["id"])
607 self.log.info("Deleted %s", repost.id)