social bookmarking for atproto
1/*
2 * clippr: a social bookmarking service for the AT Protocol
3 * Copyright (c) 2025 clippr contributors.
4 * SPDX-License-Identifier: AGPL-3.0-only
5 */
6
7import { Jetstream } from "@skyware/jetstream";
8import { Config } from "../config.js";
9import { handleClip, handleProfile, handleTag } from "./commit.js";
10import Logger from "../logger.js";
11
12const config = Config.getInstance().getConfig();
13const hostname = config.network.firehose;
14
15const jetstream = new Jetstream({
16 endpoint: `wss://${hostname}/subscribe`,
17 wantedCollections: ["social.clippr.*"],
18});
19
20export function startFirehose() {
21 jetstream.start();
22}
23
24export function stopFirehose() {
25 jetstream.close();
26}
27
28export function readFromFirehose() {
29 jetstream.on("commit", (e) => {
30 switch (e.commit.collection) {
31 case "social.clippr.feed.clip":
32 handleClip(e);
33 break;
34 case "social.clippr.feed.tag":
35 handleTag(e);
36 break;
37 case "social.clippr.actor.profile":
38 handleProfile(e);
39 break;
40 default:
41 Logger.verbose(
42 `Commit for ${e.commit.collection} is not relevant, dropping`,
43 );
44 break;
45 }
46 });
47
48 jetstream.on("account", (e) => {
49 Logger.debug(`Received account update for ${e.account.did}`);
50 });
51
52 jetstream.on("identity", (e) => {
53 Logger.debug(`Received identity update for ${e.identity.did}`);
54 });
55
56 jetstream.on("error", (e) => {
57 Logger.warn(`Error while reading from firehose: ${e.message}`);
58 });
59}