📂🌊 An event stream of ATProto blobs
blobstream.zio.blue
atproto
deno
jetstream
1import { parseArgs } from "jsr:@std/cli/parse-args";
2import { args, stats } from "./store.ts";
3import Meta from "./meta.ts";
4import SubscribeFilters from "./classes/SubscribeFilters.ts";
5import { parseEvent, printLandingHtml, printSocketMessage } from "./services.ts";
6
7const parsedArgs = parseArgs(Deno.args, {
8 string: [
9 "cache-did",
10 "endpoint-jetstream",
11 "endpoint-plcdir",
12 "listen-host",
13 "listen-port",
14 ],
15 boolean: [
16 "disable-associated-extracting",
17 "disable-pds-resolving",
18 "help"
19 ],
20 default: {
21 "cache-did": 10000,
22 "endpoint-jetstream": "wss://jetstream2.us-east.bsky.network/subscribe",
23 "endpoint-plcdir": "https://plc.zio.blue",
24 "listen-host": "0.0.0.0",
25 "listen-port": 8080,
26 },
27});
28
29args.cacheDid = parseInt(parsedArgs["cache-did"] as string);
30args.disableAssociatedExtracting = parsedArgs["disable-associated-extracting"];
31args.disablePdsResolving = parsedArgs["disable-pds-resolving"];
32args.endpointJetstream = parsedArgs["endpoint-jetstream"];
33args.endpointPlcdir = parsedArgs["endpoint-plcdir"];
34args.listenHost = parsedArgs["listen-host"];
35args.listenPort = parseInt(parsedArgs["listen-port"] as string);
36
37const clients = new Map<WebSocket, SubscribeFilters>();
38const jetstream = new WebSocket(args.endpointJetstream);
39
40stats.startTime = new Date();
41
42function printHelpText() {
43 console.log(`Blobstream | 📂🌊
44An event stream of ATProto blobs
45
46 Version ${Meta.version}
47 (c) ${Meta.copyrightYear} zio <${Meta.forgeUrl}>
48 Licensed as MIT License ✨
49
50 🦋 Follow ${Meta.authorDid} on the ATmosphere
51 ↳ Bluesky: https://bsky.app/profile/${Meta.authorDid}
52 ↳ Tangled: https://tangled.sh/${Meta.authorDid}
53
54(Todo)`);
55}
56
57function broadcast(data: any) {
58 for (const [client, filters] of clients) {
59 if (filters.matches(data)) {
60 try {
61 client.send(JSON.stringify(data));
62 } catch {
63 clients.delete(client);
64 }
65 }
66 }
67}
68
69const handler = (req: Request, connInfo: Deno.ConnInfo): Response => {
70 const url = new URL(req.url);
71
72 if (url.pathname === "/subscribe") {
73 const { socket, response } = Deno.upgradeWebSocket(req);
74
75 const filters = SubscribeFilters.fromSearchParams(url.searchParams);
76
77 clients.set(socket, filters);
78
79 const forwarded = req.headers.get("X-Forwarded-For");
80 const ip = forwarded
81 ? forwarded.split(",")[0]
82 : (connInfo.remoteAddr as any).hostname;
83
84 socket.onclose = () => {
85 stats.totalClients--;
86 if(stats.totalClients < 0) stats.totalClients = 0; // HACK
87 clients.delete(socket);
88 printSocketMessage(true, ip, filters);
89 };
90
91 stats.totalClients++;
92 printSocketMessage(false, ip, filters);
93
94 return response;
95 }
96
97 return new Response(printLandingHtml(), {
98 headers: { "content-type": "text/html; charset=utf-8" },
99 status: 200,
100 });
101};
102
103if(parsedArgs["help"] == true) {
104 printHelpText();
105 Deno.exit();
106}
107
108Deno.serve(
109 { port: args.listenPort, hostname: args.listenHost },
110 handler,
111);
112
113jetstream.onmessage = async (e) => {
114 try {
115 const start = performance.now();
116
117 const blobs = await parseEvent(e);
118 blobs.forEach(broadcast);
119
120 const duration = performance.now() - start;
121 const samples = ((stats as any).responseSamples || 0) + 1;
122 (stats as any).responseSamples = samples;
123 stats.events.averageProcessTime =
124 ((stats.events.averageProcessTime * (samples - 1)) + duration) /
125 samples;
126 } catch (err) {
127 stats.events.failures++;
128 console.error("Parse error:", err);
129 }
130};
131
132jetstream.onerror = (e) => console.error("Jetstream error:", e);
133jetstream.onclose = () => console.log("Jetstream closed");
134
135Deno.addSignalListener("SIGINT", () => {
136 console.log("Shutting down...");
137 jetstream.close();
138 Deno.exit(0);
139});