WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto

Real-Time Architecture: Firehose → SSE → HTMX#

Deep-dive research conducted 2026-02-07


The Opportunity#

Every traditional forum (phpBB, Discourse, Flarum) works on a request-response model. You load a topic page. It's a snapshot. If someone replies while you're reading, you don't see it until you manually refresh. Discourse has real-time features via WebSockets, but it's a bolt-on — a separate infrastructure layer they built on top of their standard Rails app.

atBB is different. The AT Protocol already has a real-time event stream (the firehose/Jetstream). The AppView already needs to subscribe to it for indexing. The web UI already uses HTMX, which has declarative SSE support. The entire pipeline exists — it just needs to be connected.

The data flow:

User writes post    AT Proto      Jetstream       atBB           Browser
to their PDS  ───▶  Relay    ───▶ (JSON WS)  ───▶ AppView  ───▶ (HTMX SSE)
                                                  indexes +
                                                  broadcasts

End-to-end latency: ~1–2 seconds from PDS write to browser update. That's fast enough to feel "live" without feeling like chat.


Architecture Overview#

Three Layers#

┌─────────────────────────────────────────────────────┐
│                    Browser (HTMX)                    │
│                                                      │
│  ┌────────────┐  ┌──────────┐  ┌─────────────────┐  │
│  │ Topic View │  │ Category │  │ Notification    │  │
│  │ sse-swap=  │  │ View     │  │ Badge           │  │
│  │ "newReply" │  │ sse-swap=│  │ sse-swap=       │  │
│  │            │  │ "newTopic│  │ "notification"  │  │
│  └─────┬──────┘  └────┬─────┘  └───────┬─────────┘  │
│        │              │                │             │
│        └──────────────┴────────────────┘             │
│                       │ SSE                          │
└───────────────────────┼──────────────────────────────┘
                        │
┌───────────────────────┼──────────────────────────────┐
│              @atbb/web (Hono)                        │
│                       │                              │
│            GET /sse/thread/:id ──────┐               │
│            GET /sse/category/:id ────┤  SSE          │
│            GET /sse/global ──────────┤  Endpoints    │
│                                      │               │
│            ┌─────────────────────────┘               │
│            │ Subscribe to AppView event bus          │
│            │ Render HTML fragments                   │
│            │ Stream as SSE events                    │
└────────────┼─────────────────────────────────────────┘
             │
┌────────────┼─────────────────────────────────────────┐
│            │     @atbb/appview (Hono)                │
│            │                                         │
│  ┌─────────▼──────────┐   ┌────────────────────────┐ │
│  │   Event Bus        │   │   Jetstream Consumer   │ │
│  │   (in-process)     │◀──│                        │ │
│  │                    │   │   space.atbb.*          │ │
│  │   topic:abc →      │   │   filter + index       │ │
│  │     [subscriber1]  │   │                        │ │
│  │     [subscriber2]  │   │   Cursor persistence   │ │
│  │   category:xyz →   │   │   Reconnection logic   │ │
│  │     [subscriber3]  │   └────────────┬───────────┘ │
│  └────────────────────┘                │             │
│                                        │             │
│  ┌─────────────────────────────────────▼───────────┐ │
│  │              PostgreSQL                         │ │
│  │   posts | categories | users | memberships      │ │
│  └─────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────┘
             │
             │ WebSocket (JSON)
             ▼
┌──────────────────────────────────────────────────────┐
│              Jetstream                               │
│   wss://jetstream2.us-east.bsky.network/subscribe    │
│   ?wantedCollections=space.atbb.*                    │
└──────────────────────────────────────────────────────┘

Why SSE Over WebSocket for the Browser Connection#

Factor SSE WebSocket
Direction Server → Client only Bidirectional
Forum fit Perfect — users read far more than they write Overkill
User writes Standard HTMX POST/PUT (already works) Would need ws-send
Infrastructure Works through all proxies, CDNs, load balancers Needs sticky sessions, special proxy config
Reconnection Browser EventSource auto-reconnects natively Extension handles it
Graceful degradation If SSE breaks, forum still works as normal HTTP Same
HTMX integration sse-swap maps events to DOM targets declaratively OOB swap by ID only
HTTP/2 concern Uses one connection per stream (H/2 multiplexes) Separate TCP connection

