polls on atproto pollz.waow.tech
atproto zig

tap integration#

tap is bluesky's official atproto sync utility. pollz uses it to receive real-time events from the firehose.

what tap provides#

  • firehose connection with automatic reconnection
  • signature verification of repo structure and identity
  • automatic backfill when adding new repos
  • filtered output by collection
  • ordering guarantees - backfill completes before live events
  • cursor management - persists automatically, resumes on restart

pollz tap configuration#

# tap/fly.toml
[env]
  TAP_COLLECTION_FILTERS = "tech.waow.poll,tech.waow.vote"
  TAP_SIGNAL_COLLECTION = "tech.waow.poll"
  TAP_DATABASE_URL = "sqlite:///data/tap.db"
  TAP_DISABLE_ACKS = "true"

TAP_SIGNAL_COLLECTION makes tap automatically discover and track all repos that have ever created a poll.

event format#

tap delivers events via websocket at /channel:

{
  "id": 12345,
  "type": "record",
  "record": {
    "live": true,
    "did": "did:plc:abc123",
    "collection": "tech.waow.poll",
    "rkey": "3kb3fge5lm32x",
    "action": "create",
    "record": {
      "text": "what's your favorite color?",
      "options": ["red", "blue", "green"],
      "$type": "tech.waow.poll",
      "createdAt": "2024-10-07T12:00:00.000Z"
    }
  }
}

action types#

  • create - new record created
  • update - existing record updated (same rkey)
  • delete - record deleted

backend tap consumer#

the backend connects to tap via websocket and processes events:

// tap.zig
if (mem.eql(u8, action.string, "create") or mem.eql(u8, action.string, "update")) {
    // process poll or vote
} else if (mem.eql(u8, action.string, "delete")) {
    // delete poll or vote
}

handling out-of-order events#

tap delivers events in firehose order, but the firehose itself can deliver events out of order. example:

  1. user deletes old vote, creates new vote
  2. firehose delivers: create (new), delete (old)
  3. if backend processes delete after create, the new vote disappears

solution: use putRecord instead of delete+create#

when changing a vote, the frontend uses putRecord to update the existing record:

// api.ts
if (existingRkey) {
  // update existing vote - single "update" event
  await rpc.post("com.atproto.repo.putRecord", { ... });
} else {
  // create new vote
  await rpc.post("com.atproto.repo.createRecord", { ... });
}

this results in a single "update" event instead of separate "delete" and "create" events, eliminating the race condition.

backend upsert logic#

as additional protection, insertVote uses upsert with timestamp comparison:

INSERT INTO votes (uri, subject, option, voter, created_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(subject, voter) DO UPDATE SET
  uri = excluded.uri,
  option = excluded.option,
  created_at = excluded.created_at
WHERE excluded.created_at > votes.created_at OR votes.created_at IS NULL

this ensures that if out-of-order events do occur, older events don't overwrite newer ones.

deployment#

tap runs as a separate fly.io app (pollz-tap) and communicates with the backend over fly's internal network:

pollz-tap.internal:2480  →  pollz-backend

further reading#