📂🌊 An event stream of ATProto blobs
blobstream.zio.blue
atproto
deno
jetstream
1import SubscribeFilters from "./classes/SubscribeFilters.ts";
2import Meta from "./classes/Meta.ts";
3import { args, didPdsCache, stats } from "./store.ts";
4
5export async function parseEvent(e: MessageEvent<any>) {
6 const data = JSON.parse(e.data);
7 const cursor = data.time_us || null;
8 const blobs = await findBlobObjects(data, cursor);
9
10 return blobs;
11}
12
13export function printLandingHtml(): string {
14 const header = ` ____ _ _ _
15| __ )| | ___ | |__ ___| |_ _ __ ___ __ _ _ __ ___
16| _ \\| |/ _ \\| '_ \\/ __| __| '__/ _ \\/ _\` | '_ \` _ \\
17| |_) | | (_) | |_) \\__ | |_| | | __| (_| | | | | | |
18|____/|_|\\___/|_.__/|___/\\__|_| \\___|\\__,_|_| |_| |_|`;
19
20 const outputHtml = `
21 <!DOCTYPE html>
22 <html>
23 <head>
24 <style>
25 :root {
26 color-scheme: light dark;
27 }
28
29 body {
30 color: light-dark(#000000, #ffffff);
31 background-color: light-dark(#ffffff, #18191b);
32 font-family: monospace;
33 margin: 25px;
34 white-space: pre-wrap;
35 word-wrap: break-word;
36 }
37
38 a {
39 color: light-dark(#0a5dbd, #a4cefe);
40 }
41
42 .subtle {
43 opacity: 0.8;
44 }
45 </style>
46 <title>Blobstream</title>
47 </head>
48 <body>${header}
49
50✨ Events: ${stats.events.total.toLocaleString()}
51 ↳ Dropped: ${stats.events.dropped.toLocaleString()}
52 ↳ Failed: ${stats.events.failures.toLocaleString()}
53 ↳ <span title="Average Process Time">APT</span>: ${
54 stats.events.averageProcessTime.toFixed(5)
55 }ms
56🕗 Uptime: ${getTimeSinceStart()}
57📍 Cursor: ${stats.currentCursor}
58 ↳ Latest: ${parseDate(new Date(stats.currentCursor / 1000))}
59 ↳ Current: ${parseDate(new Date())}
60📇 DIDs: ${stats.didCache.cached.toLocaleString()}
61 ↳ Added: +${stats.didCache.added.toLocaleString()}
62 ↳ Dropped: -${stats.didCache.popped.toLocaleString()}
63👤 Clients: ${stats.totalClients.toLocaleString()}
64⬆️ Upstream: ${new URL(args.endpointJetstream as string).host}
65
66WebSocket endpoint at <strong>/subscribe</strong>. Filter with queries:
67* <strong>wantedCollections</strong> — Collection(s) to filter by
68 <em class="subtle">(e.g. app.bsky.feed.post, blue.zio.atfile.upload)</em>
69* <strong>wantedDids</strong> — DID(s) to filter by
70 <em class="subtle">(e.g. did:plc:z72i7hdynmk6r22z27h6tvur, did:web:didd.uk)</em>
71* <strong>wantedMimeTypes</strong> — MimeType(s) to filter by
72 <em class="subtle">(e.g. image/jpeg, video/mp4)</em>
73* <strong>wantedPdses</strong> — PDS(es) to filter by (without protocol)
74 <em class="subtle">(e.g. shimeji.us-east.host.bsky.network, zio.blue)
75
76<a href="https://tangled.org/@zio.sh/blobstream">🐑 @zio.sh/blobstream</a> ・ ${Meta.version}
77 </body>
78 </html>
79 `;
80
81 return outputHtml;
82}
83
84export function printSocketMessage(closing: boolean, ip: string, filters: SubscribeFilters) {
85 const verb = closing ? "Closing" : "Opening";
86 console.log(
87 `${verb} (${stats.totalClients.toLocaleString()}): ${ip} (${
88 filters.toString().replace(", ", "; ")
89 })`,
90 );
91}
92
93function extractStringsFromRecord(record: any) {
94 let type = record.$type;
95 let associated: string[] = [];
96
97 switch (type) {
98 case "app.bsky.actor.profile":
99 if(record.description != undefined && record.description != null)
100 associated.push(record.description);
101 break;
102 case "app.bsky.feed.post":
103 if (record.embed != undefined) {
104 switch (record.embed.$type) {
105 // deno-lint-ignore no-case-declarations
106 case "app.bsky.embed.images":
107 const imgs = record.embed.images;
108 if (imgs) {
109 const images = Array.isArray(imgs)
110 ? imgs
111 : (Array.isArray((imgs as any).images)
112 ? (imgs as any).images
113 : [imgs]);
114
115 for (const image of images) {
116 if (!image) continue;
117 const alt = (typeof image === "object")
118 ? (image.alt ?? undefined)
119 : (typeof image === "string" ? image : undefined);
120 if (typeof alt === "string" && alt != "") {
121 associated.push(alt);
122 }
123 }
124 }
125 break;
126 case "app.bsky.embed.video":
127 if(record.embed.alt != undefined && record.embed.alt != "")
128 associated.push(record.embed.alt);
129 break;
130 }
131 }
132
133 if (
134 record.labels != undefined &&
135 record.labels.$type == "com.atproto.label.defs#selfLabels" &&
136 record.labels.values != undefined && record.labels.values.length != 0
137 ) {
138 const labels = (record.labels.values as any[])
139 .map((v) => (typeof v === "string" ? v : v?.val))
140 .filter(Boolean);
141 if (labels.length) {
142 for(const label in labels) {
143 associated.push(label);
144 }
145 }
146 }
147
148 if (record.tags != undefined && record.tags.length != 0) {
149 for(const tag in record.tags) {
150 associated.push(`#${record.tags}`);
151 }
152 }
153 if (record.text != undefined && record.text != "") {
154 associated.push(record.text);
155 }
156 break;
157 }
158
159 return associated;
160}
161
162async function findBlobObjects(
163 obj: any,
164 cursor: number | null,
165): Promise<any[]> {
166 const blobs: any[] = [];
167
168 async function search(
169 current: any,
170 rkey: string | null = null,
171 collection: string | null = null,
172 did: string | null = null,
173 record: any | null = null,
174 ): Promise<void> {
175 if (!current || typeof current !== "object") return;
176
177 if (current.rkey) rkey = current.rkey;
178 if (current.collection) collection = current.collection;
179 if (current.did) did = current.did;
180 if (current.record) record = current.record;
181
182 if (current.$type === "blob" && current.ref?.["$link"]) {
183 let associated;
184 let pds;
185
186 if (args.disableAssociatedExtracting != true && record != null) {
187 associated = extractStringsFromRecord(record);
188 }
189 if (args.disablePdsResolving != true) pds = await getPds(did);
190
191 blobs.push({
192 blob: {
193 cid: current.ref["$link"],
194 mimeType: current.mimeType,
195 size: current.size,
196 },
197 source: {
198 collection: collection,
199 did: did,
200 rkey: rkey,
201 pds: pds,
202 },
203 associated: associated,
204 cursor: cursor,
205 });
206
207 if (cursor !== null) stats.currentCursor = cursor;
208 stats.events.total++;
209 } else {
210 stats.events.dropped++;
211 }
212
213 if (Array.isArray(current)) {
214 for (const item of current) {
215 await search(item, rkey, collection, did, record);
216 }
217 } else {
218 for (const key in current) {
219 await search(current[key], rkey, collection, did, record);
220 }
221 }
222 }
223
224 await search(obj);
225 return blobs;
226}
227
228async function getPds(did: string | null): Promise<string | undefined> {
229 if (!did) return undefined;
230 let didUrl = undefined;
231
232 const cached = didPdsCache.find((item) => item.did === did);
233 if (cached) {
234 if (cached.pds === null) return undefined;
235 return cached.pds;
236 }
237
238 if (did.startsWith("did:plc:")) {
239 didUrl = `${args.endpointPlcdir}/${did}`;
240 } else if (did.startsWith("did:web:")) {
241 didUrl = `https://${did.replace("did:web:", "")}/.well-known/did.json`;
242 }
243
244 if (didUrl !== undefined) {
245 const didDoc = await fetch(didUrl);
246 if (!didDoc.ok) return undefined;
247 const didDocJson = await didDoc.json();
248 const pds = didDocJson?.service?.[0]?.serviceEndpoint;
249
250 if (pds && pds.startsWith("https://")) {
251 // cache result for future lookups
252 didPdsCache.push({ did: did, pds: pds });
253 stats.didCache.added++;
254
255 if (didPdsCache.length > args.cacheDid) {
256 stats.didCache.popped++;
257 didPdsCache.shift();
258 }
259
260 stats.didCache.cached = didPdsCache.length;
261
262 return pds;
263 }
264 }
265
266 return undefined;
267}
268
269function getTimeSinceStart(): string {
270 if (!stats.startTime) {
271 return "0d 0h 0m 0s";
272 }
273
274 const elapsedMs = Date.now() - stats.startTime.getTime();
275 const seconds = Math.floor(elapsedMs / 1000);
276 const minutes = Math.floor(seconds / 60);
277 const hours = Math.floor(minutes / 60);
278 const days = Math.floor(hours / 24);
279
280 const remainingHours = hours % 24;
281 const remainingMinutes = minutes % 60;
282 const remainingSeconds = seconds % 60;
283
284 return `${days}d ${remainingHours}h ${remainingMinutes}m ${remainingSeconds}s`;
285}
286
287
288function parseDate(date: Date): string {
289 return date.toISOString().slice(0, 19).replace("T", " ");
290}