Spark feed generator template
1import { Firehose } from "@atp/sync";
2import { IdResolver } from "@atp/identity";
3import { getLogger, Logger } from "@logtape/logtape";
4import { Database } from "../db/connection.ts";
5import { CollectionHandler, HandlerContext } from "./types.ts";
6
7// collections
8import postHandler from "./handlers/post.ts";
9// uncomment the following lines if you want to ingest likes, reposts, or follows
10// import likeHandler from "./handlers/like.ts";
11// import repostHandler from "./handlers/repost.ts"
12// import followHandler from "./handlers/follow.ts"
13
14export class Ingester {
15 idResolver: IdResolver;
16 logger: Logger;
17 firehose: Firehose;
18 db: Database;
19 handlers: Map<string, CollectionHandler>;
20
21 constructor(
22 db: Database,
23 handlers: CollectionHandler[] = [
24 postHandler,
25 // uncomment the following lines to ingest likes, reposts, or follows
26 // likeHandler
27 // repostHandler
28 // followHandler
29 ],
30 idResolver?: IdResolver,
31 ) {
32 this.logger = getLogger(["feedgen", "ingester"]);
33 this.db = db;
34 this.idResolver = idResolver ?? new IdResolver();
35
36 // Build handler map for O(1) lookup
37 this.handlers = new Map(handlers.map((h) => [h.collection, h]));
38
39 const ctx: HandlerContext = {
40 db: this.db,
41 logger: this.logger,
42 };
43
44 this.firehose = new Firehose({
45 idResolver: this.idResolver,
46 handleEvent: async (evt) => {
47 if (!("collection" in evt)) return;
48
49 const handler = this.handlers.get(evt.collection);
50 if (!handler) return;
51
52 if (evt.event === "create" && handler.handleInsert) {
53 await handler.handleInsert(ctx, evt);
54 } else if (evt.event === "update" && handler.handleInsert) {
55 await handler.handleInsert(ctx, evt);
56 } else if (evt.event === "delete" && handler.handleDelete) {
57 await handler.handleDelete(ctx, evt);
58 }
59 },
60 onError: (err) => {
61 this.logger.error("Firehose error", { error: err });
62 },
63 filterCollections: handlers.map((h) => h.collection),
64 });
65 }
66}