forked from
pds.ls/pdsls
atmosphere explorer
1import { localDateFromTimestamp } from "../../utils/date";
2
3export type StreamType = "jetstream" | "firehose" | "spacedust";
4
5export type FormField = {
6 name: string;
7 label: string;
8 type: "text" | "textarea" | "checkbox";
9 placeholder?: string;
10 searchParam: string;
11};
12
13export type RecordInfo = {
14 type: string;
15 did?: string;
16 collection?: string;
17 rkey?: string;
18 action?: string;
19 time?: string;
20};
21
22export type StreamConfig = {
23 label: string;
24 description: string;
25 icon: string;
26 defaultInstance: string;
27 fields: FormField[];
28 useFirehoseLib: boolean;
29 buildUrl: (instance: string, formData: FormData) => string;
30 parseRecord: (record: any) => RecordInfo;
31 showEventTypes: boolean;
32 collectionsLabel: string;
33};
34
35export const STREAM_CONFIGS: Record<StreamType, StreamConfig> = {
36 jetstream: {
37 label: "Jetstream",
38 description: "A simplified event stream with support for collection and DID filtering.",
39 icon: "lucide--radio-tower",
40 defaultInstance: "wss://jetstream1.us-east.bsky.network/subscribe",
41 useFirehoseLib: false,
42 showEventTypes: true,
43 collectionsLabel: "Top Collections",
44 fields: [
45 {
46 name: "collections",
47 label: "Collections",
48 type: "textarea",
49 placeholder: "Comma-separated list of collections",
50 searchParam: "collections",
51 },
52 {
53 name: "dids",
54 label: "DIDs",
55 type: "textarea",
56 placeholder: "Comma-separated list of DIDs",
57 searchParam: "dids",
58 },
59 {
60 name: "cursor",
61 label: "Cursor",
62 type: "text",
63 placeholder: "Leave empty for live-tail",
64 searchParam: "cursor",
65 },
66 {
67 name: "allEvents",
68 label: "Show account and identity events",
69 type: "checkbox",
70 searchParam: "allEvents",
71 },
72 ],
73 buildUrl: (instance, formData) => {
74 let url = instance + "?";
75
76 const collections = formData.get("collections")?.toString().split(",");
77 collections?.forEach((c) => {
78 if (c.trim().length) url += `wantedCollections=${c.trim()}&`;
79 });
80
81 const dids = formData.get("dids")?.toString().split(",");
82 dids?.forEach((d) => {
83 if (d.trim().length) url += `wantedDids=${d.trim()}&`;
84 });
85
86 const cursor = formData.get("cursor")?.toString();
87 if (cursor?.length) url += `cursor=${cursor}&`;
88
89 return url.replace(/[&?]$/, "");
90 },
91 parseRecord: (rec) => {
92 const collection = rec.commit?.collection || rec.kind;
93 const rkey = rec.commit?.rkey;
94 const action = rec.commit?.operation;
95 const time = rec.time_us ? localDateFromTimestamp(rec.time_us / 1000) : undefined;
96 return { type: rec.kind, did: rec.did, collection, rkey, action, time };
97 },
98 },
99
100 firehose: {
101 label: "Firehose",
102 description: "The raw event stream from a relay or PDS.",
103 icon: "lucide--rss",
104 defaultInstance: "wss://bsky.network",
105 useFirehoseLib: true,
106 showEventTypes: true,
107 collectionsLabel: "Top Collections",
108 fields: [
109 {
110 name: "cursor",
111 label: "Cursor",
112 type: "text",
113 placeholder: "Leave empty for live-tail",
114 searchParam: "cursor",
115 },
116 ],
117 buildUrl: (instance, _formData) => {
118 let url = instance;
119 url = url.replace("/xrpc/com.atproto.sync.subscribeRepos", "");
120 if (!(url.startsWith("wss://") || url.startsWith("ws://"))) {
121 url = "wss://" + url;
122 }
123 return url;
124 },
125 parseRecord: (rec) => {
126 const type = rec.$type?.split("#").pop() || rec.$type;
127 const did = rec.repo ?? rec.did;
128 const pathParts = rec.op?.path?.split("/") || [];
129 const collection = pathParts[0];
130 const rkey = pathParts[1];
131 const time = rec.time ? localDateFromTimestamp(Date.parse(rec.time)) : undefined;
132 return { type, did, collection, rkey, action: rec.op?.action, time };
133 },
134 },
135
136 spacedust: {
137 label: "Spacedust",
138 description: "A stream of links showing interactions across the network.",
139 icon: "lucide--link",
140 defaultInstance: "wss://spacedust.microcosm.blue/subscribe",
141 useFirehoseLib: false,
142 showEventTypes: false,
143 collectionsLabel: "Top Sources",
144 fields: [
145 {
146 name: "sources",
147 label: "Sources",
148 type: "textarea",
149 placeholder: "e.g. app.bsky.graph.follow:subject",
150 searchParam: "sources",
151 },
152 {
153 name: "subjectDids",
154 label: "Subject DIDs",
155 type: "textarea",
156 placeholder: "Comma-separated list of DIDs",
157 searchParam: "subjectDids",
158 },
159 {
160 name: "subjects",
161 label: "Subjects",
162 type: "textarea",
163 placeholder: "Comma-separated list of AT URIs",
164 searchParam: "subjects",
165 },
166 {
167 name: "instant",
168 label: "Instant mode (bypass 21s delay buffer)",
169 type: "checkbox",
170 searchParam: "instant",
171 },
172 ],
173 buildUrl: (instance, formData) => {
174 let url = instance + "?";
175
176 const sources = formData.get("sources")?.toString().split(",");
177 sources?.forEach((s) => {
178 if (s.trim().length) url += `wantedSources=${s.trim()}&`;
179 });
180
181 const subjectDids = formData.get("subjectDids")?.toString().split(",");
182 subjectDids?.forEach((d) => {
183 if (d.trim().length) url += `wantedSubjectDids=${d.trim()}&`;
184 });
185
186 const subjects = formData.get("subjects")?.toString().split(",");
187 subjects?.forEach((s) => {
188 if (s.trim().length) url += `wantedSubjects=${encodeURIComponent(s.trim())}&`;
189 });
190
191 const instant = formData.get("instant")?.toString();
192 if (instant === "on") url += `instant=true&`;
193
194 return url.replace(/[&?]$/, "");
195 },
196 parseRecord: (rec) => {
197 const source = rec.link?.source;
198 const sourceRecord = rec.link?.source_record;
199 const uriParts = sourceRecord?.replace("at://", "").split("/") || [];
200 const did = uriParts[0];
201 const collection = uriParts[1];
202 const rkey = uriParts[2];
203 return {
204 type: rec.kind,
205 did,
206 collection: source || collection,
207 rkey,
208 action: rec.link?.operation,
209 time: undefined,
210 };
211 },
212 },
213};
214
215export const STREAM_TYPES = Object.keys(STREAM_CONFIGS) as StreamType[];
216
217export const getStreamType = (pathname: string): StreamType => {
218 if (pathname === "/firehose") return "firehose";
219 if (pathname === "/spacedust") return "spacedust";
220 return "jetstream";
221};