forked from
pds.ls/pdsls
atproto explorer
1import { Firehose } from "@skyware/firehose";
2import { A, useLocation, useSearchParams } from "@solidjs/router";
3import { createSignal, For, onCleanup, onMount, Show } from "solid-js";
4import { Button } from "../components/button";
5import { JSONValue } from "../components/json";
6import { StickyOverlay } from "../components/sticky";
7import { TextInput } from "../components/text-input";
8
9const LIMIT = 25;
10type Parameter = { name: string; param: string | string[] | undefined };
11
12const StreamView = () => {
13 const [searchParams, setSearchParams] = useSearchParams();
14 const [parameters, setParameters] = createSignal<Parameter[]>([]);
15 const streamType = useLocation().pathname === "/firehose" ? "firehose" : "jetstream";
16 const [records, setRecords] = createSignal<Array<any>>([]);
17 const [connected, setConnected] = createSignal(false);
18 const [notice, setNotice] = createSignal("");
19 let socket: WebSocket;
20 let firehose: Firehose;
21 let formRef!: HTMLFormElement;
22
23 const connectSocket = async (formData: FormData) => {
24 setNotice("");
25 if (connected()) {
26 if (streamType === "jetstream") socket?.close();
27 else firehose?.close();
28 setConnected(false);
29 return;
30 }
31 setRecords([]);
32
33 let url = "";
34 if (streamType === "jetstream") {
35 url =
36 formData.get("instance")?.toString() ?? "wss://jetstream1.us-east.bsky.network/subscribe";
37 url = url.concat("?");
38 } else {
39 url = formData.get("instance")?.toString() ?? "wss://bsky.network";
40 }
41
42 const collections = formData.get("collections")?.toString().split(",");
43 collections?.forEach((collection) => {
44 if (collection.length) url = url.concat(`wantedCollections=${collection}&`);
45 });
46
47 const dids = formData.get("dids")?.toString().split(",");
48 dids?.forEach((did) => {
49 if (did.length) url = url.concat(`wantedDids=${did}&`);
50 });
51
52 const cursor = formData.get("cursor")?.toString();
53 if (streamType === "jetstream") {
54 if (cursor?.length) url = url.concat(`cursor=${cursor}`);
55 if (url.endsWith("&")) url = url.slice(0, -1);
56 }
57
58 setSearchParams({
59 instance: formData.get("instance")?.toString(),
60 collections: formData.get("collections")?.toString(),
61 dids: formData.get("dids")?.toString(),
62 cursor: formData.get("cursor")?.toString(),
63 allEvents: formData.get("allEvents")?.toString(),
64 });
65
66 setParameters([
67 { name: "Instance", param: formData.get("instance")?.toString() },
68 { name: "Collections", param: formData.get("collections")?.toString() },
69 { name: "DIDs", param: formData.get("dids")?.toString() },
70 { name: "Cursor", param: formData.get("cursor")?.toString() },
71 { name: "All Events", param: formData.get("allEvents")?.toString() },
72 ]);
73
74 setConnected(true);
75 if (streamType === "jetstream") {
76 socket = new WebSocket(url);
77 socket.addEventListener("message", (event) => {
78 const rec = JSON.parse(event.data);
79 if (searchParams.allEvents === "on" || (rec.kind !== "account" && rec.kind !== "identity"))
80 setRecords(records().concat(rec).slice(-LIMIT));
81 });
82 socket.addEventListener("error", () => {
83 setNotice("Connection error");
84 setConnected(false);
85 });
86 } else {
87 firehose = new Firehose({
88 relay: url,
89 cursor: cursor,
90 autoReconnect: false,
91 });
92 firehose.on("error", (err) => {
93 console.error(err);
94 });
95 firehose.on("commit", (commit) => {
96 for (const op of commit.ops) {
97 const record = {
98 $type: commit.$type,
99 repo: commit.repo,
100 seq: commit.seq,
101 time: commit.time,
102 rev: commit.rev,
103 since: commit.since,
104 op: op,
105 };
106 setRecords(records().concat(record).slice(-LIMIT));
107 }
108 });
109 firehose.on("identity", (identity) => {
110 setRecords(records().concat(identity).slice(-LIMIT));
111 });
112 firehose.on("account", (account) => {
113 setRecords(records().concat(account).slice(-LIMIT));
114 });
115 firehose.on("sync", (sync) => {
116 const event = {
117 $type: sync.$type,
118 did: sync.did,
119 rev: sync.rev,
120 seq: sync.seq,
121 time: sync.time,
122 };
123 setRecords(records().concat(event).slice(-LIMIT));
124 });
125 firehose.start();
126 }
127 };
128
129 onMount(async () => {
130 const formData = new FormData();
131 if (searchParams.instance) formData.append("instance", searchParams.instance.toString());
132 if (searchParams.collections)
133 formData.append("collections", searchParams.collections.toString());
134 if (searchParams.dids) formData.append("dids", searchParams.dids.toString());
135 if (searchParams.cursor) formData.append("cursor", searchParams.cursor.toString());
136 if (searchParams.allEvents) formData.append("allEvents", searchParams.allEvents.toString());
137 if (searchParams.instance) connectSocket(formData);
138 });
139
140 onCleanup(() => socket?.close());
141
142 return (
143 <div class="flex w-full flex-col items-center">
144 <div class="flex gap-2 text-sm">
145 <A
146 class="flex items-center gap-1 border-b-2 p-1"
147 inactiveClass="border-transparent hover:border-neutral-400 dark:hover:border-neutral-600"
148 href="/jetstream"
149 >
150 <span class="iconify lucide--radio-tower"></span>
151 Jetstream
152 </A>
153 <A
154 class="flex items-center gap-1 border-b-2 p-1"
155 inactiveClass="border-transparent hover:border-neutral-400 dark:hover:border-neutral-600"
156 href="/firehose"
157 >
158 <span class="iconify lucide--waves"></span>
159 Firehose
160 </A>
161 </div>
162 <StickyOverlay>
163 <form ref={formRef} class="flex w-full flex-col gap-1 text-sm">
164 <Show when={!connected()}>
165 <label class="flex items-center justify-end gap-x-1">
166 <span class="min-w-[5rem]">Instance</span>
167 <TextInput
168 name="instance"
169 value={
170 searchParams.instance ??
171 (streamType === "jetstream" ?
172 "wss://jetstream1.us-east.bsky.network/subscribe"
173 : "wss://bsky.network")
174 }
175 class="grow"
176 />
177 </label>
178 <Show when={streamType === "jetstream"}>
179 <label class="flex items-center justify-end gap-x-1">
180 <span class="min-w-[5rem]">Collections</span>
181 <textarea
182 name="collections"
183 spellcheck={false}
184 placeholder="Comma-separated list of collections"
185 value={searchParams.collections ?? ""}
186 class="dark:bg-dark-100 dark:shadow-dark-800 grow rounded-lg border-[0.5px] border-neutral-300 bg-white px-2 py-1 shadow-xs focus:outline-[1px] focus:outline-neutral-900 dark:border-neutral-700 dark:focus:outline-neutral-200"
187 />
188 </label>
189 </Show>
190 <Show when={streamType === "jetstream"}>
191 <label class="flex items-center justify-end gap-x-1">
192 <span class="min-w-[5rem]">DIDs</span>
193 <textarea
194 name="dids"
195 spellcheck={false}
196 placeholder="Comma-separated list of DIDs"
197 value={searchParams.dids ?? ""}
198 class="dark:bg-dark-100 dark:shadow-dark-800 grow rounded-lg border-[0.5px] border-neutral-300 bg-white px-2 py-1 shadow-xs focus:outline-[1px] focus:outline-neutral-900 dark:border-neutral-700 dark:focus:outline-neutral-200"
199 />
200 </label>
201 </Show>
202 <label class="flex items-center justify-end gap-x-1">
203 <span class="min-w-[5rem]">Cursor</span>
204 <TextInput
205 name="cursor"
206 placeholder="Leave empty for live-tail"
207 value={searchParams.cursor ?? ""}
208 class="grow"
209 />
210 </label>
211 <Show when={streamType === "jetstream"}>
212 <div class="flex items-center justify-end gap-x-1">
213 <input
214 type="checkbox"
215 name="allEvents"
216 id="allEvents"
217 checked={searchParams.allEvents === "on" ? true : false}
218 />
219 <label for="allEvents" class="select-none">
220 Show account and identity events
221 </label>
222 </div>
223 </Show>
224 </Show>
225 <Show when={connected()}>
226 <div class="flex flex-col gap-1 wrap-anywhere">
227 <For each={parameters()}>
228 {(param) => (
229 <Show when={param.param}>
230 <div class="flex">
231 <div class="min-w-[6rem] font-semibold">{param.name}</div>
232 {param.param}
233 </div>
234 </Show>
235 )}
236 </For>
237 </div>
238 </Show>
239 <div class="flex justify-end">
240 <Button onClick={() => connectSocket(new FormData(formRef))}>
241 {connected() ? "Disconnect" : "Connect"}
242 </Button>
243 </div>
244 </form>
245 </StickyOverlay>
246 <Show when={notice().length}>
247 <div class="text-red-500 dark:text-red-400">{notice()}</div>
248 </Show>
249 <div class="flex w-full flex-col gap-2 divide-y-[0.5px] divide-neutral-500 font-mono text-sm wrap-anywhere whitespace-pre-wrap md:w-[48rem]">
250 <For each={records().toReversed()}>
251 {(rec) => (
252 <div class="pb-2">
253 <JSONValue data={rec} repo={rec.did ?? rec.repo} />
254 </div>
255 )}
256 </For>
257 </div>
258 </div>
259 );
260};
261
262export { StreamView };