tangled
alpha
login
or
join now
baileytownsend.dev
/
label-watcher
35
fork
atom
PDS Admin tool make it easier to moderate your PDS with labels
35
fork
atom
overview
issues
pulls
pipelines
listen in to that PDS
baileytownsend.dev
3 weeks ago
3eadb197
10bfa567
+85
-7
2 changed files
expand all
collapse all
unified
split
src
handlers
pdsSubscriber.ts
index.ts
+63
src/handlers/pdsSubscriber.ts
···
1
1
+
import type { LibSQLDatabase } from "drizzle-orm/libsql";
2
2
+
import type { PDSConfig } from "../types/settings.js";
3
3
+
import * as schema from "../db/schema.js";
4
4
+
import type PQueue from "p-queue";
5
5
+
import { logger } from "../logger.js";
6
6
+
import { FirehoseSubscription } from "@atcute/firehose";
7
7
+
import { ComAtprotoSyncSubscribeRepos } from "@atcute/atproto";
8
8
+
import { handleNewIdentityEvent } from "./handleNewIdentityEvent.js";
9
9
+
10
10
+
export const pdsSubscriber = (
11
11
+
config: PDSConfig,
12
12
+
db: LibSQLDatabase<typeof schema>,
13
13
+
queue: PQueue,
14
14
+
): (() => void) => {
15
15
+
let cursor: number | undefined;
16
16
+
17
17
+
const subscription = new FirehoseSubscription({
18
18
+
service: `wss://${config.host}`,
19
19
+
nsid: ComAtprotoSyncSubscribeRepos.mainSchema,
20
20
+
params: () => ({ cursor: cursor }),
21
21
+
});
22
22
+
23
23
+
const iterator = subscription[Symbol.asyncIterator]();
24
24
+
25
25
+
const run = async () => {
26
26
+
logger.info({ host: config.host }, "Listening to PDS events");
27
27
+
for await (const message of iterator) {
28
28
+
// Saves the cursor for re connect
29
29
+
if ("seq" in message) {
30
30
+
cursor = message.seq;
31
31
+
}
32
32
+
switch (message.$type) {
33
33
+
case "com.atproto.sync.subscribeRepos#account": {
34
34
+
logger.info(
35
35
+
{
36
36
+
host: config.host,
37
37
+
did: message.did,
38
38
+
status: message.active,
39
39
+
},
40
40
+
"Identity event",
41
41
+
);
42
42
+
queue.add(
43
43
+
async () =>
44
44
+
await handleNewIdentityEvent(
45
45
+
db,
46
46
+
config.host,
47
47
+
message.did,
48
48
+
message.active,
49
49
+
),
50
50
+
);
51
51
+
52
52
+
break;
53
53
+
}
54
54
+
}
55
55
+
}
56
56
+
};
57
57
+
58
58
+
run().catch((err) => logger.error({ err }, "Subscriber error"));
59
59
+
60
60
+
return () => {
61
61
+
iterator.return?.();
62
62
+
};
63
63
+
};
+22
-7
src/index.ts
···
8
8
import { logger } from "./logger.js";
9
9
import { labelerCursor } from "./db/schema.js";
10
10
import { backFillPds } from "./pds.js";
11
11
+
import { pdsSubscriber } from "./handlers/pdsSubscriber.js";
11
12
12
13
const labelQueue = new PQueue({ concurrency: 2 });
13
14
const identityQueue = new PQueue({ concurrency: 2 });
···
35
36
}
36
37
}
37
38
39
39
+
// Waiting for the identity queue to backfill and complete before labler
40
40
+
logger.info("Waiting for identity queue to backfill and complete...");
41
41
+
await identityQueue.onIdle();
42
42
+
logger.info("Identity queue backfill and completion complete.");
43
43
+
38
44
// Gets the last saved cursors for Labelers from db for resume
39
45
const lastCursors = await db.select().from(labelerCursor);
40
46
41
41
-
const labelers = settings.labeler;
42
42
-
43
43
-
const subscribers = Object.entries(labelers).map(([_, config]) => {
47
47
+
// Sets up the subscribers to the labelers
48
48
+
const labelSubscribers = Object.entries(settings.labeler).map(([_, config]) => {
44
49
let lastCursorRow = lastCursors.find(
45
50
(cursor) => cursor.labelerId === config.host,
46
51
);
···
48
53
return labelerSubscriber(config, lastCursor, db, labelQueue);
49
54
});
50
55
56
56
+
const pdsSubscribers = Object.entries(settings.pds)
57
57
+
.map(([_, config]) => {
58
58
+
if (config.listenForNewAccounts) {
59
59
+
return pdsSubscriber(config, db, identityQueue);
60
60
+
}
61
61
+
return null;
62
62
+
})
63
63
+
.filter((x) => x !== null);
64
64
+
51
65
// Graceful shutdown
52
66
async function shutdown(signal: string) {
53
67
logger.info(`Received ${signal}, shutting down...`);
54
68
55
55
-
logger.info("Closing subscriptions...");
56
56
-
subscribers.forEach((close) => close());
69
69
+
logger.info("Closing subscribers...");
70
70
+
labelSubscribers.forEach((close) => close());
71
71
+
pdsSubscribers.forEach((close) => close());
57
72
58
58
-
logger.info("Draining the queue...");
59
59
-
await labelQueue.onIdle();
73
73
+
logger.info("Draining the queues...");
74
74
+
await Promise.all([labelQueue.onIdle(), identityQueue.onIdle()]);
60
75
61
76
logger.info("Clean shutdown complete.");
62
77
process.exit(0);