import { Firehose } from "@atp/sync"; import { IdResolver } from "@atp/identity"; import { getLogger, Logger } from "@logtape/logtape"; import { Database } from "../db/connection.ts"; import { CollectionHandler, HandlerContext } from "./types.ts"; // collections import postHandler from "./handlers/post.ts"; // uncomment the following lines if you want to ingest likes, reposts, or follows // import likeHandler from "./handlers/like.ts"; // import repostHandler from "./handlers/repost.ts" // import followHandler from "./handlers/follow.ts" export class Ingester { idResolver: IdResolver; logger: Logger; firehose: Firehose; db: Database; handlers: Map; constructor( db: Database, handlers: CollectionHandler[] = [ postHandler, // uncomment the following lines to ingest likes, reposts, or follows // likeHandler // repostHandler // followHandler ], idResolver?: IdResolver, ) { this.logger = getLogger(["feedgen", "ingester"]); this.db = db; this.idResolver = idResolver ?? new IdResolver(); // Build handler map for O(1) lookup this.handlers = new Map(handlers.map((h) => [h.collection, h])); const ctx: HandlerContext = { db: this.db, logger: this.logger, }; this.firehose = new Firehose({ idResolver: this.idResolver, handleEvent: async (evt) => { if (!("collection" in evt)) return; const handler = this.handlers.get(evt.collection); if (!handler) return; if (evt.event === "create" && handler.handleInsert) { await handler.handleInsert(ctx, evt); } else if (evt.event === "update" && handler.handleInsert) { await handler.handleInsert(ctx, evt); } else if (evt.event === "delete" && handler.handleDelete) { await handler.handleDelete(ctx, evt); } }, onError: (err) => { this.logger.error("Firehose error", { error: err }); }, filterCollections: handlers.map((h) => h.collection), }); } }