tangled
alpha
login
or
join now
sprk.so
/
feed-gen
7
fork
atom
Spark feed generator template
7
fork
atom
overview
issues
1
pulls
pipelines
ingester
knotbin.com
3 months ago
f7041e4b
c4a519fb
verified
This commit was signed with the committer's
known signature
.
knotbin.com
SSH Key Fingerprint:
SHA256:cz+cxLxCL/B8cV6riZjeEPSqiRA5+YAQM9XfjxPWTWE=
+325
-6
11 changed files
expand all
collapse all
unified
split
README.md
deno.json
deno.lock
ingester
handlers
follow.ts
like.ts
post.ts
repost.ts
index.ts
types.ts
main.ts
utils
env.ts
+1
-2
README.md
···
46
46
47
47
```
48
48
SPRK_DB_NAME=feed-gen
49
49
-
SPRK_DB_HOST=localhost
50
50
-
SPRK_DB_PORT=27017
49
49
+
SPRK_DB_URI=mongodb://localhost:27017
51
50
SPRK_DB_USER=username
52
51
SPRK_DB_PASSWORD=password
53
52
SPRK_FEEDGEN_DOMAIN=feeds.example.com
+1
deno.json
···
7
7
"@atp/common": "jsr:@atp/common@^0.1.0-alpha.4",
8
8
"@atp/identity": "jsr:@atp/identity@^0.1.0-alpha.1",
9
9
"@atp/lexicon": "jsr:@atp/lexicon@^0.1.0-alpha.2",
10
10
+
"@atp/sync": "jsr:@atp/sync@^0.1.0-alpha.4",
10
11
"@atp/syntax": "jsr:@atp/syntax@^0.1.0-alpha.2",
11
12
"@atp/xrpc-server": "jsr:@atp/xrpc-server@^0.1.0-alpha.3",
12
13
"@logtape/logtape": "jsr:@logtape/logtape@^1.2.2",
+44
deno.lock
···
9
9
"jsr:@atp/identity@~0.1.0-alpha.1": "0.1.0-alpha.1",
10
10
"jsr:@atp/lexicon@~0.1.0-alpha.1": "0.1.0-alpha.2",
11
11
"jsr:@atp/lexicon@~0.1.0-alpha.2": "0.1.0-alpha.2",
12
12
+
"jsr:@atp/repo@~0.1.0-alpha.2": "0.1.0-alpha.2",
13
13
+
"jsr:@atp/sync@~0.1.0-alpha.4": "0.1.0-alpha.4",
12
14
"jsr:@atp/syntax@~0.1.0-alpha.1": "0.1.0-alpha.2",
13
15
"jsr:@atp/syntax@~0.1.0-alpha.2": "0.1.0-alpha.2",
16
16
+
"jsr:@atp/xrpc-server@~0.1.0-alpha.2": "0.1.0-alpha.3",
14
17
"jsr:@atp/xrpc-server@~0.1.0-alpha.3": "0.1.0-alpha.3",
15
18
"jsr:@atp/xrpc@~0.1.0-alpha.2": "0.1.0-alpha.2",
16
19
"jsr:@hono/hono@^4.10.7": "4.10.7",
···
39
42
"npm:jose@^6.1.3": "6.1.3",
40
43
"npm:mongoose@^8.20.1": "8.20.1",
41
44
"npm:multiformats@^13.4.1": "13.4.1",
45
45
+
"npm:p-queue@^8.1.1": "8.1.1",
42
46
"npm:rate-limiter-flexible@^2.4.2": "2.4.2",
43
47
"npm:zod@^4.1.11": "4.1.13"
44
48
},
···
88
92
"npm:zod"
89
93
]
90
94
},
95
95
+
"@atp/repo@0.1.0-alpha.2": {
96
96
+
"integrity": "6da50453bbd527a679237d15bc9569eb2195503189f9be9d3023060f3f89f44a",
97
97
+
"dependencies": [
98
98
+
"jsr:@atp/bytes",
99
99
+
"jsr:@atp/common@~0.1.0-alpha.4",
100
100
+
"jsr:@atp/crypto@~0.1.0-alpha.2",
101
101
+
"jsr:@atp/lexicon@~0.1.0-alpha.2",
102
102
+
"jsr:@std/encoding",
103
103
+
"npm:@ipld/dag-cbor",
104
104
+
"npm:multiformats",
105
105
+
"npm:zod"
106
106
+
]
107
107
+
},
108
108
+
"@atp/sync@0.1.0-alpha.4": {
109
109
+
"integrity": "9b6aa6ccc9447270843272e40bfcb26520eddaf37f98202bcbab6c0bee0a602b",
110
110
+
"dependencies": [
111
111
+
"jsr:@atp/common@~0.1.0-alpha.4",
112
112
+
"jsr:@atp/identity",
113
113
+
"jsr:@atp/lexicon@~0.1.0-alpha.2",
114
114
+
"jsr:@atp/repo",
115
115
+
"jsr:@atp/syntax@~0.1.0-alpha.1",
116
116
+
"jsr:@atp/xrpc-server@~0.1.0-alpha.2",
117
117
+
"npm:multiformats",
118
118
+
"npm:p-queue"
119
119
+
]
120
120
+
},
91
121
"@atp/syntax@0.1.0-alpha.1": {
92
122
"integrity": "9e2055cace77cf63a8c52a4a94c39492215e7135101db7bc2289ebad9bec1991"
93
123
},
···
232
262
"tslib"
233
263
]
234
264
},
265
265
+
"eventemitter3@5.0.1": {
266
266
+
"integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA=="
267
267
+
},
235
268
"jose@6.1.3": {
236
269
"integrity": "sha512-0TpaTfihd4QMNwrz/ob2Bp7X04yuxJkjRGi4aKmOqwhov54i6u79oCv7T+C7lo70MKH6BesI3vscD1yb/yzKXQ=="
237
270
},
···
283
316
"multiformats@13.4.1": {
284
317
"integrity": "sha512-VqO6OSvLrFVAYYjgsr8tyv62/rCQhPgsZUXLTqoFLSgdkgiUYKYeArbt1uWLlEpkjxQe+P0+sHlbPEte1Bi06Q=="
285
318
},
319
319
+
"p-queue@8.1.1": {
320
320
+
"integrity": "sha512-aNZ+VfjobsWryoiPnEApGGmf5WmNsCo9xu8dfaYamG5qaLP7ClhLN6NgsFe6SwJ2UbLEBK5dv9x8Mn5+RVhMWQ==",
321
321
+
"dependencies": [
322
322
+
"eventemitter3",
323
323
+
"p-timeout"
324
324
+
]
325
325
+
},
326
326
+
"p-timeout@6.1.4": {
327
327
+
"integrity": "sha512-MyIV3ZA/PmyBN/ud8vV9XzwTrNtR4jFrObymZYnZqMmW0zA8Z17vnT0rBgFE/TlohB+YCHqXMgZzb3Csp49vqg=="
328
328
+
},
286
329
"punycode@2.3.1": {
287
330
"integrity": "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg=="
288
331
},
···
329
372
"jsr:@atp/common@~0.1.0-alpha.4",
330
373
"jsr:@atp/identity@~0.1.0-alpha.1",
331
374
"jsr:@atp/lexicon@~0.1.0-alpha.2",
375
375
+
"jsr:@atp/sync@~0.1.0-alpha.4",
332
376
"jsr:@atp/syntax@~0.1.0-alpha.2",
333
377
"jsr:@atp/xrpc-server@~0.1.0-alpha.3",
334
378
"jsr:@hono/hono@^4.10.7",
+43
ingester/handlers/follow.ts
···
1
1
+
import { CollectionHandler } from "../types.ts";
2
2
+
import type { Record as FollowRecord } from "../../lex/types/so/sprk/graph/follow.ts";
3
3
+
4
4
+
/** Handler for so.sprk.graph.follow collection */
5
5
+
const followHandler: CollectionHandler = {
6
6
+
collection: "so.sprk.graph.follow",
7
7
+
8
8
+
handleInsert: async (ctx, evt) => {
9
9
+
const record = evt.record as FollowRecord;
10
10
+
const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`;
11
11
+
12
12
+
try {
13
13
+
await ctx.db.models.Follow.findOneAndUpdate(
14
14
+
{ uri },
15
15
+
{
16
16
+
uri,
17
17
+
cid: evt.cid,
18
18
+
authorDid: evt.did,
19
19
+
subject: record.subject,
20
20
+
createdAt: record.createdAt,
21
21
+
indexedAt: new Date().toISOString(),
22
22
+
},
23
23
+
{ upsert: true, new: true },
24
24
+
);
25
25
+
ctx.logger.debug("Indexed follow", { uri });
26
26
+
} catch (err) {
27
27
+
ctx.logger.error("Failed to index follow", { uri, error: err });
28
28
+
}
29
29
+
},
30
30
+
31
31
+
handleDelete: async (ctx, evt) => {
32
32
+
const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`;
33
33
+
34
34
+
try {
35
35
+
await ctx.db.models.Follow.deleteOne({ uri });
36
36
+
ctx.logger.debug("Deleted follow", { uri });
37
37
+
} catch (err) {
38
38
+
ctx.logger.error("Failed to delete follow", { uri, error: err });
39
39
+
}
40
40
+
},
41
41
+
};
42
42
+
43
43
+
export default followHandler;
+46
ingester/handlers/like.ts
···
1
1
+
import { CollectionHandler } from "../types.ts";
2
2
+
import type { Record as LikeRecord } from "../../lex/types/so/sprk/feed/like.ts";
3
3
+
4
4
+
/** Handler for so.sprk.feed.like collection */
5
5
+
const likeHandler: CollectionHandler = {
6
6
+
collection: "so.sprk.feed.like",
7
7
+
8
8
+
handleInsert: async (ctx, evt) => {
9
9
+
const record = evt.record as LikeRecord;
10
10
+
const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`;
11
11
+
12
12
+
try {
13
13
+
await ctx.db.models.Like.findOneAndUpdate(
14
14
+
{ uri },
15
15
+
{
16
16
+
uri,
17
17
+
cid: evt.cid,
18
18
+
authorDid: evt.did,
19
19
+
subject: record.subject.uri,
20
20
+
subjectCid: record.subject.cid,
21
21
+
via: record.via?.uri ?? null,
22
22
+
viaCid: record.via?.cid ?? null,
23
23
+
createdAt: record.createdAt,
24
24
+
indexedAt: new Date().toISOString(),
25
25
+
},
26
26
+
{ upsert: true, new: true },
27
27
+
);
28
28
+
ctx.logger.debug("Indexed like", { uri });
29
29
+
} catch (err) {
30
30
+
ctx.logger.error("Failed to index like", { uri, error: err });
31
31
+
}
32
32
+
},
33
33
+
34
34
+
handleDelete: async (ctx, evt) => {
35
35
+
const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`;
36
36
+
37
37
+
try {
38
38
+
await ctx.db.models.Like.deleteOne({ uri });
39
39
+
ctx.logger.debug("Deleted like", { uri });
40
40
+
} catch (err) {
41
41
+
ctx.logger.error("Failed to delete like", { uri, error: err });
42
42
+
}
43
43
+
},
44
44
+
};
45
45
+
46
46
+
export default likeHandler;
+50
ingester/handlers/post.ts
···
1
1
+
import { CollectionHandler } from "../types.ts";
2
2
+
import type { Record as PostRecord } from "../../lex/types/so/sprk/feed/post.ts";
3
3
+
4
4
+
/** Handler for so.sprk.feed.post collection */
5
5
+
const postHandler: CollectionHandler = {
6
6
+
collection: "so.sprk.feed.post",
7
7
+
8
8
+
handleInsert: async (ctx, evt) => {
9
9
+
const record = evt.record as PostRecord;
10
10
+
const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`;
11
11
+
12
12
+
try {
13
13
+
await ctx.db.models.Post.findOneAndUpdate(
14
14
+
{ uri },
15
15
+
{
16
16
+
uri,
17
17
+
cid: evt.cid,
18
18
+
authorDid: evt.did,
19
19
+
caption: record.caption,
20
20
+
media: record.media,
21
21
+
sound: record.sound,
22
22
+
langs: record.langs ?? [],
23
23
+
labels: (record.labels && "values" in record.labels)
24
24
+
? record.labels.values
25
25
+
: [],
26
26
+
tags: record.tags ?? [],
27
27
+
createdAt: record.createdAt,
28
28
+
indexedAt: new Date().toISOString(),
29
29
+
},
30
30
+
{ upsert: true, new: true },
31
31
+
);
32
32
+
ctx.logger.debug("Indexed post", { uri });
33
33
+
} catch (err) {
34
34
+
ctx.logger.error("Failed to index post", { uri, error: err });
35
35
+
}
36
36
+
},
37
37
+
38
38
+
handleDelete: async (ctx, evt) => {
39
39
+
const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`;
40
40
+
41
41
+
try {
42
42
+
await ctx.db.models.Post.deleteOne({ uri });
43
43
+
ctx.logger.debug("Deleted post", { uri });
44
44
+
} catch (err) {
45
45
+
ctx.logger.error("Failed to delete post", { uri, error: err });
46
46
+
}
47
47
+
},
48
48
+
};
49
49
+
50
50
+
export default postHandler;
+46
ingester/handlers/repost.ts
···
1
1
+
import { CollectionHandler } from "../types.ts";
2
2
+
import type { Record as RepostRecord } from "../../lex/types/so/sprk/feed/repost.ts";
3
3
+
4
4
+
/** Handler for so.sprk.feed.repost collection */
5
5
+
const repostHandler: CollectionHandler = {
6
6
+
collection: "so.sprk.feed.repost",
7
7
+
8
8
+
handleInsert: async (ctx, evt) => {
9
9
+
const record = evt.record as RepostRecord;
10
10
+
const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`;
11
11
+
12
12
+
try {
13
13
+
await ctx.db.models.Repost.findOneAndUpdate(
14
14
+
{ uri },
15
15
+
{
16
16
+
uri,
17
17
+
cid: evt.cid,
18
18
+
authorDid: evt.did,
19
19
+
subject: record.subject.uri,
20
20
+
subjectCid: record.subject.cid,
21
21
+
via: record.via?.uri ?? null,
22
22
+
viaCid: record.via?.cid ?? null,
23
23
+
createdAt: record.createdAt,
24
24
+
indexedAt: new Date().toISOString(),
25
25
+
},
26
26
+
{ upsert: true, new: true },
27
27
+
);
28
28
+
ctx.logger.debug("Indexed repost", { uri });
29
29
+
} catch (err) {
30
30
+
ctx.logger.error("Failed to index repost", { uri, error: err });
31
31
+
}
32
32
+
},
33
33
+
34
34
+
handleDelete: async (ctx, evt) => {
35
35
+
const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`;
36
36
+
37
37
+
try {
38
38
+
await ctx.db.models.Repost.deleteOne({ uri });
39
39
+
ctx.logger.debug("Deleted repost", { uri });
40
40
+
} catch (err) {
41
41
+
ctx.logger.error("Failed to delete repost", { uri, error: err });
42
42
+
}
43
43
+
},
44
44
+
};
45
45
+
46
46
+
export default repostHandler;
+66
ingester/index.ts
···
1
1
+
import { Firehose } from "@atp/sync";
2
2
+
import { IdResolver } from "@atp/identity";
3
3
+
import { getLogger, Logger } from "@logtape/logtape";
4
4
+
import { Database } from "../db/connection.ts";
5
5
+
import { CollectionHandler, HandlerContext } from "./types.ts";
6
6
+
7
7
+
// collections
8
8
+
import postHandler from "./handlers/post.ts";
9
9
+
// uncomment the following lines if you want to ingest likes, reposts, or follows
10
10
+
// import likeHandler from "./handlers/like.ts";
11
11
+
// import repostHandler from "./handlers/repost.ts"
12
12
+
// import followHandler from "./handlers/follow.ts"
13
13
+
14
14
+
export class Ingester {
15
15
+
idResolver: IdResolver;
16
16
+
logger: Logger;
17
17
+
firehose: Firehose;
18
18
+
db: Database;
19
19
+
handlers: Map<string, CollectionHandler>;
20
20
+
21
21
+
constructor(
22
22
+
db: Database,
23
23
+
handlers: CollectionHandler[] = [
24
24
+
postHandler,
25
25
+
// uncomment the following lines to ingest likes, reposts, or follows
26
26
+
// likeHandler
27
27
+
// repostHandler
28
28
+
// followHandler
29
29
+
],
30
30
+
idResolver?: IdResolver,
31
31
+
) {
32
32
+
this.logger = getLogger(["feedgen", "ingester"]);
33
33
+
this.db = db;
34
34
+
this.idResolver = idResolver ?? new IdResolver();
35
35
+
36
36
+
// Build handler map for O(1) lookup
37
37
+
this.handlers = new Map(handlers.map((h) => [h.collection, h]));
38
38
+
39
39
+
const ctx: HandlerContext = {
40
40
+
db: this.db,
41
41
+
logger: this.logger,
42
42
+
};
43
43
+
44
44
+
this.firehose = new Firehose({
45
45
+
idResolver: this.idResolver,
46
46
+
handleEvent: async (evt) => {
47
47
+
if (!("collection" in evt)) return;
48
48
+
49
49
+
const handler = this.handlers.get(evt.collection);
50
50
+
if (!handler) return;
51
51
+
52
52
+
if (evt.event === "create" && handler.handleInsert) {
53
53
+
await handler.handleInsert(ctx, evt);
54
54
+
} else if (evt.event === "update" && handler.handleInsert) {
55
55
+
await handler.handleInsert(ctx, evt);
56
56
+
} else if (evt.event === "delete" && handler.handleDelete) {
57
57
+
await handler.handleDelete(ctx, evt);
58
58
+
}
59
59
+
},
60
60
+
onError: (err) => {
61
61
+
this.logger.error("Firehose error", { error: err });
62
62
+
},
63
63
+
filterCollections: handlers.map((h) => h.collection),
64
64
+
});
65
65
+
}
66
66
+
}
+22
ingester/types.ts
···
1
1
+
import { Event } from "@atp/sync";
2
2
+
import { Logger } from "@logtape/logtape";
3
3
+
import { Database } from "../db/connection.ts";
4
4
+
5
5
+
/** Context passed to collection handlers */
6
6
+
export interface HandlerContext {
7
7
+
db: Database;
8
8
+
logger: Logger;
9
9
+
}
10
10
+
11
11
+
/** Handler for a specific collection's events */
12
12
+
export interface CollectionHandler {
13
13
+
collection: string;
14
14
+
handleInsert?: (
15
15
+
ctx: HandlerContext,
16
16
+
evt: Event & { event: "create" | "update" },
17
17
+
) => Promise<void>;
18
18
+
handleDelete?: (
19
19
+
ctx: HandlerContext,
20
20
+
evt: Event & { event: "delete" },
21
21
+
) => Promise<void>;
22
22
+
}
+5
-3
main.ts
···
11
11
import getFeedSkeleton from "./api/getFeedSkeleton.ts";
12
12
import wellKnown from "./api/well-known.ts";
13
13
import health from "./api/health.ts";
14
14
+
import { Ingester } from "./ingester/index.ts";
14
15
15
16
await configure({
16
17
sinks: {
···
94
95
Deno.exit(1);
95
96
}
96
97
98
98
+
const ingester = new Ingester(db);
99
99
+
ingester.firehose.start();
100
100
+
97
101
const { SPRK_HOST, SPRK_PORT } = env;
98
102
Deno.serve({
99
103
hostname: SPRK_HOST,
100
104
port: SPRK_PORT,
101
105
onListen: (info) => {
102
102
-
logger.info(`Server listening on ${info.hostname}:${info.port}`);
106
106
+
logger.info(`Server listening on http://${info.hostname}:${info.port}`);
103
107
},
104
108
}, app.fetch);
105
109
···
125
129
if (import.meta.main) {
126
130
startServer();
127
131
}
128
128
-
129
129
-
export default app;
+1
-1
utils/env.ts
···
8
8
9
9
SPRK_FEEDGEN_DOMAIN: envStr("FEEDGEN_DOMAIN"),
10
10
SPRK_DB_NAME: envStr("SPRK_DB_NAME"),
11
11
-
SPRK_DB_URI: envStr("SPRK_DB_URI") ?? "mongo://localhost:27017",
11
11
+
SPRK_DB_URI: envStr("SPRK_DB_URI") ?? "mongodb://localhost:27017",
12
12
SPRK_DB_USER: envStr("SPRK_DB_USER"),
13
13
SPRK_DB_PASS: envStr("SPRK_DB_PASS"),
14
14