polls on atproto
pollz.waow.tech
atproto
zig
1# tap integration
2
3tap is bluesky's official atproto sync utility. pollz uses it to receive real-time events from the firehose.
4
5## what tap provides
6
7- firehose connection with automatic reconnection
8- signature verification of repo structure and identity
9- automatic backfill when adding new repos
10- filtered output by collection
11- ordering guarantees - backfill completes before live events
12- cursor management - persists automatically, resumes on restart
13
14## pollz tap configuration
15
16```toml
17# tap/fly.toml
18[env]
19 TAP_COLLECTION_FILTERS = "tech.waow.poll,tech.waow.vote"
20 TAP_SIGNAL_COLLECTION = "tech.waow.poll"
21 TAP_DATABASE_URL = "sqlite:///data/tap.db"
22 TAP_DISABLE_ACKS = "true"
23```
24
25`TAP_SIGNAL_COLLECTION` makes tap automatically discover and track all repos that have ever created a poll.
26
27## event format
28
29tap delivers events via websocket at `/channel`:
30
31```json
32{
33 "id": 12345,
34 "type": "record",
35 "record": {
36 "live": true,
37 "did": "did:plc:abc123",
38 "collection": "tech.waow.poll",
39 "rkey": "3kb3fge5lm32x",
40 "action": "create",
41 "record": {
42 "text": "what's your favorite color?",
43 "options": ["red", "blue", "green"],
44 "$type": "tech.waow.poll",
45 "createdAt": "2024-10-07T12:00:00.000Z"
46 }
47 }
48}
49```
50
51### action types
52- `create` - new record created
53- `update` - existing record updated (same rkey)
54- `delete` - record deleted
55
56## backend tap consumer
57
58the backend connects to tap via websocket and processes events:
59
60```zig
61// tap.zig
62if (mem.eql(u8, action.string, "create") or mem.eql(u8, action.string, "update")) {
63 // process poll or vote
64} else if (mem.eql(u8, action.string, "delete")) {
65 // delete poll or vote
66}
67```
68
69## handling out-of-order events
70
71tap delivers events in firehose order, but the firehose itself can deliver events out of order. example:
72
731. user deletes old vote, creates new vote
742. firehose delivers: create (new), delete (old)
753. if backend processes delete after create, the new vote disappears
76
77### solution: use putRecord instead of delete+create
78
79when changing a vote, the frontend uses `putRecord` to update the existing record:
80
81```typescript
82// api.ts
83if (existingRkey) {
84 // update existing vote - single "update" event
85 await rpc.post("com.atproto.repo.putRecord", { ... });
86} else {
87 // create new vote
88 await rpc.post("com.atproto.repo.createRecord", { ... });
89}
90```
91
92this results in a single "update" event instead of separate "delete" and "create" events, eliminating the race condition.
93
94### backend upsert logic
95
96as additional protection, `insertVote` uses upsert with timestamp comparison:
97
98```sql
99INSERT INTO votes (uri, subject, option, voter, created_at)
100VALUES (?, ?, ?, ?, ?)
101ON CONFLICT(subject, voter) DO UPDATE SET
102 uri = excluded.uri,
103 option = excluded.option,
104 created_at = excluded.created_at
105WHERE excluded.created_at > votes.created_at OR votes.created_at IS NULL
106```
107
108this ensures that if out-of-order events do occur, older events don't overwrite newer ones.
109
110## deployment
111
112tap runs as a separate fly.io app (`pollz-tap`) and communicates with the backend over fly's internal network:
113
114```
115pollz-tap.internal:2480 → pollz-backend
116```
117
118## further reading
119
120- [tap README](https://github.com/bluesky-social/indigo/blob/main/cmd/tap/README.md)
121- [indigo repo](https://github.com/bluesky-social/indigo)
122- [bailey's tap guide](https://marvins-guide.leaflet.pub/3m7ttuppfzc23)