๐Ÿ“‚๐ŸŒŠ An event stream of ATProto blobs blobstream.zio.blue
atproto deno jetstream

commit current working code

+592
+8
blobstream.code-workspace
··· 1 + { 2 + "folders": [ 3 + { 4 + "path": "." 5 + } 6 + ], 7 + "settings": {} 8 + }
+9
deno.json
··· 1 + { 2 + "tasks": { 3 + "dev": "deno run --allow-net --watch ./src/main.ts", 4 + "start": "deno run --allow-net ./src/main.ts" 5 + }, 6 + "imports": { 7 + "@std/assert": "jsr:@std/assert@1" 8 + } 9 + }
+16
deno.lock
··· 1 + { 2 + "version": "5", 3 + "specifiers": { 4 + "jsr:@std/cli@*": "1.0.23" 5 + }, 6 + "jsr": { 7 + "@std/cli@1.0.23": { 8 + "integrity": "bf95b7a9425ba2af1ae5a6359daf58c508f2decf711a76ed2993cd352498ccca" 9 + } 10 + }, 11 + "workspace": { 12 + "dependencies": [ 13 + "jsr:@std/assert@1" 14 + ] 15 + } 16 + }
+9
src/classes/Args.ts
··· 1 + export default class Args { 2 + cacheDid: number | 0 = 0; 3 + disableAssociatedExtracting: boolean = false; 4 + disablePdsResolving: boolean = false; 5 + endpointJetstream: string | null = null; 6 + endpointPlcdir: string | null = null; 7 + listenHost: string | null = null; 8 + listenPort: number | 0 = 0; 9 + }
+4
src/classes/DidPdsCache.ts
··· 1 + export default class DidPdsCache { 2 + did: string | null = null; 3 + pds: string | null = null; 4 + }
+26
src/classes/RuntimeStats.ts
··· 1 + export default class RuntimeStats { 2 + currentCursor: number = 0; 3 + didCache: { 4 + added: number; 5 + cached: number; 6 + popped: number; 7 + } = { 8 + added: 0, 9 + cached: 0, 10 + popped: 0, 11 + }; 12 + events: { 13 + averageProcessTime: number; 14 + dropped: number; 15 + failures: number; 16 + total: number; 17 + } = { 18 + averageProcessTime: 0, 19 + dropped: 0, 20 + failures: 0, 21 + total: 0, 22 + }; 23 + startTime: Date | null = null; 24 + totalBytes: number = 0; 25 + totalClients: number = 0; 26 + }
+62
src/classes/SubscribeFilters.ts
··· 1 + export default class SubscribeFilters { 2 + collections?: string[]; 3 + dids?: string[]; 4 + mimeTypes?: string[]; 5 + 6 + constructor( 7 + collections?: string[] | undefined, 8 + dids?: string[] | undefined, 9 + mimeTypes?: string[] | undefined, 10 + ) { 11 + this.collections = collections && collections.length 12 + ? Array.from(new Set(collections)) 13 + : undefined; 14 + this.dids = dids && dids.length ? Array.from(new Set(dids)) : undefined; 15 + this.mimeTypes = mimeTypes && mimeTypes.length 16 + ? Array.from(new Set(mimeTypes)) 17 + : undefined; 18 + } 19 + 20 + private static splitParamValues(values: string[]): string[] { 21 + return values 22 + .flatMap((v) => v.split(",")) 23 + .map((s) => s.trim()) 24 + .filter(Boolean); 25 + } 26 + 27 + static fromSearchParams(params: URLSearchParams): Filters { 28 + const collections = Filters.splitParamValues( 29 + params.getAll("wantedCollections"), 30 + ); 31 + const dids = Filters.splitParamValues(params.getAll("wantedDids")); 32 + const mimeTypes = Filters.splitParamValues( 33 + params.getAll("wantedMimeTypes"), 34 + ); 35 + return new Filters(collections, dids, mimeTypes); 36 + } 37 + 38 + matches(data: any): boolean { 39 + const col = data?.source?.collection; 40 + const did = data?.source?.did; 41 + const mtype = data?.blob?.mimeType; 42 + 43 + const matchesCollection = !this.collections || 44 + (col !== undefined && this.collections.includes(col)); 45 + const matchesDid = !this.dids || 46 + (did !== undefined && this.dids.includes(did)); 47 + const matchesMimeType = !this.mimeTypes || 48 + (mtype !== undefined && this.mimeTypes.includes(mtype)); 49 + 50 + return matchesCollection && matchesDid && matchesMimeType; 51 + } 52 + 53 + toString(): string { 54 + const parts: string[] = []; 55 + if (this.collections) { 56 + parts.push(`collection=${this.collections.join(",")}`); 57 + } 58 + if (this.dids) parts.push(`did=${this.dids.join(",")}`); 59 + if (this.mimeTypes) parts.push(`mimeType=${this.mimeTypes.join(",")}`); 60 + return parts.length ? parts.join(", ") : "any"; 61 + } 62 + }
+138
src/main.ts
··· 1 + import { parseArgs } from "jsr:@std/cli/parse-args"; 2 + import { args, stats } from "./store.ts"; 3 + import Meta from "./meta.ts"; 4 + import SubscribeFilters from "./classes/SubscribeFilters.ts"; 5 + import { parseEvent, printLandingHtml, printSocketMessage } from "./services.ts"; 6 + 7 + const parsedArgs = parseArgs(Deno.args, { 8 + string: [ 9 + "cache-did", 10 + "endpoint-jetstream", 11 + "endpoint-plcdir", 12 + "listen-host", 13 + "listen-port", 14 + ], 15 + boolean: [ 16 + "disable-associated-extracting", 17 + "disable-pds-resolving", 18 + "help" 19 + ], 20 + default: { 21 + "cache-did": 10000, 22 + "endpoint-jetstream": "wss://jetstream2.us-east.bsky.network/subscribe", 23 + "endpoint-plcdir": "https://plc.zio.blue", 24 + "listen-host": "0.0.0.0", 25 + "listen-port": 8080, 26 + }, 27 + }); 28 + 29 + args.cacheDid = parseInt(parsedArgs["cache-did"] as string); 30 + args.disableAssociatedExtracting = parsedArgs["disable-associated-extracting"]; 31 + args.disablePdsResolving = parsedArgs["disable-pds-resolving"]; 32 + args.endpointJetstream = parsedArgs["endpoint-jetstream"]; 33 + args.endpointPlcdir = parsedArgs["endpoint-plcdir"]; 34 + args.listenHost = parsedArgs["listen-host"]; 35 + args.listenPort = parseInt(parsedArgs["listen-port"] as string); 36 + 37 + const clients = new Map<WebSocket, SubscribeFilters>(); 38 + const jetstream = new WebSocket(args.endpointJetstream); 39 + 40 + stats.startTime = new Date(); 41 + 42 + function printHelpText() { 43 + console.log(`Blobstream | ๐Ÿ“‚๐ŸŒŠ 44 + An event stream of ATProto blobs 45 + 46 + Version ${Meta.version} 47 + (c) ${Meta.copyrightYear} zio <${Meta.forgeUrl}> 48 + Licensed as MIT License โœจ 49 + 50 + ๐Ÿฆ‹ Follow ${Meta.authorDid} on the ATmosphere 51 + โ†ณ Bluesky: https://bsky.app/profile/${Meta.authorDid} 52 + โ†ณ Tangled: https://tangled.sh/${Meta.authorDid} 53 + 54 + (Todo)`); 55 + } 56 + 57 + function broadcast(data: any) { 58 + for (const [client, filters] of clients) { 59 + if (filters.matches(data)) { 60 + try { 61 + client.send(JSON.stringify(data)); 62 + } catch { 63 + clients.delete(client); 64 + } 65 + } 66 + } 67 + } 68 + 69 + const handler = (req: Request, connInfo: Deno.ConnInfo): Response => { 70 + const url = new URL(req.url); 71 + 72 + if (url.pathname === "/subscribe") { 73 + const { socket, response } = Deno.upgradeWebSocket(req); 74 + 75 + const filters = SubscribeFilters.fromSearchParams(url.searchParams); 76 + 77 + clients.set(socket, filters); 78 + 79 + const forwarded = req.headers.get("X-Forwarded-For"); 80 + const ip = forwarded 81 + ? forwarded.split(",")[0] 82 + : (connInfo.remoteAddr as any).hostname; 83 + 84 + socket.onclose = () => { 85 + stats.totalClients--; 86 + clients.delete(socket); 87 + printSocketMessage(true, ip, filters); 88 + }; 89 + 90 + stats.totalClients++; 91 + printSocketMessage(false, ip, filters); 92 + 93 + return response; 94 + } 95 + 96 + return new Response(printLandingHtml(), { 97 + headers: { "content-type": "text/html; charset=utf-8" }, 98 + status: 200, 99 + }); 100 + }; 101 + 102 + if(parsedArgs["help"] == true) { 103 + printHelpText(); 104 + Deno.exit(); 105 + } 106 + 107 + Deno.serve( 108 + { port: args.listenPort, hostname: args.listenHost }, 109 + handler, 110 + ); 111 + 112 + jetstream.onmessage = async (e) => { 113 + try { 114 + const start = performance.now(); 115 + 116 + const blobs = await parseEvent(e); 117 + blobs.forEach(broadcast); 118 + 119 + const duration = performance.now() - start; 120 + const samples = ((stats as any).responseSamples || 0) + 1; 121 + (stats as any).responseSamples = samples; 122 + stats.events.averageProcessTime = 123 + ((stats.events.averageProcessTime * (samples - 1)) + duration) / 124 + samples; 125 + } catch (err) { 126 + stats.events.failures++; 127 + console.error("Parse error:", err); 128 + } 129 + }; 130 + 131 + jetstream.onerror = (e) => console.error("Jetstream error:", e); 132 + jetstream.onclose = () => console.log("Jetstream closed"); 133 + 134 + Deno.addSignalListener("SIGINT", () => { 135 + console.log("Shutting down..."); 136 + jetstream.close(); 137 + Deno.exit(0); 138 + });
+7
src/meta.ts
··· 1 + export default class Meta { 2 + static author: string = "zio"; 3 + static authorDid: string = "did:web:zio.sh"; 4 + static copyrightYear: number = 2025; 5 + static forgeUrl: string = "https://tangled.org/@zio.sh/blobstream"; 6 + static version: string = "0.0.0"; 7 + }
+288
src/services.ts
··· 1 + import SubscribeFilters from "./classes/SubscribeFilters.ts"; 2 + import Meta from "./meta.ts"; 3 + import { args, didPdsCache, stats } from "./store.ts"; 4 + 5 + export async function parseEvent(e: MessageEvent<any>) { 6 + const data = JSON.parse(e.data); 7 + const cursor = data.time_us || null; 8 + const blobs = await findBlobObjects(data, cursor); 9 + 10 + return blobs; 11 + } 12 + 13 + export function printLandingHtml(): string { 14 + const header = ` ____ _ _ _ 15 + | __ )| | ___ | |__ ___| |_ _ __ ___ __ _ _ __ ___ 16 + | _ \\| |/ _ \\| '_ \\/ __| __| '__/ _ \\/ _\` | '_ \` _ \\ 17 + | |_) | | (_) | |_) \\__ | |_| | | __| (_| | | | | | | 18 + |____/|_|\\___/|_.__/|___/\\__|_| \\___|\\__,_|_| |_| |_|`; 19 + 20 + const outputHtml = ` 21 + <!DOCTYPE html> 22 + <html> 23 + <head> 24 + <style> 25 + :root { 26 + color-scheme: light dark; 27 + } 28 + 29 + body { 30 + color: light-dark(#000000, #ffffff); 31 + background-color: light-dark(#ffffff, #18191b); 32 + font-family: monospace; 33 + margin: 25px; 34 + white-space: pre-wrap; 35 + word-wrap: break-word; 36 + } 37 + 38 + a { 39 + color: light-dark(#0a5dbd, #a4cefe); 40 + } 41 + 42 + .subtle { 43 + opacity: 0.8; 44 + } 45 + </style> 46 + <title>Blobstream</title> 47 + </head> 48 + <body>${header} 49 + 50 + โœจ Events: ${stats.events.total.toLocaleString()} 51 + โ†ณ Dropped: ${stats.events.dropped.toLocaleString()} 52 + โ†ณ Failed: ${stats.events.failures.toLocaleString()} 53 + โ†ณ <span title="Average Process Time">APT</span>: ${ 54 + stats.events.averageProcessTime.toFixed(5) 55 + }ms 56 + ๐Ÿ•— Uptime: ${getTimeSinceStart()} 57 + ๐Ÿ“ Cursor: ${stats.currentCursor} 58 + โ†ณ Latest: ${parseDate(new Date(stats.currentCursor / 1000))} 59 + โ†ณ Current: ${parseDate(new Date())} 60 + ๐Ÿ“‡ DIDs: ${stats.didCache.cached.toLocaleString()} 61 + โ†ณ Added: +${stats.didCache.added.toLocaleString()} 62 + โ†ณ Dropped: -${stats.didCache.popped.toLocaleString()} 63 + ๐Ÿ‘ค Clients: ${stats.totalClients.toLocaleString()} 64 + โฌ†๏ธ Upstream: ${new URL(args.endpointJetstream as string).host} 65 + 66 + WebSocket endpoint at <strong>/subscribe</strong>. Filter with queries: 67 + * <strong>wantedCollections</strong> &mdash; Collection(s) to filter by 68 + <em class="subtle">(e.g. app.bsky.feed.post, blue.zio.atfile.upload)</em> 69 + * <strong>wantedDids</strong> &mdash; DID(s) to filter by 70 + <em class="subtle">(e.g. did:plc:z72i7hdynmk6r22z27h6tvur, did:web:didd.uk)</em> 71 + * <strong>wantedMimeTypes</strong> &mdash; MimeType(s) to filter by 72 + <em class="subtle">(e.g. image/jpeg, video/mp4)</em> 73 + 74 + <a href="https://tangled.org/@zio.sh/blobstream">๐Ÿ‘ @zio.sh/blobstream</a> ใƒป ${Meta.version} 75 + </body> 76 + </html> 77 + `; 78 + 79 + return outputHtml; 80 + } 81 + 82 + export function printSocketMessage(closing: boolean, ip: string, filters: SubscribeFilters) { 83 + const verb = closing ? "Closing" : "Opening"; 84 + console.log( 85 + `${verb} (${stats.totalClients.toLocaleString()}): ${ip} (${ 86 + filters.toString().replace(", ", "; ") 87 + })`, 88 + ); 89 + } 90 + 91 + function extractStringsFromRecord(record: any) { 92 + let type = record.$type; 93 + let associated: string[] = []; 94 + 95 + switch (type) { 96 + case "app.bsky.actor.profile": 97 + if(record.description != undefined && record.description != null) 98 + associated.push(record.description); 99 + break; 100 + case "app.bsky.feed.post": 101 + if (record.embed != undefined) { 102 + switch (record.embed.$type) { 103 + // deno-lint-ignore no-case-declarations 104 + case "app.bsky.embed.images": 105 + const imgs = record.embed.images; 106 + if (imgs) { 107 + const images = Array.isArray(imgs) 108 + ? imgs 109 + : (Array.isArray((imgs as any).images) 110 + ? (imgs as any).images 111 + : [imgs]); 112 + 113 + for (const image of images) { 114 + if (!image) continue; 115 + const alt = (typeof image === "object") 116 + ? (image.alt ?? undefined) 117 + : (typeof image === "string" ? image : undefined); 118 + if (typeof alt === "string" && alt != "") { 119 + associated.push(alt); 120 + } 121 + } 122 + } 123 + break; 124 + case "app.bsky.embed.video": 125 + if(record.embed.alt != undefined && record.embed.alt != "") 126 + associated.push(record.embed.alt); 127 + break; 128 + } 129 + } 130 + 131 + if ( 132 + record.labels != undefined && 133 + record.labels.$type == "com.atproto.label.defs#selfLabels" && 134 + record.labels.values != undefined && record.labels.values.length != 0 135 + ) { 136 + const labels = (record.labels.values as any[]) 137 + .map((v) => (typeof v === "string" ? v : v?.val)) 138 + .filter(Boolean); 139 + if (labels.length) { 140 + for(const label in labels) { 141 + associated.push(label); 142 + } 143 + } 144 + } 145 + 146 + if (record.tags != undefined && record.tags.length != 0) { 147 + for(const tag in record.tags) { 148 + associated.push(`#${record.tags}`); 149 + } 150 + } 151 + if (record.text != undefined && record.text != "") { 152 + associated.push(record.text); 153 + } 154 + break; 155 + } 156 + 157 + return associated; 158 + } 159 + 160 + async function findBlobObjects( 161 + obj: any, 162 + cursor: number | null, 163 + ): Promise<any[]> { 164 + const blobs: any[] = []; 165 + 166 + async function search( 167 + current: any, 168 + rkey: string | null = null, 169 + collection: string | null = null, 170 + did: string | null = null, 171 + record: any | null = null, 172 + ): Promise<void> { 173 + if (!current || typeof current !== "object") return; 174 + 175 + if (current.rkey) rkey = current.rkey; 176 + if (current.collection) collection = current.collection; 177 + if (current.did) did = current.did; 178 + if (current.record) record = current.record; 179 + 180 + if (current.$type === "blob" && current.ref?.["$link"]) { 181 + let associated; 182 + let pds; 183 + 184 + if (args.disableAssociatedExtracting != true && record != null) { 185 + associated = extractStringsFromRecord(record); 186 + } 187 + if (args.disablePdsResolving != true) pds = await getPds(did); 188 + 189 + blobs.push({ 190 + blob: { 191 + cid: current.ref["$link"], 192 + mimeType: current.mimeType, 193 + size: current.size, 194 + }, 195 + source: { 196 + collection: collection, 197 + did: did, 198 + rkey: rkey, 199 + pds: pds, 200 + }, 201 + associated: associated, 202 + cursor: cursor, 203 + }); 204 + 205 + if (cursor !== null) stats.currentCursor = cursor; 206 + stats.events.total++; 207 + } else { 208 + stats.events.dropped++; 209 + } 210 + 211 + if (Array.isArray(current)) { 212 + for (const item of current) { 213 + await search(item, rkey, collection, did, record); 214 + } 215 + } else { 216 + for (const key in current) { 217 + await search(current[key], rkey, collection, did, record); 218 + } 219 + } 220 + } 221 + 222 + await search(obj); 223 + return blobs; 224 + } 225 + 226 + async function getPds(did: string | null): Promise<string | undefined> { 227 + if (!did) return undefined; 228 + let didUrl = undefined; 229 + 230 + const cached = didPdsCache.find((item) => item.did === did); 231 + if (cached) { 232 + if (cached.pds === null) return undefined; 233 + return cached.pds; 234 + } 235 + 236 + if (did.startsWith("did:plc:")) { 237 + didUrl = `${args.endpointPlcdir}/${did}`; 238 + } else if (did.startsWith("did:web:")) { 239 + didUrl = `https://${did.replace("did:web:", "")}/.well-known/did.json`; 240 + } 241 + 242 + if (didUrl !== undefined) { 243 + const didDoc = await fetch(didUrl); 244 + if (!didDoc.ok) return undefined; 245 + const didDocJson = await didDoc.json(); 246 + const pds = didDocJson?.service?.[0]?.serviceEndpoint; 247 + 248 + if (pds && pds.startsWith("https://")) { 249 + // cache result for future lookups 250 + didPdsCache.push({ did: did, pds: pds }); 251 + stats.didCache.added++; 252 + 253 + if (didPdsCache.length > args.cacheDid) { 254 + stats.didCache.popped++; 255 + didPdsCache.shift(); 256 + } 257 + 258 + stats.didCache.cached = didPdsCache.length; 259 + 260 + return pds; 261 + } 262 + } 263 + 264 + return undefined; 265 + } 266 + 267 + function getTimeSinceStart(): string { 268 + if (!stats.startTime) { 269 + return "0d 0h 0m 0s"; 270 + } 271 + 272 + const elapsedMs = Date.now() - stats.startTime.getTime(); 273 + const seconds = Math.floor(elapsedMs / 1000); 274 + const minutes = Math.floor(seconds / 60); 275 + const hours = Math.floor(minutes / 60); 276 + const days = Math.floor(hours / 24); 277 + 278 + const remainingHours = hours % 24; 279 + const remainingMinutes = minutes % 60; 280 + const remainingSeconds = seconds % 60; 281 + 282 + return `${days}d ${remainingHours}h ${remainingMinutes}m ${remainingSeconds}s`; 283 + } 284 + 285 + 286 + function parseDate(date: Date): string { 287 + return date.toISOString().slice(0, 19).replace("T", " "); 288 + }
+25
src/store.ts
··· 1 + import RuntimeStats from "./classes/RuntimeStats.ts"; 2 + import DidPdsCache from "./classes/DidPdsCache.ts"; 3 + import Args from "./classes/Args.ts"; 4 + 5 + export const args: Args = new Args(); 6 + export const didPdsCache: DidPdsCache[] = []; 7 + export const stats = new RuntimeStats(); 8 + 9 + declare global { 10 + var __args__: Args | undefined; 11 + var __didPdsCache__: DidPdsCache | undefined; 12 + var __runtimeStats__: RuntimeStats | undefined; 13 + } 14 + 15 + if (typeof (globalThis as any).__args__ === 'undefined') { 16 + (globalThis as any).__args__ = args; 17 + } 18 + 19 + if (typeof (globalThis as any).__didPdsCache__ === 'undefined') { 20 + (globalThis as any).__didPdsCache__ = didPdsCache; 21 + } 22 + 23 + if (typeof (globalThis as any).__runtimeStats__ === 'undefined') { 24 + (globalThis as any).__runtimeStats__ = stats; 25 + }