Forum interactions are fundamentally asymmetric. Users spend 95% of their time reading. SSE handles the high-volume server→client push (new replies, presence, typing indicators). Standard HTMX POST handles the low-volume client→server actions (submitting replies, reacting). WebSocket's bidirectionality is wasted here.


Layer 1: Jetstream Consumer (AppView)#

Connection Setup#

Use @skyware/jetstream for MVP — it provides typed JSON events with cursor management. The full @atproto/sync firehose (CBOR + signatures) is available for post-MVP hardening.

// packages/appview/src/firehose/consumer.ts

import { Jetstream } from "@skyware/jetstream";
import type { EventEmitter } from "node:events";

interface FirehoseConsumer {
  start(): void;
  stop(): void;
  events: EventEmitter; // broadcast bus
}

function createFirehoseConsumer(db: Database): FirehoseConsumer {
  const events = new EventEmitter();

  const jetstream = new Jetstream({
    endpoint: "wss://jetstream2.us-east.bsky.network/subscribe",
    wantedCollections: ["space.atbb.*"],
    cursor: loadCursorFromDb(db),  // microsecond timestamp
  });

  // Index new posts and broadcast for SSE
  jetstream.onCreate("space.atbb.post", async (event) => {
    const { did, commit, time_us } = event;
    const post = await indexPost(db, did, commit.rkey, commit.cid, commit.record);
    saveCursor(db, time_us);

    // Determine broadcast channel
    if (post.rootPostId) {
      // It's a reply — broadcast to the thread channel
      events.emit(`topic:${post.rootPostId}`, { type: "newReply", post });
    } else {
      // It's a new topic — broadcast to the category channel
      // NOTE: Current schema has `forumUri` not `categoryId`. To route
      // "new topic in category X" events, need to either: (a) resolve
      // category from forum metadata, or (b) add categoryUri to posts table.
      // For now, broadcast to forum-level channel:
      events.emit(`forum:${post.forumUri}`, { type: "newTopic", post });
    }

    // Always broadcast to global (for notification badges, etc.)
    events.emit("global", { type: "newPost", post });
  });

  jetstream.onDelete("space.atbb.post", async (event) => {
    const { did, commit, time_us } = event;
    const post = await softDeletePost(db, did, commit.rkey);
    saveCursor(db, time_us);

    if (post) {
      events.emit(`topic:${post.rootPostId}`, { type: "postDeleted", post });
    }
  });

  // Index other record types similarly...
  jetstream.onCreate("space.atbb.forum.category", async (event) => {
    await indexCategory(db, event.did, event.commit.rkey, event.commit.record);
    saveCursor(db, event.time_us);
    events.emit("global", { type: "categoryUpdate" });
  });

  jetstream.onCreate("space.atbb.reaction", async (event) => {
    const reaction = await indexReaction(db, event.did, event.commit);
    saveCursor(db, event.time_us);
    events.emit(`topic:${reaction.topicId}`, { type: "newReaction", reaction });
  });

  return {
    start: () => jetstream.start(),
    stop: () => jetstream.close(),
    events,
  };
}

Jetstream Filtering#

Jetstream supports NSID prefix wildcards:

?wantedCollections=space.atbb.*

This catches space.atbb.post, space.atbb.forum.forum, space.atbb.forum.category, space.atbb.membership, space.atbb.reaction, space.atbb.modAction — everything in the atBB namespace.

Note: Wildcard syntax is supported as long as the prefix (space.atbb) passes NSID validation, which it does. Jetstream allows up to 100 collection filters per connection. If wildcard filtering proves problematic in practice, the seven collections can be enumerated explicitly.

Cursor Management#

Jetstream events have a time_us field (Unix microseconds). Persist this as a cursor:

// Save every 100 events (not every event — that would hammer the DB)
let eventsSinceSave = 0;
function saveCursor(db: Database, cursor: number) {
  eventsSinceSave++;
  if (eventsSinceSave >= 100) {
    db.execute("UPDATE firehose_state SET cursor = $1", [cursor]);
    eventsSinceSave = 0;
  }
}

