# Real-Time Architecture: Firehose → SSE → HTMX *Deep-dive research conducted 2026-02-07* --- ## The Opportunity Every traditional forum (phpBB, Discourse, Flarum) works on a request-response model. You load a topic page. It's a snapshot. If someone replies while you're reading, you don't see it until you manually refresh. Discourse has real-time features via WebSockets, but it's a bolt-on — a separate infrastructure layer they built on top of their standard Rails app. atBB is different. The AT Protocol *already has* a real-time event stream (the firehose/Jetstream). The AppView *already needs* to subscribe to it for indexing. The web UI *already uses* HTMX, which has declarative SSE support. The entire pipeline exists — it just needs to be connected. **The data flow:** ``` User writes post AT Proto Jetstream atBB Browser to their PDS ───▶ Relay ───▶ (JSON WS) ───▶ AppView ───▶ (HTMX SSE) indexes + broadcasts ``` End-to-end latency: ~1–2 seconds from PDS write to browser update. That's fast enough to feel "live" without feeling like chat. --- ## Architecture Overview ### Three Layers ``` ┌─────────────────────────────────────────────────────┐ │ Browser (HTMX) │ │ │ │ ┌────────────┐ ┌──────────┐ ┌─────────────────┐ │ │ │ Topic View │ │ Category │ │ Notification │ │ │ │ sse-swap= │ │ View │ │ Badge │ │ │ │ "newReply" │ │ sse-swap=│ │ sse-swap= │ │ │ │ │ │ "newTopic│ │ "notification" │ │ │ └─────┬──────┘ └────┬─────┘ └───────┬─────────┘ │ │ │ │ │ │ │ └──────────────┴────────────────┘ │ │ │ SSE │ └───────────────────────┼──────────────────────────────┘ │ ┌───────────────────────┼──────────────────────────────┐ │ @atbb/web (Hono) │ │ │ │ │ GET /sse/thread/:id ──────┐ │ │ GET /sse/category/:id ────┤ SSE │ │ GET /sse/global ──────────┤ Endpoints │ │ │ │ │ ┌─────────────────────────┘ │ │ │ Subscribe to AppView event bus │ │ │ Render HTML fragments │ │ │ Stream as SSE events │ └────────────┼─────────────────────────────────────────┘ │ ┌────────────┼─────────────────────────────────────────┐ │ │ @atbb/appview (Hono) │ │ │ │ │ ┌─────────▼──────────┐ ┌────────────────────────┐ │ │ │ Event Bus │ │ Jetstream Consumer │ │ │ │ (in-process) │◀──│ │ │ │ │ │ │ space.atbb.* │ │ │ │ topic:abc → │ │ filter + index │ │ │ │ [subscriber1] │ │ │ │ │ │ [subscriber2] │ │ Cursor persistence │ │ │ │ category:xyz → │ │ Reconnection logic │ │ │ │ [subscriber3] │ └────────────┬───────────┘ │ │ └────────────────────┘ │ │ │ │ │ │ ┌─────────────────────────────────────▼───────────┐ │ │ │ PostgreSQL │ │ │ │ posts | categories | users | memberships │ │ │ └─────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────┘ │ │ WebSocket (JSON) ▼ ┌──────────────────────────────────────────────────────┐ │ Jetstream │ │ wss://jetstream2.us-east.bsky.network/subscribe │ │ ?wantedCollections=space.atbb.* │ └──────────────────────────────────────────────────────┘ ``` ### Why SSE Over WebSocket for the Browser Connection | Factor | SSE | WebSocket | |--------|-----|-----------| | Direction | Server → Client only | Bidirectional | | Forum fit | Perfect — users read far more than they write | Overkill | | User writes | Standard HTMX POST/PUT (already works) | Would need `ws-send` | | Infrastructure | Works through all proxies, CDNs, load balancers | Needs sticky sessions, special proxy config | | Reconnection | Browser `EventSource` auto-reconnects natively | Extension handles it | | Graceful degradation | If SSE breaks, forum still works as normal HTTP | Same | | HTMX integration | `sse-swap` maps events to DOM targets declaratively | OOB swap by ID only | | HTTP/2 concern | Uses one connection per stream (H/2 multiplexes) | Separate TCP connection | **Forum interactions are fundamentally asymmetric.** Users spend 95% of their time reading. SSE handles the high-volume server→client push (new replies, presence, typing indicators). Standard HTMX POST handles the low-volume client→server actions (submitting replies, reacting). WebSocket's bidirectionality is wasted here. --- ## Layer 1: Jetstream Consumer (AppView) ### Connection Setup Use `@skyware/jetstream` for MVP — it provides typed JSON events with cursor management. The full `@atproto/sync` firehose (CBOR + signatures) is available for post-MVP hardening. ```typescript // packages/appview/src/firehose/consumer.ts import { Jetstream } from "@skyware/jetstream"; import type { EventEmitter } from "node:events"; interface FirehoseConsumer { start(): void; stop(): void; events: EventEmitter; // broadcast bus } function createFirehoseConsumer(db: Database): FirehoseConsumer { const events = new EventEmitter(); const jetstream = new Jetstream({ endpoint: "wss://jetstream2.us-east.bsky.network/subscribe", wantedCollections: ["space.atbb.*"], cursor: loadCursorFromDb(db), // microsecond timestamp }); // Index new posts and broadcast for SSE jetstream.onCreate("space.atbb.post", async (event) => { const { did, commit, time_us } = event; const post = await indexPost(db, did, commit.rkey, commit.cid, commit.record); saveCursor(db, time_us); // Determine broadcast channel if (post.rootPostId) { // It's a reply — broadcast to the thread channel events.emit(`topic:${post.rootPostId}`, { type: "newReply", post }); } else { // It's a new topic — broadcast to the category channel // NOTE: Current schema has `forumUri` not `categoryId`. To route // "new topic in category X" events, need to either: (a) resolve // category from forum metadata, or (b) add categoryUri to posts table. // For now, broadcast to forum-level channel: events.emit(`forum:${post.forumUri}`, { type: "newTopic", post }); } // Always broadcast to global (for notification badges, etc.) events.emit("global", { type: "newPost", post }); }); jetstream.onDelete("space.atbb.post", async (event) => { const { did, commit, time_us } = event; const post = await softDeletePost(db, did, commit.rkey); saveCursor(db, time_us); if (post) { events.emit(`topic:${post.rootPostId}`, { type: "postDeleted", post }); } }); // Index other record types similarly... jetstream.onCreate("space.atbb.forum.category", async (event) => { await indexCategory(db, event.did, event.commit.rkey, event.commit.record); saveCursor(db, event.time_us); events.emit("global", { type: "categoryUpdate" }); }); jetstream.onCreate("space.atbb.reaction", async (event) => { const reaction = await indexReaction(db, event.did, event.commit); saveCursor(db, event.time_us); events.emit(`topic:${reaction.topicId}`, { type: "newReaction", reaction }); }); return { start: () => jetstream.start(), stop: () => jetstream.close(), events, }; } ``` ### Jetstream Filtering Jetstream supports NSID prefix wildcards: ``` ?wantedCollections=space.atbb.* ``` This catches `space.atbb.post`, `space.atbb.forum.forum`, `space.atbb.forum.category`, `space.atbb.membership`, `space.atbb.reaction`, `space.atbb.modAction` — everything in the atBB namespace. **Note:** Wildcard syntax is supported as long as the prefix (`space.atbb`) passes NSID validation, which it does. Jetstream allows up to 100 collection filters per connection. If wildcard filtering proves problematic in practice, the seven collections can be enumerated explicitly. ### Cursor Management Jetstream events have a `time_us` field (Unix microseconds). Persist this as a cursor: ```typescript // Save every 100 events (not every event — that would hammer the DB) let eventsSinceSave = 0; function saveCursor(db: Database, cursor: number) { eventsSinceSave++; if (eventsSinceSave >= 100) { db.execute("UPDATE firehose_state SET cursor = $1", [cursor]); eventsSinceSave = 0; } } // On reconnect, rewind 5 seconds for safety (process events idempotently) function loadCursorFromDb(db: Database): number | undefined { const row = db.queryOne("SELECT cursor FROM firehose_state"); if (!row?.cursor) return undefined; return row.cursor - 5_000_000; // 5 seconds in microseconds } ``` **Backfill window:** Jetstream retains ~24 hours of events. If the AppView is offline longer, fall back to `com.atproto.sync.getRepo` for known DIDs. --- ## Layer 2: Event Bus → SSE Endpoints (Web Package) The web package subscribes to the AppView's event bus and renders HTML fragments streamed to the browser. ### Option A: Internal Event Bus (Single-Process) If appview and web run in the same process (or web calls an appview SSE endpoint): ```typescript // packages/appview/src/routes/events.ts import { Hono } from "hono"; import { streamSSE } from "hono/streaming"; const app = new Hono(); // SSE endpoint for a specific thread app.get("/api/events/topic/:id", async (c) => { const topicId = c.req.param("id"); return streamSSE(c, async (stream) => { // Send initial connection confirmation await stream.writeSSE({ event: "connected", data: "ok" }); // Heartbeat to prevent proxy timeouts const heartbeat = setInterval(async () => { await stream.writeSSE({ event: "heartbeat", data: "" }); }, 30_000); // Subscribe to topic events const handler = async (event: ForumEvent) => { // Hono's streamSSE accepts JSX directly in the data field await stream.writeSSE({ event: event.type, // "newReply", "newReaction", "postDeleted" data: renderEventComponent(event), // Returns JSX element }); }; firehoseConsumer.events.on(`topic:${topicId}`, handler); // Cleanup on disconnect stream.onAbort(() => { clearInterval(heartbeat); firehoseConsumer.events.off(`topic:${topicId}`, handler); }); }); }); // SSE endpoint for a category (new topics) app.get("/api/events/category/:id", async (c) => { const categoryId = c.req.param("id"); return streamSSE(c, async (stream) => { const heartbeat = setInterval(async () => { await stream.writeSSE({ event: "heartbeat", data: "" }); }, 30_000); const handler = async (event: ForumEvent) => { await stream.writeSSE({ event: event.type, data: renderEventComponent(event) }); }; firehoseConsumer.events.on(`category:${categoryId}`, handler); stream.onAbort(() => { clearInterval(heartbeat); firehoseConsumer.events.off(`category:${categoryId}`, handler); }); }); }); ``` ### Option B: AppView Exposes SSE, Web Proxies It If appview and web are separate processes, the web package can either: 1. Proxy the SSE stream from appview directly 2. Consume appview SSE internally and re-emit with HTML rendering Option 1 is simpler — the appview SSE endpoint returns HTML fragments, and the web package's JSX templates include `sse-connect` pointing at the appview. ### HTML Fragment Rendering The key insight: SSE events carry **pre-rendered HTML fragments**, not JSON. This is what makes HTMX SSE zero custom client-side JS. ```typescript // packages/web/src/components/ReplyCard.tsx import type { FC } from "hono/jsx"; interface ReplyCardProps { author: string; authorDid: string; text: string; createdAt: string; replyCount?: number; } export const ReplyCard: FC = (props) => (
{props.author}
{props.text}
); // Render JSX component for SSE function renderReplyComponent(post: IndexedPost) { // Hono's streamSSE accepts JSX directly — no manual string conversion needed return ; // Alternative if string needed: component.toString() } ``` --- ## Layer 3: HTMX SSE in the Browser ### Topic View (Thread Page) ```tsx // packages/web/src/routes/topic.tsx export const TopicView: FC = ({ topic, replies }) => ( {/* SSE connection scoped to this thread */}
{/* Thread header */}

{topic.title}

{topic.text}
{/* Reply list — new replies streamed in at the bottom */}
{replies.map((reply) => ( ))} {/* This target receives new replies via SSE */}
{/* Reaction updates swap into specific post elements via OOB */} {/* (The SSE "newReaction" event sends OOB-targeted HTML) */} {/* Reply count badge — updated in real-time */} {replies.length} replies {/* Typing indicator */}
{/* Reply form — standard HTMX POST (not SSE) */}
); ``` ### Category View (Topic List) ```tsx export const CategoryView: FC = ({ category, topics }) => (

{category.name}

{category.description}

{topics.map((topic) => )}
Topic Author Replies Last Post
); ``` ### Global Notification Badge This could be placed in the base layout so it works on every page: ```tsx // packages/web/src/layouts/base.tsx export const BaseLayout: FC = (props) => (