tangled
alpha
login
or
join now
baileytownsend.dev
/
label-watcher
36
fork
atom
PDS Admin tool make it easier to moderate your PDS with labels
36
fork
atom
overview
issues
pulls
pipelines
I think this is the pds backfill
baileytownsend.dev
3 weeks ago
10bfa567
25ba3658
+50
-8
3 changed files
expand all
collapse all
unified
split
src
handlers
handleNewIdentityEvent.ts
index.ts
pds.ts
+25
src/handlers/handleNewIdentityEvent.ts
···
1
1
+
import * as schema from "../db/schema.js";
2
2
+
import type { LibSQLDatabase } from "drizzle-orm/libsql";
3
3
+
4
4
+
export const handleNewIdentityEvent = async (
5
5
+
db: LibSQLDatabase<typeof schema>,
6
6
+
pdsHost: string,
7
7
+
did: string,
8
8
+
active: boolean,
9
9
+
) => {
10
10
+
await db
11
11
+
.insert(schema.watchedRepos)
12
12
+
.values({
13
13
+
did,
14
14
+
pdsHost,
15
15
+
active,
16
16
+
dateFirstSeen: new Date(),
17
17
+
})
18
18
+
.onConflictDoUpdate({
19
19
+
target: schema.watchedRepos.did,
20
20
+
set: {
21
21
+
pdsHost,
22
22
+
active,
23
23
+
},
24
24
+
});
25
25
+
};
+2
-2
src/index.ts
···
4
4
import { parse } from "smol-toml";
5
5
import PQueue from "p-queue";
6
6
import { labelerSubscriber } from "./handlers/lablerSubscriber.js";
7
7
-
import type { PDSConfig, Settings } from "./types/settings.js";
7
7
+
import type { Settings } from "./types/settings.js";
8
8
import { logger } from "./logger.js";
9
9
import { labelerCursor } from "./db/schema.js";
10
10
import { backFillPds } from "./pds.js";
···
31
31
32
32
for (const config of pdsConfigs) {
33
33
if (config.backfillAccounts) {
34
34
-
await backFillPds(config, identityQueue);
34
34
+
await backFillPds(config, db, identityQueue);
35
35
}
36
36
}
37
37
+23
-6
src/pds.ts
···
2
2
import type { PDSConfig } from "./types/settings.js";
3
3
import type {} from "@atcute/atproto";
4
4
import { Client, simpleFetchHandler, ok } from "@atcute/client";
5
5
+
import * as schema from "./db/schema.js";
6
6
+
import type { LibSQLDatabase } from "drizzle-orm/libsql";
7
7
+
import { handleNewIdentityEvent } from "./handlers/handleNewIdentityEvent.js";
8
8
+
import { logger } from "./logger.js";
5
9
6
6
-
export const backFillPds = async (config: PDSConfig, queue: PQueue) => {
10
10
+
export const backFillPds = async (
11
11
+
config: PDSConfig,
12
12
+
db: LibSQLDatabase<typeof schema>,
13
13
+
queue: PQueue,
14
14
+
) => {
15
15
+
logger.info(`Starting backfill process for ${config.host}`);
16
16
+
7
17
const rpc = new Client({
8
18
handler: simpleFetchHandler({ service: `https://${config.host}` }),
9
19
});
10
20
11
11
-
let cursor = undefined;
21
21
+
let cursor: string | undefined;
12
22
13
23
do {
14
14
-
let result = await ok(
24
24
+
const result = await ok(
15
25
rpc.get("com.atproto.sync.listRepos", {
16
26
params: {
17
27
limit: 1000,
···
20
30
}),
21
31
);
22
32
23
23
-
cursor = result.cursor;
24
24
-
for (let repo of result.repos) {
25
25
-
console.log(repo.did);
33
33
+
for (const repo of result.repos) {
34
34
+
if (repo.active) {
35
35
+
queue.add(
36
36
+
async () =>
37
37
+
await handleNewIdentityEvent(db, config.host, repo.did, true),
38
38
+
);
39
39
+
}
26
40
}
41
41
+
42
42
+
cursor = result.cursor;
27
43
} while (cursor);
44
44
+
logger.info(`Backfill process for ${config.host} completed`);
28
45
};