import json import re from dataclasses import dataclass from typing import Any, override import httpx import misskey.mfm as mfm from atproto.models import ( Facet, ImageEmbed, RecordEmbed, ReplyRef, SelfLabel, SelfLabels, StrongRef, cid_from_json, ) from atproto.store import get_store from bluesky.client import BlueskyClient from bluesky.info import SERVICE, BlueskyService, validate_and_transform from bluesky.richtext import tokens_to_richtext from cross.attachments import ( LabelsAttachment, LanguagesAttachment, MediaAttachment, QuoteAttachment, RemoteUrlAttachment, SensitiveAttachment, ) from cross.media import Blob, compress_image, convert_to_mp4, get_media_meta from cross.post import Post, PostRef from cross.service import OutputService from cross.tokens import LinkToken, TextToken, Token from database.connection import DatabasePool from util.splitter import TokenSplitter ALLOWED_GATES: list[str] = ["mentioned", "following", "followers"] ADULT_PATTERN = re.compile(r"\b(adult|sexual|nsfw)\b", re.IGNORECASE) PORN_PATTERN = re.compile(r"\b(porn|explicit)\b", re.IGNORECASE) @dataclass(kw_only=True) class BlueskyOutputOptions: handle: str | None = None did: str | None = None pds: str | None = None password: str = "" quote_gate: bool = False thread_gate: list[str] | None = None encode_videos: bool = True @classmethod def from_dict(cls, data: dict[str, Any]) -> "BlueskyOutputOptions": validate_and_transform(data) if "password" not in data: raise KeyError("'password' is required for bluesky") if "quote_gate" in data: data["quote_gate"] = bool(data["quote_gate"]) if ( "thread_gate" in data and isinstance(data["thread_gate"], list) and any(v not in ALLOWED_GATES for v in data["thread_gate"]) ): raise ValueError( f"'thread_gate' only accepts {', '.join(ALLOWED_GATES)}, " f"got: {data['thread_gate']}" ) if "encode_videos" in data: data["encode_videos"] = bool(data["encode_videos"]) return BlueskyOutputOptions(**data) class BlueskyOutputService(BlueskyService, OutputService): def __init__( self, db: DatabasePool, http: httpx.Client, options: BlueskyOutputOptions ) -> None: super().__init__(SERVICE, db) self.http = http self.options: BlueskyOutputOptions = options self._store = get_store(db) self._init_identity() self._client = BlueskyClient( self.pds, self._store, self.did, http, self.options.password, ) self.options.password = "" self.log.info("Logged in as '%s'", self.did) @override def get_identity_options(self) -> tuple[str | None, str | None, str | None]: return (self.options.handle, self.options.did, self.options.pds) def _split_attachments( self, attachments: list[Blob] ) -> tuple[list[Blob], list[Blob]]: supported: list[Blob] = [] unsupported: list[Blob] = [] for blob in attachments: if blob.mime.startswith(("image/", "video/")): supported.append(blob) else: unsupported.append(blob) return supported, unsupported def _split_media_per_post( self, token_blocks: list[list[Any]], media: list[Blob], ) -> list[tuple[list[Any], list[Blob]]]: posts: list[dict[str, Any]] = [ {"tokens": block, "attachments": []} for block in token_blocks ] available_indices: list[int] = list(range(len(posts))) current_image_post_idx: int | None = None def make_blank_post() -> dict[str, Any]: return {"tokens": [], "attachments": []} def pop_next_empty_index() -> int: if available_indices: return available_indices.pop(0) new_idx = len(posts) posts.append(make_blank_post()) return new_idx for blob in media: if blob.mime.startswith("video/"): current_image_post_idx = None idx = pop_next_empty_index() posts[idx]["attachments"].append(blob) elif blob.mime.startswith("image/"): if ( current_image_post_idx is not None and len(posts[current_image_post_idx]["attachments"]) < 4 ): posts[current_image_post_idx]["attachments"].append(blob) else: idx = pop_next_empty_index() posts[idx]["attachments"].append(blob) current_image_post_idx = idx result: list[tuple[list[Any], list[Blob]]] = [] for p in posts: result.append((p["tokens"], p["attachments"])) return result def _build_labels( self, spoiler: str | None, is_sensitive: bool, ) -> SelfLabels | None: unique_labels: set[str] = set() if spoiler: unique_labels.add("graphic-media") if PORN_PATTERN.search(spoiler): unique_labels.add("porn") elif ADULT_PATTERN.search(spoiler): unique_labels.add("sexual") if is_sensitive: unique_labels.add("graphic-media") if not unique_labels: return None return SelfLabels(values=[SelfLabel(val=label) for label in unique_labels]) @override def accept_post(self, post: Post): db_post = self._get_post(post.service, post.author, post.id) if not db_post: self.log.error("Skipping '%s': post not found in db") return reply_to: ReplyRef | None = None new_root_id: int | None = None new_parent_id: int | None = None if post.parent_id: thread = self._find_mapped_thread( post.parent_id, post.service, post.author, self.url, self.did, ) if not thread: self.log.error( "Skipping '%s': parent thread tuple not found in db", post.id ) return root_uri, reply_uri, new_root_id, new_parent_id = thread root_post = self._get_post_by_id(new_root_id) reply_post = self._get_post_by_id(new_parent_id) if not root_post or not reply_post: self.log.error( "Skipping '%s': failed to fetch parent posts from db", post.id ) return root_cid = cid_from_json(root_post["extra_data"]) reply_cid = cid_from_json(reply_post["extra_data"]) if not root_cid.is_ok(): self.log.error( "Skipping '%s': failed to parse CID. %s", post.id, root_cid.error() ) return if not reply_cid.is_ok(): self.log.error( "Skipping '%s': failed to parse CID. %s", post.id, reply_cid.error() ) return root_ref = StrongRef(uri=root_uri, cid=root_cid.value()) reply_ref = StrongRef(uri=reply_uri, cid=reply_cid.value()) reply_to = ReplyRef(root=root_ref, parent=reply_ref) labels_attachment = post.attachments.get(LabelsAttachment) spoiler: str | None = ( labels_attachment.labels[0] if labels_attachment and labels_attachment.labels else None ) sensitive_attachment = post.attachments.get(SensitiveAttachment) is_sensitive = sensitive_attachment.sensitive if sensitive_attachment else False labels = self._build_labels(spoiler, is_sensitive) langs: list[str] | None = None langs_attachment = post.attachments.get(LanguagesAttachment) if langs_attachment and langs_attachment.langs: langs = langs_attachment.langs[:3] media_attachment = post.attachments.get(MediaAttachment) all_media = media_attachment.blobs if media_attachment else [] supported_media, unsupported_media = self._split_attachments(all_media) tokens: list[Token] = [] if spoiler: tokens.append(TextToken(text=f"[{spoiler}]\n\n")) tokens.extend(post.tokens) if unsupported_media: tokens.append(TextToken(text="\n")) for attachment in unsupported_media: url = ( attachment.url if hasattr(attachment, "url") and attachment.url else attachment.name or "unknown" ) name = "💾 file" if attachment.name: if attachment.mime.startswith("audio/"): name = "🎵 " + attachment.name elif attachment.mime.startswith("text/"): name = "📄 " + attachment.name else: name = "💾 " + attachment.name if len(name) > 28: name = name[: 28 - 1] + "…" tokens.append(LinkToken(href=url, label=f"[{name}]")) tokens.append(TextToken(text=" ")) if post.text_type == "text/x.misskeymarkdown": tokens, status = mfm.strip_mfm(tokens) remote_url = post.attachments.get(RemoteUrlAttachment) if status and remote_url and remote_url.url: tokens.append(TextToken(text="\n")) tokens.append( LinkToken( href=remote_url.url, label="[Post contains MFM, see original]" ) ) quote_attachment = post.attachments.get(QuoteAttachment) quoted_cid: str | None = None quoted_uri: str | None = None if quote_attachment: if quote_attachment.quoted_user != post.author: self.log.info("Skipping '%s': quoted other user", post.id) return quoted_post = self._get_post( post.service, post.author, quote_attachment.quoted_id ) if not quoted_post: self.log.error("Skipping '%s': quoted post not found in db!", post.id) return quoted_mappings = self._get_mappings(quoted_post["id"], self.url, self.did) if not quoted_mappings: self.log.error( "Skipping '%s': failed to find mappings for quoted post", post.id ) return quoted_result = cid_from_json(quoted_mappings[0]["extra_data"]) if not quoted_result.is_ok(): self.log.error( "Skipping '%s': failed to parse CID. %s", post.id, quoted_result.error(), ) return quoted_cid = quoted_result.value() quoted_uri = quoted_mappings[0]["identifier"] splitter = TokenSplitter(max_chars=300, max_link_len=30) token_blocks = splitter.split(tokens) if not token_blocks.is_ok(): self.log.error("Skipping '%s': %s", post.id, token_blocks.error()) return for blob in supported_media: if blob.mime.startswith("image/") and len(blob.io) > 2_000_000: self.log.error( "Skipping '%s': image too large", post.id, ) return if blob.mime.startswith("video/"): if blob.mime != "video/mp4" and not self.options.encode_videos: self.log.info( "Skipping '%s': video is not mp4, but encoding is disabled", post.id, ) return if len(blob.io) > 100_000_000: self.log.error( "Skipping '%s': video too large", post.id, ) return baked_media = self._split_media_per_post( [list(block) for block in token_blocks.value()], supported_media, ) precomputed_richtexts: list[tuple[str, list[Facet]]] = [] for block in token_blocks.value(): result = tokens_to_richtext(block) if result is None: self.log.error( "Skipping '%s': invalid rich text types", post.id, ) return precomputed_richtexts.append(result) created_records: list[tuple[str, str]] = [] post_root_ref: StrongRef | None = None previous_reply_ref: StrongRef | None = None original_thread_root: StrongRef | None = reply_to.root if reply_to else None richtext_index = 0 for i, (block_tokens, attachments) in enumerate(baked_media): if block_tokens and richtext_index < len(precomputed_richtexts): text, facets = precomputed_richtexts[richtext_index] richtext_index += 1 else: text = "" facets = [] current_reply_to: ReplyRef | None = None if i == 0: current_reply_to = reply_to elif previous_reply_ref and post_root_ref: root_ref = original_thread_root or post_root_ref current_reply_to = ReplyRef(root=root_ref, parent=previous_reply_ref) embed: dict[str, Any] | None = None if i == 0 and quoted_uri and quoted_cid: if attachments and attachments[0].mime.startswith("image/"): embed = RecordEmbed( record=StrongRef(uri=quoted_uri, cid=quoted_cid) ).to_dict() else: embed = RecordEmbed( record=StrongRef(uri=quoted_uri, cid=quoted_cid) ).to_dict() if not attachments: response = self._client.send_post( text=text or " ", facets=facets or None, embed=embed, reply_to=current_reply_to, labels=labels, langs=langs, ) elif attachments[0].mime.startswith("image/"): images: list[ImageEmbed] = [] for img_blob in attachments[:4]: image_io = img_blob.io if len(image_io) > 1_000_000: self.log.info("Compressing %s...", img_blob.name or "image") compressed = compress_image(img_blob) image_io = compressed.io try: meta = get_media_meta(image_io) aspect_ratio = (meta.width, meta.height) except Exception as e: self.log.error(e) aspect_ratio = None images.append( ImageEmbed( image=image_io, alt=img_blob.alt, aspect_ratio=aspect_ratio, ) ) response = self._client.send_images( text=text or "", images=images, facets=facets or None, embed=embed, reply_to=current_reply_to, labels=labels, langs=langs, ) else: video_blob = attachments[0] video_io = video_blob.io if video_blob.mime != "video/mp4": self.log.info("Converting %s to mp4...", video_blob.name or "video") converted = convert_to_mp4(video_blob) video_io = converted.io try: meta = get_media_meta(video_io) aspect_ratio = (meta.width, meta.height) duration = meta.duration except Exception as e: self.log.error(e) aspect_ratio = None duration = None if duration and duration > 180: self.log.info( "Skipping '%s': video too long (%.1f > 180s)", post.id, duration, ) return response = self._client.send_video( text=text or "", video=video_io, alt=video_blob.alt, aspect_ratio=aspect_ratio, embed=embed, reply_to=current_reply_to, labels=labels, langs=langs, ) created_records.append((response.uri, response.cid)) if post_root_ref is None: post_root_ref = StrongRef(uri=response.uri, cid=response.cid) previous_reply_ref = StrongRef(uri=response.uri, cid=response.cid) if i == 0: self._client.create_gates( response.uri, self.options.thread_gate, self.options.quote_gate, ) all_records = created_records if new_root_id is None or new_parent_id is None: new_root_id = self._insert_post( { "user": self.did, "service": self.url, "identifier": created_records[0][0], "parent": None, "root": None, "reposted": None, "extra_data": json.dumps({"cid": created_records[0][1]}), "crossposted": 1, } ) new_parent_id = new_root_id created_records = created_records[1:] self._insert_post_mapping(db_post["id"], new_parent_id) for uri, cid in created_records: new_parent_id = self._insert_post( { "user": self.did, "service": self.url, "identifier": uri, "parent": new_parent_id, "root": new_root_id, "reposted": None, "extra_data": json.dumps({"cid": cid}), "crossposted": 1, } ) self._insert_post_mapping(db_post["id"], new_parent_id) self.log.info( "Post accepted successfully: %s -> %s", post.id, [r[0] for r in all_records], ) @override def delete_post(self, post: PostRef): db_post = self._get_post(post.service, post.author, post.id) if not db_post: self.log.warning("Skipping delete '%s': post not found in db", post.id) return mappings = self._get_mappings(db_post["id"], self.url, self.did) for mapping in mappings[::-1]: self._client.delete_post(mapping["identifier"]) self._delete_post_by_id(mapping["id"]) self.log.info("Deleted '%s'", post.id) @override def accept_repost(self, repost: PostRef, reposted: PostRef): db_repost = self._get_post(repost.service, repost.author, repost.id) db_reposted = self._get_post(reposted.service, reposted.author, reposted.id) if not db_repost or not db_reposted: self.log.info("Skipping repost '%s': post not found in db", repost.id) return mappings = self._get_mappings(db_reposted["id"], self.url, self.did) if not mappings: return cid = cid_from_json(mappings[0]["extra_data"]) if not cid.is_ok(): self.log.exception( "Skipping repost '%s': failed to parse CID from extra_data", repost.id ) return response = self._client.repost(mappings[0]["identifier"], cid.value()) self._insert_post( { "user": self.did, "service": self.url, "identifier": response.uri, "parent": None, "root": None, "reposted": mappings[0]["id"], "extra_data": json.dumps({"cid": response.cid}), "crossposted": 1, } ) inserted = self._get_post(self.url, self.did, response.uri) if not inserted: raise ValueError("Inserted post not found!") self._insert_post_mapping(db_repost["id"], inserted["id"]) self.log.info("Repost accepted successfully: %s", repost.id) @override def delete_repost(self, repost: PostRef): db_repost = self._get_post(repost.service, repost.author, repost.id) if not db_repost: self.log.warning("Skipping delete '%s': repost not found in db", repost.id) return mappings = self._get_mappings(db_repost["id"], self.url, self.did) if mappings: self._client.delete_repost(mappings[0]["identifier"]) self._delete_post_by_id(mappings[0]["id"]) self.log.info("Deleted %s", repost.id)