social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at next 607 lines 22 kB view raw
1import json 2import re 3from dataclasses import dataclass 4from typing import Any, override 5 6import httpx 7 8import misskey.mfm as mfm 9from atproto.models import ( 10 Facet, 11 ImageEmbed, 12 RecordEmbed, 13 ReplyRef, 14 SelfLabel, 15 SelfLabels, 16 StrongRef, 17 cid_from_json, 18) 19from atproto.store import get_store 20from bluesky.client import BlueskyClient 21from bluesky.info import SERVICE, BlueskyService, validate_and_transform 22from bluesky.richtext import tokens_to_richtext 23from cross.attachments import ( 24 LabelsAttachment, 25 LanguagesAttachment, 26 MediaAttachment, 27 QuoteAttachment, 28 RemoteUrlAttachment, 29 SensitiveAttachment, 30) 31from cross.media import Blob, compress_image, convert_to_mp4, get_media_meta 32from cross.post import Post, PostRef 33from cross.service import OutputService 34from cross.tokens import LinkToken, TextToken, Token 35from database.connection import DatabasePool 36from util.splitter import TokenSplitter 37 38 39ALLOWED_GATES: list[str] = ["mentioned", "following", "followers"] 40 41ADULT_PATTERN = re.compile(r"\b(adult|sexual|nsfw)\b", re.IGNORECASE) 42PORN_PATTERN = re.compile(r"\b(porn|explicit)\b", re.IGNORECASE) 43 44 45@dataclass(kw_only=True) 46class BlueskyOutputOptions: 47 handle: str | None = None 48 did: str | None = None 49 pds: str | None = None 50 password: str = "" 51 quote_gate: bool = False 52 thread_gate: list[str] | None = None 53 encode_videos: bool = True 54 55 @classmethod 56 def from_dict(cls, data: dict[str, Any]) -> "BlueskyOutputOptions": 57 validate_and_transform(data) 58 59 if "password" not in data: 60 raise KeyError("'password' is required for bluesky") 61 62 if "quote_gate" in data: 63 data["quote_gate"] = bool(data["quote_gate"]) 64 65 if ( 66 "thread_gate" in data 67 and isinstance(data["thread_gate"], list) 68 and any(v not in ALLOWED_GATES for v in data["thread_gate"]) 69 ): 70 raise ValueError( 71 f"'thread_gate' only accepts {', '.join(ALLOWED_GATES)}, " 72 f"got: {data['thread_gate']}" 73 ) 74 75 if "encode_videos" in data: 76 data["encode_videos"] = bool(data["encode_videos"]) 77 78 return BlueskyOutputOptions(**data) 79 80 81class BlueskyOutputService(BlueskyService, OutputService): 82 def __init__( 83 self, db: DatabasePool, http: httpx.Client, options: BlueskyOutputOptions 84 ) -> None: 85 super().__init__(SERVICE, db) 86 self.http = http 87 self.options: BlueskyOutputOptions = options 88 89 self._store = get_store(db) 90 self._init_identity() 91 92 self._client = BlueskyClient( 93 self.pds, 94 self._store, 95 self.did, 96 http, 97 self.options.password, 98 ) 99 self.options.password = "" 100 self.log.info("Logged in as '%s'", self.did) 101 102 @override 103 def get_identity_options(self) -> tuple[str | None, str | None, str | None]: 104 return (self.options.handle, self.options.did, self.options.pds) 105 106 def _split_attachments( 107 self, attachments: list[Blob] 108 ) -> tuple[list[Blob], list[Blob]]: 109 supported: list[Blob] = [] 110 unsupported: list[Blob] = [] 111 112 for blob in attachments: 113 if blob.mime.startswith(("image/", "video/")): 114 supported.append(blob) 115 else: 116 unsupported.append(blob) 117 118 return supported, unsupported 119 120 def _split_media_per_post( 121 self, 122 token_blocks: list[list[Any]], 123 media: list[Blob], 124 ) -> list[tuple[list[Any], list[Blob]]]: 125 posts: list[dict[str, Any]] = [ 126 {"tokens": block, "attachments": []} for block in token_blocks 127 ] 128 available_indices: list[int] = list(range(len(posts))) 129 current_image_post_idx: int | None = None 130 131 def make_blank_post() -> dict[str, Any]: 132 return {"tokens": [], "attachments": []} 133 134 def pop_next_empty_index() -> int: 135 if available_indices: 136 return available_indices.pop(0) 137 new_idx = len(posts) 138 posts.append(make_blank_post()) 139 return new_idx 140 141 for blob in media: 142 if blob.mime.startswith("video/"): 143 current_image_post_idx = None 144 idx = pop_next_empty_index() 145 posts[idx]["attachments"].append(blob) 146 elif blob.mime.startswith("image/"): 147 if ( 148 current_image_post_idx is not None 149 and len(posts[current_image_post_idx]["attachments"]) < 4 150 ): 151 posts[current_image_post_idx]["attachments"].append(blob) 152 else: 153 idx = pop_next_empty_index() 154 posts[idx]["attachments"].append(blob) 155 current_image_post_idx = idx 156 157 result: list[tuple[list[Any], list[Blob]]] = [] 158 for p in posts: 159 result.append((p["tokens"], p["attachments"])) 160 161 return result 162 163 def _build_labels( 164 self, 165 spoiler: str | None, 166 is_sensitive: bool, 167 ) -> SelfLabels | None: 168 unique_labels: set[str] = set() 169 170 if spoiler: 171 unique_labels.add("graphic-media") 172 173 if PORN_PATTERN.search(spoiler): 174 unique_labels.add("porn") 175 elif ADULT_PATTERN.search(spoiler): 176 unique_labels.add("sexual") 177 178 if is_sensitive: 179 unique_labels.add("graphic-media") 180 181 if not unique_labels: 182 return None 183 184 return SelfLabels(values=[SelfLabel(val=label) for label in unique_labels]) 185 186 @override 187 def accept_post(self, post: Post): 188 db_post = self._get_post(post.service, post.author, post.id) 189 if not db_post: 190 self.log.error("Skipping '%s': post not found in db") 191 return 192 193 reply_to: ReplyRef | None = None 194 new_root_id: int | None = None 195 new_parent_id: int | None = None 196 197 if post.parent_id: 198 thread = self._find_mapped_thread( 199 post.parent_id, 200 post.service, 201 post.author, 202 self.url, 203 self.did, 204 ) 205 if not thread: 206 self.log.error( 207 "Skipping '%s': parent thread tuple not found in db", post.id 208 ) 209 return 210 211 root_uri, reply_uri, new_root_id, new_parent_id = thread 212 213 root_post = self._get_post_by_id(new_root_id) 214 reply_post = self._get_post_by_id(new_parent_id) 215 216 if not root_post or not reply_post: 217 self.log.error( 218 "Skipping '%s': failed to fetch parent posts from db", post.id 219 ) 220 return 221 222 root_cid = cid_from_json(root_post["extra_data"]) 223 reply_cid = cid_from_json(reply_post["extra_data"]) 224 225 if not root_cid.is_ok(): 226 self.log.error( 227 "Skipping '%s': failed to parse CID. %s", post.id, root_cid.error() 228 ) 229 return 230 if not reply_cid.is_ok(): 231 self.log.error( 232 "Skipping '%s': failed to parse CID. %s", post.id, reply_cid.error() 233 ) 234 return 235 236 root_ref = StrongRef(uri=root_uri, cid=root_cid.value()) 237 reply_ref = StrongRef(uri=reply_uri, cid=reply_cid.value()) 238 reply_to = ReplyRef(root=root_ref, parent=reply_ref) 239 240 labels_attachment = post.attachments.get(LabelsAttachment) 241 spoiler: str | None = ( 242 labels_attachment.labels[0] 243 if labels_attachment and labels_attachment.labels 244 else None 245 ) 246 sensitive_attachment = post.attachments.get(SensitiveAttachment) 247 is_sensitive = sensitive_attachment.sensitive if sensitive_attachment else False 248 249 labels = self._build_labels(spoiler, is_sensitive) 250 251 langs: list[str] | None = None 252 langs_attachment = post.attachments.get(LanguagesAttachment) 253 if langs_attachment and langs_attachment.langs: 254 langs = langs_attachment.langs[:3] 255 256 media_attachment = post.attachments.get(MediaAttachment) 257 all_media = media_attachment.blobs if media_attachment else [] 258 supported_media, unsupported_media = self._split_attachments(all_media) 259 260 tokens: list[Token] = [] 261 262 if spoiler: 263 tokens.append(TextToken(text=f"[{spoiler}]\n\n")) 264 265 tokens.extend(post.tokens) 266 267 if unsupported_media: 268 tokens.append(TextToken(text="\n")) 269 for attachment in unsupported_media: 270 url = ( 271 attachment.url 272 if hasattr(attachment, "url") and attachment.url 273 else attachment.name or "unknown" 274 ) 275 276 name = "💾 file" 277 if attachment.name: 278 if attachment.mime.startswith("audio/"): 279 name = "🎵 " + attachment.name 280 elif attachment.mime.startswith("text/"): 281 name = "📄 " + attachment.name 282 else: 283 name = "💾 " + attachment.name 284 285 if len(name) > 28: 286 name = name[: 28 - 1] + "" 287 288 tokens.append(LinkToken(href=url, label=f"[{name}]")) 289 tokens.append(TextToken(text=" ")) 290 291 if post.text_type == "text/x.misskeymarkdown": 292 tokens, status = mfm.strip_mfm(tokens) 293 remote_url = post.attachments.get(RemoteUrlAttachment) 294 if status and remote_url and remote_url.url: 295 tokens.append(TextToken(text="\n")) 296 tokens.append( 297 LinkToken( 298 href=remote_url.url, label="[Post contains MFM, see original]" 299 ) 300 ) 301 302 quote_attachment = post.attachments.get(QuoteAttachment) 303 quoted_cid: str | None = None 304 quoted_uri: str | None = None 305 if quote_attachment: 306 if quote_attachment.quoted_user != post.author: 307 self.log.info("Skipping '%s': quoted other user", post.id) 308 return 309 310 quoted_post = self._get_post( 311 post.service, post.author, quote_attachment.quoted_id 312 ) 313 if not quoted_post: 314 self.log.error("Skipping '%s': quoted post not found in db!", post.id) 315 return 316 317 quoted_mappings = self._get_mappings(quoted_post["id"], self.url, self.did) 318 if not quoted_mappings: 319 self.log.error( 320 "Skipping '%s': failed to find mappings for quoted post", post.id 321 ) 322 return 323 324 quoted_result = cid_from_json(quoted_mappings[0]["extra_data"]) 325 if not quoted_result.is_ok(): 326 self.log.error( 327 "Skipping '%s': failed to parse CID. %s", 328 post.id, 329 quoted_result.error(), 330 ) 331 return 332 333 quoted_cid = quoted_result.value() 334 quoted_uri = quoted_mappings[0]["identifier"] 335 336 splitter = TokenSplitter(max_chars=300, max_link_len=30) 337 token_blocks = splitter.split(tokens) 338 339 if not token_blocks.is_ok(): 340 self.log.error("Skipping '%s': %s", post.id, token_blocks.error()) 341 return 342 343 for blob in supported_media: 344 if blob.mime.startswith("image/") and len(blob.io) > 2_000_000: 345 self.log.error( 346 "Skipping '%s': image too large", 347 post.id, 348 ) 349 return 350 if blob.mime.startswith("video/"): 351 if blob.mime != "video/mp4" and not self.options.encode_videos: 352 self.log.info( 353 "Skipping '%s': video is not mp4, but encoding is disabled", 354 post.id, 355 ) 356 return 357 if len(blob.io) > 100_000_000: 358 self.log.error( 359 "Skipping '%s': video too large", 360 post.id, 361 ) 362 return 363 364 baked_media = self._split_media_per_post( 365 [list(block) for block in token_blocks.value()], 366 supported_media, 367 ) 368 369 precomputed_richtexts: list[tuple[str, list[Facet]]] = [] 370 for block in token_blocks.value(): 371 result = tokens_to_richtext(block) 372 if result is None: 373 self.log.error( 374 "Skipping '%s': invalid rich text types", 375 post.id, 376 ) 377 return 378 precomputed_richtexts.append(result) 379 380 created_records: list[tuple[str, str]] = [] 381 post_root_ref: StrongRef | None = None 382 previous_reply_ref: StrongRef | None = None 383 original_thread_root: StrongRef | None = reply_to.root if reply_to else None 384 385 richtext_index = 0 386 387 for i, (block_tokens, attachments) in enumerate(baked_media): 388 if block_tokens and richtext_index < len(precomputed_richtexts): 389 text, facets = precomputed_richtexts[richtext_index] 390 richtext_index += 1 391 else: 392 text = "" 393 facets = [] 394 395 current_reply_to: ReplyRef | None = None 396 if i == 0: 397 current_reply_to = reply_to 398 elif previous_reply_ref and post_root_ref: 399 root_ref = original_thread_root or post_root_ref 400 current_reply_to = ReplyRef(root=root_ref, parent=previous_reply_ref) 401 402 embed: dict[str, Any] | None = None 403 if i == 0 and quoted_uri and quoted_cid: 404 if attachments and attachments[0].mime.startswith("image/"): 405 embed = RecordEmbed( 406 record=StrongRef(uri=quoted_uri, cid=quoted_cid) 407 ).to_dict() 408 else: 409 embed = RecordEmbed( 410 record=StrongRef(uri=quoted_uri, cid=quoted_cid) 411 ).to_dict() 412 413 if not attachments: 414 response = self._client.send_post( 415 text=text or " ", 416 facets=facets or None, 417 embed=embed, 418 reply_to=current_reply_to, 419 labels=labels, 420 langs=langs, 421 ) 422 elif attachments[0].mime.startswith("image/"): 423 images: list[ImageEmbed] = [] 424 for img_blob in attachments[:4]: 425 image_io = img_blob.io 426 if len(image_io) > 1_000_000: 427 self.log.info("Compressing %s...", img_blob.name or "image") 428 compressed = compress_image(img_blob) 429 image_io = compressed.io 430 431 try: 432 meta = get_media_meta(image_io) 433 aspect_ratio = (meta.width, meta.height) 434 except Exception as e: 435 self.log.error(e) 436 aspect_ratio = None 437 438 images.append( 439 ImageEmbed( 440 image=image_io, 441 alt=img_blob.alt, 442 aspect_ratio=aspect_ratio, 443 ) 444 ) 445 446 response = self._client.send_images( 447 text=text or "", 448 images=images, 449 facets=facets or None, 450 embed=embed, 451 reply_to=current_reply_to, 452 labels=labels, 453 langs=langs, 454 ) 455 else: 456 video_blob = attachments[0] 457 video_io = video_blob.io 458 459 if video_blob.mime != "video/mp4": 460 self.log.info("Converting %s to mp4...", video_blob.name or "video") 461 converted = convert_to_mp4(video_blob) 462 video_io = converted.io 463 464 try: 465 meta = get_media_meta(video_io) 466 aspect_ratio = (meta.width, meta.height) 467 duration = meta.duration 468 except Exception as e: 469 self.log.error(e) 470 aspect_ratio = None 471 duration = None 472 473 if duration and duration > 180: 474 self.log.info( 475 "Skipping '%s': video too long (%.1f > 180s)", 476 post.id, 477 duration, 478 ) 479 return 480 481 response = self._client.send_video( 482 text=text or "", 483 video=video_io, 484 alt=video_blob.alt, 485 aspect_ratio=aspect_ratio, 486 embed=embed, 487 reply_to=current_reply_to, 488 labels=labels, 489 langs=langs, 490 ) 491 492 created_records.append((response.uri, response.cid)) 493 494 if post_root_ref is None: 495 post_root_ref = StrongRef(uri=response.uri, cid=response.cid) 496 previous_reply_ref = StrongRef(uri=response.uri, cid=response.cid) 497 498 if i == 0: 499 self._client.create_gates( 500 response.uri, 501 self.options.thread_gate, 502 self.options.quote_gate, 503 ) 504 505 all_records = created_records 506 if new_root_id is None or new_parent_id is None: 507 new_root_id = self._insert_post( 508 { 509 "user": self.did, 510 "service": self.url, 511 "identifier": created_records[0][0], 512 "parent": None, 513 "root": None, 514 "reposted": None, 515 "extra_data": json.dumps({"cid": created_records[0][1]}), 516 "crossposted": 1, 517 } 518 ) 519 new_parent_id = new_root_id 520 created_records = created_records[1:] 521 self._insert_post_mapping(db_post["id"], new_parent_id) 522 523 for uri, cid in created_records: 524 new_parent_id = self._insert_post( 525 { 526 "user": self.did, 527 "service": self.url, 528 "identifier": uri, 529 "parent": new_parent_id, 530 "root": new_root_id, 531 "reposted": None, 532 "extra_data": json.dumps({"cid": cid}), 533 "crossposted": 1, 534 } 535 ) 536 self._insert_post_mapping(db_post["id"], new_parent_id) 537 538 self.log.info( 539 "Post accepted successfully: %s -> %s", 540 post.id, 541 [r[0] for r in all_records], 542 ) 543 544 @override 545 def delete_post(self, post: PostRef): 546 db_post = self._get_post(post.service, post.author, post.id) 547 if not db_post: 548 self.log.warning("Skipping delete '%s': post not found in db", post.id) 549 return 550 551 mappings = self._get_mappings(db_post["id"], self.url, self.did) 552 for mapping in mappings[::-1]: 553 self._client.delete_post(mapping["identifier"]) 554 self._delete_post_by_id(mapping["id"]) 555 self.log.info("Deleted '%s'", post.id) 556 557 @override 558 def accept_repost(self, repost: PostRef, reposted: PostRef): 559 db_repost = self._get_post(repost.service, repost.author, repost.id) 560 db_reposted = self._get_post(reposted.service, reposted.author, reposted.id) 561 if not db_repost or not db_reposted: 562 self.log.info("Skipping repost '%s': post not found in db", repost.id) 563 return 564 565 mappings = self._get_mappings(db_reposted["id"], self.url, self.did) 566 if not mappings: 567 return 568 569 cid = cid_from_json(mappings[0]["extra_data"]) 570 if not cid.is_ok(): 571 self.log.exception( 572 "Skipping repost '%s': failed to parse CID from extra_data", repost.id 573 ) 574 return 575 576 response = self._client.repost(mappings[0]["identifier"], cid.value()) 577 578 self._insert_post( 579 { 580 "user": self.did, 581 "service": self.url, 582 "identifier": response.uri, 583 "parent": None, 584 "root": None, 585 "reposted": mappings[0]["id"], 586 "extra_data": json.dumps({"cid": response.cid}), 587 "crossposted": 1, 588 } 589 ) 590 inserted = self._get_post(self.url, self.did, response.uri) 591 if not inserted: 592 raise ValueError("Inserted post not found!") 593 self._insert_post_mapping(db_repost["id"], inserted["id"]) 594 self.log.info("Repost accepted successfully: %s", repost.id) 595 596 @override 597 def delete_repost(self, repost: PostRef): 598 db_repost = self._get_post(repost.service, repost.author, repost.id) 599 if not db_repost: 600 self.log.warning("Skipping delete '%s': repost not found in db", repost.id) 601 return 602 603 mappings = self._get_mappings(db_repost["id"], self.url, self.did) 604 if mappings: 605 self._client.delete_repost(mappings[0]["identifier"]) 606 self._delete_post_by_id(mappings[0]["id"]) 607 self.log.info("Deleted %s", repost.id)