/* * clippr: a social bookmarking service for the AT Protocol * Copyright (c) 2025 clippr contributors. * SPDX-License-Identifier: AGPL-3.0-only */ import { Jetstream } from "@skyware/jetstream"; import { Config } from "../config.js"; import { handleClip, handleProfile, handleTag } from "./commit.js"; import Logger from "../logger.js"; const config = Config.getInstance().getConfig(); const hostname = config.network.firehose; const jetstream = new Jetstream({ endpoint: `wss://${hostname}/subscribe`, wantedCollections: ["social.clippr.*"], }); export function startFirehose() { jetstream.start(); } export function stopFirehose() { jetstream.close(); } export function readFromFirehose() { jetstream.on("commit", (e) => { switch (e.commit.collection) { case "social.clippr.feed.clip": handleClip(e); break; case "social.clippr.feed.tag": handleTag(e); break; case "social.clippr.actor.profile": handleProfile(e); break; default: Logger.verbose( `Commit for ${e.commit.collection} is not relevant, dropping`, ); break; } }); jetstream.on("account", (e) => { Logger.debug(`Received account update for ${e.account.did}`); }); jetstream.on("identity", (e) => { Logger.debug(`Received identity update for ${e.identity.did}`); }); jetstream.on("error", (e) => { Logger.warn(`Error while reading from firehose: ${e.message}`); }); }