//! frame processing worker — heavy frame handling offloaded from reader threads //! //! the reader thread (subscriber) does lightweight header decode, cursor tracking, //! and rate limiting, then submits raw frame bytes here for heavy processing: //! CBOR decode, DID resolution, signature validation, DB persist, broadcast. //! //! double decode (reader + worker both parse CBOR header) is intentional — //! CBOR decode is ~1-2μs, far cheaper than serializing decoded values across threads. const std = @import("std"); const zat = @import("zat"); const broadcaster = @import("broadcaster.zig"); const validator_mod = @import("validator.zig"); const event_log_mod = @import("event_log.zig"); const collection_index_mod = @import("collection_index.zig"); const thread_pool = @import("thread_pool.zig"); const Allocator = std.mem.Allocator; const log = std.log.scoped(.relay); pub const FrameWork = struct { data: []u8, // raw frame bytes (heap-duped by reader, freed by worker) host_id: u64, hostname: []const u8, // borrowed from subscriber (stable lifetime) allocator: Allocator, // shared references (all thread-safe, all outlive the work item) bc: *broadcaster.Broadcaster, validator: *validator_mod.Validator, persist: ?*event_log_mod.DiskPersist, collection_index: ?*collection_index_mod.CollectionIndex, }; pub fn processFrame(work: *FrameWork) void { defer work.allocator.free(work.data); var arena = std.heap.ArenaAllocator.init(work.allocator); defer arena.deinit(); const alloc = arena.allocator(); const data = work.data; // re-decode header (cheap — ~1-2μs) const header_result = zat.cbor.decode(alloc, data) catch |err| { log.debug("worker: frame header decode failed: {s} (len={d})", .{ @errorName(err), data.len }); _ = work.bc.stats.decode_errors.fetchAdd(1, .monotonic); return; }; const header = header_result.value; const payload_data = data[header_result.consumed..]; const op = header.getInt("op") orelse return; if (op != 1) return; // only process message frames (error frames handled by reader) const frame_type = header.getString("t") orelse return; const payload = zat.cbor.decodeAll(alloc, payload_data) catch |err| { log.debug("worker: frame payload decode failed: {s} (type={s})", .{ @errorName(err), frame_type }); _ = work.bc.stats.decode_errors.fetchAdd(1, .monotonic); return; }; // route by frame type — unknown types already filtered by reader const is_commit = std.mem.eql(u8, frame_type, "#commit"); const is_sync = std.mem.eql(u8, frame_type, "#sync"); const is_account = std.mem.eql(u8, frame_type, "#account"); const is_identity = std.mem.eql(u8, frame_type, "#identity"); if (!is_commit and !is_sync and !is_account and !is_identity) return; // extract DID: "repo" for commits, "did" for identity/account const did: ?[]const u8 = if (is_commit) payload.getString("repo") else payload.getString("did"); // on #identity event, evict cached signing key so next commit re-resolves if (is_identity) { if (did) |d| work.validator.evictKey(d); } // resolve DID → numeric UID for event header (host-aware) const uid: u64 = if (work.persist) |dp| blk: { if (did) |d| { const result = dp.uidForDidFromHost(d, work.host_id) catch break :blk @as(u64, 0); // host authority enforcement: verify DID doc confirms this host. // #identity events are exempt — any PDS can emit them (same as indigo). // covers both first-seen DIDs (is_new) and host migrations (host_changed). if ((result.is_new or result.host_changed) and !is_identity) { switch (work.validator.resolveHostAuthority(d, work.host_id)) { .migrate => { // DID doc confirms the host — update and continue dp.setAccountHostId(result.uid, work.host_id) catch {}; if (result.host_changed) { log.info("host {s}: account migrated uid={d} did={s}", .{ work.hostname, result.uid, d, }); } }, .reject => { // DID doc does not confirm — drop the event log.info("host {s}: dropping event, host authority failed uid={d} did={s}", .{ work.hostname, result.uid, d, }); _ = work.bc.stats.failed.fetchAdd(1, .monotonic); return; }, .accept => {}, } } break :blk result.uid; } else break :blk @as(u64, 0); } else 0; // process #account events: update upstream status if (is_account) { if (work.persist) |dp| { if (uid > 0) { const is_active = payload.getBool("active") orelse false; const status_str = payload.getString("status"); const new_status: []const u8 = if (is_active) "active" else (status_str orelse "inactive"); dp.updateAccountUpstreamStatus(uid, new_status) catch |err| { log.debug("upstream status update failed: {s}", .{@errorName(err)}); }; // on account tombstone/deletion, remove all collection index entries if (std.mem.eql(u8, new_status, "deleted") or std.mem.eql(u8, new_status, "takendown")) { if (work.collection_index) |ci| { if (did) |d| { ci.removeAll(d) catch |err| { log.debug("collection removeAll failed: {s}", .{@errorName(err)}); }; } } } } } } // for commits and syncs: check account is active, validate, extract state var commit_data_cid: ?[]const u8 = null; var commit_rev: ?[]const u8 = null; if (is_commit or is_sync) { // drop for inactive accounts if (work.persist) |dp| { if (uid > 0) { const active = dp.isAccountActive(uid) catch true; if (!active) { _ = work.bc.stats.skipped.fetchAdd(1, .monotonic); return; } } } // rev checks: stale, future, chain continuity if (is_commit and uid > 0) { if (payload.getString("rev")) |incoming_rev| { // future-rev rejection: drop commits with timestamps too far ahead if (zat.Tid.parse(incoming_rev)) |tid| { const rev_us: i64 = @intCast(tid.timestamp()); const now_us = std.time.microTimestamp(); const skew_us: i64 = work.validator.config.rev_clock_skew * 1_000_000; if (rev_us > now_us + skew_us) { log.info("host {s}: dropping future rev uid={d} rev={s}", .{ work.hostname, uid, incoming_rev, }); _ = work.bc.stats.failed.fetchAdd(1, .monotonic); return; } } if (work.persist) |dp| { if (dp.getAccountState(uid, alloc) catch null) |prev| { // stale rev check if (std.mem.order(u8, incoming_rev, prev.rev) != .gt) { log.debug("host {s}: dropping stale commit uid={d} rev={s} <= {s}", .{ work.hostname, uid, incoming_rev, prev.rev, }); _ = work.bc.stats.skipped.fetchAdd(1, .monotonic); return; } // chain continuity: since should match stored rev if (payload.getString("since")) |since| { if (!std.mem.eql(u8, since, prev.rev)) { log.info("host {s}: chain break uid={d} since={s} stored_rev={s}", .{ work.hostname, uid, since, prev.rev, }); _ = work.bc.stats.chain_breaks.fetchAdd(1, .monotonic); } } // chain continuity: prevData CID should match stored data_cid if (payload.get("prevData")) |pd| { if (pd == .cid) { const prev_data_encoded = zat.multibase.encode(alloc, .base32lower, pd.cid.raw) catch ""; if (prev_data_encoded.len > 0 and prev.data_cid.len > 0 and !std.mem.eql(u8, prev_data_encoded, prev.data_cid)) { log.info("host {s}: chain break uid={d} prevData mismatch", .{ work.hostname, uid, }); _ = work.bc.stats.chain_breaks.fetchAdd(1, .monotonic); } } } } } } } if (is_commit) { const result = work.validator.validateCommit(payload); if (!result.valid) return; commit_data_cid = result.data_cid; commit_rev = result.commit_rev; // track collections from commit ops (phase 1 live indexing) if (work.collection_index) |ci| { if (did) |d| { if (payload.get("ops")) |ops| { ci.trackCommitOps(d, ops); } } } } else { // #sync: signature verification only, no ops/MST const result = work.validator.validateSync(payload); if (!result.valid) return; commit_data_cid = result.data_cid; commit_rev = result.commit_rev; } } // determine event kind for persistence const kind: event_log_mod.EvtKind = if (is_commit) .commit else if (is_sync) .sync else if (is_account) .account else .identity; // persist and broadcast under ordering lock if (work.persist) |dp| { const relay_seq = blk: { work.bc.broadcast_order.lock(); defer work.bc.broadcast_order.unlock(); const seq = dp.persist(kind, uid, data) catch |err| { log.warn("persist failed: {s}", .{@errorName(err)}); return; }; work.bc.stats.relay_seq.store(seq, .release); const broadcast_data = broadcaster.resequenceFrame(alloc, data, seq) orelse data; work.bc.broadcast(seq, broadcast_data); break :blk seq; }; _ = relay_seq; // update per-DID state outside the ordering lock (Postgres round-trip) if ((is_commit or is_sync) and uid > 0) { if (commit_rev) |rev| { const cid_str: []const u8 = if (commit_data_cid) |cid_raw| zat.multibase.encode(alloc, .base32lower, cid_raw) catch "" else ""; if (dp.updateAccountState(uid, rev, cid_str)) |updated| { if (!updated) { log.debug("host {s}: stale state update lost race uid={d} rev={s}", .{ work.hostname, uid, rev, }); } } else |err| { log.debug("account state update failed: {s}", .{@errorName(err)}); } } } } else { const upstream_seq = payload.getUint("seq") orelse 0; work.bc.broadcast(upstream_seq, data); } } pub const FramePool = thread_pool.ThreadPool(FrameWork, processFrame);