📂🌊 An event stream of ATProto blobs blobstream.zio.blue
atproto deno jetstream
at main 139 lines 3.8 kB view raw
1import { parseArgs } from "jsr:@std/cli/parse-args"; 2import { args, stats } from "./store.ts"; 3import Meta from "./meta.ts"; 4import SubscribeFilters from "./classes/SubscribeFilters.ts"; 5import { parseEvent, printLandingHtml, printSocketMessage } from "./services.ts"; 6 7const parsedArgs = parseArgs(Deno.args, { 8 string: [ 9 "cache-did", 10 "endpoint-jetstream", 11 "endpoint-plcdir", 12 "listen-host", 13 "listen-port", 14 ], 15 boolean: [ 16 "disable-associated-extracting", 17 "disable-pds-resolving", 18 "help" 19 ], 20 default: { 21 "cache-did": 10000, 22 "endpoint-jetstream": "wss://jetstream2.us-east.bsky.network/subscribe", 23 "endpoint-plcdir": "https://plc.zio.blue", 24 "listen-host": "0.0.0.0", 25 "listen-port": 8080, 26 }, 27}); 28 29args.cacheDid = parseInt(parsedArgs["cache-did"] as string); 30args.disableAssociatedExtracting = parsedArgs["disable-associated-extracting"]; 31args.disablePdsResolving = parsedArgs["disable-pds-resolving"]; 32args.endpointJetstream = parsedArgs["endpoint-jetstream"]; 33args.endpointPlcdir = parsedArgs["endpoint-plcdir"]; 34args.listenHost = parsedArgs["listen-host"]; 35args.listenPort = parseInt(parsedArgs["listen-port"] as string); 36 37const clients = new Map<WebSocket, SubscribeFilters>(); 38const jetstream = new WebSocket(args.endpointJetstream); 39 40stats.startTime = new Date(); 41 42function printHelpText() { 43 console.log(`Blobstream | 📂🌊 44An event stream of ATProto blobs 45 46 Version ${Meta.version} 47 (c) ${Meta.copyrightYear} zio <${Meta.forgeUrl}> 48 Licensed as MIT License ✨ 49 50 🦋 Follow ${Meta.authorDid} on the ATmosphere 51 ↳ Bluesky: https://bsky.app/profile/${Meta.authorDid} 52 ↳ Tangled: https://tangled.sh/${Meta.authorDid} 53 54(Todo)`); 55} 56 57function broadcast(data: any) { 58 for (const [client, filters] of clients) { 59 if (filters.matches(data)) { 60 try { 61 client.send(JSON.stringify(data)); 62 } catch { 63 clients.delete(client); 64 } 65 } 66 } 67} 68 69const handler = (req: Request, connInfo: Deno.ConnInfo): Response => { 70 const url = new URL(req.url); 71 72 if (url.pathname === "/subscribe") { 73 const { socket, response } = Deno.upgradeWebSocket(req); 74 75 const filters = SubscribeFilters.fromSearchParams(url.searchParams); 76 77 clients.set(socket, filters); 78 79 const forwarded = req.headers.get("X-Forwarded-For"); 80 const ip = forwarded 81 ? forwarded.split(",")[0] 82 : (connInfo.remoteAddr as any).hostname; 83 84 socket.onclose = () => { 85 stats.totalClients--; 86 if(stats.totalClients < 0) stats.totalClients = 0; // HACK 87 clients.delete(socket); 88 printSocketMessage(true, ip, filters); 89 }; 90 91 stats.totalClients++; 92 printSocketMessage(false, ip, filters); 93 94 return response; 95 } 96 97 return new Response(printLandingHtml(), { 98 headers: { "content-type": "text/html; charset=utf-8" }, 99 status: 200, 100 }); 101}; 102 103if(parsedArgs["help"] == true) { 104 printHelpText(); 105 Deno.exit(); 106} 107 108Deno.serve( 109 { port: args.listenPort, hostname: args.listenHost }, 110 handler, 111); 112 113jetstream.onmessage = async (e) => { 114 try { 115 const start = performance.now(); 116 117 const blobs = await parseEvent(e); 118 blobs.forEach(broadcast); 119 120 const duration = performance.now() - start; 121 const samples = ((stats as any).responseSamples || 0) + 1; 122 (stats as any).responseSamples = samples; 123 stats.events.averageProcessTime = 124 ((stats.events.averageProcessTime * (samples - 1)) + duration) / 125 samples; 126 } catch (err) { 127 stats.events.failures++; 128 console.error("Parse error:", err); 129 } 130}; 131 132jetstream.onerror = (e) => console.error("Jetstream error:", e); 133jetstream.onclose = () => console.log("Jetstream closed"); 134 135Deno.addSignalListener("SIGINT", () => { 136 console.log("Shutting down..."); 137 jetstream.close(); 138 Deno.exit(0); 139});