// On reconnect, rewind 5 seconds for safety (process events idempotently)
function loadCursorFromDb(db: Database): number | undefined {
  const row = db.queryOne("SELECT cursor FROM firehose_state");
  if (!row?.cursor) return undefined;
  return row.cursor - 5_000_000; // 5 seconds in microseconds
}

Backfill window: Jetstream retains ~24 hours of events. If the AppView is offline longer, fall back to com.atproto.sync.getRepo for known DIDs.


Layer 2: Event Bus → SSE Endpoints (Web Package)#

The web package subscribes to the AppView's event bus and renders HTML fragments streamed to the browser.

Option A: Internal Event Bus (Single-Process)#

If appview and web run in the same process (or web calls an appview SSE endpoint):

// packages/appview/src/routes/events.ts

import { Hono } from "hono";
import { streamSSE } from "hono/streaming";

const app = new Hono();

// SSE endpoint for a specific thread
app.get("/api/events/topic/:id", async (c) => {
  const topicId = c.req.param("id");

  return streamSSE(c, async (stream) => {
    // Send initial connection confirmation
    await stream.writeSSE({ event: "connected", data: "ok" });

    // Heartbeat to prevent proxy timeouts
    const heartbeat = setInterval(async () => {
      await stream.writeSSE({ event: "heartbeat", data: "" });
    }, 30_000);

    // Subscribe to topic events
    const handler = async (event: ForumEvent) => {
      // Hono's streamSSE accepts JSX directly in the data field
      await stream.writeSSE({
        event: event.type,  // "newReply", "newReaction", "postDeleted"
        data: renderEventComponent(event), // Returns JSX element
      });
    };

    firehoseConsumer.events.on(`topic:${topicId}`, handler);

    // Cleanup on disconnect
    stream.onAbort(() => {
      clearInterval(heartbeat);
      firehoseConsumer.events.off(`topic:${topicId}`, handler);
    });
  });
});

// SSE endpoint for a category (new topics)
app.get("/api/events/category/:id", async (c) => {
  const categoryId = c.req.param("id");

  return streamSSE(c, async (stream) => {
    const heartbeat = setInterval(async () => {
      await stream.writeSSE({ event: "heartbeat", data: "" });
    }, 30_000);

    const handler = async (event: ForumEvent) => {
      await stream.writeSSE({
        event: event.type,
        data: renderEventComponent(event)
      });
    };

    firehoseConsumer.events.on(`category:${categoryId}`, handler);

    stream.onAbort(() => {
      clearInterval(heartbeat);
      firehoseConsumer.events.off(`category:${categoryId}`, handler);
    });
  });
});

Option B: AppView Exposes SSE, Web Proxies It#

If appview and web are separate processes, the web package can either:

  1. Proxy the SSE stream from appview directly
  2. Consume appview SSE internally and re-emit with HTML rendering

Option 1 is simpler — the appview SSE endpoint returns HTML fragments, and the web package's JSX templates include sse-connect pointing at the appview.

HTML Fragment Rendering#

The key insight: SSE events carry pre-rendered HTML fragments, not JSON. This is what makes HTMX SSE zero custom client-side JS.

// packages/web/src/components/ReplyCard.tsx

import type { FC } from "hono/jsx";

interface ReplyCardProps {
  author: string;
  authorDid: string;
  text: string;
  createdAt: string;
  replyCount?: number;
}

export const ReplyCard: FC<ReplyCardProps> = (props) => (
  <article class="reply" id={`reply-${props.authorDid}-${props.createdAt}`}>
    <header class="reply-meta">
      <a href={`/user/${props.authorDid}`} class="reply-author">
        {props.author}
      </a>
      <time datetime={props.createdAt}>
        {new Date(props.createdAt).toLocaleString()}
      </time>
    </header>
    <div class="reply-body">
      {props.text}
    </div>
  </article>
);

// Render JSX component for SSE
function renderReplyComponent(post: IndexedPost) {
  // Hono's streamSSE accepts JSX directly — no manual string conversion needed
  return <ReplyCard
    author={post.authorHandle}
    authorDid={post.authorDid}
    text={post.text}
    createdAt={post.createdAt}
  />;
  // Alternative if string needed: component.toString()
}

Layer 3: HTMX SSE in the Browser#

Topic View (Thread Page)#

// packages/web/src/routes/topic.tsx

