tap integration#
tap is bluesky's official atproto sync utility. pollz uses it to receive real-time events from the firehose.
what tap provides#
- firehose connection with automatic reconnection
- signature verification of repo structure and identity
- automatic backfill when adding new repos
- filtered output by collection
- ordering guarantees - backfill completes before live events
- cursor management - persists automatically, resumes on restart
pollz tap configuration#
# tap/fly.toml
[env]
TAP_COLLECTION_FILTERS = "tech.waow.poll,tech.waow.vote"
TAP_SIGNAL_COLLECTION = "tech.waow.poll"
TAP_DATABASE_URL = "sqlite:///data/tap.db"
TAP_DISABLE_ACKS = "true"
TAP_SIGNAL_COLLECTION makes tap automatically discover and track all repos that have ever created a poll.
event format#
tap delivers events via websocket at /channel:
{
"id": 12345,
"type": "record",
"record": {
"live": true,
"did": "did:plc:abc123",
"collection": "tech.waow.poll",
"rkey": "3kb3fge5lm32x",
"action": "create",
"record": {
"text": "what's your favorite color?",
"options": ["red", "blue", "green"],
"$type": "tech.waow.poll",
"createdAt": "2024-10-07T12:00:00.000Z"
}
}
}
action types#
create- new record createdupdate- existing record updated (same rkey)delete- record deleted
backend tap consumer#
the backend connects to tap via websocket and processes events:
// tap.zig
if (mem.eql(u8, action.string, "create") or mem.eql(u8, action.string, "update")) {
// process poll or vote
} else if (mem.eql(u8, action.string, "delete")) {
// delete poll or vote
}
handling out-of-order events#
tap delivers events in firehose order, but the firehose itself can deliver events out of order. example:
- user deletes old vote, creates new vote
- firehose delivers: create (new), delete (old)
- if backend processes delete after create, the new vote disappears
solution: use putRecord instead of delete+create#
when changing a vote, the frontend uses putRecord to update the existing record:
// api.ts
if (existingRkey) {
// update existing vote - single "update" event
await rpc.post("com.atproto.repo.putRecord", { ... });
} else {
// create new vote
await rpc.post("com.atproto.repo.createRecord", { ... });
}
this results in a single "update" event instead of separate "delete" and "create" events, eliminating the race condition.
backend upsert logic#
as additional protection, insertVote uses upsert with timestamp comparison:
INSERT INTO votes (uri, subject, option, voter, created_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(subject, voter) DO UPDATE SET
uri = excluded.uri,
option = excluded.option,
created_at = excluded.created_at
WHERE excluded.created_at > votes.created_at OR votes.created_at IS NULL
this ensures that if out-of-order events do occur, older events don't overwrite newer ones.
deployment#
tap runs as a separate fly.io app (pollz-tap) and communicates with the backend over fly's internal network:
pollz-tap.internal:2480 → pollz-backend