social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from atproto.models import (
2 Facet,
3 FacetFeature,
4 FacetIndex,
5 LinkFeature,
6 MentionFeature,
7 TagFeature,
8)
9from cross.tokens import LinkToken, MentionToken, TagToken, TextToken, Token
10from util.splitter import canonical_label
11
12
13def richtext_to_tokens(text: str, facets: list[dict]) -> list[Token]:
14 if not text:
15 return []
16 ut8_text = text.encode("utf-8")
17 if not facets:
18 return [TextToken(text=ut8_text.decode("utf-8"))]
19
20 slices: list[tuple[int, int, str, str]] = []
21 for facet in facets:
22 features: list[dict] = facet.get("features", [])
23 if not features:
24 continue
25 feature = features[0]
26 feature_type = feature["$type"]
27 index = facet["index"]
28 match feature_type:
29 case "app.bsky.richtext.facet#tag":
30 slices.append(
31 (index["byteStart"], index["byteEnd"], "tag", feature["tag"])
32 )
33 case "app.bsky.richtext.facet#link":
34 slices.append(
35 (index["byteStart"], index["byteEnd"], "link", feature["uri"])
36 )
37 case "app.bsky.richtext.facet#mention":
38 slices.append(
39 (index["byteStart"], index["byteEnd"], "mention", feature["did"])
40 )
41
42 if not slices:
43 return [TextToken(text=ut8_text.decode("utf-8"))]
44
45 slices.sort(key=lambda s: s[0])
46 unique: list[tuple[int, int, str, str]] = []
47 current_end = 0
48 for start, end, ttype, val in slices:
49 if start >= current_end:
50 unique.append((start, end, ttype, val))
51 current_end = end
52
53 if not unique:
54 return [TextToken(text=ut8_text.decode("utf-8"))]
55
56 tokens: list[Token] = []
57 prev = 0
58
59 for start, end, ttype, val in unique:
60 if start > prev:
61 tokens.append(TextToken(text=ut8_text[prev:start].decode("utf-8")))
62 match ttype:
63 case "link":
64 label = ut8_text[start:end].decode("utf-8")
65 split = val.split("://", 1)
66 if (
67 len(split) > 1
68 and split[1].startswith(label)
69 or (label.endswith("...") and split[1].startswith(label[:-3]))
70 ):
71 tokens.append(LinkToken(href=val))
72 prev = end
73 continue
74 tokens.append(LinkToken(href=val, label=label))
75 case "tag":
76 tag = ut8_text[start:end].decode("utf-8")
77 tokens.append(TagToken(tag=tag[1:] if tag.startswith("#") else tag))
78 case "mention":
79 mention = ut8_text[start:end].decode("utf-8")
80 tokens.append(
81 MentionToken(
82 username=mention[1:] if mention.startswith("@") else mention,
83 uri=val,
84 )
85 )
86 prev = end
87
88 if prev < len(ut8_text):
89 tokens.append(TextToken(text=ut8_text[prev:].decode("utf-8")))
90
91 return tokens
92
93
94def tokens_to_richtext(tokens: list[Token]) -> tuple[str, list[Facet]] | None:
95 segments: list[tuple[str, FacetFeature | None]] = []
96 byte_offset = 0
97
98 for token in tokens:
99 match token:
100 case TextToken():
101 text_bytes = token.text.encode("utf-8")
102 segments.append((token.text, None))
103 byte_offset += len(text_bytes)
104
105 case TagToken():
106 tag_text = f"#{token.tag}"
107 tag_bytes = tag_text.encode("utf-8")
108 segments.append(
109 (
110 tag_text,
111 TagFeature(tag=token.tag),
112 )
113 )
114 byte_offset += len(tag_bytes)
115
116 case MentionToken():
117 mention_text = f"@{token.username}"
118 mention_bytes = mention_text.encode("utf-8")
119 segments.append(
120 (
121 mention_text,
122 MentionFeature(did=token.uri)
123 if token.uri
124 else MentionFeature(did=""),
125 )
126 )
127 byte_offset += len(mention_bytes)
128
129 case LinkToken():
130 href = token.href
131 label = token.label if token.label else href
132
133 if canonical_label(token.label, token.href):
134 max_label_len = 30
135 label_bytes = label.encode("utf-8")
136 if len(label_bytes) > max_label_len:
137 label = label[: max_label_len - 1] + "…"
138 label_bytes = label.encode("utf-8")
139 else:
140 label_bytes = label.encode("utf-8")
141
142 segments.append(
143 (
144 label,
145 LinkFeature(uri=href),
146 )
147 )
148 byte_offset += len(label_bytes)
149
150 case _:
151 return None
152
153 text = "".join(seg[0] for seg in segments)
154 facets: list[Facet] = []
155
156 current_offset = 0
157 for seg_text, seg_feature in segments:
158 if seg_feature:
159 seg_bytes = seg_text.encode("utf-8")
160 facets.append(
161 Facet(
162 index=FacetIndex(
163 byte_start=current_offset,
164 byte_end=current_offset + len(seg_bytes),
165 ),
166 features=[seg_feature],
167 )
168 )
169 current_offset += len(seg_text.encode("utf-8"))
170
171 return text, facets