export const TopicView: FC<TopicViewProps> = ({ topic, replies }) => (
  <BaseLayout title={topic.title}>
    {/* SSE connection scoped to this thread */}
    <div hx-ext="sse" sse-connect={`/api/events/topic/${topic.id}`}>

      {/* Thread header */}
      <article class="topic-op">
        <h1>{topic.title}</h1>
        <div class="post-body">{topic.text}</div>
        <div class="post-meta">
          by <a href={`/user/${topic.authorDid}`}>{topic.author}</a>
          {" · "}
          <time datetime={topic.createdAt}>{topic.createdAt}</time>
        </div>
      </article>

      {/* Reply list — new replies streamed in at the bottom */}
      <section id="replies">
        {replies.map((reply) => (
          <ReplyCard {...reply} />
        ))}

        {/* This target receives new replies via SSE */}
        <div sse-swap="newReply" hx-swap="beforebegin"></div>
      </section>

      {/* Reaction updates swap into specific post elements via OOB */}
      {/* (The SSE "newReaction" event sends OOB-targeted HTML) */}

      {/* Reply count badge — updated in real-time */}
      <span id="reply-count" sse-swap="replyCount">
        {replies.length} replies
      </span>

      {/* Typing indicator */}
      <div id="typing-indicator" sse-swap="typing"></div>

    </div>

    {/* Reply form — standard HTMX POST (not SSE) */}
    <form hx-post={`/api/topics/${topic.id}/reply`}
          hx-swap="none"
          hx-on::after-request="this.reset()">
      <textarea name="text" placeholder="Write a reply..."
                required minlength="1" maxlength="3000"></textarea>
      <button type="submit">Reply</button>
    </form>
  </BaseLayout>
);

Category View (Topic List)#

export const CategoryView: FC<CategoryViewProps> = ({ category, topics }) => (
  <BaseLayout title={category.name}>
    <div hx-ext="sse" sse-connect={`/api/events/category/${category.id}`}>

      <h1>{category.name}</h1>
      <p>{category.description}</p>

      <table class="topic-list">
        <thead>
          <tr>
            <th>Topic</th>
            <th>Author</th>
            <th>Replies</th>
            <th>Last Post</th>
          </tr>
        </thead>
        <tbody id="topic-list-body"
              sse-swap="newTopic"
              hx-swap="afterbegin">
          {topics.map((topic) => <TopicRow {...topic} />)}
        </tbody>
      </table>

    </div>
  </BaseLayout>
);

Global Notification Badge#

This could be placed in the base layout so it works on every page:

// packages/web/src/layouts/base.tsx

export const BaseLayout: FC<BaseLayoutProps> = (props) => (
  <html>
    <head>
      <script src="https://unpkg.com/htmx.org@2.0.4" />
      <script src="https://unpkg.com/htmx-ext-sse@2.2.3/sse.js" />
    </head>
    <body hx-boost="true">
      <header hx-ext="sse" sse-connect="/api/events/global">
        <nav>
          <a href="/">atBB Forum</a>
          {/* Notification badge — updated live */}
          <span id="notification-badge" sse-swap="notification"></span>
        </nav>
      </header>
      <main>
        {props.children}
      </main>
    </body>
  </html>
);

What This Enables (Concrete Features)#

Tier 1: Easy Wins (MVP-compatible)#

Feature SSE Event Behavior
Live replies newReply New reply appears at bottom of thread without refresh
Live new topics newTopic New topic appears at top of category view
Reply count replyCount Reply count badge updates in real-time
Post deletion postDeleted Deleted post fades out or shows "[deleted]"
Mod actions modAction Locked/pinned status updates live

Tier 2: Enhanced UX (Post-MVP)#

Feature SSE Event Behavior
Typing indicator typing "User X is typing..." shown below reply list
Online presence presence "12 users viewing this topic"
Reaction animations newReaction Like/upvote count increments with animation
Unread badges notification Global nav shows unread count for subscribed topics
Topic bumping topicBumped Topic list reorders when a topic gets a new reply

Tier 3: Differentiators (Future)#

Feature Description
Cross-forum activity feed Since AT Proto identities span forums, show activity from all forums a user participates in
Live moderation dashboard Stream mod queue events in real-time
"Someone replied to your post" toasts Non-intrusive notification popups

