//! relay frame validator — DID key resolution + real signature verification //! //! validates firehose commit frames by verifying the commit signature against //! the pre-resolved signing key for the DID. accepts pre-decoded CBOR payload //! from the subscriber (decoded via zat SDK). on cache miss, skips validation //! and queues background resolution. no frame is ever blocked on network I/O. const std = @import("std"); const zat = @import("zat"); const broadcaster = @import("broadcaster.zig"); const event_log_mod = @import("event_log.zig"); const lru = @import("lru.zig"); const Allocator = std.mem.Allocator; const log = std.log.scoped(.relay); /// decoded and cached signing key for a DID const CachedKey = struct { key_type: zat.multicodec.KeyType, raw: [33]u8, // compressed public key (secp256k1 or p256) len: u8, resolve_time: i64 = 0, // epoch seconds when resolved }; pub const ValidationResult = struct { valid: bool, skipped: bool, data_cid: ?[]const u8 = null, // MST root CID from verified commit commit_rev: ?[]const u8 = null, // rev from verified commit }; /// configuration for commit validation checks pub const ValidatorConfig = struct { /// verify MST structure during signature verification verify_mst: bool = false, // off by default for relay throughput /// verify commit diffs via MST inversion (sync 1.1) verify_commit_diff: bool = false, /// max allowed operations per commit max_ops: usize = 200, /// max clock skew for rev timestamps (seconds) rev_clock_skew: i64 = 300, // 5 minutes }; pub const Validator = struct { allocator: Allocator, stats: *broadcaster.Stats, config: ValidatorConfig, persist: ?*event_log_mod.DiskPersist = null, // DID → signing key cache (decoded, ready for verification) cache: lru.LruCache(CachedKey), // background resolve queue queue: std.ArrayListUnmanaged([]const u8) = .{}, // in-flight set — prevents duplicate DID entries in the queue queued_set: std.StringHashMapUnmanaged(void) = .{}, queue_mutex: std.Thread.Mutex = .{}, queue_cond: std.Thread.Condition = .{}, resolver_threads: [max_resolver_threads]?std.Thread = .{null} ** max_resolver_threads, alive: std.atomic.Value(bool) = .{ .raw = true }, max_cache_size: u32 = 250_000, const max_resolver_threads = 8; const default_resolver_threads = 4; const max_queue_size: usize = 100_000; pub fn init(allocator: Allocator, stats: *broadcaster.Stats) Validator { return initWithConfig(allocator, stats, .{}); } pub fn initWithConfig(allocator: Allocator, stats: *broadcaster.Stats, config: ValidatorConfig) Validator { return .{ .allocator = allocator, .stats = stats, .config = config, .cache = lru.LruCache(CachedKey).init(allocator, 250_000), }; } pub fn deinit(self: *Validator) void { self.alive.store(false, .release); self.queue_cond.broadcast(); for (&self.resolver_threads) |*t| { if (t.*) |thread| { thread.join(); t.* = null; } } self.cache.deinit(); // free queued DIDs for (self.queue.items) |did| { self.allocator.free(did); } self.queue.deinit(self.allocator); self.queued_set.deinit(self.allocator); } /// start background resolver threads pub fn start(self: *Validator) !void { self.max_cache_size = parseEnvInt(u32, "VALIDATOR_CACHE_SIZE", self.max_cache_size); self.cache.capacity = self.max_cache_size; const n = parseEnvInt(u8, "RESOLVER_THREADS", default_resolver_threads); const count = @min(n, max_resolver_threads); for (self.resolver_threads[0..count]) |*t| { t.* = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, resolveLoop, .{self}); } } /// validate a #sync frame: signature verification only (no ops, no MST). /// #sync resets a repo to a new commit state — used for recovery from broken streams. /// on cache miss, queues background resolution and skips. pub fn validateSync(self: *Validator, payload: zat.cbor.Value) ValidationResult { const did = payload.getString("did") orelse { _ = self.stats.skipped.fetchAdd(1, .monotonic); return .{ .valid = true, .skipped = true }; }; if (zat.Did.parse(did) == null) { _ = self.stats.failed.fetchAdd(1, .monotonic); _ = self.stats.failed_bad_did.fetchAdd(1, .monotonic); return .{ .valid = false, .skipped = false }; } // check rev is valid TID (if present) if (payload.getString("rev")) |rev| { if (zat.Tid.parse(rev) == null) { _ = self.stats.failed.fetchAdd(1, .monotonic); _ = self.stats.failed_bad_rev.fetchAdd(1, .monotonic); return .{ .valid = false, .skipped = false }; } } const blocks = payload.getBytes("blocks") orelse { _ = self.stats.failed.fetchAdd(1, .monotonic); _ = self.stats.failed_missing_blocks.fetchAdd(1, .monotonic); return .{ .valid = false, .skipped = false }; }; // #sync CAR should be small (just the signed commit block) // lexicon maxLength: 10000 if (blocks.len > 10_000) { _ = self.stats.failed.fetchAdd(1, .monotonic); _ = self.stats.failed_oversized_blocks.fetchAdd(1, .monotonic); return .{ .valid = false, .skipped = false }; } // cache lookup const cached_key: ?CachedKey = self.cache.get(did); if (cached_key == null) { _ = self.stats.cache_misses.fetchAdd(1, .monotonic); _ = self.stats.skipped.fetchAdd(1, .monotonic); self.queueResolve(did); return .{ .valid = true, .skipped = true }; } _ = self.stats.cache_hits.fetchAdd(1, .monotonic); // verify signature (no MST, no ops) const public_key = zat.multicodec.PublicKey{ .key_type = cached_key.?.key_type, .raw = cached_key.?.raw[0..cached_key.?.len], }; var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); const result = zat.verifyCommitCar(arena.allocator(), blocks, public_key, .{ .verify_mst = false, .expected_did = did, .max_car_size = 10 * 1024, }) catch |err| { log.debug("sync verification failed for {s}: {s}", .{ did, @errorName(err) }); // sync spec: on signature failure, key may have rotated. // evict cached key and queue re-resolution. skip this frame. self.evictKey(did); self.queueResolve(did); _ = self.stats.skipped.fetchAdd(1, .monotonic); return .{ .valid = true, .skipped = true }; }; _ = self.stats.validated.fetchAdd(1, .monotonic); return .{ .valid = true, .skipped = false, .data_cid = result.commit_cid, .commit_rev = result.commit_rev, }; } /// validate a commit frame using a pre-decoded CBOR payload (from SDK decoder). /// on cache miss, queues background resolution and skips. pub fn validateCommit(self: *Validator, payload: zat.cbor.Value) ValidationResult { // extract DID from decoded payload const did = payload.getString("repo") orelse { _ = self.stats.skipped.fetchAdd(1, .monotonic); return .{ .valid = true, .skipped = true }; }; // check cache for pre-resolved signing key const cached_key: ?CachedKey = self.cache.get(did); if (cached_key == null) { // cache miss — queue for background resolution, skip validation _ = self.stats.cache_misses.fetchAdd(1, .monotonic); _ = self.stats.skipped.fetchAdd(1, .monotonic); self.queueResolve(did); return .{ .valid = true, .skipped = true }; } _ = self.stats.cache_hits.fetchAdd(1, .monotonic); // cache hit — do structure checks + signature verification if (self.verifyCommit(payload, did, cached_key.?)) |vr| { _ = self.stats.validated.fetchAdd(1, .monotonic); return vr; } else |err| { log.debug("commit verification failed for {s}: {s}", .{ did, @errorName(err) }); // sync spec: on signature failure, key may have rotated. // evict cached key and queue re-resolution. skip this frame // (treat as cache miss). next commit will use the refreshed key. self.evictKey(did); self.queueResolve(did); _ = self.stats.skipped.fetchAdd(1, .monotonic); return .{ .valid = true, .skipped = true }; } } fn verifyCommit(self: *Validator, payload: zat.cbor.Value, expected_did: []const u8, cached_key: CachedKey) !ValidationResult { // commit structure checks first (cheap, no allocation) self.checkCommitStructure(payload) catch { _ = self.stats.failed_bad_structure.fetchAdd(1, .monotonic); return error.InvalidFrame; }; // extract blocks (raw CAR bytes) from the pre-decoded payload const blocks = payload.getBytes("blocks") orelse return error.InvalidFrame; // blocks size check — lexicon maxLength: 2000000 if (blocks.len > 2_000_000) return error.InvalidFrame; // build public key for verification const public_key = zat.multicodec.PublicKey{ .key_type = cached_key.key_type, .raw = cached_key.raw[0..cached_key.len], }; // run real signature verification (needs its own arena for CAR/MST temporaries) var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); const alloc = arena.allocator(); // try sync 1.1 path: extract ops and use verifyCommitDiff if (self.config.verify_commit_diff) { if (self.extractOps(alloc, payload)) |msg_ops| { // get stored prev_data from payload const prev_data: ?[]const u8 = if (payload.get("prevData")) |pd| switch (pd) { .cid => |c| c.raw, .null => null, else => null, } else null; const diff_result = zat.verifyCommitDiff(alloc, blocks, msg_ops, prev_data, public_key, .{ .expected_did = expected_did, .skip_inversion = prev_data == null, }) catch |err| { return err; }; return .{ .valid = true, .skipped = false, .data_cid = diff_result.data_cid, .commit_rev = diff_result.commit_rev, }; } } // fallback: legacy verification (signature + optional MST walk) const result = zat.verifyCommitCar(alloc, blocks, public_key, .{ .verify_mst = self.config.verify_mst, .expected_did = expected_did, }) catch |err| { return err; }; return .{ .valid = true, .skipped = false, .data_cid = result.commit_cid, .commit_rev = result.commit_rev, }; } /// extract ops from payload and convert to mst.Operation array. /// the firehose format uses a single "path" field ("collection/rkey"), /// not separate "collection"/"rkey" fields. fn extractOps(self: *Validator, alloc: Allocator, payload: zat.cbor.Value) ?[]const zat.MstOperation { _ = self; const ops_array = payload.getArray("ops") orelse return null; var ops: std.ArrayListUnmanaged(zat.MstOperation) = .{}; for (ops_array) |op| { const action = op.getString("action") orelse continue; const path = op.getString("path") orelse continue; // validate path contains "/" (collection/rkey) if (std.mem.indexOfScalar(u8, path, '/') == null) continue; // extract CID values const cid_value: ?[]const u8 = if (op.get("cid")) |v| switch (v) { .cid => |c| c.raw, else => null, } else null; var value: ?[]const u8 = null; var prev: ?[]const u8 = null; if (std.mem.eql(u8, action, "create")) { value = cid_value; } else if (std.mem.eql(u8, action, "update")) { value = cid_value; prev = if (op.get("prev")) |v| switch (v) { .cid => |c| c.raw, else => null, } else null; } else if (std.mem.eql(u8, action, "delete")) { prev = if (op.get("prev")) |v| switch (v) { .cid => |c| c.raw, else => null, } else null; } else continue; ops.append(alloc, .{ .path = path, .value = value, .prev = prev, }) catch return null; } if (ops.items.len == 0) return null; return ops.items; } fn checkCommitStructure(self: *Validator, payload: zat.cbor.Value) !void { // check repo field is a valid DID const repo = payload.getString("repo") orelse return error.InvalidFrame; if (zat.Did.parse(repo) == null) return error.InvalidFrame; // check rev is a valid TID if (payload.getString("rev")) |rev| { if (zat.Tid.parse(rev) == null) return error.InvalidFrame; } // check ops count if (payload.get("ops")) |ops_value| { switch (ops_value) { .array => |ops| { if (ops.len > self.config.max_ops) return error.InvalidFrame; // validate each op has valid path (collection/rkey) for (ops) |op| { if (op.getString("path")) |path| { if (std.mem.indexOfScalar(u8, path, '/')) |sep| { const collection = path[0..sep]; const rkey = path[sep + 1 ..]; if (zat.Nsid.parse(collection) == null) return error.InvalidFrame; if (rkey.len > 0) { if (zat.Rkey.parse(rkey) == null) return error.InvalidFrame; } } else return error.InvalidFrame; // path must contain '/' } } }, else => return error.InvalidFrame, } } } fn queueResolve(self: *Validator, did: []const u8) void { // check if already cached (race between validate and resolver) if (self.cache.contains(did)) return; const duped = self.allocator.dupe(u8, did) catch return; self.queue_mutex.lock(); defer self.queue_mutex.unlock(); // skip if already queued (prevents unbounded queue growth) if (self.queued_set.contains(duped)) { self.allocator.free(duped); return; } // cap queue size — drop DID without adding to queued_set so it can be re-queued later if (self.queue.items.len >= max_queue_size) { self.allocator.free(duped); return; } self.queue.append(self.allocator, duped) catch { self.allocator.free(duped); return; }; self.queued_set.put(self.allocator, duped, {}) catch {}; self.queue_cond.signal(); } fn resolveLoop(self: *Validator) void { var resolver = zat.DidResolver.initWithOptions(self.allocator, .{ .keep_alive = true }); defer resolver.deinit(); while (self.alive.load(.acquire)) { var did: ?[]const u8 = null; { self.queue_mutex.lock(); defer self.queue_mutex.unlock(); while (self.queue.items.len == 0 and self.alive.load(.acquire)) { self.queue_cond.timedWait(&self.queue_mutex, 1 * std.time.ns_per_s) catch {}; } if (self.queue.items.len > 0) { did = self.queue.orderedRemove(0); _ = self.queued_set.remove(did.?); } } const d = did orelse continue; defer self.allocator.free(d); // skip if already cached (resolved while queued) if (self.cache.contains(d)) continue; // resolve DID → signing key const parsed = zat.Did.parse(d) orelse continue; var doc = resolver.resolve(parsed) catch |err| { log.debug("DID resolve failed for {s}: {s}", .{ d, @errorName(err) }); continue; }; defer doc.deinit(); // extract and decode signing key const vm = doc.signingKey() orelse continue; const key_bytes = zat.multibase.decode(self.allocator, vm.public_key_multibase) catch continue; defer self.allocator.free(key_bytes); const public_key = zat.multicodec.parsePublicKey(key_bytes) catch continue; // store decoded key in cache (fixed-size, no pointer chasing) var cached = CachedKey{ .key_type = public_key.key_type, .raw = undefined, .len = @intCast(public_key.raw.len), .resolve_time = std.time.timestamp(), }; @memcpy(cached.raw[0..public_key.raw.len], public_key.raw); self.cache.put(d, cached) catch continue; // --- host validation (merged from migration queue) --- // while we have the DID doc, check PDS endpoint and update host if needed. // best-effort: failures don't prevent signing key caching. if (self.persist) |persist| { if (doc.pdsEndpoint()) |pds_endpoint| { if (extractHostFromUrl(pds_endpoint)) |pds_host| { const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse continue; const uid = persist.uidForDid(d) catch continue; const current_host = persist.getAccountHostId(uid) catch continue; if (current_host != 0 and current_host != pds_host_id) { persist.setAccountHostId(uid, pds_host_id) catch {}; log.info("host updated via DID doc: {s} -> host {d}", .{ d, pds_host_id }); } } } } } } /// evict a DID's cached signing key (e.g. on #identity event). /// the next commit from this DID will trigger a fresh resolution. pub fn evictKey(self: *Validator, did: []const u8) void { _ = self.cache.remove(did); } /// cache size (for diagnostics) pub fn cacheSize(self: *Validator) u32 { return self.cache.count(); } /// resolve queue length (for diagnostics — non-blocking) pub fn resolveQueueLen(self: *Validator) usize { if (!self.queue_mutex.tryLock()) return 0; defer self.queue_mutex.unlock(); return self.queue.items.len; } /// resolve dedup set size (for diagnostics — non-blocking) pub fn resolveQueuedSetCount(self: *Validator) u32 { if (!self.queue_mutex.tryLock()) return 0; defer self.queue_mutex.unlock(); return self.queued_set.count(); } /// signing key cache hashmap backing capacity (for memory attribution) pub fn cacheMapCapacity(self: *Validator) u32 { return self.cache.mapCapacity(); } /// resolver dedup set hashmap backing capacity (for memory attribution — non-blocking) pub fn resolveQueuedSetCapacity(self: *Validator) u32 { if (!self.queue_mutex.tryLock()) return 0; defer self.queue_mutex.unlock(); return self.queued_set.capacity(); } pub const HostAuthority = enum { accept, migrate, reject }; /// synchronous host authority check. called on first-seen DIDs (is_new) /// and host migrations (host_changed). resolves the DID doc to verify the /// PDS endpoint matches the incoming host. retries once on failure to /// handle transient network errors. /// /// returns: /// .accept — should not happen (caller should only call on new/mismatch) /// .migrate — DID doc confirms this host, caller should update host_id /// .reject — DID doc does not confirm, caller should drop the event pub fn resolveHostAuthority(self: *Validator, did: []const u8, incoming_host_id: u64) HostAuthority { const persist = self.persist orelse return .migrate; // no DB — can't check var resolver = zat.DidResolver.initWithOptions(self.allocator, .{}); defer resolver.deinit(); const parsed = zat.Did.parse(did) orelse return .reject; // first resolve attempt var doc = resolver.resolve(parsed) catch { // retry once on network failure var doc2 = resolver.resolve(parsed) catch return .reject; defer doc2.deinit(); return self.checkPdsHost(&doc2, persist, incoming_host_id); }; defer doc.deinit(); return self.checkPdsHost(&doc, persist, incoming_host_id); } fn checkPdsHost(self: *Validator, doc: *zat.DidDocument, persist: *event_log_mod.DiskPersist, incoming_host_id: u64) HostAuthority { _ = self; const pds_endpoint = doc.pdsEndpoint() orelse return .reject; const pds_host = extractHostFromUrl(pds_endpoint) orelse return .reject; const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse return .reject; if (pds_host_id == incoming_host_id) return .migrate; return .reject; } }; /// extract hostname from a URL like "https://pds.example.com" or "https://pds.example.com:443/path" pub fn extractHostFromUrl(url: []const u8) ?[]const u8 { // strip scheme var rest = url; if (std.mem.startsWith(u8, rest, "https://")) { rest = rest["https://".len..]; } else if (std.mem.startsWith(u8, rest, "http://")) { rest = rest["http://".len..]; } // strip path if (std.mem.indexOfScalar(u8, rest, '/')) |i| { rest = rest[0..i]; } // strip port if (std.mem.indexOfScalar(u8, rest, ':')) |i| { rest = rest[0..i]; } if (rest.len == 0) return null; return rest; } fn parseEnvInt(comptime T: type, key: []const u8, default: T) T { const val = std.posix.getenv(key) orelse return default; return std.fmt.parseInt(T, val, 10) catch default; } // --- tests --- test "validateCommit skips on cache miss" { var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); // build a commit payload using SDK const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, .{ .key = "seq", .value = .{ .unsigned = 42 } }, .{ .key = "rev", .value = .{ .text = "3k2abc000000" } }, .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } }, } }; const result = v.validateCommit(payload); try std.testing.expect(result.valid); try std.testing.expect(result.skipped); try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire)); } test "validateCommit skips when no repo field" { var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); // payload without "repo" field const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "seq", .value = .{ .unsigned = 42 } }, } }; const result = v.validateCommit(payload); try std.testing.expect(result.valid); try std.testing.expect(result.skipped); try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); } test "checkCommitStructure rejects invalid DID" { var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "repo", .value = .{ .text = "not-a-did" } }, } }; try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload)); } test "checkCommitStructure accepts valid commit" { var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, } }; try v.checkCommitStructure(payload); } test "validateSync skips on cache miss" { var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, .{ .key = "seq", .value = .{ .unsigned = 42 } }, .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } }, } }; const result = v.validateSync(payload); try std.testing.expect(result.valid); try std.testing.expect(result.skipped); try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire)); } test "validateSync rejects invalid DID" { var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "did", .value = .{ .text = "not-a-did" } }, .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } }, } }; const result = v.validateSync(payload); try std.testing.expect(!result.valid); try std.testing.expect(!result.skipped); try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire)); } test "validateSync rejects missing blocks" { var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, } }; const result = v.validateSync(payload); try std.testing.expect(!result.valid); try std.testing.expect(!result.skipped); try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire)); } test "validateSync skips when no did field" { var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "seq", .value = .{ .unsigned = 42 } }, } }; const result = v.validateSync(payload); try std.testing.expect(result.valid); try std.testing.expect(result.skipped); try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); } test "LRU cache evicts least recently used" { var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); v.cache.capacity = 3; defer v.deinit(); const mk = CachedKey{ .key_type = .p256, .raw = .{0} ** 33, .len = 33 }; try v.cache.put("did:plc:aaa", mk); try v.cache.put("did:plc:bbb", mk); try v.cache.put("did:plc:ccc", mk); // access "aaa" to promote it _ = v.cache.get("did:plc:aaa"); // insert "ddd" — should evict "bbb" (LRU) try v.cache.put("did:plc:ddd", mk); try std.testing.expect(v.cache.get("did:plc:bbb") == null); try std.testing.expect(v.cache.get("did:plc:aaa") != null); try std.testing.expect(v.cache.get("did:plc:ccc") != null); try std.testing.expect(v.cache.get("did:plc:ddd") != null); try std.testing.expectEqual(@as(u32, 3), v.cache.count()); } test "checkCommitStructure rejects too many ops" { var stats = broadcaster.Stats{}; var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .max_ops = 2 }); defer v.deinit(); // build ops array with 3 items (over limit of 2) const ops = [_]zat.cbor.Value{ .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, }; const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, .{ .key = "ops", .value = .{ .array = &ops } }, } }; try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload)); } // --- spec conformance tests --- test "spec: #commit blocks > 2,000,000 bytes rejected" { // lexicon maxLength for #commit blocks: 2,000,000 var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); // insert a fake cached key so we reach the blocks size check const did = "did:plc:test123"; try v.cache.put(did, .{ .key_type = .p256, .raw = .{0} ** 33, .len = 33, .resolve_time = 100, }); // blocks with 2,000,001 bytes (1 byte over limit) const oversized_blocks = try std.testing.allocator.alloc(u8, 2_000_001); defer std.testing.allocator.free(oversized_blocks); @memset(oversized_blocks, 0); const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "repo", .value = .{ .text = did } }, .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, .{ .key = "blocks", .value = .{ .bytes = oversized_blocks } }, } }; const result = v.validateCommit(payload); try std.testing.expect(!result.valid or result.skipped); } test "spec: #commit blocks = 2,000,000 bytes accepted (boundary)" { // lexicon maxLength for #commit blocks: 2,000,000 — exactly at limit should pass size check var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); const did = "did:plc:test123"; try v.cache.put(did, .{ .key_type = .p256, .raw = .{0} ** 33, .len = 33, .resolve_time = 100, }); // exactly 2,000,000 bytes — should pass size check (may fail signature verify, that's ok) const exact_blocks = try std.testing.allocator.alloc(u8, 2_000_000); defer std.testing.allocator.free(exact_blocks); @memset(exact_blocks, 0); const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "repo", .value = .{ .text = did } }, .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, .{ .key = "blocks", .value = .{ .bytes = exact_blocks } }, } }; const result = v.validateCommit(payload); // should not be rejected for size — may fail signature verification (that's fine, // it means we passed the size check). with P1.1c, sig failure → skipped=true. try std.testing.expect(result.valid or result.skipped); } test "spec: #sync blocks > 10,000 bytes rejected" { // lexicon maxLength for #sync blocks: 10,000 var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_001) } }, } }; const result = v.validateSync(payload); try std.testing.expect(!result.valid); try std.testing.expect(!result.skipped); } test "spec: #sync blocks = 10,000 bytes accepted (boundary)" { // lexicon maxLength for #sync blocks: 10,000 — exactly at limit should pass size check var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_000) } }, } }; const result = v.validateSync(payload); // should pass size check — will be a cache miss → skipped (no cached key) try std.testing.expect(result.valid); try std.testing.expect(result.skipped); } test "extractOps reads path field from firehose format" { var stats = broadcaster.Stats{}; var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }); defer v.deinit(); // use arena since extractOps allocates an ArrayList internally var arena = std.heap.ArenaAllocator.init(std.testing.allocator); defer arena.deinit(); const ops = [_]zat.cbor.Value{ .{ .map = &.{ .{ .key = "action", .value = .{ .text = "create" } }, .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } }, .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } }, } }, .{ .map = &.{ .{ .key = "action", .value = .{ .text = "delete" } }, .{ .key = "path", .value = .{ .text = "app.bsky.feed.like/3k2def000000" } }, } }, }; const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, .{ .key = "ops", .value = .{ .array = &ops } }, } }; const result = v.extractOps(arena.allocator(), payload); try std.testing.expect(result != null); try std.testing.expectEqual(@as(usize, 2), result.?.len); try std.testing.expectEqualStrings("app.bsky.feed.post/3k2abc000000", result.?[0].path); try std.testing.expectEqualStrings("app.bsky.feed.like/3k2def000000", result.?[1].path); try std.testing.expect(result.?[0].value != null); // create has cid try std.testing.expect(result.?[1].value == null); // delete has no cid } test "extractOps rejects malformed path without slash" { var stats = broadcaster.Stats{}; var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }); defer v.deinit(); var arena = std.heap.ArenaAllocator.init(std.testing.allocator); defer arena.deinit(); const ops = [_]zat.cbor.Value{ .{ .map = &.{ .{ .key = "action", .value = .{ .text = "create" } }, .{ .key = "path", .value = .{ .text = "noslash" } }, .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } }, } }, }; const payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, .{ .key = "ops", .value = .{ .array = &ops } }, } }; // malformed path (no slash) → all ops skipped → returns null const result = v.extractOps(arena.allocator(), payload); try std.testing.expect(result == null); } test "checkCommitStructure validates path field" { var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); // valid path const valid_ops = [_]zat.cbor.Value{ .{ .map = &.{ .{ .key = "action", .value = .{ .text = "create" } }, .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } }, } }, }; const valid_payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, .{ .key = "ops", .value = .{ .array = &valid_ops } }, } }; try v.checkCommitStructure(valid_payload); // invalid collection in path const invalid_ops = [_]zat.cbor.Value{ .{ .map = &.{ .{ .key = "action", .value = .{ .text = "create" } }, .{ .key = "path", .value = .{ .text = "not-an-nsid/3k2abc000000" } }, } }, }; const invalid_payload: zat.cbor.Value = .{ .map = &.{ .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, .{ .key = "ops", .value = .{ .array = &invalid_ops } }, } }; try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(invalid_payload)); } test "queueResolve deduplicates repeated DIDs" { var stats = broadcaster.Stats{}; var v = Validator.init(std.testing.allocator, &stats); defer v.deinit(); // queue the same DID 100 times for (0..100) |_| { v.queueResolve("did:plc:duplicate"); } // should have exactly 1 entry, not 100 try std.testing.expectEqual(@as(usize, 1), v.queue.items.len); try std.testing.expectEqual(@as(u32, 1), v.queued_set.count()); }