tangled
alpha
login
or
join now
rocksky.app
/
rocksky
96
fork
atom
A decentralized music tracking and discovery platform built on AT Protocol 🎵
rocksky.app
spotify
atproto
lastfm
musicbrainz
scrobbling
listenbrainz
96
fork
atom
overview
issues
7
pulls
pipelines
tap: handle webhook
tsiry-sandratraina.com
1 month ago
bfdb8dab
5c024c9a
+316
-160
8 changed files
expand all
collapse all
unified
split
tap
.env.example
drizzle
0002_reflective_angel.sql
meta
0002_snapshot.json
_journal.json
src
batch.ts
main.ts
schema
event.ts
tap.ts
+1
tap/.env.example
···
1
1
+
TAP_ADMIN_PASSWORD=
+1
tap/drizzle/0002_reflective_angel.sql
···
1
1
+
CREATE INDEX `created_at` ON `events` (`created_at`);
+172
tap/drizzle/meta/0002_snapshot.json
···
1
1
+
{
2
2
+
"version": "6",
3
3
+
"dialect": "sqlite",
4
4
+
"id": "74a3ca11-e1bb-41e1-9fed-b011cc552abf",
5
5
+
"prevId": "e965a6b3-42e8-41a5-8919-11fb478cfa1a",
6
6
+
"tables": {
7
7
+
"events": {
8
8
+
"name": "events",
9
9
+
"columns": {
10
10
+
"id": {
11
11
+
"name": "id",
12
12
+
"type": "integer",
13
13
+
"primaryKey": true,
14
14
+
"notNull": true,
15
15
+
"autoincrement": false
16
16
+
},
17
17
+
"type": {
18
18
+
"name": "type",
19
19
+
"type": "text",
20
20
+
"primaryKey": false,
21
21
+
"notNull": true,
22
22
+
"autoincrement": false
23
23
+
},
24
24
+
"action": {
25
25
+
"name": "action",
26
26
+
"type": "text",
27
27
+
"primaryKey": false,
28
28
+
"notNull": false,
29
29
+
"autoincrement": false
30
30
+
},
31
31
+
"did": {
32
32
+
"name": "did",
33
33
+
"type": "text",
34
34
+
"primaryKey": false,
35
35
+
"notNull": true,
36
36
+
"autoincrement": false
37
37
+
},
38
38
+
"status": {
39
39
+
"name": "status",
40
40
+
"type": "text",
41
41
+
"primaryKey": false,
42
42
+
"notNull": false,
43
43
+
"autoincrement": false
44
44
+
},
45
45
+
"handle": {
46
46
+
"name": "handle",
47
47
+
"type": "text",
48
48
+
"primaryKey": false,
49
49
+
"notNull": false,
50
50
+
"autoincrement": false
51
51
+
},
52
52
+
"is_active": {
53
53
+
"name": "is_active",
54
54
+
"type": "integer",
55
55
+
"primaryKey": false,
56
56
+
"notNull": false,
57
57
+
"autoincrement": false
58
58
+
},
59
59
+
"collection": {
60
60
+
"name": "collection",
61
61
+
"type": "text",
62
62
+
"primaryKey": false,
63
63
+
"notNull": false,
64
64
+
"autoincrement": false
65
65
+
},
66
66
+
"rev": {
67
67
+
"name": "rev",
68
68
+
"type": "text",
69
69
+
"primaryKey": false,
70
70
+
"notNull": false,
71
71
+
"autoincrement": false
72
72
+
},
73
73
+
"rkey": {
74
74
+
"name": "rkey",
75
75
+
"type": "text",
76
76
+
"primaryKey": false,
77
77
+
"notNull": false,
78
78
+
"autoincrement": false
79
79
+
},
80
80
+
"record": {
81
81
+
"name": "record",
82
82
+
"type": "text",
83
83
+
"primaryKey": false,
84
84
+
"notNull": false,
85
85
+
"autoincrement": false
86
86
+
},
87
87
+
"live": {
88
88
+
"name": "live",
89
89
+
"type": "integer",
90
90
+
"primaryKey": false,
91
91
+
"notNull": false,
92
92
+
"autoincrement": false
93
93
+
},
94
94
+
"cid": {
95
95
+
"name": "cid",
96
96
+
"type": "text",
97
97
+
"primaryKey": false,
98
98
+
"notNull": false,
99
99
+
"autoincrement": false
100
100
+
},
101
101
+
"created_at": {
102
102
+
"name": "created_at",
103
103
+
"type": "integer",
104
104
+
"primaryKey": false,
105
105
+
"notNull": true,
106
106
+
"autoincrement": false,
107
107
+
"default": "(unixepoch())"
108
108
+
}
109
109
+
},
110
110
+
"indexes": {
111
111
+
"events_cid_unique": {
112
112
+
"name": "events_cid_unique",
113
113
+
"columns": [
114
114
+
"cid"
115
115
+
],
116
116
+
"isUnique": true
117
117
+
},
118
118
+
"did_idx": {
119
119
+
"name": "did_idx",
120
120
+
"columns": [
121
121
+
"did"
122
122
+
],
123
123
+
"isUnique": false
124
124
+
},
125
125
+
"type_idx": {
126
126
+
"name": "type_idx",
127
127
+
"columns": [
128
128
+
"type"
129
129
+
],
130
130
+
"isUnique": false
131
131
+
},
132
132
+
"collection_idx": {
133
133
+
"name": "collection_idx",
134
134
+
"columns": [
135
135
+
"collection"
136
136
+
],
137
137
+
"isUnique": false
138
138
+
},
139
139
+
"did_collection_rkey_idx": {
140
140
+
"name": "did_collection_rkey_idx",
141
141
+
"columns": [
142
142
+
"did",
143
143
+
"collection",
144
144
+
"rkey"
145
145
+
],
146
146
+
"isUnique": false
147
147
+
},
148
148
+
"created_at": {
149
149
+
"name": "created_at",
150
150
+
"columns": [
151
151
+
"created_at"
152
152
+
],
153
153
+
"isUnique": false
154
154
+
}
155
155
+
},
156
156
+
"foreignKeys": {},
157
157
+
"compositePrimaryKeys": {},
158
158
+
"uniqueConstraints": {},
159
159
+
"checkConstraints": {}
160
160
+
}
161
161
+
},
162
162
+
"views": {},
163
163
+
"enums": {},
164
164
+
"_meta": {
165
165
+
"schemas": {},
166
166
+
"tables": {},
167
167
+
"columns": {}
168
168
+
},
169
169
+
"internal": {
170
170
+
"indexes": {}
171
171
+
}
172
172
+
}
+7
tap/drizzle/meta/_journal.json
···
15
15
"when": 1768622485450,
16
16
"tag": "0001_funny_wrecker",
17
17
"breakpoints": true
18
18
+
},
19
19
+
{
20
20
+
"idx": 2,
21
21
+
"version": "6",
22
22
+
"when": 1768632671860,
23
23
+
"tag": "0002_reflective_angel",
24
24
+
"breakpoints": true
18
25
}
19
26
]
20
27
}
+70
tap/src/batch.ts
···
1
1
+
import { ctx } from "./context.ts";
2
2
+
import schema from "./schema/mod.ts";
3
3
+
import _ from "@es-toolkit/es-toolkit/compat";
4
4
+
import { broadcastEvent } from "./main.ts";
5
5
+
import type { InsertEvent } from "./schema/event.ts";
6
6
+
import logger from "./logger.ts";
7
7
+
8
8
+
const BATCH_SIZE = 100;
9
9
+
const BATCH_TIMEOUT_MS = 100;
10
10
+
11
11
+
let eventBatch: InsertEvent[] = [];
12
12
+
let batchTimer: number | null = null;
13
13
+
let flushPromise: Promise<void> | null = null;
14
14
+
15
15
+
export async function flushBatch() {
16
16
+
if (flushPromise) {
17
17
+
await flushPromise;
18
18
+
return;
19
19
+
}
20
20
+
21
21
+
if (eventBatch.length === 0) return;
22
22
+
23
23
+
flushPromise = (async () => {
24
24
+
const toInsert = [...eventBatch];
25
25
+
eventBatch = [];
26
26
+
27
27
+
try {
28
28
+
logger.info`🔄 Flushing batch of ${toInsert.length} events...`;
29
29
+
30
30
+
const results = await ctx.db
31
31
+
.insert(schema.events)
32
32
+
.values(toInsert)
33
33
+
.onConflictDoNothing()
34
34
+
.returning()
35
35
+
.execute();
36
36
+
37
37
+
for (const result of results) {
38
38
+
broadcastEvent(result);
39
39
+
}
40
40
+
41
41
+
logger.info`📝 Batch inserted ${results.length} events`;
42
42
+
} catch (error) {
43
43
+
logger.error`Failed to insert batch: ${error}`;
44
44
+
// Re-add failed events to the front of the batch for retry
45
45
+
eventBatch = [...toInsert, ...eventBatch];
46
46
+
} finally {
47
47
+
flushPromise = null;
48
48
+
}
49
49
+
})();
50
50
+
51
51
+
await flushPromise;
52
52
+
}
53
53
+
54
54
+
export function addToBatch(event: InsertEvent) {
55
55
+
eventBatch.push(event);
56
56
+
57
57
+
if (batchTimer !== null) {
58
58
+
clearTimeout(batchTimer);
59
59
+
batchTimer = null;
60
60
+
}
61
61
+
62
62
+
if (eventBatch.length >= BATCH_SIZE) {
63
63
+
flushBatch().catch((err) => logger.error`Flush error: ${err}`);
64
64
+
} else {
65
65
+
batchTimer = setTimeout(() => {
66
66
+
batchTimer = null;
67
67
+
flushBatch().catch((err) => logger.error`Flush error: ${err}`);
68
68
+
}, BATCH_TIMEOUT_MS);
69
69
+
}
70
70
+
}
+64
-29
tap/src/main.ts
···
1
1
import { ctx } from "./context.ts";
2
2
import logger from "./logger.ts";
3
3
-
import connectToTap from "./tap.ts";
4
3
import schema from "./schema/mod.ts";
5
4
import { asc, inArray } from "drizzle-orm";
6
5
import { omit } from "@es-toolkit/es-toolkit/compat";
7
6
import type { SelectEvent } from "./schema/event.ts";
7
7
+
import { assureAdminAuth, parseTapEvent } from "@atproto/tap";
8
8
+
import { addToBatch, flushBatch } from "./batch.ts";
8
9
9
9
-
const PAGE_SIZE = 100; // Larger batches for faster streaming
10
10
-
const YIELD_EVERY_N_PAGES = 5; // Yield every 5 pages (2500 events)
11
11
-
const MAX_BUFFER_SIZE = 256 * 1024; // 256KB buffer limit
12
12
-
const BACKPRESSURE_CHECK_INTERVAL = 100; // Check every 100 events
13
13
-
const VERBOSE_LOGGING = false; // Set to true for detailed message tracking
10
10
+
const PAGE_SIZE = 100;
11
11
+
const YIELD_EVERY_N_PAGES = 5;
12
12
+
const YIELD_DELAY_MS = 100;
13
13
+
const ADMIN_PASSWORD = Deno.env.get("TAP_ADMIN_PASSWORD")!;
14
14
15
15
interface ClientState {
16
16
socket: WebSocket;
···
29
29
try {
30
30
if (socket.readyState === WebSocket.OPEN) {
31
31
socket.send(message);
32
32
-
if (
33
33
-
VERBOSE_LOGGING &&
34
34
-
eventCount !== undefined &&
35
35
-
eventCount % 50 === 0
36
36
-
) {
32
32
+
if (eventCount !== undefined && eventCount % 50 === 0) {
37
33
logger.info`📤 Sent ${eventCount} events, readyState: ${socket.readyState}`;
38
34
}
39
35
return true;
···
47
43
return false;
48
44
}
49
45
50
50
-
async function waitForBackpressure(socket: WebSocket): Promise<void> {
51
51
-
const bufferedAmount = (socket as unknown as { bufferedAmount?: number })
52
52
-
.bufferedAmount;
53
53
-
if (bufferedAmount && bufferedAmount > MAX_BUFFER_SIZE) {
54
54
-
logger.info`⏸️ Backpressure detected (${bufferedAmount} bytes buffered), waiting...`;
55
55
-
// Wait for buffer to drain
56
56
-
await new Promise((resolve) => setTimeout(resolve, 100));
57
57
-
}
58
58
-
}
59
59
-
60
46
export function broadcastEvent(evt: SelectEvent) {
61
47
const message = JSON.stringify({
62
48
...omit(evt, "createdAt", "record"),
···
84
70
}
85
71
}
86
72
87
87
-
connectToTap();
73
73
+
Deno.serve({ port: parseInt(Deno.env.get("WS_PORT") || "2481") }, (req) => {
74
74
+
if (req.method === "POST") {
75
75
+
try {
76
76
+
assureAdminAuth(ADMIN_PASSWORD, req.headers.get("authorization")!);
77
77
+
} catch {
78
78
+
return new Response(null, { status: 401 });
79
79
+
}
80
80
+
const evt = parseTapEvent(req.body);
81
81
+
switch (evt.type) {
82
82
+
case "identity": {
83
83
+
addToBatch({
84
84
+
id: evt.id,
85
85
+
type: evt.type,
86
86
+
did: evt.did,
87
87
+
handle: evt.handle,
88
88
+
status: evt.status,
89
89
+
isActive: evt.isActive,
90
90
+
action: null,
91
91
+
rev: null,
92
92
+
collection: null,
93
93
+
rkey: null,
94
94
+
record: null,
95
95
+
cid: null,
96
96
+
live: null,
97
97
+
});
98
98
+
logger.info`New identity: ${evt.did} ${evt.handle} ${evt.status}`;
99
99
+
break;
100
100
+
}
101
101
+
case "record": {
102
102
+
addToBatch({
103
103
+
id: evt.id,
104
104
+
type: evt.type,
105
105
+
action: evt.action,
106
106
+
did: evt.did,
107
107
+
rev: evt.rev,
108
108
+
collection: evt.collection,
109
109
+
rkey: evt.rkey,
110
110
+
record: JSON.stringify(evt.record),
111
111
+
cid: evt.cid,
112
112
+
live: evt.live,
113
113
+
handle: null,
114
114
+
status: null,
115
115
+
isActive: null,
116
116
+
});
117
117
+
const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`;
118
118
+
logger.info`New record: ${uri}`;
119
119
+
break;
120
120
+
}
121
121
+
}
88
122
89
89
-
Deno.serve({ port: parseInt(Deno.env.get("WS_PORT") || "2481") }, (req) => {
123
123
+
return new Response("");
124
124
+
}
125
125
+
90
126
if (req.headers.get("upgrade") != "websocket") {
91
127
return new Response(null, { status: 426 });
92
128
}
···
187
223
logger.error`❌ Failed to send event at index ${totalEvents}, stopping pagination`;
188
224
return;
189
225
}
190
190
-
191
191
-
// Check backpressure periodically (no message delay for speed)
192
192
-
if (totalEvents % BACKPRESSURE_CHECK_INTERVAL === 0) {
193
193
-
await waitForBackpressure(socket);
194
194
-
}
195
226
}
196
227
197
228
hasMore = events.length === PAGE_SIZE;
198
229
page++;
199
230
200
231
if (hasMore && page % YIELD_EVERY_N_PAGES === 0) {
201
201
-
await new Promise((resolve) => setTimeout(resolve, 0));
232
232
+
await new Promise((resolve) => setTimeout(resolve, YIELD_DELAY_MS));
202
233
}
203
234
}
204
235
···
299
330
});
300
331
301
332
return response;
333
333
+
});
334
334
+
335
335
+
globalThis.addEventListener("beforeunload", () => {
336
336
+
flushBatch();
302
337
});
303
338
304
339
const url = `ws://localhost:${Deno.env.get("WS_PORT") || 2481}`;
+1
tap/src/schema/event.ts
···
28
28
index("type_idx").on(t.type),
29
29
index("collection_idx").on(t.collection),
30
30
index("did_collection_rkey_idx").on(t.did, t.collection, t.rkey),
31
31
+
index("created_at").on(t.createdAt),
31
32
],
32
33
);
33
34
-131
tap/src/tap.ts
···
1
1
-
import { Tap, SimpleIndexer } from "@atproto/tap";
2
2
-
import logger from "./logger.ts";
3
3
-
import { ctx } from "./context.ts";
4
4
-
import schema from "./schema/mod.ts";
5
5
-
import _ from "@es-toolkit/es-toolkit/compat";
6
6
-
import { broadcastEvent } from "./main.ts";
7
7
-
import type { InsertEvent } from "./schema/event.ts";
8
8
-
9
9
-
export const TAP_WS_URL = Deno.env.get("TAP_URL") || "http://localhost:2480";
10
10
-
11
11
-
const BATCH_SIZE = 100;
12
12
-
const BATCH_TIMEOUT_MS = 100;
13
13
-
14
14
-
export default function connectToTap() {
15
15
-
const tap = new Tap(TAP_WS_URL);
16
16
-
const indexer = new SimpleIndexer();
17
17
-
18
18
-
let eventBatch: InsertEvent[] = [];
19
19
-
let batchTimer: number | null = null;
20
20
-
let flushPromise: Promise<void> | null = null;
21
21
-
22
22
-
async function flushBatch() {
23
23
-
if (flushPromise) {
24
24
-
await flushPromise;
25
25
-
return;
26
26
-
}
27
27
-
28
28
-
if (eventBatch.length === 0) return;
29
29
-
30
30
-
flushPromise = (async () => {
31
31
-
const toInsert = [...eventBatch];
32
32
-
eventBatch = [];
33
33
-
34
34
-
try {
35
35
-
logger.info`🔄 Flushing batch of ${toInsert.length} events...`;
36
36
-
37
37
-
const results = await ctx.db
38
38
-
.insert(schema.events)
39
39
-
.values(toInsert)
40
40
-
.onConflictDoNothing()
41
41
-
.returning()
42
42
-
.execute();
43
43
-
44
44
-
for (const result of results) {
45
45
-
broadcastEvent(result);
46
46
-
}
47
47
-
48
48
-
logger.info`📝 Batch inserted ${results.length} events`;
49
49
-
} catch (error) {
50
50
-
logger.error`Failed to insert batch: ${error}`;
51
51
-
// Re-add failed events to the front of the batch for retry
52
52
-
eventBatch = [...toInsert, ...eventBatch];
53
53
-
} finally {
54
54
-
flushPromise = null;
55
55
-
}
56
56
-
})();
57
57
-
58
58
-
await flushPromise;
59
59
-
}
60
60
-
61
61
-
function addToBatch(event: InsertEvent) {
62
62
-
eventBatch.push(event);
63
63
-
64
64
-
// Clear existing timer
65
65
-
if (batchTimer !== null) {
66
66
-
clearTimeout(batchTimer);
67
67
-
batchTimer = null;
68
68
-
}
69
69
-
70
70
-
// Flush immediately if batch is full
71
71
-
if (eventBatch.length >= BATCH_SIZE) {
72
72
-
flushBatch().catch((err) => logger.error`Flush error: ${err}`);
73
73
-
} else {
74
74
-
// Set timer to flush after timeout
75
75
-
batchTimer = setTimeout(() => {
76
76
-
batchTimer = null;
77
77
-
flushBatch().catch((err) => logger.error`Flush error: ${err}`);
78
78
-
}, BATCH_TIMEOUT_MS);
79
79
-
}
80
80
-
}
81
81
-
82
82
-
indexer.identity(async (evt) => {
83
83
-
addToBatch({
84
84
-
id: evt.id,
85
85
-
type: evt.type,
86
86
-
did: evt.did,
87
87
-
handle: evt.handle,
88
88
-
status: evt.status,
89
89
-
isActive: evt.isActive,
90
90
-
action: null,
91
91
-
rev: null,
92
92
-
collection: null,
93
93
-
rkey: null,
94
94
-
record: null,
95
95
-
cid: null,
96
96
-
live: null,
97
97
-
});
98
98
-
99
99
-
logger.info`${evt.did} updated identity: ${evt.handle} (${evt.status})`;
100
100
-
});
101
101
-
102
102
-
indexer.record(async (evt) => {
103
103
-
addToBatch({
104
104
-
id: evt.id,
105
105
-
type: evt.type,
106
106
-
action: evt.action,
107
107
-
did: evt.did,
108
108
-
rev: evt.rev,
109
109
-
collection: evt.collection,
110
110
-
rkey: evt.rkey,
111
111
-
record: JSON.stringify(evt.record),
112
112
-
cid: evt.cid,
113
113
-
live: evt.live,
114
114
-
handle: null,
115
115
-
status: null,
116
116
-
isActive: null,
117
117
-
});
118
118
-
119
119
-
const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`;
120
120
-
logger.info`New record: ${uri}`;
121
121
-
});
122
122
-
123
123
-
indexer.error((err) => logger.error`${err}`);
124
124
-
125
125
-
const channel = tap.channel(indexer);
126
126
-
channel.start();
127
127
-
128
128
-
globalThis.addEventListener("beforeunload", () => {
129
129
-
flushBatch();
130
130
-
});
131
131
-
}