Scaling Considerations#

Single Instance (MVP)#

For MVP, an in-process EventEmitter is sufficient:

Jetstream → AppView process (index + EventEmitter) → SSE streams to browsers

No external infrastructure needed. The EventEmitter holds subscriber lists in memory.

Capacity: A single Node.js process can comfortably hold 1,000+ SSE connections. For a self-hosted forum, this is more than enough.

Multi-Instance (Production)#

If atBB needs horizontal scaling:

Jetstream → AppView Instance 1 ──┐
                                  ├── Redis Pub/Sub ──┬── Web Instance 1 → SSE
Jetstream → AppView Instance 2 ──┘                    └── Web Instance 2 → SSE
  • Only one AppView instance should consume Jetstream (use leader election or a single indexer process)
  • That instance publishes events to Redis Pub/Sub
  • All web instances subscribe to Redis and stream to their connected clients
  • Alternative: Use PostgreSQL LISTEN/NOTIFY instead of Redis — one fewer dependency

HTTP/2 Requirement#

SSE connections hold an HTTP connection open. Under HTTP/1.1, browsers limit to 6 connections per domain — a user with multiple tabs could exhaust this. HTTP/2 multiplexes all streams over a single TCP connection, eliminating this issue.

Action: Ensure the deployment setup (Docker Compose with nginx/Caddy) uses HTTP/2. Caddy enables HTTP/2 by default.

Proxy Configuration#

SSE requires long-lived connections. Configure reverse proxy timeouts:

# nginx — for SSE routes only
location /api/events/ {
    proxy_pass http://appview:3000;
    proxy_http_version 1.1;
    proxy_set_header Connection "";
    proxy_buffering off;
    proxy_cache off;
    proxy_read_timeout 86400s;  # 24 hours
}

Or with Caddy (no special config needed — it handles streaming correctly by default).


Implementation Roadmap#

Phase 1 (Part of AppView Core milestone)#

  1. Add Jetstream consumer@skyware/jetstream with space.atbb.* filter
  2. Add in-process EventEmitter — broadcast indexed events by channel
  3. Add SSE endpoint/api/events/topic/:id using Hono's streamSSE
  4. Persist cursor — store time_us in PostgreSQL, reload on restart

Phase 2 (Part of Web UI milestone)#

  1. Add htmx-ext-sse — include SSE extension alongside HTMX in base layout
  2. Add sse-connect to topic view — connect to thread-specific SSE stream
  3. Render reply HTML fragments — reuse existing JSX components for SSE payloads
  4. Add sse-connect to category view — connect to category-specific SSE stream

Phase 3 (Post-MVP polish)#

  1. Typing indicators — debounced POST from textarea keyup, broadcast via SSE
  2. Presence tracking — track SSE connections per topic, broadcast count
  3. Notification badges — global SSE stream for subscribed topic updates
  4. Graceful degradation — ensure forum works perfectly without SSE (progressive enhancement)

Dependencies#

Step Depends On New Packages
Jetstream consumer Phase 1 AppView Core @skyware/jetstream, ws
EventEmitter Jetstream consumer None (Node.js built-in)
SSE endpoint EventEmitter + Hono None (hono/streaming built-in)
HTMX SSE extension Web UI + SSE endpoint htmx-ext-sse (CDN)

Why This Matters#

No other forum software has this architecture. Here's the comparison:

Forum Real-Time Approach
phpBB None. Pure request-response. Must refresh to see new replies.
Discourse Custom WebSocket implementation via MessageBus gem. Bolt-on architecture, requires Redis, sticky sessions.
Flarum Pusher.com integration (third-party SaaS). Adds external dependency and cost.
NodeBB Socket.io (WebSocket). Heavy client-side JS framework.
atBB Protocol-native firehose → in-process event bus → SSE → HTMX declarative swap. Zero custom client-side JS (HTMX itself is ~14KB). The real-time stream is architecturally intrinsic, not bolted on.

The AT Protocol firehose means atBB doesn't add real-time — it is real-time. The firehose consumer that indexes posts is the same component that powers live updates. There's no separate infrastructure, no Redis, no WebSocket server, no client-side framework. Just HTML attributes.

This is atBB's strongest architectural differentiator and should be a first-class feature from day one.


Sources#