atproto relay implementation in zig zlay.waow.tech
at main 294 lines 13 kB view raw
1//! frame processing worker — heavy frame handling offloaded from reader threads 2//! 3//! the reader thread (subscriber) does lightweight header decode, cursor tracking, 4//! and rate limiting, then submits raw frame bytes here for heavy processing: 5//! CBOR decode, DID resolution, signature validation, DB persist, broadcast. 6//! 7//! double decode (reader + worker both parse CBOR header) is intentional — 8//! CBOR decode is ~1-2μs, far cheaper than serializing decoded values across threads. 9 10const std = @import("std"); 11const zat = @import("zat"); 12const broadcaster = @import("broadcaster.zig"); 13const validator_mod = @import("validator.zig"); 14const event_log_mod = @import("event_log.zig"); 15const collection_index_mod = @import("collection_index.zig"); 16const thread_pool = @import("thread_pool.zig"); 17 18const Allocator = std.mem.Allocator; 19const log = std.log.scoped(.relay); 20 21pub const FrameWork = struct { 22 data: []u8, // raw frame bytes (heap-duped by reader, freed by worker) 23 host_id: u64, 24 hostname: []const u8, // borrowed from subscriber (stable lifetime) 25 allocator: Allocator, 26 // shared references (all thread-safe, all outlive the work item) 27 bc: *broadcaster.Broadcaster, 28 validator: *validator_mod.Validator, 29 persist: ?*event_log_mod.DiskPersist, 30 collection_index: ?*collection_index_mod.CollectionIndex, 31}; 32 33pub fn processFrame(work: *FrameWork) void { 34 defer work.allocator.free(work.data); 35 36 var arena = std.heap.ArenaAllocator.init(work.allocator); 37 defer arena.deinit(); 38 const alloc = arena.allocator(); 39 40 const data = work.data; 41 42 // re-decode header (cheap — ~1-2μs) 43 const header_result = zat.cbor.decode(alloc, data) catch |err| { 44 log.debug("worker: frame header decode failed: {s} (len={d})", .{ @errorName(err), data.len }); 45 _ = work.bc.stats.decode_errors.fetchAdd(1, .monotonic); 46 return; 47 }; 48 const header = header_result.value; 49 const payload_data = data[header_result.consumed..]; 50 51 const op = header.getInt("op") orelse return; 52 if (op != 1) return; // only process message frames (error frames handled by reader) 53 54 const frame_type = header.getString("t") orelse return; 55 const payload = zat.cbor.decodeAll(alloc, payload_data) catch |err| { 56 log.debug("worker: frame payload decode failed: {s} (type={s})", .{ @errorName(err), frame_type }); 57 _ = work.bc.stats.decode_errors.fetchAdd(1, .monotonic); 58 return; 59 }; 60 61 // route by frame type — unknown types already filtered by reader 62 const is_commit = std.mem.eql(u8, frame_type, "#commit"); 63 const is_sync = std.mem.eql(u8, frame_type, "#sync"); 64 const is_account = std.mem.eql(u8, frame_type, "#account"); 65 const is_identity = std.mem.eql(u8, frame_type, "#identity"); 66 67 if (!is_commit and !is_sync and !is_account and !is_identity) return; 68 69 // extract DID: "repo" for commits, "did" for identity/account 70 const did: ?[]const u8 = if (is_commit) 71 payload.getString("repo") 72 else 73 payload.getString("did"); 74 75 // on #identity event, evict cached signing key so next commit re-resolves 76 if (is_identity) { 77 if (did) |d| work.validator.evictKey(d); 78 } 79 80 // resolve DID → numeric UID for event header (host-aware) 81 const uid: u64 = if (work.persist) |dp| blk: { 82 if (did) |d| { 83 const result = dp.uidForDidFromHost(d, work.host_id) catch break :blk @as(u64, 0); 84 85 // host authority enforcement: verify DID doc confirms this host. 86 // #identity events are exempt — any PDS can emit them (same as indigo). 87 // covers both first-seen DIDs (is_new) and host migrations (host_changed). 88 if ((result.is_new or result.host_changed) and !is_identity) { 89 switch (work.validator.resolveHostAuthority(d, work.host_id)) { 90 .migrate => { 91 // DID doc confirms the host — update and continue 92 dp.setAccountHostId(result.uid, work.host_id) catch {}; 93 if (result.host_changed) { 94 log.info("host {s}: account migrated uid={d} did={s}", .{ 95 work.hostname, result.uid, d, 96 }); 97 } 98 }, 99 .reject => { 100 // DID doc does not confirm — drop the event 101 log.info("host {s}: dropping event, host authority failed uid={d} did={s}", .{ 102 work.hostname, result.uid, d, 103 }); 104 _ = work.bc.stats.failed.fetchAdd(1, .monotonic); 105 _ = work.bc.stats.failed_host_authority.fetchAdd(1, .monotonic); 106 return; 107 }, 108 .accept => {}, 109 } 110 } 111 112 break :blk result.uid; 113 } else break :blk @as(u64, 0); 114 } else 0; 115 116 // process #account events: update upstream status 117 if (is_account) { 118 if (work.persist) |dp| { 119 if (uid > 0) { 120 const is_active = payload.getBool("active") orelse false; 121 const status_str = payload.getString("status"); 122 const new_status: []const u8 = if (is_active) 123 "active" 124 else 125 (status_str orelse "inactive"); 126 dp.updateAccountUpstreamStatus(uid, new_status) catch |err| { 127 log.debug("upstream status update failed: {s}", .{@errorName(err)}); 128 }; 129 130 // on account tombstone/deletion, remove all collection index entries 131 if (std.mem.eql(u8, new_status, "deleted") or std.mem.eql(u8, new_status, "takendown")) { 132 if (work.collection_index) |ci| { 133 if (did) |d| { 134 ci.removeAll(d) catch |err| { 135 log.debug("collection removeAll failed: {s}", .{@errorName(err)}); 136 }; 137 } 138 } 139 } 140 } 141 } 142 } 143 144 // for commits and syncs: check account is active, validate, extract state 145 var commit_data_cid: ?[]const u8 = null; 146 var commit_rev: ?[]const u8 = null; 147 if (is_commit or is_sync) { 148 // drop events for inactive accounts. 149 // status self-corrects when the next #account event arrives. 150 // TODO: reintroduce PDS re-check via background queue with shared 151 // long-lived http client + timeout (not inline on frame workers). 152 if (work.persist) |dp| { 153 if (uid > 0) { 154 const active = dp.isAccountActive(uid) catch true; 155 if (!active) { 156 _ = work.bc.stats.skipped.fetchAdd(1, .monotonic); 157 return; 158 } 159 } 160 } 161 162 // rev checks: stale, future, chain continuity 163 if (is_commit and uid > 0) { 164 if (payload.getString("rev")) |incoming_rev| { 165 // future-rev rejection: drop commits with timestamps too far ahead 166 if (zat.Tid.parse(incoming_rev)) |tid| { 167 const rev_us: i64 = @intCast(tid.timestamp()); 168 const now_us = std.time.microTimestamp(); 169 const skew_us: i64 = work.validator.config.rev_clock_skew * 1_000_000; 170 if (rev_us > now_us + skew_us) { 171 log.info("host {s}: dropping future rev uid={d} rev={s}", .{ 172 work.hostname, uid, incoming_rev, 173 }); 174 _ = work.bc.stats.failed.fetchAdd(1, .monotonic); 175 _ = work.bc.stats.failed_future_rev.fetchAdd(1, .monotonic); 176 return; 177 } 178 } 179 180 if (work.persist) |dp| { 181 if (dp.getAccountState(uid, alloc) catch null) |prev| { 182 // stale rev check 183 if (std.mem.order(u8, incoming_rev, prev.rev) != .gt) { 184 log.debug("host {s}: dropping stale commit uid={d} rev={s} <= {s}", .{ 185 work.hostname, uid, incoming_rev, prev.rev, 186 }); 187 _ = work.bc.stats.skipped.fetchAdd(1, .monotonic); 188 return; 189 } 190 191 // chain continuity: since should match stored rev 192 if (payload.getString("since")) |since| { 193 if (!std.mem.eql(u8, since, prev.rev)) { 194 log.info("host {s}: chain break uid={d} since={s} stored_rev={s}", .{ 195 work.hostname, uid, since, prev.rev, 196 }); 197 _ = work.bc.stats.chain_breaks.fetchAdd(1, .monotonic); 198 } 199 } 200 201 // chain continuity: prevData CID should match stored data_cid 202 if (payload.get("prevData")) |pd| { 203 if (pd == .cid) { 204 const prev_data_encoded = zat.multibase.encode(alloc, .base32lower, pd.cid.raw) catch ""; 205 if (prev_data_encoded.len > 0 and prev.data_cid.len > 0 and 206 !std.mem.eql(u8, prev_data_encoded, prev.data_cid)) 207 { 208 log.info("host {s}: chain break uid={d} prevData mismatch", .{ 209 work.hostname, uid, 210 }); 211 _ = work.bc.stats.chain_breaks.fetchAdd(1, .monotonic); 212 } 213 } 214 } 215 } 216 } 217 } 218 } 219 220 if (is_commit) { 221 const result = work.validator.validateCommit(payload); 222 if (!result.valid) return; 223 commit_data_cid = result.data_cid; 224 commit_rev = result.commit_rev; 225 226 // track collections from commit ops (phase 1 live indexing) 227 if (work.collection_index) |ci| { 228 if (did) |d| { 229 if (payload.get("ops")) |ops| { 230 ci.trackCommitOps(d, ops); 231 } 232 } 233 } 234 } else { 235 // #sync: signature verification only, no ops/MST 236 const result = work.validator.validateSync(payload); 237 if (!result.valid) return; 238 commit_data_cid = result.data_cid; 239 commit_rev = result.commit_rev; 240 } 241 } 242 243 // determine event kind for persistence 244 const kind: event_log_mod.EvtKind = if (is_commit) 245 .commit 246 else if (is_sync) 247 .sync 248 else if (is_account) 249 .account 250 else 251 .identity; 252 253 // persist and broadcast under ordering lock 254 if (work.persist) |dp| { 255 const relay_seq = blk: { 256 work.bc.broadcast_order.lock(); 257 defer work.bc.broadcast_order.unlock(); 258 259 const seq = dp.persist(kind, uid, data) catch |err| { 260 log.warn("persist failed: {s}", .{@errorName(err)}); 261 return; 262 }; 263 work.bc.stats.relay_seq.store(seq, .release); 264 const broadcast_data = broadcaster.resequenceFrame(alloc, data, seq) orelse data; 265 work.bc.broadcast(seq, broadcast_data); 266 break :blk seq; 267 }; 268 _ = relay_seq; 269 270 // update per-DID state outside the ordering lock (Postgres round-trip) 271 if ((is_commit or is_sync) and uid > 0) { 272 if (commit_rev) |rev| { 273 const cid_str: []const u8 = if (commit_data_cid) |cid_raw| 274 zat.multibase.encode(alloc, .base32lower, cid_raw) catch "" 275 else 276 ""; 277 if (dp.updateAccountState(uid, rev, cid_str)) |updated| { 278 if (!updated) { 279 log.debug("host {s}: stale state update lost race uid={d} rev={s}", .{ 280 work.hostname, uid, rev, 281 }); 282 } 283 } else |err| { 284 log.debug("account state update failed: {s}", .{@errorName(err)}); 285 } 286 } 287 } 288 } else { 289 const upstream_seq = payload.getUint("seq") orelse 0; 290 work.bc.broadcast(upstream_seq, data); 291 } 292} 293 294pub const FramePool = thread_pool.ThreadPool(FrameWork, processFrame);