tangled
alpha
login
or
join now
bas.sh
/
pdsls
forked from
pds.ls/pdsls
0
fork
atom
atmosphere explorer
0
fork
atom
overview
issues
pulls
pipelines
add stardust streaming
handle.invalid
1 month ago
a7f73a85
143d88d2
verified
This commit was signed with the committer's
known signature
.
handle.invalid
SSH Key Fingerprint:
SHA256:mBrT4x0JdzLpbVR95g1hjI1aaErfC02kmLRkPXwsYCk=
+386
-219
4 changed files
expand all
collapse all
unified
split
src
index.tsx
views
stream
config.ts
index.tsx
stats.tsx
+1
-1
src/index.tsx
···
19
19
() => (
20
20
<Router root={Layout}>
21
21
<Route path="/" component={Home} />
22
22
-
<Route path={["/jetstream", "/firehose"]} component={StreamView} />
22
22
+
<Route path={["/jetstream", "/firehose", "/spacedust"]} component={StreamView} />
23
23
<Route path="/labels" component={LabelView} />
24
24
<Route path="/car" component={CarView} />
25
25
<Route path="/car/explore" component={ExploreToolView} />
+217
src/views/stream/config.ts
···
1
1
+
import { localDateFromTimestamp } from "../../utils/date";
2
2
+
3
3
+
export type StreamType = "jetstream" | "firehose" | "spacedust";
4
4
+
5
5
+
export type FormField = {
6
6
+
name: string;
7
7
+
label: string;
8
8
+
type: "text" | "textarea" | "checkbox";
9
9
+
placeholder?: string;
10
10
+
searchParam: string;
11
11
+
};
12
12
+
13
13
+
export type RecordInfo = {
14
14
+
type: string;
15
15
+
did?: string;
16
16
+
collection?: string;
17
17
+
rkey?: string;
18
18
+
action?: string;
19
19
+
time?: string;
20
20
+
};
21
21
+
22
22
+
export type StreamConfig = {
23
23
+
label: string;
24
24
+
icon: string;
25
25
+
defaultInstance: string;
26
26
+
fields: FormField[];
27
27
+
useFirehoseLib: boolean;
28
28
+
buildUrl: (instance: string, formData: FormData) => string;
29
29
+
parseRecord: (record: any) => RecordInfo;
30
30
+
showEventTypes: boolean;
31
31
+
collectionsLabel: string;
32
32
+
};
33
33
+
34
34
+
export const STREAM_CONFIGS: Record<StreamType, StreamConfig> = {
35
35
+
jetstream: {
36
36
+
label: "Jetstream",
37
37
+
icon: "lucide--radio-tower",
38
38
+
defaultInstance: "wss://jetstream1.us-east.bsky.network/subscribe",
39
39
+
useFirehoseLib: false,
40
40
+
showEventTypes: true,
41
41
+
collectionsLabel: "Top Collections",
42
42
+
fields: [
43
43
+
{
44
44
+
name: "collections",
45
45
+
label: "Collections",
46
46
+
type: "textarea",
47
47
+
placeholder: "Comma-separated list of collections",
48
48
+
searchParam: "collections",
49
49
+
},
50
50
+
{
51
51
+
name: "dids",
52
52
+
label: "DIDs",
53
53
+
type: "textarea",
54
54
+
placeholder: "Comma-separated list of DIDs",
55
55
+
searchParam: "dids",
56
56
+
},
57
57
+
{
58
58
+
name: "cursor",
59
59
+
label: "Cursor",
60
60
+
type: "text",
61
61
+
placeholder: "Leave empty for live-tail",
62
62
+
searchParam: "cursor",
63
63
+
},
64
64
+
{
65
65
+
name: "allEvents",
66
66
+
label: "Show account and identity events",
67
67
+
type: "checkbox",
68
68
+
searchParam: "allEvents",
69
69
+
},
70
70
+
],
71
71
+
buildUrl: (instance, formData) => {
72
72
+
let url = instance + "?";
73
73
+
74
74
+
const collections = formData.get("collections")?.toString().split(",");
75
75
+
collections?.forEach((c) => {
76
76
+
if (c.trim().length) url += `wantedCollections=${c.trim()}&`;
77
77
+
});
78
78
+
79
79
+
const dids = formData.get("dids")?.toString().split(",");
80
80
+
dids?.forEach((d) => {
81
81
+
if (d.trim().length) url += `wantedDids=${d.trim()}&`;
82
82
+
});
83
83
+
84
84
+
const cursor = formData.get("cursor")?.toString();
85
85
+
if (cursor?.length) url += `cursor=${cursor}&`;
86
86
+
87
87
+
return url.replace(/[&?]$/, "");
88
88
+
},
89
89
+
parseRecord: (rec) => {
90
90
+
const collection = rec.commit?.collection || rec.kind;
91
91
+
const rkey = rec.commit?.rkey;
92
92
+
const action = rec.commit?.operation;
93
93
+
const time = rec.time_us ? localDateFromTimestamp(rec.time_us / 1000) : undefined;
94
94
+
return { type: rec.kind, did: rec.did, collection, rkey, action, time };
95
95
+
},
96
96
+
},
97
97
+
98
98
+
firehose: {
99
99
+
label: "Firehose",
100
100
+
icon: "lucide--rss",
101
101
+
defaultInstance: "wss://bsky.network",
102
102
+
useFirehoseLib: true,
103
103
+
showEventTypes: true,
104
104
+
collectionsLabel: "Top Collections",
105
105
+
fields: [
106
106
+
{
107
107
+
name: "cursor",
108
108
+
label: "Cursor",
109
109
+
type: "text",
110
110
+
placeholder: "Leave empty for live-tail",
111
111
+
searchParam: "cursor",
112
112
+
},
113
113
+
],
114
114
+
buildUrl: (instance, _formData) => {
115
115
+
let url = instance;
116
116
+
url = url.replace("/xrpc/com.atproto.sync.subscribeRepos", "");
117
117
+
if (!(url.startsWith("wss://") || url.startsWith("ws://"))) {
118
118
+
url = "wss://" + url;
119
119
+
}
120
120
+
return url;
121
121
+
},
122
122
+
parseRecord: (rec) => {
123
123
+
const type = rec.$type?.split("#").pop() || rec.$type;
124
124
+
const did = rec.repo ?? rec.did;
125
125
+
const pathParts = rec.op?.path?.split("/") || [];
126
126
+
const collection = pathParts[0];
127
127
+
const rkey = pathParts[1];
128
128
+
const time = rec.time ? localDateFromTimestamp(Date.parse(rec.time)) : undefined;
129
129
+
return { type, did, collection, rkey, action: rec.op?.action, time };
130
130
+
},
131
131
+
},
132
132
+
133
133
+
spacedust: {
134
134
+
label: "Spacedust",
135
135
+
icon: "lucide--link",
136
136
+
defaultInstance: "wss://spacedust.microcosm.blue/subscribe",
137
137
+
useFirehoseLib: false,
138
138
+
showEventTypes: false,
139
139
+
collectionsLabel: "Top Sources",
140
140
+
fields: [
141
141
+
{
142
142
+
name: "sources",
143
143
+
label: "Sources",
144
144
+
type: "textarea",
145
145
+
placeholder: "e.g. app.bsky.graph.follow:subject",
146
146
+
searchParam: "sources",
147
147
+
},
148
148
+
{
149
149
+
name: "subjectDids",
150
150
+
label: "Subject DIDs",
151
151
+
type: "textarea",
152
152
+
placeholder: "Comma-separated list of DIDs",
153
153
+
searchParam: "subjectDids",
154
154
+
},
155
155
+
{
156
156
+
name: "subjects",
157
157
+
label: "Subjects",
158
158
+
type: "textarea",
159
159
+
placeholder: "Comma-separated list of AT URIs",
160
160
+
searchParam: "subjects",
161
161
+
},
162
162
+
{
163
163
+
name: "instant",
164
164
+
label: "Instant mode (bypass 21s delay buffer)",
165
165
+
type: "checkbox",
166
166
+
searchParam: "instant",
167
167
+
},
168
168
+
],
169
169
+
buildUrl: (instance, formData) => {
170
170
+
let url = instance + "?";
171
171
+
172
172
+
const sources = formData.get("sources")?.toString().split(",");
173
173
+
sources?.forEach((s) => {
174
174
+
if (s.trim().length) url += `wantedSources=${s.trim()}&`;
175
175
+
});
176
176
+
177
177
+
const subjectDids = formData.get("subjectDids")?.toString().split(",");
178
178
+
subjectDids?.forEach((d) => {
179
179
+
if (d.trim().length) url += `wantedSubjectDids=${d.trim()}&`;
180
180
+
});
181
181
+
182
182
+
const subjects = formData.get("subjects")?.toString().split(",");
183
183
+
subjects?.forEach((s) => {
184
184
+
if (s.trim().length) url += `wantedSubjects=${encodeURIComponent(s.trim())}&`;
185
185
+
});
186
186
+
187
187
+
const instant = formData.get("instant")?.toString();
188
188
+
if (instant === "on") url += `instant=true&`;
189
189
+
190
190
+
return url.replace(/[&?]$/, "");
191
191
+
},
192
192
+
parseRecord: (rec) => {
193
193
+
const source = rec.link?.source;
194
194
+
const sourceRecord = rec.link?.source_record;
195
195
+
const uriParts = sourceRecord?.replace("at://", "").split("/") || [];
196
196
+
const did = uriParts[0];
197
197
+
const collection = uriParts[1];
198
198
+
const rkey = uriParts[2];
199
199
+
return {
200
200
+
type: rec.kind,
201
201
+
did,
202
202
+
collection: source || collection,
203
203
+
rkey,
204
204
+
action: rec.link?.operation,
205
205
+
time: undefined,
206
206
+
};
207
207
+
},
208
208
+
},
209
209
+
};
210
210
+
211
211
+
export const STREAM_TYPES = Object.keys(STREAM_CONFIGS) as StreamType[];
212
212
+
213
213
+
export const getStreamType = (pathname: string): StreamType => {
214
214
+
if (pathname === "/firehose") return "firehose";
215
215
+
if (pathname === "/spacedust") return "spacedust";
216
216
+
return "jetstream";
217
217
+
};
+154
-214
src/views/stream/index.tsx
···
7
7
import { JSONValue } from "../../components/json";
8
8
import { TextInput } from "../../components/text-input";
9
9
import { addToClipboard } from "../../utils/copy";
10
10
-
import { localDateFromTimestamp } from "../../utils/date";
10
10
+
import { getStreamType, STREAM_CONFIGS, STREAM_TYPES, StreamType } from "./config";
11
11
import { StreamStats, StreamStatsPanel } from "./stats";
12
12
13
13
const LIMIT = 20;
14
14
-
type Parameter = { name: string; param: string | string[] | undefined };
15
15
-
16
16
-
const StreamRecordItem = (props: { record: any; streamType: "jetstream" | "firehose" }) => {
17
17
-
const [expanded, setExpanded] = createSignal(false);
18
14
19
19
-
const getBasicInfo = () => {
20
20
-
const rec = props.record;
21
21
-
if (props.streamType === "jetstream") {
22
22
-
const collection = rec.commit?.collection || rec.kind;
23
23
-
const rkey = rec.commit?.rkey;
24
24
-
const action = rec.commit?.operation;
25
25
-
const time = rec.time_us ? localDateFromTimestamp(rec.time_us / 1000) : undefined;
26
26
-
return { type: rec.kind, did: rec.did, collection, rkey, action, time };
27
27
-
} else {
28
28
-
const type = rec.$type?.split("#").pop() || rec.$type;
29
29
-
const did = rec.repo ?? rec.did;
30
30
-
const pathParts = rec.op?.path?.split("/") || [];
31
31
-
const collection = pathParts[0];
32
32
-
const rkey = pathParts[1];
33
33
-
const time = rec.time ? localDateFromTimestamp(Date.parse(rec.time)) : undefined;
34
34
-
return { type, did, collection, rkey, action: rec.op?.action, time };
35
35
-
}
36
36
-
};
15
15
+
const TYPE_COLORS: Record<string, string> = {
16
16
+
create: "bg-green-100 text-green-700 dark:bg-green-900/30 dark:text-green-300",
17
17
+
update: "bg-orange-100 text-orange-700 dark:bg-orange-900/30 dark:text-orange-300",
18
18
+
delete: "bg-red-100 text-red-700 dark:bg-red-900/30 dark:text-red-300",
19
19
+
identity: "bg-purple-100 text-purple-700 dark:bg-purple-900/30 dark:text-purple-300",
20
20
+
account: "bg-blue-100 text-blue-700 dark:bg-blue-900/30 dark:text-blue-300",
21
21
+
sync: "bg-pink-100 text-pink-700 dark:bg-pink-900/30 dark:text-pink-300",
22
22
+
};
37
23
38
38
-
const info = getBasicInfo();
24
24
+
const StreamRecordItem = (props: { record: any; streamType: StreamType }) => {
25
25
+
const [expanded, setExpanded] = createSignal(false);
26
26
+
const config = () => STREAM_CONFIGS[props.streamType];
27
27
+
const info = () => config().parseRecord(props.record);
39
28
40
40
-
const typeColors: Record<string, string> = {
41
41
-
create: "bg-green-100 text-green-700 dark:bg-green-900/30 dark:text-green-300",
42
42
-
update: "bg-orange-100 text-orange-700 dark:bg-orange-900/30 dark:text-orange-300",
43
43
-
delete: "bg-red-100 text-red-700 dark:bg-red-900/30 dark:text-red-300",
44
44
-
identity: "bg-purple-100 text-purple-700 dark:bg-purple-900/30 dark:text-purple-300",
45
45
-
account: "bg-blue-100 text-blue-700 dark:bg-blue-900/30 dark:text-blue-300",
46
46
-
sync: "bg-pink-100 text-pink-700 dark:bg-pink-900/30 dark:text-pink-300",
29
29
+
const displayType = () => {
30
30
+
const i = info();
31
31
+
return i.type === "commit" || i.type === "link" ? i.action : i.type;
47
32
};
48
33
49
34
const copyRecord = (e: MouseEvent) => {
···
65
50
: <span class="iconify lucide--chevron-right"></span>}
66
51
</span>
67
52
<div class="flex min-w-0 flex-1 flex-col gap-0.5">
68
68
-
<div class="flex flex-wrap items-center gap-x-1.5 gap-y-0.5 sm:gap-x-2">
53
53
+
<div class="flex items-center gap-x-1.5 sm:gap-x-2">
69
54
<span
70
70
-
class={`rounded px-1.5 py-0.5 text-xs font-medium ${typeColors[info.type === "commit" ? info.action : info.type] || "bg-neutral-200 text-neutral-700 dark:bg-neutral-700 dark:text-neutral-300"}`}
55
55
+
class={`shrink-0 rounded px-1.5 py-0.5 text-xs font-medium ${TYPE_COLORS[displayType()!] || "bg-neutral-200 text-neutral-700 dark:bg-neutral-700 dark:text-neutral-300"}`}
71
56
>
72
72
-
{info.type === "commit" ? info.action : info.type}
57
57
+
{displayType()}
73
58
</span>
74
74
-
<Show when={info.collection && info.collection !== info.type}>
75
75
-
<span class="text-neutral-600 dark:text-neutral-300">{info.collection}</span>
59
59
+
<Show when={info().collection && info().collection !== info().type}>
60
60
+
<span class="min-w-0 truncate text-neutral-600 dark:text-neutral-300">
61
61
+
{info().collection}
62
62
+
</span>
76
63
</Show>
77
77
-
<Show when={info.rkey}>
78
78
-
<span class="text-neutral-400 dark:text-neutral-500">{info.rkey}</span>
64
64
+
<Show when={info().rkey}>
65
65
+
<span class="shrink-0 text-neutral-400 dark:text-neutral-500">{info().rkey}</span>
79
66
</Show>
80
67
</div>
81
68
<div class="flex flex-col gap-x-2 gap-y-0.5 text-xs text-neutral-500 sm:flex-row sm:items-center dark:text-neutral-400">
82
82
-
<Show when={info.did}>
69
69
+
<Show when={info().did}>
83
70
<span class="w-fit" onclick={(e) => e.stopPropagation()}>
84
84
-
<DidHoverCard newTab did={info.did} />
71
71
+
<DidHoverCard newTab did={info().did!} />
85
72
</span>
86
73
</Show>
87
87
-
<Show when={info.time}>
88
88
-
<span>{info.time}</span>
74
74
+
<Show when={info().time}>
75
75
+
<span>{info().time}</span>
89
76
</Show>
90
77
</div>
91
78
</div>
···
103
90
<Show when={expanded()}>
104
91
<div class="ml-6.5">
105
92
<div class="w-full text-xs wrap-anywhere whitespace-pre-wrap md:w-2xl">
106
106
-
<JSONValue newTab data={props.record} repo={info.did} hideBlobs />
93
93
+
<JSONValue newTab data={props.record} repo={info().did ?? ""} hideBlobs />
107
94
</div>
108
95
</div>
109
96
</Show>
···
111
98
);
112
99
};
113
100
114
114
-
const StreamView = () => {
101
101
+
export const StreamView = () => {
115
102
const [searchParams, setSearchParams] = useSearchParams();
116
116
-
const [parameters, setParameters] = createSignal<Parameter[]>([]);
117
117
-
const streamType = useLocation().pathname === "/firehose" ? "firehose" : "jetstream";
103
103
+
const streamType = getStreamType(useLocation().pathname);
104
104
+
const config = () => STREAM_CONFIGS[streamType];
105
105
+
118
106
const [records, setRecords] = createSignal<any[]>([]);
119
107
const [connected, setConnected] = createSignal(false);
120
108
const [paused, setPaused] = createSignal(false);
121
109
const [notice, setNotice] = createSignal("");
110
110
+
const [parameters, setParameters] = createSignal<{ name: string; value?: string }[]>([]);
122
111
const [stats, setStats] = createSignal<StreamStats>({
123
112
totalEvents: 0,
124
113
eventsPerSecond: 0,
···
126
115
collections: {},
127
116
});
128
117
const [currentTime, setCurrentTime] = createSignal(Date.now());
118
118
+
129
119
let socket: WebSocket;
130
120
let firehose: Firehose;
131
121
let formRef!: HTMLFormElement;
···
133
123
let rafId: number | null = null;
134
124
let statsIntervalId: number | null = null;
135
125
let statsUpdateIntervalId: number | null = null;
136
136
-
let lastSecondEventCount = 0;
137
126
let currentSecondEventCount = 0;
138
138
-
// Track stats in variables for batching
139
127
let totalEventsCount = 0;
140
128
let eventTypesMap: Record<string, number> = {};
141
129
let collectionsMap: Record<string, number> = {};
142
130
143
131
const addRecord = (record: any) => {
144
132
currentSecondEventCount++;
133
133
+
totalEventsCount++;
145
134
146
146
-
// Track statistics in variables (batched update)
147
147
-
totalEventsCount++;
148
148
-
const eventType = record.kind || record.$type || "unknown";
135
135
+
const rawEventType = record.kind || record.$type || "unknown";
136
136
+
const eventType = rawEventType.includes("#") ? rawEventType.split("#").pop() : rawEventType;
149
137
eventTypesMap[eventType] = (eventTypesMap[eventType] || 0) + 1;
138
138
+
150
139
if (eventType !== "account" && eventType !== "identity") {
151
151
-
const collection = record.commit?.collection || record.op?.path?.split("/")[0] || "unknown";
140
140
+
const collection =
141
141
+
record.commit?.collection ||
142
142
+
record.op?.path?.split("/")[0] ||
143
143
+
record.link?.source ||
144
144
+
"unknown";
152
145
collectionsMap[collection] = (collectionsMap[collection] || 0) + 1;
153
146
}
154
147
···
165
158
};
166
159
167
160
const disconnect = () => {
168
168
-
if (streamType === "jetstream") socket?.close();
161
161
+
if (!config().useFirehoseLib) socket?.close();
169
162
else firehose?.close();
163
163
+
170
164
if (rafId !== null) {
171
165
cancelAnimationFrame(rafId);
172
166
rafId = null;
···
179
173
clearInterval(statsUpdateIntervalId);
180
174
statsUpdateIntervalId = null;
181
175
}
176
176
+
182
177
pendingRecords = [];
183
178
totalEventsCount = 0;
184
179
eventTypesMap = {};
185
180
collectionsMap = {};
186
181
setConnected(false);
187
182
setPaused(false);
188
188
-
setStats((prev) => ({
189
189
-
...prev,
190
190
-
eventsPerSecond: 0,
191
191
-
}));
183
183
+
setStats((prev) => ({ ...prev, eventsPerSecond: 0 }));
192
184
};
193
185
194
194
-
const togglePause = () => {
195
195
-
setPaused(!paused());
196
196
-
};
197
197
-
198
198
-
const connectSocket = async (formData: FormData) => {
186
186
+
const connectStream = async (formData: FormData) => {
199
187
setNotice("");
200
188
if (connected()) {
201
189
disconnect();
···
203
191
}
204
192
setRecords([]);
205
193
206
206
-
let url = "";
207
207
-
if (streamType === "jetstream") {
208
208
-
url =
209
209
-
formData.get("instance")?.toString() ?? "wss://jetstream1.us-east.bsky.network/subscribe";
210
210
-
url = url.concat("?");
211
211
-
} else {
212
212
-
url = formData.get("instance")?.toString() ?? "wss://bsky.network";
213
213
-
url = url.replace("/xrpc/com.atproto.sync.subscribeRepos", "");
214
214
-
if (!(url.startsWith("wss://") || url.startsWith("ws://"))) url = "wss://" + url;
215
215
-
}
216
216
-
217
217
-
const collections = formData.get("collections")?.toString().split(",");
218
218
-
collections?.forEach((collection) => {
219
219
-
if (collection.length) url = url.concat(`wantedCollections=${collection}&`);
220
220
-
});
221
221
-
222
222
-
const dids = formData.get("dids")?.toString().split(",");
223
223
-
dids?.forEach((did) => {
224
224
-
if (did.length) url = url.concat(`wantedDids=${did}&`);
225
225
-
});
226
226
-
227
227
-
const cursor = formData.get("cursor")?.toString();
228
228
-
if (streamType === "jetstream") {
229
229
-
if (cursor?.length) url = url.concat(`cursor=${cursor}`);
230
230
-
if (url.endsWith("&")) url = url.slice(0, -1);
231
231
-
}
194
194
+
const instance = formData.get("instance")?.toString() ?? config().defaultInstance;
195
195
+
const url = config().buildUrl(instance, formData);
232
196
233
233
-
setSearchParams({
234
234
-
instance: formData.get("instance")?.toString(),
235
235
-
collections: formData.get("collections")?.toString(),
236
236
-
dids: formData.get("dids")?.toString(),
237
237
-
cursor: formData.get("cursor")?.toString(),
238
238
-
allEvents: formData.get("allEvents")?.toString(),
197
197
+
// Save all form fields to URL params
198
198
+
const params: Record<string, string | undefined> = { instance };
199
199
+
config().fields.forEach((field) => {
200
200
+
params[field.searchParam] = formData.get(field.name)?.toString();
239
201
});
202
202
+
setSearchParams(params);
240
203
204
204
+
// Build parameters display
241
205
setParameters([
242
242
-
{ name: "Instance", param: formData.get("instance")?.toString() },
243
243
-
{ name: "Collections", param: formData.get("collections")?.toString() },
244
244
-
{ name: "DIDs", param: formData.get("dids")?.toString() },
245
245
-
{ name: "Cursor", param: formData.get("cursor")?.toString() },
246
246
-
{ name: "All Events", param: formData.get("allEvents")?.toString() },
206
206
+
{ name: "Instance", value: instance },
207
207
+
...config()
208
208
+
.fields.filter((f) => f.type !== "checkbox")
209
209
+
.map((f) => ({ name: f.label, value: formData.get(f.name)?.toString() })),
210
210
+
...config()
211
211
+
.fields.filter((f) => f.type === "checkbox" && formData.get(f.name) === "on")
212
212
+
.map((f) => ({ name: f.label, value: "on" })),
247
213
]);
248
214
249
215
setConnected(true);
250
216
const now = Date.now();
251
217
setCurrentTime(now);
252
218
253
253
-
// Reset tracking variables
254
219
totalEventsCount = 0;
255
220
eventTypesMap = {};
256
221
collectionsMap = {};
···
272
237
}));
273
238
}, 50);
274
239
275
275
-
// Calculate events/sec every second
276
240
statsIntervalId = window.setInterval(() => {
277
277
-
setStats((prev) => ({
278
278
-
...prev,
279
279
-
eventsPerSecond: currentSecondEventCount,
280
280
-
}));
281
281
-
lastSecondEventCount = currentSecondEventCount;
241
241
+
setStats((prev) => ({ ...prev, eventsPerSecond: currentSecondEventCount }));
282
242
currentSecondEventCount = 0;
283
243
setCurrentTime(Date.now());
284
244
}, 1000);
285
285
-
if (streamType === "jetstream") {
245
245
+
246
246
+
if (!config().useFirehoseLib) {
286
247
socket = new WebSocket(url);
287
248
socket.addEventListener("message", (event) => {
288
249
const rec = JSON.parse(event.data);
289
289
-
if (searchParams.allEvents === "on" || (rec.kind !== "account" && rec.kind !== "identity"))
250
250
+
const isFilteredEvent = rec.kind === "account" || rec.kind === "identity";
251
251
+
if (!isFilteredEvent || streamType !== "jetstream" || searchParams.allEvents === "on")
290
252
addRecord(rec);
291
253
});
292
254
socket.addEventListener("error", () => {
···
294
256
disconnect();
295
257
});
296
258
} else {
259
259
+
const cursor = formData.get("cursor")?.toString();
297
260
firehose = new Firehose({
298
261
relay: url,
299
262
cursor: cursor,
···
307
270
});
308
271
firehose.on("commit", (commit) => {
309
272
for (const op of commit.ops) {
310
310
-
const record = {
273
273
+
addRecord({
311
274
$type: commit.$type,
312
275
repo: commit.repo,
313
276
seq: commit.seq,
···
315
278
rev: commit.rev,
316
279
since: commit.since,
317
280
op: op,
318
318
-
};
319
319
-
addRecord(record);
281
281
+
});
320
282
}
321
283
});
322
322
-
firehose.on("identity", (identity) => {
323
323
-
addRecord(identity);
324
324
-
});
325
325
-
firehose.on("account", (account) => {
326
326
-
addRecord(account);
327
327
-
});
284
284
+
firehose.on("identity", (identity) => addRecord(identity));
285
285
+
firehose.on("account", (account) => addRecord(account));
328
286
firehose.on("sync", (sync) => {
329
329
-
const event = {
287
287
+
addRecord({
330
288
$type: sync.$type,
331
289
did: sync.did,
332
290
rev: sync.rev,
333
291
seq: sync.seq,
334
292
time: sync.time,
335
335
-
};
336
336
-
addRecord(event);
293
293
+
});
337
294
});
338
295
firehose.start();
339
296
}
340
297
};
341
298
342
342
-
onMount(async () => {
343
343
-
const formData = new FormData();
344
344
-
if (searchParams.instance) formData.append("instance", searchParams.instance.toString());
345
345
-
if (searchParams.collections)
346
346
-
formData.append("collections", searchParams.collections.toString());
347
347
-
if (searchParams.dids) formData.append("dids", searchParams.dids.toString());
348
348
-
if (searchParams.cursor) formData.append("cursor", searchParams.cursor.toString());
349
349
-
if (searchParams.allEvents) formData.append("allEvents", searchParams.allEvents.toString());
350
350
-
if (searchParams.instance) connectSocket(formData);
299
299
+
onMount(() => {
300
300
+
if (searchParams.instance) {
301
301
+
const formData = new FormData();
302
302
+
formData.append("instance", searchParams.instance.toString());
303
303
+
config().fields.forEach((field) => {
304
304
+
const value = searchParams[field.searchParam];
305
305
+
if (value) formData.append(field.name, value.toString());
306
306
+
});
307
307
+
connectStream(formData);
308
308
+
}
351
309
});
352
310
353
311
onCleanup(() => {
354
312
socket?.close();
355
355
-
if (rafId !== null) {
356
356
-
cancelAnimationFrame(rafId);
357
357
-
}
358
358
-
if (statsIntervalId !== null) {
359
359
-
clearInterval(statsIntervalId);
360
360
-
}
361
361
-
if (statsUpdateIntervalId !== null) {
362
362
-
clearInterval(statsUpdateIntervalId);
363
363
-
}
313
313
+
firehose?.close();
314
314
+
if (rafId !== null) cancelAnimationFrame(rafId);
315
315
+
if (statsIntervalId !== null) clearInterval(statsIntervalId);
316
316
+
if (statsUpdateIntervalId !== null) clearInterval(statsUpdateIntervalId);
364
317
});
365
318
366
319
return (
367
320
<>
368
368
-
<Title>{streamType === "firehose" ? "Firehose" : "Jetstream"} - PDSls</Title>
321
321
+
<Title>{config().label} - PDSls</Title>
369
322
<div class="flex w-full flex-col items-center gap-2">
323
323
+
{/* Tab Navigation */}
370
324
<div class="flex gap-4 font-medium">
371
371
-
<A
372
372
-
class="flex items-center gap-1 border-b-2"
373
373
-
inactiveClass="border-transparent text-neutral-600 dark:text-neutral-400 hover:border-neutral-400 dark:hover:border-neutral-600"
374
374
-
href="/jetstream"
375
375
-
>
376
376
-
Jetstream
377
377
-
</A>
378
378
-
<A
379
379
-
class="flex items-center gap-1 border-b-2"
380
380
-
inactiveClass="border-transparent text-neutral-600 dark:text-neutral-400 hover:border-neutral-400 dark:hover:border-neutral-600"
381
381
-
href="/firehose"
382
382
-
>
383
383
-
Firehose
384
384
-
</A>
325
325
+
<For each={STREAM_TYPES}>
326
326
+
{(type) => (
327
327
+
<A
328
328
+
class="flex items-center gap-1 border-b-2"
329
329
+
inactiveClass="border-transparent text-neutral-600 dark:text-neutral-400 hover:border-neutral-400 dark:hover:border-neutral-600"
330
330
+
href={`/${type}`}
331
331
+
>
332
332
+
{STREAM_CONFIGS[type].label}
333
333
+
</A>
334
334
+
)}
335
335
+
</For>
385
336
</div>
337
337
+
338
338
+
{/* Connection Form */}
386
339
<Show when={!connected()}>
387
387
-
<form ref={formRef} class="flex w-full flex-col gap-1.5 p-2 text-sm">
340
340
+
<form ref={formRef} class="flex w-full flex-col gap-2 p-2 text-sm">
388
341
<label class="flex items-center justify-end gap-x-1">
389
389
-
<span class="min-w-20">Instance</span>
342
342
+
<span class="min-w-21 select-none">Instance</span>
390
343
<TextInput
391
344
name="instance"
392
392
-
value={
393
393
-
searchParams.instance ??
394
394
-
(streamType === "jetstream" ?
395
395
-
"wss://jetstream1.us-east.bsky.network/subscribe"
396
396
-
: "wss://bsky.network")
397
397
-
}
398
398
-
class="grow"
399
399
-
/>
400
400
-
</label>
401
401
-
<Show when={streamType === "jetstream"}>
402
402
-
<label class="flex items-center justify-end gap-x-1">
403
403
-
<span class="min-w-20">Collections</span>
404
404
-
<textarea
405
405
-
name="collections"
406
406
-
spellcheck={false}
407
407
-
placeholder="Comma-separated list of collections"
408
408
-
value={searchParams.collections ?? ""}
409
409
-
class="dark:bg-dark-100 grow rounded-lg bg-white px-2 py-1 outline-1 outline-neutral-200 focus:outline-[1.5px] focus:outline-neutral-600 dark:outline-neutral-600 dark:focus:outline-neutral-400"
410
410
-
/>
411
411
-
</label>
412
412
-
</Show>
413
413
-
<Show when={streamType === "jetstream"}>
414
414
-
<label class="flex items-center justify-end gap-x-1">
415
415
-
<span class="min-w-20">DIDs</span>
416
416
-
<textarea
417
417
-
name="dids"
418
418
-
spellcheck={false}
419
419
-
placeholder="Comma-separated list of DIDs"
420
420
-
value={searchParams.dids ?? ""}
421
421
-
class="dark:bg-dark-100 grow rounded-lg bg-white px-2 py-1 outline-1 outline-neutral-200 focus:outline-[1.5px] focus:outline-neutral-600 dark:outline-neutral-600 dark:focus:outline-neutral-400"
422
422
-
/>
423
423
-
</label>
424
424
-
</Show>
425
425
-
<label class="flex items-center justify-end gap-x-1">
426
426
-
<span class="min-w-20">Cursor</span>
427
427
-
<TextInput
428
428
-
name="cursor"
429
429
-
placeholder="Leave empty for live-tail"
430
430
-
value={searchParams.cursor ?? ""}
345
345
+
value={searchParams.instance ?? config().defaultInstance}
431
346
class="grow"
432
347
/>
433
348
</label>
434
434
-
<Show when={streamType === "jetstream"}>
435
435
-
<div class="flex items-center justify-end gap-x-1">
436
436
-
<input
437
437
-
type="checkbox"
438
438
-
name="allEvents"
439
439
-
id="allEvents"
440
440
-
checked={searchParams.allEvents === "on" ? true : false}
441
441
-
/>
442
442
-
<label for="allEvents" class="select-none">
443
443
-
Show account and identity events
349
349
+
350
350
+
<For each={config().fields}>
351
351
+
{(field) => (
352
352
+
<label class="flex items-center justify-end gap-x-1">
353
353
+
<Show when={field.type === "checkbox"}>
354
354
+
<input
355
355
+
type="checkbox"
356
356
+
name={field.name}
357
357
+
id={field.name}
358
358
+
checked={searchParams[field.searchParam] === "on"}
359
359
+
/>
360
360
+
</Show>
361
361
+
<span class="min-w-21 select-none">{field.label}</span>
362
362
+
<Show when={field.type === "textarea"}>
363
363
+
<textarea
364
364
+
name={field.name}
365
365
+
spellcheck={false}
366
366
+
placeholder={field.placeholder}
367
367
+
value={(searchParams[field.searchParam] as string) ?? ""}
368
368
+
class="dark:bg-dark-100 grow rounded-lg bg-white px-2 py-1 outline-1 outline-neutral-200 focus:outline-[1.5px] focus:outline-neutral-600 dark:outline-neutral-600 dark:focus:outline-neutral-400"
369
369
+
/>
370
370
+
</Show>
371
371
+
<Show when={field.type === "text"}>
372
372
+
<TextInput
373
373
+
name={field.name}
374
374
+
placeholder={field.placeholder}
375
375
+
value={(searchParams[field.searchParam] as string) ?? ""}
376
376
+
class="grow"
377
377
+
/>
378
378
+
</Show>
444
379
</label>
445
445
-
</div>
446
446
-
</Show>
380
380
+
)}
381
381
+
</For>
382
382
+
447
383
<div class="flex justify-end gap-2">
448
448
-
<Button onClick={() => connectSocket(new FormData(formRef))}>Connect</Button>
384
384
+
<Button onClick={() => connectStream(new FormData(formRef))}>Connect</Button>
449
385
</div>
450
386
</form>
451
387
</Show>
388
388
+
389
389
+
{/* Connected State */}
452
390
<Show when={connected()}>
453
391
<div class="flex w-full flex-col gap-2 p-2">
454
392
<div class="flex flex-col gap-1 text-sm wrap-anywhere">
455
393
<div class="font-semibold">Parameters</div>
456
394
<For each={parameters()}>
457
395
{(param) => (
458
458
-
<Show when={param.param}>
396
396
+
<Show when={param.value}>
459
397
<div class="text-sm">
460
398
<div class="text-xs text-neutral-500 dark:text-neutral-400">{param.name}</div>
461
461
-
<div class="text-neutral-700 dark:text-neutral-300">{param.param}</div>
399
399
+
<div class="text-neutral-700 dark:text-neutral-300">{param.value}</div>
462
400
</div>
463
401
</Show>
464
402
)}
465
403
</For>
466
404
</div>
467
467
-
<StreamStatsPanel stats={stats()} currentTime={currentTime()} />
405
405
+
<StreamStatsPanel stats={stats()} currentTime={currentTime()} streamType={streamType} />
468
406
<div class="flex justify-end gap-2">
469
407
<button
470
408
type="button"
471
409
ontouchstart={(e) => {
472
410
e.preventDefault();
473
473
-
requestAnimationFrame(() => togglePause());
411
411
+
requestAnimationFrame(() => setPaused(!paused()));
474
412
}}
475
475
-
onclick={togglePause}
413
413
+
onclick={() => setPaused(!paused())}
476
414
class="dark:hover:bg-dark-200 dark:shadow-dark-700 dark:active:bg-dark-100 box-border flex h-7 items-center gap-1 rounded-lg border-[0.5px] border-neutral-300 bg-neutral-50 px-2 py-1.5 text-xs shadow-xs select-none hover:bg-neutral-100 active:bg-neutral-200 dark:border-neutral-700 dark:bg-neutral-800"
477
415
>
478
416
{paused() ? "Resume" : "Pause"}
···
491
429
</div>
492
430
</div>
493
431
</Show>
432
432
+
433
433
+
{/* Error Notice */}
494
434
<Show when={notice().length}>
495
435
<div class="text-red-500 dark:text-red-400">{notice()}</div>
496
436
</Show>
437
437
+
438
438
+
{/* Records List */}
497
439
<Show when={connected() || records().length > 0}>
498
440
<div class="flex min-h-280 w-full flex-col gap-2 font-mono text-xs [overflow-anchor:auto] sm:text-sm">
499
441
<For each={records().toReversed()}>
···
510
452
</>
511
453
);
512
454
};
513
513
-
514
514
-
export { StreamView };
+14
-4
src/views/stream/stats.tsx
···
1
1
import { For, Show } from "solid-js";
2
2
+
import { STREAM_CONFIGS, StreamType } from "./config";
2
3
3
4
export type StreamStats = {
4
5
connectedAt?: number;
···
22
23
}
23
24
};
24
25
25
25
-
export const StreamStatsPanel = (props: { stats: StreamStats; currentTime: number }) => {
26
26
+
export const StreamStatsPanel = (props: {
27
27
+
stats: StreamStats;
28
28
+
currentTime: number;
29
29
+
streamType: StreamType;
30
30
+
}) => {
31
31
+
const config = () => STREAM_CONFIGS[props.streamType];
26
32
const uptime = () => (props.stats.connectedAt ? props.currentTime - props.stats.connectedAt : 0);
27
33
28
34
const topCollections = () =>
···
60
66
</div>
61
67
</div>
62
68
63
63
-
<Show when={topEventTypes().length > 0}>
69
69
+
<Show when={topEventTypes().length > 0 && config().showEventTypes}>
64
70
<div class="mt-2">
65
71
<div class="mb-1 text-xs text-neutral-500 dark:text-neutral-400">Event Types</div>
66
72
<div class="grid grid-cols-[1fr_5rem_3rem] gap-x-1 gap-y-0.5 font-mono text-xs sm:gap-x-4">
···
86
92
87
93
<Show when={topCollections().length > 0}>
88
94
<div class="mt-2">
89
89
-
<div class="mb-1 text-xs text-neutral-500 dark:text-neutral-400">Top Collections</div>
95
95
+
<div class="mb-1 text-xs text-neutral-500 dark:text-neutral-400">
96
96
+
{config().collectionsLabel}
97
97
+
</div>
90
98
<div class="grid grid-cols-[1fr_5rem_3rem] gap-x-1 gap-y-0.5 font-mono text-xs sm:gap-x-4">
91
99
<For each={topCollections()}>
92
100
{([collection, count]) => {
93
101
const percentage = ((count / props.stats.totalEvents) * 100).toFixed(1);
94
102
return (
95
103
<>
96
96
-
<span class="text-neutral-700 dark:text-neutral-300">{collection}</span>
104
104
+
<span class="min-w-0 truncate text-neutral-700 dark:text-neutral-300">
105
105
+
{collection}
106
106
+
</span>
97
107
<span class="text-right text-neutral-600 tabular-nums dark:text-neutral-400">
98
108
{count.toLocaleString()}
99
109
</span>