tangled
alpha
login
or
join now
essem.space
/
pds-feedgen
2
fork
atom
A couple of Bluesky feeds focused around PDSes
2
fork
atom
overview
issues
pulls
pipelines
Move db tasks to worker and run post removal occasionally
essem.space
5 months ago
d9203d3b
5b29c04a
+146
-99
2 changed files
expand all
collapse all
unified
split
ingest
worker.ts
ingest.ts
+34
-99
ingest.ts
···
1
-
import { AppBskyFeedPost } from "@atcute/bluesky";
2
import { Client, ok, simpleFetchHandler } from "@atcute/client";
3
import {
4
CompositeDidDocumentResolver,
···
6
WebDidDocumentResolver,
7
} from "@atcute/identity-resolver";
8
import type { ResourceUri } from "@atcute/lexicons/syntax";
9
-
import { createClient } from "redis";
10
import { Jetstream, CommitType } from "@skyware/jetstream";
11
12
import type {} from "@atcute/atproto";
13
14
import { db } from "./common/db.ts";
15
-
import { Post, type Author, type DID } from "./common/types.ts";
16
-
17
-
type ShallowPost = Omit<Post, "cid" | "indexed_at" | "author">;
18
-
19
-
const postQueue: Post[] = [];
20
-
const delQueue: ShallowPost[] = [];
21
-
22
-
const redis = createClient();
23
-
redis.on("error", (err) => console.log("Redis Client Error", err));
24
-
await redis.connect();
25
-
26
-
const insertPost = db.prepare(
27
-
`INSERT INTO posts ("uri", "cid", "author", "indexed_at") VALUES (?1, ?2, ?3, ?4) ON CONFLICT DO NOTHING;`
28
-
);
29
-
30
-
const insertPosts = db.transaction((posts: Post[]) => {
31
-
for (const post of posts) {
32
-
const changes = insertPost.run(
33
-
post.uri,
34
-
post.cid,
35
-
post.author,
36
-
post.indexed_at
37
-
);
38
-
if (changes > 0) {
39
-
const pdsKey = `posts:${post.pds}`;
40
-
redis
41
-
.lPush(pdsKey, `${post.uri};${post.indexed_at}`)
42
-
.then((length) => {
43
-
if (length > 30000) {
44
-
redis.lTrim(pdsKey, 0, 29999);
45
-
return redis.rPop(pdsKey);
46
-
}
47
-
})
48
-
.then((last) => {
49
-
if (last) {
50
-
const indexTime = last.split(";")[1];
51
-
if (indexTime?.trim()) {
52
-
removePostByPDS.run(post.pds, indexTime);
53
-
}
54
-
}
55
-
});
56
-
}
57
-
}
58
-
});
59
-
60
-
const removePostByURL = db.prepare(
61
-
`DELETE FROM posts WHERE uri = ?1 RETURNING indexed_at, author;`
62
-
);
63
-
const removePostByPDS = db.prepare(
64
-
`DELETE FROM posts WHERE rowid IN (SELECT a.rowid FROM posts a INNER JOIN authors b ON a.author = b.did WHERE b.pds = ?1 AND a.indexed_at < ?2);`
65
-
);
66
-
67
-
const removePosts = db.transaction((posts: ShallowPost[]) => {
68
-
for (const post of posts) {
69
-
const dbResult = removePostByURL.get<Omit<Post, "uri" | "cid" | "pds">>(
70
-
post.uri
71
-
);
72
-
if (dbResult) {
73
-
redis.lRem(`posts:${post.pds}`, 0, `${post.uri};${dbResult.indexed_at}`);
74
-
}
75
-
}
76
-
});
77
-
78
-
const getAuthor = db.prepare("SELECT pds FROM authors WHERE did = ?");
79
-
const upsertAuthor = db.prepare(
80
-
"INSERT OR REPLACE INTO authors (did, pds, pds_base) VALUES (?1, ?2, ?3)"
81
-
);
82
-
83
-
const getCursor = db.prepare("SELECT cursor FROM state WHERE id = 1");
84
-
const updateCursor = db.prepare("UPDATE state SET cursor = ? WHERE id = 1");
85
86
const didResolver = new CompositeDidDocumentResolver({
87
methods: {
···
90
},
91
});
92
0
0
0
0
0
0
93
async function getPDS(did: DID, ignoreCache = false) {
94
let pds: string | undefined;
95
···
105
service.type == "AtprotoPersonalDataServer" &&
106
typeof service.serviceEndpoint === "string"
107
) {
108
-
upsertAuthor.run(
0
109
did,
110
-
service.serviceEndpoint,
111
-
getPDSBase(service.serviceEndpoint)
112
-
);
113
pds = service.serviceEndpoint;
114
}
115
}
···
126
}`;
127
}
128
0
0
129
const dbCursor = getCursor.get<{ cursor?: string }>();
130
const cursor = dbCursor ? Number(dbCursor.cursor) : 0;
131
const jetstream = new Jetstream({
···
140
141
jetstream.on("error", (e, c) => {
142
console.error(e);
143
-
updateCursor.run(c);
0
0
0
144
});
145
146
let count = 0;
···
149
count++;
150
if (count >= 1024) {
151
count = 0;
152
-
updateCursor.run(e.time_us);
0
0
0
153
}
154
155
const atUri: ResourceUri = `at://${e.did}/app.bsky.feed.post/${e.commit.rkey}`;
···
167
}
168
169
if (e.commit.operation === CommitType.Create) {
170
-
const indexed_at = new Date().toISOString();
171
-
postQueue.push({
172
-
uri: atUri,
173
cid: e.commit.cid,
174
-
author: e.did,
175
-
indexed_at,
176
pds,
177
});
178
-
if (postQueue.length > 127) {
179
-
insertPosts.immediate(postQueue.splice(0, 128));
180
-
}
181
} else if (e.commit.operation === CommitType.Delete) {
182
-
delQueue.push({
183
-
uri: atUri,
0
184
pds,
185
});
186
-
if (delQueue.length > 63) {
187
-
removePosts.immediate(delQueue.splice(0, 64));
188
-
}
189
}
190
});
191
···
204
},
205
})
206
);
207
-
const posts = records.map((v) => ({
208
-
uri: v.uri,
209
-
cid: v.cid,
210
-
author: e.did,
211
-
indexed_at:
212
-
(v.value as AppBskyFeedPost.Main).createdAt ?? new Date().toISOString(),
213
pds,
214
-
}));
215
-
insertPosts.immediate(posts);
216
} catch (e) {
217
console.error(`Failed to backfill posts: ${e}`);
218
}
···
0
1
import { Client, ok, simpleFetchHandler } from "@atcute/client";
2
import {
3
CompositeDidDocumentResolver,
···
5
WebDidDocumentResolver,
6
} from "@atcute/identity-resolver";
7
import type { ResourceUri } from "@atcute/lexicons/syntax";
0
8
import { Jetstream, CommitType } from "@skyware/jetstream";
9
10
import type {} from "@atcute/atproto";
11
12
import { db } from "./common/db.ts";
13
+
import type { Author, DID } from "./common/types.ts";
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
14
15
const didResolver = new CompositeDidDocumentResolver({
16
methods: {
···
19
},
20
});
21
22
+
const worker = new Worker(new URL("./ingest/worker.ts", import.meta.url).href, {
23
+
type: "module",
24
+
});
25
+
26
+
const getAuthor = db.prepare("SELECT pds FROM authors WHERE did = ?");
27
+
28
async function getPDS(did: DID, ignoreCache = false) {
29
let pds: string | undefined;
30
···
40
service.type == "AtprotoPersonalDataServer" &&
41
typeof service.serviceEndpoint === "string"
42
) {
43
+
worker.postMessage({
44
+
op: 4,
45
did,
46
+
pds: service.serviceEndpoint,
47
+
pds_base: getPDSBase(service.serviceEndpoint),
48
+
});
49
pds = service.serviceEndpoint;
50
}
51
}
···
62
}`;
63
}
64
65
+
const getCursor = db.prepare("SELECT cursor FROM state WHERE id = 1");
66
+
67
const dbCursor = getCursor.get<{ cursor?: string }>();
68
const cursor = dbCursor ? Number(dbCursor.cursor) : 0;
69
const jetstream = new Jetstream({
···
78
79
jetstream.on("error", (e, c) => {
80
console.error(e);
81
+
worker.postMessage({
82
+
op: 3,
83
+
cursor: c,
84
+
});
85
});
86
87
let count = 0;
···
90
count++;
91
if (count >= 1024) {
92
count = 0;
93
+
worker.postMessage({
94
+
op: 3,
95
+
cursor: e.time_us,
96
+
});
97
}
98
99
const atUri: ResourceUri = `at://${e.did}/app.bsky.feed.post/${e.commit.rkey}`;
···
111
}
112
113
if (e.commit.operation === CommitType.Create) {
114
+
worker.postMessage({
115
+
op: 0,
116
+
atUri,
117
cid: e.commit.cid,
118
+
did: e.did,
0
119
pds,
120
});
0
0
0
121
} else if (e.commit.operation === CommitType.Delete) {
122
+
worker.postMessage({
123
+
op: 1,
124
+
atUri,
125
pds,
126
});
0
0
0
127
}
128
});
129
···
142
},
143
})
144
);
145
+
worker.postMessage({
146
+
op: 2,
147
+
records,
148
+
did: e.did,
0
0
149
pds,
150
+
});
0
151
} catch (e) {
152
console.error(`Failed to backfill posts: ${e}`);
153
}
+112
ingest/worker.ts
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
import { createClient } from "redis";
2
+
3
+
import { db } from "../common/db.ts";
4
+
import type { Post } from "../common/types.ts";
5
+
6
+
type ShallowPost = Omit<Post, "cid" | "indexed_at" | "author">;
7
+
8
+
const redis = createClient();
9
+
redis.on("error", (err) => console.log("Redis Client Error", err));
10
+
await redis.connect();
11
+
12
+
const insertPost = db.prepare(
13
+
`INSERT INTO posts ("uri", "cid", "author", "indexed_at") VALUES (?1, ?2, ?3, ?4) ON CONFLICT DO NOTHING;`
14
+
);
15
+
16
+
const lastPostTimes = new Map<string, string>();
17
+
18
+
const insertPosts = db.transaction((posts: Post[]) => {
19
+
for (const post of posts) {
20
+
const changes = insertPost.run(
21
+
post.uri,
22
+
post.cid,
23
+
post.author,
24
+
post.indexed_at
25
+
);
26
+
if (changes > 0) {
27
+
const pdsKey = `posts:${post.pds}`;
28
+
redis
29
+
.lPush(pdsKey, `${post.uri};${post.indexed_at}`)
30
+
.then((length) => {
31
+
if (length > 30000) {
32
+
redis.lTrim(pdsKey, 0, 29999);
33
+
return redis.rPop(pdsKey);
34
+
}
35
+
})
36
+
.then((last) => {
37
+
if (last) {
38
+
const indexTime = last.split(";")[1];
39
+
if (indexTime?.trim() && post.pds) {
40
+
lastPostTimes.set(post.pds, indexTime);
41
+
}
42
+
}
43
+
});
44
+
}
45
+
}
46
+
});
47
+
48
+
const removePostByURL = db.prepare(
49
+
`DELETE FROM posts WHERE uri = ?1 RETURNING indexed_at, author;`
50
+
);
51
+
const removePostByPDS = db.prepare(
52
+
`DELETE FROM posts WHERE rowid IN (SELECT a.rowid FROM posts a INNER JOIN authors b ON a.author = b.did WHERE b.pds = ?1 AND a.indexed_at < ?2);`
53
+
);
54
+
55
+
const removePosts = db.transaction((posts: ShallowPost[]) => {
56
+
for (const post of posts) {
57
+
const dbResult = removePostByURL.get<Omit<Post, "uri" | "cid" | "pds">>(
58
+
post.uri
59
+
);
60
+
if (dbResult) {
61
+
redis.lRem(`posts:${post.pds}`, 0, `${post.uri};${dbResult.indexed_at}`);
62
+
}
63
+
}
64
+
});
65
+
66
+
const upsertAuthor = db.prepare(
67
+
"INSERT OR REPLACE INTO authors (did, pds, pds_base) VALUES (?1, ?2, ?3)"
68
+
);
69
+
70
+
const updateCursor = db.prepare("UPDATE state SET cursor = ? WHERE id = 1");
71
+
72
+
setInterval(() => {
73
+
for (const [pds, time] of lastPostTimes) {
74
+
lastPostTimes.delete(pds);
75
+
removePostByPDS.run(pds, time);
76
+
}
77
+
}, 60000);
78
+
79
+
self.onmessage = (e: MessageEvent) => {
80
+
if (e.data.op === 0) {
81
+
const indexed_at = new Date().toISOString();
82
+
insertPosts.immediate([
83
+
{
84
+
uri: e.data.atUri,
85
+
cid: e.data.cid,
86
+
author: e.data.did,
87
+
indexed_at,
88
+
pds: e.data.pds,
89
+
},
90
+
]);
91
+
} else if (e.data.op === 1) {
92
+
removePosts.immediate([
93
+
{
94
+
uri: e.data.atUri,
95
+
pds: e.data.pds,
96
+
},
97
+
]);
98
+
} else if (e.data.op === 2) {
99
+
const posts = e.data.records.map((v) => ({
100
+
uri: v.uri,
101
+
cid: v.cid,
102
+
author: e.data.did,
103
+
indexed_at: v.value.createdAt ?? new Date().toISOString(),
104
+
pds: e.data.pds,
105
+
}));
106
+
insertPosts.immediate(posts);
107
+
} else if (e.data.op === 3) {
108
+
updateCursor.run(e.data.cursor);
109
+
} else if (e.data.op === 4) {
110
+
upsertAuthor.run(e.data.did, e.data.pds, e.data.pds_base);
111
+
}
112
+
};