atproto relay implementation in zig zlay.waow.tech
at main 951 lines 37 kB view raw
1//! relay frame validator — DID key resolution + real signature verification 2//! 3//! validates firehose commit frames by verifying the commit signature against 4//! the pre-resolved signing key for the DID. accepts pre-decoded CBOR payload 5//! from the subscriber (decoded via zat SDK). on cache miss, skips validation 6//! and queues background resolution. no frame is ever blocked on network I/O. 7 8const std = @import("std"); 9const zat = @import("zat"); 10const broadcaster = @import("broadcaster.zig"); 11const event_log_mod = @import("event_log.zig"); 12const lru = @import("lru.zig"); 13 14const Allocator = std.mem.Allocator; 15const log = std.log.scoped(.relay); 16 17/// decoded and cached signing key for a DID 18const CachedKey = struct { 19 key_type: zat.multicodec.KeyType, 20 raw: [33]u8, // compressed public key (secp256k1 or p256) 21 len: u8, 22 resolve_time: i64 = 0, // epoch seconds when resolved 23}; 24 25pub const ValidationResult = struct { 26 valid: bool, 27 skipped: bool, 28 data_cid: ?[]const u8 = null, // MST root CID from verified commit 29 commit_rev: ?[]const u8 = null, // rev from verified commit 30}; 31 32/// configuration for commit validation checks 33pub const ValidatorConfig = struct { 34 /// verify MST structure during signature verification 35 verify_mst: bool = false, // off by default for relay throughput 36 /// verify commit diffs via MST inversion (sync 1.1) 37 verify_commit_diff: bool = false, 38 /// max allowed operations per commit 39 max_ops: usize = 200, 40 /// max clock skew for rev timestamps (seconds) 41 rev_clock_skew: i64 = 300, // 5 minutes 42}; 43 44pub const Validator = struct { 45 allocator: Allocator, 46 stats: *broadcaster.Stats, 47 config: ValidatorConfig, 48 persist: ?*event_log_mod.DiskPersist = null, 49 // DID → signing key cache (decoded, ready for verification) 50 cache: lru.LruCache(CachedKey), 51 // background resolve queue 52 queue: std.ArrayListUnmanaged([]const u8) = .{}, 53 // in-flight set — prevents duplicate DID entries in the queue 54 queued_set: std.StringHashMapUnmanaged(void) = .{}, 55 queue_mutex: std.Thread.Mutex = .{}, 56 queue_cond: std.Thread.Condition = .{}, 57 resolver_threads: [max_resolver_threads]?std.Thread = .{null} ** max_resolver_threads, 58 alive: std.atomic.Value(bool) = .{ .raw = true }, 59 max_cache_size: u32 = 250_000, 60 61 const max_resolver_threads = 8; 62 const default_resolver_threads = 4; 63 const max_queue_size: usize = 100_000; 64 65 pub fn init(allocator: Allocator, stats: *broadcaster.Stats) Validator { 66 return initWithConfig(allocator, stats, .{}); 67 } 68 69 pub fn initWithConfig(allocator: Allocator, stats: *broadcaster.Stats, config: ValidatorConfig) Validator { 70 return .{ 71 .allocator = allocator, 72 .stats = stats, 73 .config = config, 74 .cache = lru.LruCache(CachedKey).init(allocator, 250_000), 75 }; 76 } 77 78 pub fn deinit(self: *Validator) void { 79 self.alive.store(false, .release); 80 self.queue_cond.broadcast(); 81 for (&self.resolver_threads) |*t| { 82 if (t.*) |thread| { 83 thread.join(); 84 t.* = null; 85 } 86 } 87 88 self.cache.deinit(); 89 90 // free queued DIDs 91 for (self.queue.items) |did| { 92 self.allocator.free(did); 93 } 94 self.queue.deinit(self.allocator); 95 self.queued_set.deinit(self.allocator); 96 } 97 98 /// start background resolver threads 99 pub fn start(self: *Validator) !void { 100 self.max_cache_size = parseEnvInt(u32, "VALIDATOR_CACHE_SIZE", self.max_cache_size); 101 self.cache.capacity = self.max_cache_size; 102 const n = parseEnvInt(u8, "RESOLVER_THREADS", default_resolver_threads); 103 const count = @min(n, max_resolver_threads); 104 for (self.resolver_threads[0..count]) |*t| { 105 t.* = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, resolveLoop, .{self}); 106 } 107 } 108 109 /// validate a #sync frame: signature verification only (no ops, no MST). 110 /// #sync resets a repo to a new commit state — used for recovery from broken streams. 111 /// on cache miss, queues background resolution and skips. 112 pub fn validateSync(self: *Validator, payload: zat.cbor.Value) ValidationResult { 113 const did = payload.getString("did") orelse { 114 _ = self.stats.skipped.fetchAdd(1, .monotonic); 115 return .{ .valid = true, .skipped = true }; 116 }; 117 118 if (zat.Did.parse(did) == null) { 119 _ = self.stats.failed.fetchAdd(1, .monotonic); 120 return .{ .valid = false, .skipped = false }; 121 } 122 123 // check rev is valid TID (if present) 124 if (payload.getString("rev")) |rev| { 125 if (zat.Tid.parse(rev) == null) { 126 _ = self.stats.failed.fetchAdd(1, .monotonic); 127 return .{ .valid = false, .skipped = false }; 128 } 129 } 130 131 const blocks = payload.getBytes("blocks") orelse { 132 _ = self.stats.failed.fetchAdd(1, .monotonic); 133 return .{ .valid = false, .skipped = false }; 134 }; 135 136 // #sync CAR should be small (just the signed commit block) 137 // lexicon maxLength: 10000 138 if (blocks.len > 10_000) { 139 _ = self.stats.failed.fetchAdd(1, .monotonic); 140 return .{ .valid = false, .skipped = false }; 141 } 142 143 // cache lookup 144 const cached_key: ?CachedKey = self.cache.get(did); 145 146 if (cached_key == null) { 147 _ = self.stats.cache_misses.fetchAdd(1, .monotonic); 148 _ = self.stats.skipped.fetchAdd(1, .monotonic); 149 self.queueResolve(did); 150 return .{ .valid = true, .skipped = true }; 151 } 152 153 _ = self.stats.cache_hits.fetchAdd(1, .monotonic); 154 155 // verify signature (no MST, no ops) 156 const public_key = zat.multicodec.PublicKey{ 157 .key_type = cached_key.?.key_type, 158 .raw = cached_key.?.raw[0..cached_key.?.len], 159 }; 160 161 var arena = std.heap.ArenaAllocator.init(self.allocator); 162 defer arena.deinit(); 163 164 const result = zat.verifyCommitCar(arena.allocator(), blocks, public_key, .{ 165 .verify_mst = false, 166 .expected_did = did, 167 .max_car_size = 10 * 1024, 168 }) catch |err| { 169 log.debug("sync verification failed for {s}: {s}", .{ did, @errorName(err) }); 170 // sync spec: on signature failure, key may have rotated. 171 // evict cached key and queue re-resolution. skip this frame. 172 self.evictKey(did); 173 self.queueResolve(did); 174 _ = self.stats.skipped.fetchAdd(1, .monotonic); 175 return .{ .valid = true, .skipped = true }; 176 }; 177 178 _ = self.stats.validated.fetchAdd(1, .monotonic); 179 return .{ 180 .valid = true, 181 .skipped = false, 182 .data_cid = result.commit_cid, 183 .commit_rev = result.commit_rev, 184 }; 185 } 186 187 /// validate a commit frame using a pre-decoded CBOR payload (from SDK decoder). 188 /// on cache miss, queues background resolution and skips. 189 pub fn validateCommit(self: *Validator, payload: zat.cbor.Value) ValidationResult { 190 // extract DID from decoded payload 191 const did = payload.getString("repo") orelse { 192 _ = self.stats.skipped.fetchAdd(1, .monotonic); 193 return .{ .valid = true, .skipped = true }; 194 }; 195 196 // check cache for pre-resolved signing key 197 const cached_key: ?CachedKey = self.cache.get(did); 198 199 if (cached_key == null) { 200 // cache miss — queue for background resolution, skip validation 201 _ = self.stats.cache_misses.fetchAdd(1, .monotonic); 202 _ = self.stats.skipped.fetchAdd(1, .monotonic); 203 self.queueResolve(did); 204 return .{ .valid = true, .skipped = true }; 205 } 206 207 _ = self.stats.cache_hits.fetchAdd(1, .monotonic); 208 209 // cache hit — do structure checks + signature verification 210 if (self.verifyCommit(payload, did, cached_key.?)) |vr| { 211 _ = self.stats.validated.fetchAdd(1, .monotonic); 212 return vr; 213 } else |err| { 214 log.debug("commit verification failed for {s}: {s}", .{ did, @errorName(err) }); 215 // sync spec: on signature failure, key may have rotated. 216 // evict cached key and queue re-resolution. skip this frame 217 // (treat as cache miss). next commit will use the refreshed key. 218 self.evictKey(did); 219 self.queueResolve(did); 220 _ = self.stats.skipped.fetchAdd(1, .monotonic); 221 return .{ .valid = true, .skipped = true }; 222 } 223 } 224 225 fn verifyCommit(self: *Validator, payload: zat.cbor.Value, expected_did: []const u8, cached_key: CachedKey) !ValidationResult { 226 // commit structure checks first (cheap, no allocation) 227 try self.checkCommitStructure(payload); 228 229 // extract blocks (raw CAR bytes) from the pre-decoded payload 230 const blocks = payload.getBytes("blocks") orelse return error.InvalidFrame; 231 232 // blocks size check — lexicon maxLength: 2000000 233 if (blocks.len > 2_000_000) return error.InvalidFrame; 234 235 // build public key for verification 236 const public_key = zat.multicodec.PublicKey{ 237 .key_type = cached_key.key_type, 238 .raw = cached_key.raw[0..cached_key.len], 239 }; 240 241 // run real signature verification (needs its own arena for CAR/MST temporaries) 242 var arena = std.heap.ArenaAllocator.init(self.allocator); 243 defer arena.deinit(); 244 const alloc = arena.allocator(); 245 246 // try sync 1.1 path: extract ops and use verifyCommitDiff 247 if (self.config.verify_commit_diff) { 248 if (self.extractOps(alloc, payload)) |msg_ops| { 249 // get stored prev_data from payload 250 const prev_data: ?[]const u8 = if (payload.get("prevData")) |pd| switch (pd) { 251 .cid => |c| c.raw, 252 .null => null, 253 else => null, 254 } else null; 255 256 const diff_result = zat.verifyCommitDiff(alloc, blocks, msg_ops, prev_data, public_key, .{ 257 .expected_did = expected_did, 258 .skip_inversion = prev_data == null, 259 }) catch |err| { 260 return err; 261 }; 262 263 return .{ 264 .valid = true, 265 .skipped = false, 266 .data_cid = diff_result.data_cid, 267 .commit_rev = diff_result.commit_rev, 268 }; 269 } 270 } 271 272 // fallback: legacy verification (signature + optional MST walk) 273 const result = zat.verifyCommitCar(alloc, blocks, public_key, .{ 274 .verify_mst = self.config.verify_mst, 275 .expected_did = expected_did, 276 }) catch |err| { 277 return err; 278 }; 279 280 return .{ 281 .valid = true, 282 .skipped = false, 283 .data_cid = result.commit_cid, 284 .commit_rev = result.commit_rev, 285 }; 286 } 287 288 /// extract ops from payload and convert to mst.Operation array. 289 /// the firehose format uses a single "path" field ("collection/rkey"), 290 /// not separate "collection"/"rkey" fields. 291 fn extractOps(self: *Validator, alloc: Allocator, payload: zat.cbor.Value) ?[]const zat.MstOperation { 292 _ = self; 293 const ops_array = payload.getArray("ops") orelse return null; 294 var ops: std.ArrayListUnmanaged(zat.MstOperation) = .{}; 295 for (ops_array) |op| { 296 const action = op.getString("action") orelse continue; 297 const path = op.getString("path") orelse continue; 298 299 // validate path contains "/" (collection/rkey) 300 if (std.mem.indexOfScalar(u8, path, '/') == null) continue; 301 302 // extract CID values 303 const cid_value: ?[]const u8 = if (op.get("cid")) |v| switch (v) { 304 .cid => |c| c.raw, 305 else => null, 306 } else null; 307 308 var value: ?[]const u8 = null; 309 var prev: ?[]const u8 = null; 310 311 if (std.mem.eql(u8, action, "create")) { 312 value = cid_value; 313 } else if (std.mem.eql(u8, action, "update")) { 314 value = cid_value; 315 prev = if (op.get("prev")) |v| switch (v) { 316 .cid => |c| c.raw, 317 else => null, 318 } else null; 319 } else if (std.mem.eql(u8, action, "delete")) { 320 prev = if (op.get("prev")) |v| switch (v) { 321 .cid => |c| c.raw, 322 else => null, 323 } else null; 324 } else continue; 325 326 ops.append(alloc, .{ 327 .path = path, 328 .value = value, 329 .prev = prev, 330 }) catch return null; 331 } 332 333 if (ops.items.len == 0) return null; 334 return ops.items; 335 } 336 337 fn checkCommitStructure(self: *Validator, payload: zat.cbor.Value) !void { 338 // check repo field is a valid DID 339 const repo = payload.getString("repo") orelse return error.InvalidFrame; 340 if (zat.Did.parse(repo) == null) return error.InvalidFrame; 341 342 // check rev is a valid TID 343 if (payload.getString("rev")) |rev| { 344 if (zat.Tid.parse(rev) == null) return error.InvalidFrame; 345 } 346 347 // check ops count 348 if (payload.get("ops")) |ops_value| { 349 switch (ops_value) { 350 .array => |ops| { 351 if (ops.len > self.config.max_ops) return error.InvalidFrame; 352 // validate each op has valid path (collection/rkey) 353 for (ops) |op| { 354 if (op.getString("path")) |path| { 355 if (std.mem.indexOfScalar(u8, path, '/')) |sep| { 356 const collection = path[0..sep]; 357 const rkey = path[sep + 1 ..]; 358 if (zat.Nsid.parse(collection) == null) return error.InvalidFrame; 359 if (rkey.len > 0) { 360 if (zat.Rkey.parse(rkey) == null) return error.InvalidFrame; 361 } 362 } else return error.InvalidFrame; // path must contain '/' 363 } 364 } 365 }, 366 else => return error.InvalidFrame, 367 } 368 } 369 } 370 371 fn queueResolve(self: *Validator, did: []const u8) void { 372 // check if already cached (race between validate and resolver) 373 if (self.cache.contains(did)) return; 374 375 const duped = self.allocator.dupe(u8, did) catch return; 376 377 self.queue_mutex.lock(); 378 defer self.queue_mutex.unlock(); 379 380 // skip if already queued (prevents unbounded queue growth) 381 if (self.queued_set.contains(duped)) { 382 self.allocator.free(duped); 383 return; 384 } 385 386 // cap queue size — drop DID without adding to queued_set so it can be re-queued later 387 if (self.queue.items.len >= max_queue_size) { 388 self.allocator.free(duped); 389 return; 390 } 391 392 self.queue.append(self.allocator, duped) catch { 393 self.allocator.free(duped); 394 return; 395 }; 396 self.queued_set.put(self.allocator, duped, {}) catch {}; 397 self.queue_cond.signal(); 398 } 399 400 fn resolveLoop(self: *Validator) void { 401 var resolver = zat.DidResolver.initWithOptions(self.allocator, .{ .keep_alive = true }); 402 defer resolver.deinit(); 403 404 while (self.alive.load(.acquire)) { 405 var did: ?[]const u8 = null; 406 { 407 self.queue_mutex.lock(); 408 defer self.queue_mutex.unlock(); 409 while (self.queue.items.len == 0 and self.alive.load(.acquire)) { 410 self.queue_cond.timedWait(&self.queue_mutex, 1 * std.time.ns_per_s) catch {}; 411 } 412 if (self.queue.items.len > 0) { 413 did = self.queue.orderedRemove(0); 414 _ = self.queued_set.remove(did.?); 415 } 416 } 417 418 const d = did orelse continue; 419 defer self.allocator.free(d); 420 421 // skip if already cached (resolved while queued) 422 if (self.cache.contains(d)) continue; 423 424 // resolve DID → signing key 425 const parsed = zat.Did.parse(d) orelse continue; 426 var doc = resolver.resolve(parsed) catch |err| { 427 log.debug("DID resolve failed for {s}: {s}", .{ d, @errorName(err) }); 428 continue; 429 }; 430 defer doc.deinit(); 431 432 // extract and decode signing key 433 const vm = doc.signingKey() orelse continue; 434 const key_bytes = zat.multibase.decode(self.allocator, vm.public_key_multibase) catch continue; 435 defer self.allocator.free(key_bytes); 436 const public_key = zat.multicodec.parsePublicKey(key_bytes) catch continue; 437 438 // store decoded key in cache (fixed-size, no pointer chasing) 439 var cached = CachedKey{ 440 .key_type = public_key.key_type, 441 .raw = undefined, 442 .len = @intCast(public_key.raw.len), 443 .resolve_time = std.time.timestamp(), 444 }; 445 @memcpy(cached.raw[0..public_key.raw.len], public_key.raw); 446 447 self.cache.put(d, cached) catch continue; 448 449 // --- host validation (merged from migration queue) --- 450 // while we have the DID doc, check PDS endpoint and update host if needed. 451 // best-effort: failures don't prevent signing key caching. 452 if (self.persist) |persist| { 453 if (doc.pdsEndpoint()) |pds_endpoint| { 454 if (extractHostFromUrl(pds_endpoint)) |pds_host| { 455 const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse continue; 456 const uid = persist.uidForDid(d) catch continue; 457 const current_host = persist.getAccountHostId(uid) catch continue; 458 if (current_host != 0 and current_host != pds_host_id) { 459 persist.setAccountHostId(uid, pds_host_id) catch {}; 460 log.info("host updated via DID doc: {s} -> host {d}", .{ d, pds_host_id }); 461 } 462 } 463 } 464 } 465 } 466 } 467 468 /// evict a DID's cached signing key (e.g. on #identity event). 469 /// the next commit from this DID will trigger a fresh resolution. 470 pub fn evictKey(self: *Validator, did: []const u8) void { 471 _ = self.cache.remove(did); 472 } 473 474 /// cache size (for diagnostics) 475 pub fn cacheSize(self: *Validator) u32 { 476 return self.cache.count(); 477 } 478 479 /// resolve queue length (for diagnostics — non-blocking) 480 pub fn resolveQueueLen(self: *Validator) usize { 481 if (!self.queue_mutex.tryLock()) return 0; 482 defer self.queue_mutex.unlock(); 483 return self.queue.items.len; 484 } 485 486 /// resolve dedup set size (for diagnostics — non-blocking) 487 pub fn resolveQueuedSetCount(self: *Validator) u32 { 488 if (!self.queue_mutex.tryLock()) return 0; 489 defer self.queue_mutex.unlock(); 490 return self.queued_set.count(); 491 } 492 493 /// signing key cache hashmap backing capacity (for memory attribution) 494 pub fn cacheMapCapacity(self: *Validator) u32 { 495 return self.cache.mapCapacity(); 496 } 497 498 /// resolver dedup set hashmap backing capacity (for memory attribution — non-blocking) 499 pub fn resolveQueuedSetCapacity(self: *Validator) u32 { 500 if (!self.queue_mutex.tryLock()) return 0; 501 defer self.queue_mutex.unlock(); 502 return self.queued_set.capacity(); 503 } 504 505 pub const HostAuthority = enum { accept, migrate, reject }; 506 507 /// synchronous host authority check. called on first-seen DIDs (is_new) 508 /// and host migrations (host_changed). resolves the DID doc to verify the 509 /// PDS endpoint matches the incoming host. retries once on failure to 510 /// handle transient network errors. 511 /// 512 /// returns: 513 /// .accept — should not happen (caller should only call on new/mismatch) 514 /// .migrate — DID doc confirms this host, caller should update host_id 515 /// .reject — DID doc does not confirm, caller should drop the event 516 pub fn resolveHostAuthority(self: *Validator, did: []const u8, incoming_host_id: u64) HostAuthority { 517 const persist = self.persist orelse return .migrate; // no DB — can't check 518 519 var resolver = zat.DidResolver.initWithOptions(self.allocator, .{}); 520 defer resolver.deinit(); 521 522 const parsed = zat.Did.parse(did) orelse return .reject; 523 524 // first resolve attempt 525 var doc = resolver.resolve(parsed) catch { 526 // retry once on network failure 527 var doc2 = resolver.resolve(parsed) catch return .reject; 528 defer doc2.deinit(); 529 return self.checkPdsHost(&doc2, persist, incoming_host_id); 530 }; 531 defer doc.deinit(); 532 return self.checkPdsHost(&doc, persist, incoming_host_id); 533 } 534 535 fn checkPdsHost(self: *Validator, doc: *zat.DidDocument, persist: *event_log_mod.DiskPersist, incoming_host_id: u64) HostAuthority { 536 _ = self; 537 const pds_endpoint = doc.pdsEndpoint() orelse return .reject; 538 const pds_host = extractHostFromUrl(pds_endpoint) orelse return .reject; 539 const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse return .reject; 540 if (pds_host_id == incoming_host_id) return .migrate; 541 return .reject; 542 } 543}; 544 545/// extract hostname from a URL like "https://pds.example.com" or "https://pds.example.com:443/path" 546pub fn extractHostFromUrl(url: []const u8) ?[]const u8 { 547 // strip scheme 548 var rest = url; 549 if (std.mem.startsWith(u8, rest, "https://")) { 550 rest = rest["https://".len..]; 551 } else if (std.mem.startsWith(u8, rest, "http://")) { 552 rest = rest["http://".len..]; 553 } 554 // strip path 555 if (std.mem.indexOfScalar(u8, rest, '/')) |i| { 556 rest = rest[0..i]; 557 } 558 // strip port 559 if (std.mem.indexOfScalar(u8, rest, ':')) |i| { 560 rest = rest[0..i]; 561 } 562 if (rest.len == 0) return null; 563 return rest; 564} 565 566fn parseEnvInt(comptime T: type, key: []const u8, default: T) T { 567 const val = std.posix.getenv(key) orelse return default; 568 return std.fmt.parseInt(T, val, 10) catch default; 569} 570 571// --- tests --- 572 573test "validateCommit skips on cache miss" { 574 var stats = broadcaster.Stats{}; 575 var v = Validator.init(std.testing.allocator, &stats); 576 defer v.deinit(); 577 578 // build a commit payload using SDK 579 const payload: zat.cbor.Value = .{ .map = &.{ 580 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 581 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 582 .{ .key = "rev", .value = .{ .text = "3k2abc000000" } }, 583 .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } }, 584 } }; 585 586 const result = v.validateCommit(payload); 587 try std.testing.expect(result.valid); 588 try std.testing.expect(result.skipped); 589 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire)); 590} 591 592test "validateCommit skips when no repo field" { 593 var stats = broadcaster.Stats{}; 594 var v = Validator.init(std.testing.allocator, &stats); 595 defer v.deinit(); 596 597 // payload without "repo" field 598 const payload: zat.cbor.Value = .{ .map = &.{ 599 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 600 } }; 601 602 const result = v.validateCommit(payload); 603 try std.testing.expect(result.valid); 604 try std.testing.expect(result.skipped); 605 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); 606} 607 608test "checkCommitStructure rejects invalid DID" { 609 var stats = broadcaster.Stats{}; 610 var v = Validator.init(std.testing.allocator, &stats); 611 defer v.deinit(); 612 613 const payload: zat.cbor.Value = .{ .map = &.{ 614 .{ .key = "repo", .value = .{ .text = "not-a-did" } }, 615 } }; 616 617 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload)); 618} 619 620test "checkCommitStructure accepts valid commit" { 621 var stats = broadcaster.Stats{}; 622 var v = Validator.init(std.testing.allocator, &stats); 623 defer v.deinit(); 624 625 const payload: zat.cbor.Value = .{ .map = &.{ 626 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 627 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 628 } }; 629 630 try v.checkCommitStructure(payload); 631} 632 633test "validateSync skips on cache miss" { 634 var stats = broadcaster.Stats{}; 635 var v = Validator.init(std.testing.allocator, &stats); 636 defer v.deinit(); 637 638 const payload: zat.cbor.Value = .{ .map = &.{ 639 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 640 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 641 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 642 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } }, 643 } }; 644 645 const result = v.validateSync(payload); 646 try std.testing.expect(result.valid); 647 try std.testing.expect(result.skipped); 648 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire)); 649} 650 651test "validateSync rejects invalid DID" { 652 var stats = broadcaster.Stats{}; 653 var v = Validator.init(std.testing.allocator, &stats); 654 defer v.deinit(); 655 656 const payload: zat.cbor.Value = .{ .map = &.{ 657 .{ .key = "did", .value = .{ .text = "not-a-did" } }, 658 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } }, 659 } }; 660 661 const result = v.validateSync(payload); 662 try std.testing.expect(!result.valid); 663 try std.testing.expect(!result.skipped); 664 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire)); 665} 666 667test "validateSync rejects missing blocks" { 668 var stats = broadcaster.Stats{}; 669 var v = Validator.init(std.testing.allocator, &stats); 670 defer v.deinit(); 671 672 const payload: zat.cbor.Value = .{ .map = &.{ 673 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 674 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 675 } }; 676 677 const result = v.validateSync(payload); 678 try std.testing.expect(!result.valid); 679 try std.testing.expect(!result.skipped); 680 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire)); 681} 682 683test "validateSync skips when no did field" { 684 var stats = broadcaster.Stats{}; 685 var v = Validator.init(std.testing.allocator, &stats); 686 defer v.deinit(); 687 688 const payload: zat.cbor.Value = .{ .map = &.{ 689 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 690 } }; 691 692 const result = v.validateSync(payload); 693 try std.testing.expect(result.valid); 694 try std.testing.expect(result.skipped); 695 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); 696} 697 698test "LRU cache evicts least recently used" { 699 var stats = broadcaster.Stats{}; 700 var v = Validator.init(std.testing.allocator, &stats); 701 v.cache.capacity = 3; 702 defer v.deinit(); 703 704 const mk = CachedKey{ .key_type = .p256, .raw = .{0} ** 33, .len = 33 }; 705 706 try v.cache.put("did:plc:aaa", mk); 707 try v.cache.put("did:plc:bbb", mk); 708 try v.cache.put("did:plc:ccc", mk); 709 710 // access "aaa" to promote it 711 _ = v.cache.get("did:plc:aaa"); 712 713 // insert "ddd" — should evict "bbb" (LRU) 714 try v.cache.put("did:plc:ddd", mk); 715 716 try std.testing.expect(v.cache.get("did:plc:bbb") == null); 717 try std.testing.expect(v.cache.get("did:plc:aaa") != null); 718 try std.testing.expect(v.cache.get("did:plc:ccc") != null); 719 try std.testing.expect(v.cache.get("did:plc:ddd") != null); 720 try std.testing.expectEqual(@as(u32, 3), v.cache.count()); 721} 722 723test "checkCommitStructure rejects too many ops" { 724 var stats = broadcaster.Stats{}; 725 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .max_ops = 2 }); 726 defer v.deinit(); 727 728 // build ops array with 3 items (over limit of 2) 729 const ops = [_]zat.cbor.Value{ 730 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 731 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 732 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 733 }; 734 735 const payload: zat.cbor.Value = .{ .map = &.{ 736 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 737 .{ .key = "ops", .value = .{ .array = &ops } }, 738 } }; 739 740 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload)); 741} 742 743// --- spec conformance tests --- 744 745test "spec: #commit blocks > 2,000,000 bytes rejected" { 746 // lexicon maxLength for #commit blocks: 2,000,000 747 var stats = broadcaster.Stats{}; 748 var v = Validator.init(std.testing.allocator, &stats); 749 defer v.deinit(); 750 751 // insert a fake cached key so we reach the blocks size check 752 const did = "did:plc:test123"; 753 try v.cache.put(did, .{ 754 .key_type = .p256, 755 .raw = .{0} ** 33, 756 .len = 33, 757 .resolve_time = 100, 758 }); 759 760 // blocks with 2,000,001 bytes (1 byte over limit) 761 const oversized_blocks = try std.testing.allocator.alloc(u8, 2_000_001); 762 defer std.testing.allocator.free(oversized_blocks); 763 @memset(oversized_blocks, 0); 764 765 const payload: zat.cbor.Value = .{ .map = &.{ 766 .{ .key = "repo", .value = .{ .text = did } }, 767 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 768 .{ .key = "blocks", .value = .{ .bytes = oversized_blocks } }, 769 } }; 770 771 const result = v.validateCommit(payload); 772 try std.testing.expect(!result.valid or result.skipped); 773} 774 775test "spec: #commit blocks = 2,000,000 bytes accepted (boundary)" { 776 // lexicon maxLength for #commit blocks: 2,000,000 — exactly at limit should pass size check 777 var stats = broadcaster.Stats{}; 778 var v = Validator.init(std.testing.allocator, &stats); 779 defer v.deinit(); 780 781 const did = "did:plc:test123"; 782 try v.cache.put(did, .{ 783 .key_type = .p256, 784 .raw = .{0} ** 33, 785 .len = 33, 786 .resolve_time = 100, 787 }); 788 789 // exactly 2,000,000 bytes — should pass size check (may fail signature verify, that's ok) 790 const exact_blocks = try std.testing.allocator.alloc(u8, 2_000_000); 791 defer std.testing.allocator.free(exact_blocks); 792 @memset(exact_blocks, 0); 793 794 const payload: zat.cbor.Value = .{ .map = &.{ 795 .{ .key = "repo", .value = .{ .text = did } }, 796 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 797 .{ .key = "blocks", .value = .{ .bytes = exact_blocks } }, 798 } }; 799 800 const result = v.validateCommit(payload); 801 // should not be rejected for size — may fail signature verification (that's fine, 802 // it means we passed the size check). with P1.1c, sig failure → skipped=true. 803 try std.testing.expect(result.valid or result.skipped); 804} 805 806test "spec: #sync blocks > 10,000 bytes rejected" { 807 // lexicon maxLength for #sync blocks: 10,000 808 var stats = broadcaster.Stats{}; 809 var v = Validator.init(std.testing.allocator, &stats); 810 defer v.deinit(); 811 812 const payload: zat.cbor.Value = .{ .map = &.{ 813 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 814 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 815 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_001) } }, 816 } }; 817 818 const result = v.validateSync(payload); 819 try std.testing.expect(!result.valid); 820 try std.testing.expect(!result.skipped); 821} 822 823test "spec: #sync blocks = 10,000 bytes accepted (boundary)" { 824 // lexicon maxLength for #sync blocks: 10,000 — exactly at limit should pass size check 825 var stats = broadcaster.Stats{}; 826 var v = Validator.init(std.testing.allocator, &stats); 827 defer v.deinit(); 828 829 const payload: zat.cbor.Value = .{ .map = &.{ 830 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 831 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 832 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_000) } }, 833 } }; 834 835 const result = v.validateSync(payload); 836 // should pass size check — will be a cache miss → skipped (no cached key) 837 try std.testing.expect(result.valid); 838 try std.testing.expect(result.skipped); 839} 840 841test "extractOps reads path field from firehose format" { 842 var stats = broadcaster.Stats{}; 843 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }); 844 defer v.deinit(); 845 846 // use arena since extractOps allocates an ArrayList internally 847 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 848 defer arena.deinit(); 849 850 const ops = [_]zat.cbor.Value{ 851 .{ .map = &.{ 852 .{ .key = "action", .value = .{ .text = "create" } }, 853 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } }, 854 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } }, 855 } }, 856 .{ .map = &.{ 857 .{ .key = "action", .value = .{ .text = "delete" } }, 858 .{ .key = "path", .value = .{ .text = "app.bsky.feed.like/3k2def000000" } }, 859 } }, 860 }; 861 862 const payload: zat.cbor.Value = .{ .map = &.{ 863 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 864 .{ .key = "ops", .value = .{ .array = &ops } }, 865 } }; 866 867 const result = v.extractOps(arena.allocator(), payload); 868 try std.testing.expect(result != null); 869 try std.testing.expectEqual(@as(usize, 2), result.?.len); 870 try std.testing.expectEqualStrings("app.bsky.feed.post/3k2abc000000", result.?[0].path); 871 try std.testing.expectEqualStrings("app.bsky.feed.like/3k2def000000", result.?[1].path); 872 try std.testing.expect(result.?[0].value != null); // create has cid 873 try std.testing.expect(result.?[1].value == null); // delete has no cid 874} 875 876test "extractOps rejects malformed path without slash" { 877 var stats = broadcaster.Stats{}; 878 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }); 879 defer v.deinit(); 880 881 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 882 defer arena.deinit(); 883 884 const ops = [_]zat.cbor.Value{ 885 .{ .map = &.{ 886 .{ .key = "action", .value = .{ .text = "create" } }, 887 .{ .key = "path", .value = .{ .text = "noslash" } }, 888 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } }, 889 } }, 890 }; 891 892 const payload: zat.cbor.Value = .{ .map = &.{ 893 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 894 .{ .key = "ops", .value = .{ .array = &ops } }, 895 } }; 896 897 // malformed path (no slash) → all ops skipped → returns null 898 const result = v.extractOps(arena.allocator(), payload); 899 try std.testing.expect(result == null); 900} 901 902test "checkCommitStructure validates path field" { 903 var stats = broadcaster.Stats{}; 904 var v = Validator.init(std.testing.allocator, &stats); 905 defer v.deinit(); 906 907 // valid path 908 const valid_ops = [_]zat.cbor.Value{ 909 .{ .map = &.{ 910 .{ .key = "action", .value = .{ .text = "create" } }, 911 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } }, 912 } }, 913 }; 914 915 const valid_payload: zat.cbor.Value = .{ .map = &.{ 916 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 917 .{ .key = "ops", .value = .{ .array = &valid_ops } }, 918 } }; 919 920 try v.checkCommitStructure(valid_payload); 921 922 // invalid collection in path 923 const invalid_ops = [_]zat.cbor.Value{ 924 .{ .map = &.{ 925 .{ .key = "action", .value = .{ .text = "create" } }, 926 .{ .key = "path", .value = .{ .text = "not-an-nsid/3k2abc000000" } }, 927 } }, 928 }; 929 930 const invalid_payload: zat.cbor.Value = .{ .map = &.{ 931 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 932 .{ .key = "ops", .value = .{ .array = &invalid_ops } }, 933 } }; 934 935 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(invalid_payload)); 936} 937 938test "queueResolve deduplicates repeated DIDs" { 939 var stats = broadcaster.Stats{}; 940 var v = Validator.init(std.testing.allocator, &stats); 941 defer v.deinit(); 942 943 // queue the same DID 100 times 944 for (0..100) |_| { 945 v.queueResolve("did:plc:duplicate"); 946 } 947 948 // should have exactly 1 entry, not 100 949 try std.testing.expectEqual(@as(usize, 1), v.queue.items.len); 950 try std.testing.expectEqual(@as(u32, 1), v.queued_set.count()); 951}