atproto relay implementation in zig
zlay.waow.tech
1//! frame processing worker — heavy frame handling offloaded from reader threads
2//!
3//! the reader thread (subscriber) does lightweight header decode, cursor tracking,
4//! and rate limiting, then submits raw frame bytes here for heavy processing:
5//! CBOR decode, DID resolution, signature validation, DB persist, broadcast.
6//!
7//! double decode (reader + worker both parse CBOR header) is intentional —
8//! CBOR decode is ~1-2μs, far cheaper than serializing decoded values across threads.
9
10const std = @import("std");
11const zat = @import("zat");
12const broadcaster = @import("broadcaster.zig");
13const validator_mod = @import("validator.zig");
14const event_log_mod = @import("event_log.zig");
15const collection_index_mod = @import("collection_index.zig");
16const thread_pool = @import("thread_pool.zig");
17
18const Allocator = std.mem.Allocator;
19const log = std.log.scoped(.relay);
20
21pub const FrameWork = struct {
22 data: []u8, // raw frame bytes (heap-duped by reader, freed by worker)
23 host_id: u64,
24 hostname: []const u8, // borrowed from subscriber (stable lifetime)
25 allocator: Allocator,
26 // shared references (all thread-safe, all outlive the work item)
27 bc: *broadcaster.Broadcaster,
28 validator: *validator_mod.Validator,
29 persist: ?*event_log_mod.DiskPersist,
30 collection_index: ?*collection_index_mod.CollectionIndex,
31};
32
33pub fn processFrame(work: *FrameWork) void {
34 defer work.allocator.free(work.data);
35
36 var arena = std.heap.ArenaAllocator.init(work.allocator);
37 defer arena.deinit();
38 const alloc = arena.allocator();
39
40 const data = work.data;
41
42 // re-decode header (cheap — ~1-2μs)
43 const header_result = zat.cbor.decode(alloc, data) catch |err| {
44 log.debug("worker: frame header decode failed: {s} (len={d})", .{ @errorName(err), data.len });
45 _ = work.bc.stats.decode_errors.fetchAdd(1, .monotonic);
46 return;
47 };
48 const header = header_result.value;
49 const payload_data = data[header_result.consumed..];
50
51 const op = header.getInt("op") orelse return;
52 if (op != 1) return; // only process message frames (error frames handled by reader)
53
54 const frame_type = header.getString("t") orelse return;
55 const payload = zat.cbor.decodeAll(alloc, payload_data) catch |err| {
56 log.debug("worker: frame payload decode failed: {s} (type={s})", .{ @errorName(err), frame_type });
57 _ = work.bc.stats.decode_errors.fetchAdd(1, .monotonic);
58 return;
59 };
60
61 // route by frame type — unknown types already filtered by reader
62 const is_commit = std.mem.eql(u8, frame_type, "#commit");
63 const is_sync = std.mem.eql(u8, frame_type, "#sync");
64 const is_account = std.mem.eql(u8, frame_type, "#account");
65 const is_identity = std.mem.eql(u8, frame_type, "#identity");
66
67 if (!is_commit and !is_sync and !is_account and !is_identity) return;
68
69 // extract DID: "repo" for commits, "did" for identity/account
70 const did: ?[]const u8 = if (is_commit)
71 payload.getString("repo")
72 else
73 payload.getString("did");
74
75 // on #identity event, evict cached signing key so next commit re-resolves
76 if (is_identity) {
77 if (did) |d| work.validator.evictKey(d);
78 }
79
80 // resolve DID → numeric UID for event header (host-aware)
81 const uid: u64 = if (work.persist) |dp| blk: {
82 if (did) |d| {
83 const result = dp.uidForDidFromHost(d, work.host_id) catch break :blk @as(u64, 0);
84
85 // host authority enforcement: verify DID doc confirms this host.
86 // #identity events are exempt — any PDS can emit them (same as indigo).
87 // covers both first-seen DIDs (is_new) and host migrations (host_changed).
88 if ((result.is_new or result.host_changed) and !is_identity) {
89 switch (work.validator.resolveHostAuthority(d, work.host_id)) {
90 .migrate => {
91 // DID doc confirms the host — update and continue
92 dp.setAccountHostId(result.uid, work.host_id) catch {};
93 if (result.host_changed) {
94 log.info("host {s}: account migrated uid={d} did={s}", .{
95 work.hostname, result.uid, d,
96 });
97 }
98 },
99 .reject => {
100 // DID doc does not confirm — drop the event
101 log.info("host {s}: dropping event, host authority failed uid={d} did={s}", .{
102 work.hostname, result.uid, d,
103 });
104 _ = work.bc.stats.failed.fetchAdd(1, .monotonic);
105 _ = work.bc.stats.failed_host_authority.fetchAdd(1, .monotonic);
106 return;
107 },
108 .accept => {},
109 }
110 }
111
112 break :blk result.uid;
113 } else break :blk @as(u64, 0);
114 } else 0;
115
116 // process #account events: update upstream status
117 if (is_account) {
118 if (work.persist) |dp| {
119 if (uid > 0) {
120 const is_active = payload.getBool("active") orelse false;
121 const status_str = payload.getString("status");
122 const new_status: []const u8 = if (is_active)
123 "active"
124 else
125 (status_str orelse "inactive");
126 dp.updateAccountUpstreamStatus(uid, new_status) catch |err| {
127 log.debug("upstream status update failed: {s}", .{@errorName(err)});
128 };
129
130 // on account tombstone/deletion, remove all collection index entries
131 if (std.mem.eql(u8, new_status, "deleted") or std.mem.eql(u8, new_status, "takendown")) {
132 if (work.collection_index) |ci| {
133 if (did) |d| {
134 ci.removeAll(d) catch |err| {
135 log.debug("collection removeAll failed: {s}", .{@errorName(err)});
136 };
137 }
138 }
139 }
140 }
141 }
142 }
143
144 // for commits and syncs: check account is active, validate, extract state
145 var commit_data_cid: ?[]const u8 = null;
146 var commit_rev: ?[]const u8 = null;
147 if (is_commit or is_sync) {
148 // drop events for inactive accounts.
149 // status self-corrects when the next #account event arrives.
150 // TODO: reintroduce PDS re-check via background queue with shared
151 // long-lived http client + timeout (not inline on frame workers).
152 if (work.persist) |dp| {
153 if (uid > 0) {
154 const active = dp.isAccountActive(uid) catch true;
155 if (!active) {
156 _ = work.bc.stats.skipped.fetchAdd(1, .monotonic);
157 return;
158 }
159 }
160 }
161
162 // rev checks: stale, future, chain continuity
163 if (is_commit and uid > 0) {
164 if (payload.getString("rev")) |incoming_rev| {
165 // future-rev rejection: drop commits with timestamps too far ahead
166 if (zat.Tid.parse(incoming_rev)) |tid| {
167 const rev_us: i64 = @intCast(tid.timestamp());
168 const now_us = std.time.microTimestamp();
169 const skew_us: i64 = work.validator.config.rev_clock_skew * 1_000_000;
170 if (rev_us > now_us + skew_us) {
171 log.info("host {s}: dropping future rev uid={d} rev={s}", .{
172 work.hostname, uid, incoming_rev,
173 });
174 _ = work.bc.stats.failed.fetchAdd(1, .monotonic);
175 _ = work.bc.stats.failed_future_rev.fetchAdd(1, .monotonic);
176 return;
177 }
178 }
179
180 if (work.persist) |dp| {
181 if (dp.getAccountState(uid, alloc) catch null) |prev| {
182 // stale rev check
183 if (std.mem.order(u8, incoming_rev, prev.rev) != .gt) {
184 log.debug("host {s}: dropping stale commit uid={d} rev={s} <= {s}", .{
185 work.hostname, uid, incoming_rev, prev.rev,
186 });
187 _ = work.bc.stats.skipped.fetchAdd(1, .monotonic);
188 return;
189 }
190
191 // chain continuity: since should match stored rev
192 if (payload.getString("since")) |since| {
193 if (!std.mem.eql(u8, since, prev.rev)) {
194 log.info("host {s}: chain break uid={d} since={s} stored_rev={s}", .{
195 work.hostname, uid, since, prev.rev,
196 });
197 _ = work.bc.stats.chain_breaks.fetchAdd(1, .monotonic);
198 }
199 }
200
201 // chain continuity: prevData CID should match stored data_cid
202 if (payload.get("prevData")) |pd| {
203 if (pd == .cid) {
204 const prev_data_encoded = zat.multibase.encode(alloc, .base32lower, pd.cid.raw) catch "";
205 if (prev_data_encoded.len > 0 and prev.data_cid.len > 0 and
206 !std.mem.eql(u8, prev_data_encoded, prev.data_cid))
207 {
208 log.info("host {s}: chain break uid={d} prevData mismatch", .{
209 work.hostname, uid,
210 });
211 _ = work.bc.stats.chain_breaks.fetchAdd(1, .monotonic);
212 }
213 }
214 }
215 }
216 }
217 }
218 }
219
220 if (is_commit) {
221 const result = work.validator.validateCommit(payload);
222 if (!result.valid) return;
223 commit_data_cid = result.data_cid;
224 commit_rev = result.commit_rev;
225
226 // track collections from commit ops (phase 1 live indexing)
227 if (work.collection_index) |ci| {
228 if (did) |d| {
229 if (payload.get("ops")) |ops| {
230 ci.trackCommitOps(d, ops);
231 }
232 }
233 }
234 } else {
235 // #sync: signature verification only, no ops/MST
236 const result = work.validator.validateSync(payload);
237 if (!result.valid) return;
238 commit_data_cid = result.data_cid;
239 commit_rev = result.commit_rev;
240 }
241 }
242
243 // determine event kind for persistence
244 const kind: event_log_mod.EvtKind = if (is_commit)
245 .commit
246 else if (is_sync)
247 .sync
248 else if (is_account)
249 .account
250 else
251 .identity;
252
253 // persist and broadcast under ordering lock
254 if (work.persist) |dp| {
255 const relay_seq = blk: {
256 work.bc.broadcast_order.lock();
257 defer work.bc.broadcast_order.unlock();
258
259 const seq = dp.persist(kind, uid, data) catch |err| {
260 log.warn("persist failed: {s}", .{@errorName(err)});
261 return;
262 };
263 work.bc.stats.relay_seq.store(seq, .release);
264 const broadcast_data = broadcaster.resequenceFrame(alloc, data, seq) orelse data;
265 work.bc.broadcast(seq, broadcast_data);
266 break :blk seq;
267 };
268 _ = relay_seq;
269
270 // update per-DID state outside the ordering lock (Postgres round-trip)
271 if ((is_commit or is_sync) and uid > 0) {
272 if (commit_rev) |rev| {
273 const cid_str: []const u8 = if (commit_data_cid) |cid_raw|
274 zat.multibase.encode(alloc, .base32lower, cid_raw) catch ""
275 else
276 "";
277 if (dp.updateAccountState(uid, rev, cid_str)) |updated| {
278 if (!updated) {
279 log.debug("host {s}: stale state update lost race uid={d} rev={s}", .{
280 work.hostname, uid, rev,
281 });
282 }
283 } else |err| {
284 log.debug("account state update failed: {s}", .{@errorName(err)});
285 }
286 }
287 }
288 } else {
289 const upstream_seq = payload.getUint("seq") orelse 0;
290 work.bc.broadcast(upstream_seq, data);
291 }
292}
293
294pub const FramePool = thread_pool.ThreadPool(FrameWork, processFrame);