social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at next 171 lines 5.7 kB view raw
1from atproto.models import ( 2 Facet, 3 FacetFeature, 4 FacetIndex, 5 LinkFeature, 6 MentionFeature, 7 TagFeature, 8) 9from cross.tokens import LinkToken, MentionToken, TagToken, TextToken, Token 10from util.splitter import canonical_label 11 12 13def richtext_to_tokens(text: str, facets: list[dict]) -> list[Token]: 14 if not text: 15 return [] 16 ut8_text = text.encode("utf-8") 17 if not facets: 18 return [TextToken(text=ut8_text.decode("utf-8"))] 19 20 slices: list[tuple[int, int, str, str]] = [] 21 for facet in facets: 22 features: list[dict] = facet.get("features", []) 23 if not features: 24 continue 25 feature = features[0] 26 feature_type = feature["$type"] 27 index = facet["index"] 28 match feature_type: 29 case "app.bsky.richtext.facet#tag": 30 slices.append( 31 (index["byteStart"], index["byteEnd"], "tag", feature["tag"]) 32 ) 33 case "app.bsky.richtext.facet#link": 34 slices.append( 35 (index["byteStart"], index["byteEnd"], "link", feature["uri"]) 36 ) 37 case "app.bsky.richtext.facet#mention": 38 slices.append( 39 (index["byteStart"], index["byteEnd"], "mention", feature["did"]) 40 ) 41 42 if not slices: 43 return [TextToken(text=ut8_text.decode("utf-8"))] 44 45 slices.sort(key=lambda s: s[0]) 46 unique: list[tuple[int, int, str, str]] = [] 47 current_end = 0 48 for start, end, ttype, val in slices: 49 if start >= current_end: 50 unique.append((start, end, ttype, val)) 51 current_end = end 52 53 if not unique: 54 return [TextToken(text=ut8_text.decode("utf-8"))] 55 56 tokens: list[Token] = [] 57 prev = 0 58 59 for start, end, ttype, val in unique: 60 if start > prev: 61 tokens.append(TextToken(text=ut8_text[prev:start].decode("utf-8"))) 62 match ttype: 63 case "link": 64 label = ut8_text[start:end].decode("utf-8") 65 split = val.split("://", 1) 66 if ( 67 len(split) > 1 68 and split[1].startswith(label) 69 or (label.endswith("...") and split[1].startswith(label[:-3])) 70 ): 71 tokens.append(LinkToken(href=val)) 72 prev = end 73 continue 74 tokens.append(LinkToken(href=val, label=label)) 75 case "tag": 76 tag = ut8_text[start:end].decode("utf-8") 77 tokens.append(TagToken(tag=tag[1:] if tag.startswith("#") else tag)) 78 case "mention": 79 mention = ut8_text[start:end].decode("utf-8") 80 tokens.append( 81 MentionToken( 82 username=mention[1:] if mention.startswith("@") else mention, 83 uri=val, 84 ) 85 ) 86 prev = end 87 88 if prev < len(ut8_text): 89 tokens.append(TextToken(text=ut8_text[prev:].decode("utf-8"))) 90 91 return tokens 92 93 94def tokens_to_richtext(tokens: list[Token]) -> tuple[str, list[Facet]] | None: 95 segments: list[tuple[str, FacetFeature | None]] = [] 96 byte_offset = 0 97 98 for token in tokens: 99 match token: 100 case TextToken(): 101 text_bytes = token.text.encode("utf-8") 102 segments.append((token.text, None)) 103 byte_offset += len(text_bytes) 104 105 case TagToken(): 106 tag_text = f"#{token.tag}" 107 tag_bytes = tag_text.encode("utf-8") 108 segments.append( 109 ( 110 tag_text, 111 TagFeature(tag=token.tag), 112 ) 113 ) 114 byte_offset += len(tag_bytes) 115 116 case MentionToken(): 117 mention_text = f"@{token.username}" 118 mention_bytes = mention_text.encode("utf-8") 119 segments.append( 120 ( 121 mention_text, 122 MentionFeature(did=token.uri) 123 if token.uri 124 else MentionFeature(did=""), 125 ) 126 ) 127 byte_offset += len(mention_bytes) 128 129 case LinkToken(): 130 href = token.href 131 label = token.label if token.label else href 132 133 if canonical_label(token.label, token.href): 134 max_label_len = 30 135 label_bytes = label.encode("utf-8") 136 if len(label_bytes) > max_label_len: 137 label = label[: max_label_len - 1] + "" 138 label_bytes = label.encode("utf-8") 139 else: 140 label_bytes = label.encode("utf-8") 141 142 segments.append( 143 ( 144 label, 145 LinkFeature(uri=href), 146 ) 147 ) 148 byte_offset += len(label_bytes) 149 150 case _: 151 return None 152 153 text = "".join(seg[0] for seg in segments) 154 facets: list[Facet] = [] 155 156 current_offset = 0 157 for seg_text, seg_feature in segments: 158 if seg_feature: 159 seg_bytes = seg_text.encode("utf-8") 160 facets.append( 161 Facet( 162 index=FacetIndex( 163 byte_start=current_offset, 164 byte_end=current_offset + len(seg_bytes), 165 ), 166 features=[seg_feature], 167 ) 168 ) 169 current_offset += len(seg_text.encode("utf-8")) 170 171 return text, facets