atproto relay implementation in zig zlay.waow.tech
at 9a22a23276dcbc1d4465a0c4b7ad246de0cb613d 912 lines 35 kB view raw
1//! relay frame validator — DID key resolution + real signature verification 2//! 3//! validates firehose commit frames by verifying the commit signature against 4//! the pre-resolved signing key for the DID. accepts pre-decoded CBOR payload 5//! from the subscriber (decoded via zat SDK). on cache miss, skips validation 6//! and queues background resolution. no frame is ever blocked on network I/O. 7 8const std = @import("std"); 9const zat = @import("zat"); 10const broadcaster = @import("broadcaster.zig"); 11const event_log_mod = @import("event_log.zig"); 12const lru = @import("lru.zig"); 13 14const Allocator = std.mem.Allocator; 15const log = std.log.scoped(.relay); 16 17/// decoded and cached signing key for a DID 18const CachedKey = struct { 19 key_type: zat.multicodec.KeyType, 20 raw: [33]u8, // compressed public key (secp256k1 or p256) 21 len: u8, 22 resolve_time: i64 = 0, // epoch seconds when resolved 23}; 24 25pub const ValidationResult = struct { 26 valid: bool, 27 skipped: bool, 28 data_cid: ?[]const u8 = null, // MST root CID from verified commit 29 commit_rev: ?[]const u8 = null, // rev from verified commit 30}; 31 32/// configuration for commit validation checks 33pub const ValidatorConfig = struct { 34 /// verify MST structure during signature verification 35 verify_mst: bool = false, // off by default for relay throughput 36 /// verify commit diffs via MST inversion (sync 1.1) 37 verify_commit_diff: bool = false, 38 /// max allowed operations per commit 39 max_ops: usize = 200, 40 /// max clock skew for rev timestamps (seconds) 41 rev_clock_skew: i64 = 300, // 5 minutes 42}; 43 44pub const Validator = struct { 45 allocator: Allocator, 46 stats: *broadcaster.Stats, 47 config: ValidatorConfig, 48 persist: ?*event_log_mod.DiskPersist = null, 49 // DID → signing key cache (decoded, ready for verification) 50 cache: lru.LruCache(CachedKey), 51 // background resolve queue 52 queue: std.ArrayListUnmanaged([]const u8) = .{}, 53 // in-flight set — prevents duplicate DID entries in the queue 54 queued_set: std.StringHashMapUnmanaged(void) = .{}, 55 queue_mutex: std.Thread.Mutex = .{}, 56 queue_cond: std.Thread.Condition = .{}, 57 resolver_threads: [max_resolver_threads]?std.Thread = .{null} ** max_resolver_threads, 58 alive: std.atomic.Value(bool) = .{ .raw = true }, 59 max_cache_size: u32 = 250_000, 60 61 const max_resolver_threads = 8; 62 const default_resolver_threads = 4; 63 const max_queue_size: usize = 100_000; 64 65 pub fn init(allocator: Allocator, stats: *broadcaster.Stats) Validator { 66 return initWithConfig(allocator, stats, .{}); 67 } 68 69 pub fn initWithConfig(allocator: Allocator, stats: *broadcaster.Stats, config: ValidatorConfig) Validator { 70 return .{ 71 .allocator = allocator, 72 .stats = stats, 73 .config = config, 74 .cache = lru.LruCache(CachedKey).init(allocator, 250_000), 75 }; 76 } 77 78 pub fn deinit(self: *Validator) void { 79 self.alive.store(false, .release); 80 self.queue_cond.broadcast(); 81 for (&self.resolver_threads) |*t| { 82 if (t.*) |thread| { 83 thread.join(); 84 t.* = null; 85 } 86 } 87 88 self.cache.deinit(); 89 90 // free queued DIDs 91 for (self.queue.items) |did| { 92 self.allocator.free(did); 93 } 94 self.queue.deinit(self.allocator); 95 self.queued_set.deinit(self.allocator); 96 } 97 98 /// start background resolver threads 99 pub fn start(self: *Validator) !void { 100 self.max_cache_size = parseEnvInt(u32, "VALIDATOR_CACHE_SIZE", self.max_cache_size); 101 self.cache.capacity = self.max_cache_size; 102 const n = parseEnvInt(u8, "RESOLVER_THREADS", default_resolver_threads); 103 const count = @min(n, max_resolver_threads); 104 for (self.resolver_threads[0..count]) |*t| { 105 t.* = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, resolveLoop, .{self}); 106 } 107 } 108 109 /// validate a #sync frame: signature verification only (no ops, no MST). 110 /// #sync resets a repo to a new commit state — used for recovery from broken streams. 111 /// on cache miss, queues background resolution and skips. 112 pub fn validateSync(self: *Validator, payload: zat.cbor.Value) ValidationResult { 113 const did = payload.getString("did") orelse { 114 _ = self.stats.skipped.fetchAdd(1, .monotonic); 115 return .{ .valid = true, .skipped = true }; 116 }; 117 118 if (zat.Did.parse(did) == null) { 119 _ = self.stats.failed.fetchAdd(1, .monotonic); 120 return .{ .valid = false, .skipped = false }; 121 } 122 123 // check rev is valid TID (if present) 124 if (payload.getString("rev")) |rev| { 125 if (zat.Tid.parse(rev) == null) { 126 _ = self.stats.failed.fetchAdd(1, .monotonic); 127 return .{ .valid = false, .skipped = false }; 128 } 129 } 130 131 const blocks = payload.getBytes("blocks") orelse { 132 _ = self.stats.failed.fetchAdd(1, .monotonic); 133 return .{ .valid = false, .skipped = false }; 134 }; 135 136 // #sync CAR should be small (just the signed commit block) 137 // lexicon maxLength: 10000 138 if (blocks.len > 10_000) { 139 _ = self.stats.failed.fetchAdd(1, .monotonic); 140 return .{ .valid = false, .skipped = false }; 141 } 142 143 // cache lookup 144 const cached_key: ?CachedKey = self.cache.get(did); 145 146 if (cached_key == null) { 147 _ = self.stats.cache_misses.fetchAdd(1, .monotonic); 148 _ = self.stats.skipped.fetchAdd(1, .monotonic); 149 self.queueResolve(did); 150 return .{ .valid = true, .skipped = true }; 151 } 152 153 _ = self.stats.cache_hits.fetchAdd(1, .monotonic); 154 155 // verify signature (no MST, no ops) 156 const public_key = zat.multicodec.PublicKey{ 157 .key_type = cached_key.?.key_type, 158 .raw = cached_key.?.raw[0..cached_key.?.len], 159 }; 160 161 var arena = std.heap.ArenaAllocator.init(self.allocator); 162 defer arena.deinit(); 163 164 const result = zat.verifyCommitCar(arena.allocator(), blocks, public_key, .{ 165 .verify_mst = false, 166 .expected_did = did, 167 .max_car_size = 10 * 1024, 168 }) catch |err| { 169 log.debug("sync verification failed for {s}: {s}", .{ did, @errorName(err) }); 170 // sync spec: on signature failure, key may have rotated. 171 // evict cached key and queue re-resolution. skip this frame. 172 self.evictKey(did); 173 self.queueResolve(did); 174 _ = self.stats.skipped.fetchAdd(1, .monotonic); 175 return .{ .valid = true, .skipped = true }; 176 }; 177 178 _ = self.stats.validated.fetchAdd(1, .monotonic); 179 return .{ 180 .valid = true, 181 .skipped = false, 182 .data_cid = result.commit_cid, 183 .commit_rev = result.commit_rev, 184 }; 185 } 186 187 /// validate a commit frame using a pre-decoded CBOR payload (from SDK decoder). 188 /// on cache miss, queues background resolution and skips. 189 pub fn validateCommit(self: *Validator, payload: zat.cbor.Value) ValidationResult { 190 // extract DID from decoded payload 191 const did = payload.getString("repo") orelse { 192 _ = self.stats.skipped.fetchAdd(1, .monotonic); 193 return .{ .valid = true, .skipped = true }; 194 }; 195 196 // check cache for pre-resolved signing key 197 const cached_key: ?CachedKey = self.cache.get(did); 198 199 if (cached_key == null) { 200 // cache miss — queue for background resolution, skip validation 201 _ = self.stats.cache_misses.fetchAdd(1, .monotonic); 202 _ = self.stats.skipped.fetchAdd(1, .monotonic); 203 self.queueResolve(did); 204 return .{ .valid = true, .skipped = true }; 205 } 206 207 _ = self.stats.cache_hits.fetchAdd(1, .monotonic); 208 209 // cache hit — do structure checks + signature verification 210 if (self.verifyCommit(payload, did, cached_key.?)) |vr| { 211 _ = self.stats.validated.fetchAdd(1, .monotonic); 212 return vr; 213 } else |err| { 214 log.debug("commit verification failed for {s}: {s}", .{ did, @errorName(err) }); 215 // sync spec: on signature failure, key may have rotated. 216 // evict cached key and queue re-resolution. skip this frame 217 // (treat as cache miss). next commit will use the refreshed key. 218 self.evictKey(did); 219 self.queueResolve(did); 220 _ = self.stats.skipped.fetchAdd(1, .monotonic); 221 return .{ .valid = true, .skipped = true }; 222 } 223 } 224 225 fn verifyCommit(self: *Validator, payload: zat.cbor.Value, expected_did: []const u8, cached_key: CachedKey) !ValidationResult { 226 // commit structure checks first (cheap, no allocation) 227 try self.checkCommitStructure(payload); 228 229 // extract blocks (raw CAR bytes) from the pre-decoded payload 230 const blocks = payload.getBytes("blocks") orelse return error.InvalidFrame; 231 232 // blocks size check — lexicon maxLength: 2000000 233 if (blocks.len > 2_000_000) return error.InvalidFrame; 234 235 // build public key for verification 236 const public_key = zat.multicodec.PublicKey{ 237 .key_type = cached_key.key_type, 238 .raw = cached_key.raw[0..cached_key.len], 239 }; 240 241 // run real signature verification (needs its own arena for CAR/MST temporaries) 242 var arena = std.heap.ArenaAllocator.init(self.allocator); 243 defer arena.deinit(); 244 const alloc = arena.allocator(); 245 246 // try sync 1.1 path: extract ops and use verifyCommitDiff 247 if (self.config.verify_commit_diff) { 248 if (self.extractOps(alloc, payload)) |msg_ops| { 249 // get stored prev_data from payload 250 const prev_data: ?[]const u8 = if (payload.get("prevData")) |pd| switch (pd) { 251 .cid => |c| c.raw, 252 .null => null, 253 else => null, 254 } else null; 255 256 const diff_result = zat.verifyCommitDiff(alloc, blocks, msg_ops, prev_data, public_key, .{ 257 .expected_did = expected_did, 258 .skip_inversion = prev_data == null, 259 }) catch |err| { 260 return err; 261 }; 262 263 return .{ 264 .valid = true, 265 .skipped = false, 266 .data_cid = diff_result.data_cid, 267 .commit_rev = diff_result.commit_rev, 268 }; 269 } 270 } 271 272 // fallback: legacy verification (signature + optional MST walk) 273 const result = zat.verifyCommitCar(alloc, blocks, public_key, .{ 274 .verify_mst = self.config.verify_mst, 275 .expected_did = expected_did, 276 }) catch |err| { 277 return err; 278 }; 279 280 return .{ 281 .valid = true, 282 .skipped = false, 283 .data_cid = result.commit_cid, 284 .commit_rev = result.commit_rev, 285 }; 286 } 287 288 /// extract ops from payload and convert to mst.Operation array. 289 /// the firehose format uses a single "path" field ("collection/rkey"), 290 /// not separate "collection"/"rkey" fields. 291 fn extractOps(self: *Validator, alloc: Allocator, payload: zat.cbor.Value) ?[]const zat.MstOperation { 292 _ = self; 293 const ops_array = payload.getArray("ops") orelse return null; 294 var ops: std.ArrayListUnmanaged(zat.MstOperation) = .{}; 295 for (ops_array) |op| { 296 const action = op.getString("action") orelse continue; 297 const path = op.getString("path") orelse continue; 298 299 // validate path contains "/" (collection/rkey) 300 if (std.mem.indexOfScalar(u8, path, '/') == null) continue; 301 302 // extract CID values 303 const cid_value: ?[]const u8 = if (op.get("cid")) |v| switch (v) { 304 .cid => |c| c.raw, 305 else => null, 306 } else null; 307 308 var value: ?[]const u8 = null; 309 var prev: ?[]const u8 = null; 310 311 if (std.mem.eql(u8, action, "create")) { 312 value = cid_value; 313 } else if (std.mem.eql(u8, action, "update")) { 314 value = cid_value; 315 prev = if (op.get("prev")) |v| switch (v) { 316 .cid => |c| c.raw, 317 else => null, 318 } else null; 319 } else if (std.mem.eql(u8, action, "delete")) { 320 prev = if (op.get("prev")) |v| switch (v) { 321 .cid => |c| c.raw, 322 else => null, 323 } else null; 324 } else continue; 325 326 ops.append(alloc, .{ 327 .path = path, 328 .value = value, 329 .prev = prev, 330 }) catch return null; 331 } 332 333 if (ops.items.len == 0) return null; 334 return ops.items; 335 } 336 337 fn checkCommitStructure(self: *Validator, payload: zat.cbor.Value) !void { 338 // check repo field is a valid DID 339 const repo = payload.getString("repo") orelse return error.InvalidFrame; 340 if (zat.Did.parse(repo) == null) return error.InvalidFrame; 341 342 // check rev is a valid TID 343 if (payload.getString("rev")) |rev| { 344 if (zat.Tid.parse(rev) == null) return error.InvalidFrame; 345 } 346 347 // check ops count 348 if (payload.get("ops")) |ops_value| { 349 switch (ops_value) { 350 .array => |ops| { 351 if (ops.len > self.config.max_ops) return error.InvalidFrame; 352 // validate each op has valid path (collection/rkey) 353 for (ops) |op| { 354 if (op.getString("path")) |path| { 355 if (std.mem.indexOfScalar(u8, path, '/')) |sep| { 356 const collection = path[0..sep]; 357 const rkey = path[sep + 1 ..]; 358 if (zat.Nsid.parse(collection) == null) return error.InvalidFrame; 359 if (rkey.len > 0) { 360 if (zat.Rkey.parse(rkey) == null) return error.InvalidFrame; 361 } 362 } else return error.InvalidFrame; // path must contain '/' 363 } 364 } 365 }, 366 else => return error.InvalidFrame, 367 } 368 } 369 } 370 371 fn queueResolve(self: *Validator, did: []const u8) void { 372 // check if already cached (race between validate and resolver) 373 if (self.cache.contains(did)) return; 374 375 const duped = self.allocator.dupe(u8, did) catch return; 376 377 self.queue_mutex.lock(); 378 defer self.queue_mutex.unlock(); 379 380 // skip if already queued (prevents unbounded queue growth) 381 if (self.queued_set.contains(duped)) { 382 self.allocator.free(duped); 383 return; 384 } 385 386 // cap queue size — drop DID without adding to queued_set so it can be re-queued later 387 if (self.queue.items.len >= max_queue_size) { 388 self.allocator.free(duped); 389 return; 390 } 391 392 self.queue.append(self.allocator, duped) catch { 393 self.allocator.free(duped); 394 return; 395 }; 396 self.queued_set.put(self.allocator, duped, {}) catch {}; 397 self.queue_cond.signal(); 398 } 399 400 fn resolveLoop(self: *Validator) void { 401 var resolver = zat.DidResolver.initWithOptions(self.allocator, .{ .keep_alive = true }); 402 defer resolver.deinit(); 403 404 while (self.alive.load(.acquire)) { 405 var did: ?[]const u8 = null; 406 { 407 self.queue_mutex.lock(); 408 defer self.queue_mutex.unlock(); 409 while (self.queue.items.len == 0 and self.alive.load(.acquire)) { 410 self.queue_cond.timedWait(&self.queue_mutex, 1 * std.time.ns_per_s) catch {}; 411 } 412 if (self.queue.items.len > 0) { 413 did = self.queue.orderedRemove(0); 414 _ = self.queued_set.remove(did.?); 415 } 416 } 417 418 const d = did orelse continue; 419 defer self.allocator.free(d); 420 421 // skip if already cached (resolved while queued) 422 if (self.cache.contains(d)) continue; 423 424 // resolve DID → signing key 425 const parsed = zat.Did.parse(d) orelse continue; 426 var doc = resolver.resolve(parsed) catch |err| { 427 log.debug("DID resolve failed for {s}: {s}", .{ d, @errorName(err) }); 428 continue; 429 }; 430 defer doc.deinit(); 431 432 // extract and decode signing key 433 const vm = doc.signingKey() orelse continue; 434 const key_bytes = zat.multibase.decode(self.allocator, vm.public_key_multibase) catch continue; 435 defer self.allocator.free(key_bytes); 436 const public_key = zat.multicodec.parsePublicKey(key_bytes) catch continue; 437 438 // store decoded key in cache (fixed-size, no pointer chasing) 439 var cached = CachedKey{ 440 .key_type = public_key.key_type, 441 .raw = undefined, 442 .len = @intCast(public_key.raw.len), 443 .resolve_time = std.time.timestamp(), 444 }; 445 @memcpy(cached.raw[0..public_key.raw.len], public_key.raw); 446 447 self.cache.put(d, cached) catch continue; 448 449 // --- host validation (merged from migration queue) --- 450 // while we have the DID doc, check PDS endpoint and update host if needed. 451 // best-effort: failures don't prevent signing key caching. 452 if (self.persist) |persist| { 453 if (doc.pdsEndpoint()) |pds_endpoint| { 454 if (extractHostFromUrl(pds_endpoint)) |pds_host| { 455 const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse continue; 456 const uid = persist.uidForDid(d) catch continue; 457 const current_host = persist.getAccountHostId(uid) catch continue; 458 if (current_host != 0 and current_host != pds_host_id) { 459 persist.setAccountHostId(uid, pds_host_id) catch {}; 460 log.info("host updated via DID doc: {s} -> host {d}", .{ d, pds_host_id }); 461 } 462 } 463 } 464 } 465 } 466 } 467 468 /// evict a DID's cached signing key (e.g. on #identity event). 469 /// the next commit from this DID will trigger a fresh resolution. 470 pub fn evictKey(self: *Validator, did: []const u8) void { 471 _ = self.cache.remove(did); 472 } 473 474 /// cache size (for diagnostics) 475 pub fn cacheSize(self: *Validator) u32 { 476 return self.cache.count(); 477 } 478 479 /// resolve queue length (for diagnostics — non-blocking) 480 pub fn resolveQueueLen(self: *Validator) usize { 481 if (!self.queue_mutex.tryLock()) return 0; 482 defer self.queue_mutex.unlock(); 483 return self.queue.items.len; 484 } 485 486 /// resolve dedup set size (for diagnostics — non-blocking) 487 pub fn resolveQueuedSetCount(self: *Validator) u32 { 488 if (!self.queue_mutex.tryLock()) return 0; 489 defer self.queue_mutex.unlock(); 490 return self.queued_set.count(); 491 } 492 493 /// signing key cache hashmap backing capacity (for memory attribution) 494 pub fn cacheMapCapacity(self: *Validator) u32 { 495 return self.cache.mapCapacity(); 496 } 497 498 /// resolver dedup set hashmap backing capacity (for memory attribution — non-blocking) 499 pub fn resolveQueuedSetCapacity(self: *Validator) u32 { 500 if (!self.queue_mutex.tryLock()) return 0; 501 defer self.queue_mutex.unlock(); 502 return self.queued_set.capacity(); 503 } 504}; 505 506/// extract hostname from a URL like "https://pds.example.com" or "https://pds.example.com:443/path" 507fn extractHostFromUrl(url: []const u8) ?[]const u8 { 508 // strip scheme 509 var rest = url; 510 if (std.mem.startsWith(u8, rest, "https://")) { 511 rest = rest["https://".len..]; 512 } else if (std.mem.startsWith(u8, rest, "http://")) { 513 rest = rest["http://".len..]; 514 } 515 // strip path 516 if (std.mem.indexOfScalar(u8, rest, '/')) |i| { 517 rest = rest[0..i]; 518 } 519 // strip port 520 if (std.mem.indexOfScalar(u8, rest, ':')) |i| { 521 rest = rest[0..i]; 522 } 523 if (rest.len == 0) return null; 524 return rest; 525} 526 527fn parseEnvInt(comptime T: type, key: []const u8, default: T) T { 528 const val = std.posix.getenv(key) orelse return default; 529 return std.fmt.parseInt(T, val, 10) catch default; 530} 531 532// --- tests --- 533 534test "validateCommit skips on cache miss" { 535 var stats = broadcaster.Stats{}; 536 var v = Validator.init(std.testing.allocator, &stats); 537 defer v.deinit(); 538 539 // build a commit payload using SDK 540 const payload: zat.cbor.Value = .{ .map = &.{ 541 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 542 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 543 .{ .key = "rev", .value = .{ .text = "3k2abc000000" } }, 544 .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } }, 545 } }; 546 547 const result = v.validateCommit(payload); 548 try std.testing.expect(result.valid); 549 try std.testing.expect(result.skipped); 550 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire)); 551} 552 553test "validateCommit skips when no repo field" { 554 var stats = broadcaster.Stats{}; 555 var v = Validator.init(std.testing.allocator, &stats); 556 defer v.deinit(); 557 558 // payload without "repo" field 559 const payload: zat.cbor.Value = .{ .map = &.{ 560 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 561 } }; 562 563 const result = v.validateCommit(payload); 564 try std.testing.expect(result.valid); 565 try std.testing.expect(result.skipped); 566 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); 567} 568 569test "checkCommitStructure rejects invalid DID" { 570 var stats = broadcaster.Stats{}; 571 var v = Validator.init(std.testing.allocator, &stats); 572 defer v.deinit(); 573 574 const payload: zat.cbor.Value = .{ .map = &.{ 575 .{ .key = "repo", .value = .{ .text = "not-a-did" } }, 576 } }; 577 578 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload)); 579} 580 581test "checkCommitStructure accepts valid commit" { 582 var stats = broadcaster.Stats{}; 583 var v = Validator.init(std.testing.allocator, &stats); 584 defer v.deinit(); 585 586 const payload: zat.cbor.Value = .{ .map = &.{ 587 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 588 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 589 } }; 590 591 try v.checkCommitStructure(payload); 592} 593 594test "validateSync skips on cache miss" { 595 var stats = broadcaster.Stats{}; 596 var v = Validator.init(std.testing.allocator, &stats); 597 defer v.deinit(); 598 599 const payload: zat.cbor.Value = .{ .map = &.{ 600 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 601 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 602 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 603 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } }, 604 } }; 605 606 const result = v.validateSync(payload); 607 try std.testing.expect(result.valid); 608 try std.testing.expect(result.skipped); 609 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire)); 610} 611 612test "validateSync rejects invalid DID" { 613 var stats = broadcaster.Stats{}; 614 var v = Validator.init(std.testing.allocator, &stats); 615 defer v.deinit(); 616 617 const payload: zat.cbor.Value = .{ .map = &.{ 618 .{ .key = "did", .value = .{ .text = "not-a-did" } }, 619 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } }, 620 } }; 621 622 const result = v.validateSync(payload); 623 try std.testing.expect(!result.valid); 624 try std.testing.expect(!result.skipped); 625 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire)); 626} 627 628test "validateSync rejects missing blocks" { 629 var stats = broadcaster.Stats{}; 630 var v = Validator.init(std.testing.allocator, &stats); 631 defer v.deinit(); 632 633 const payload: zat.cbor.Value = .{ .map = &.{ 634 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 635 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 636 } }; 637 638 const result = v.validateSync(payload); 639 try std.testing.expect(!result.valid); 640 try std.testing.expect(!result.skipped); 641 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire)); 642} 643 644test "validateSync skips when no did field" { 645 var stats = broadcaster.Stats{}; 646 var v = Validator.init(std.testing.allocator, &stats); 647 defer v.deinit(); 648 649 const payload: zat.cbor.Value = .{ .map = &.{ 650 .{ .key = "seq", .value = .{ .unsigned = 42 } }, 651 } }; 652 653 const result = v.validateSync(payload); 654 try std.testing.expect(result.valid); 655 try std.testing.expect(result.skipped); 656 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); 657} 658 659test "LRU cache evicts least recently used" { 660 var stats = broadcaster.Stats{}; 661 var v = Validator.init(std.testing.allocator, &stats); 662 v.cache.capacity = 3; 663 defer v.deinit(); 664 665 const mk = CachedKey{ .key_type = .p256, .raw = .{0} ** 33, .len = 33 }; 666 667 try v.cache.put("did:plc:aaa", mk); 668 try v.cache.put("did:plc:bbb", mk); 669 try v.cache.put("did:plc:ccc", mk); 670 671 // access "aaa" to promote it 672 _ = v.cache.get("did:plc:aaa"); 673 674 // insert "ddd" — should evict "bbb" (LRU) 675 try v.cache.put("did:plc:ddd", mk); 676 677 try std.testing.expect(v.cache.get("did:plc:bbb") == null); 678 try std.testing.expect(v.cache.get("did:plc:aaa") != null); 679 try std.testing.expect(v.cache.get("did:plc:ccc") != null); 680 try std.testing.expect(v.cache.get("did:plc:ddd") != null); 681 try std.testing.expectEqual(@as(u32, 3), v.cache.count()); 682} 683 684test "checkCommitStructure rejects too many ops" { 685 var stats = broadcaster.Stats{}; 686 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .max_ops = 2 }); 687 defer v.deinit(); 688 689 // build ops array with 3 items (over limit of 2) 690 const ops = [_]zat.cbor.Value{ 691 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 692 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 693 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} }, 694 }; 695 696 const payload: zat.cbor.Value = .{ .map = &.{ 697 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 698 .{ .key = "ops", .value = .{ .array = &ops } }, 699 } }; 700 701 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload)); 702} 703 704// --- spec conformance tests --- 705 706test "spec: #commit blocks > 2,000,000 bytes rejected" { 707 // lexicon maxLength for #commit blocks: 2,000,000 708 var stats = broadcaster.Stats{}; 709 var v = Validator.init(std.testing.allocator, &stats); 710 defer v.deinit(); 711 712 // insert a fake cached key so we reach the blocks size check 713 const did = "did:plc:test123"; 714 try v.cache.put(did, .{ 715 .key_type = .p256, 716 .raw = .{0} ** 33, 717 .len = 33, 718 .resolve_time = 100, 719 }); 720 721 // blocks with 2,000,001 bytes (1 byte over limit) 722 const oversized_blocks = try std.testing.allocator.alloc(u8, 2_000_001); 723 defer std.testing.allocator.free(oversized_blocks); 724 @memset(oversized_blocks, 0); 725 726 const payload: zat.cbor.Value = .{ .map = &.{ 727 .{ .key = "repo", .value = .{ .text = did } }, 728 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 729 .{ .key = "blocks", .value = .{ .bytes = oversized_blocks } }, 730 } }; 731 732 const result = v.validateCommit(payload); 733 try std.testing.expect(!result.valid or result.skipped); 734} 735 736test "spec: #commit blocks = 2,000,000 bytes accepted (boundary)" { 737 // lexicon maxLength for #commit blocks: 2,000,000 — exactly at limit should pass size check 738 var stats = broadcaster.Stats{}; 739 var v = Validator.init(std.testing.allocator, &stats); 740 defer v.deinit(); 741 742 const did = "did:plc:test123"; 743 try v.cache.put(did, .{ 744 .key_type = .p256, 745 .raw = .{0} ** 33, 746 .len = 33, 747 .resolve_time = 100, 748 }); 749 750 // exactly 2,000,000 bytes — should pass size check (may fail signature verify, that's ok) 751 const exact_blocks = try std.testing.allocator.alloc(u8, 2_000_000); 752 defer std.testing.allocator.free(exact_blocks); 753 @memset(exact_blocks, 0); 754 755 const payload: zat.cbor.Value = .{ .map = &.{ 756 .{ .key = "repo", .value = .{ .text = did } }, 757 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 758 .{ .key = "blocks", .value = .{ .bytes = exact_blocks } }, 759 } }; 760 761 const result = v.validateCommit(payload); 762 // should not be rejected for size — may fail signature verification (that's fine, 763 // it means we passed the size check). with P1.1c, sig failure → skipped=true. 764 try std.testing.expect(result.valid or result.skipped); 765} 766 767test "spec: #sync blocks > 10,000 bytes rejected" { 768 // lexicon maxLength for #sync blocks: 10,000 769 var stats = broadcaster.Stats{}; 770 var v = Validator.init(std.testing.allocator, &stats); 771 defer v.deinit(); 772 773 const payload: zat.cbor.Value = .{ .map = &.{ 774 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 775 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 776 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_001) } }, 777 } }; 778 779 const result = v.validateSync(payload); 780 try std.testing.expect(!result.valid); 781 try std.testing.expect(!result.skipped); 782} 783 784test "spec: #sync blocks = 10,000 bytes accepted (boundary)" { 785 // lexicon maxLength for #sync blocks: 10,000 — exactly at limit should pass size check 786 var stats = broadcaster.Stats{}; 787 var v = Validator.init(std.testing.allocator, &stats); 788 defer v.deinit(); 789 790 const payload: zat.cbor.Value = .{ .map = &.{ 791 .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 792 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 793 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_000) } }, 794 } }; 795 796 const result = v.validateSync(payload); 797 // should pass size check — will be a cache miss → skipped (no cached key) 798 try std.testing.expect(result.valid); 799 try std.testing.expect(result.skipped); 800} 801 802test "extractOps reads path field from firehose format" { 803 var stats = broadcaster.Stats{}; 804 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }); 805 defer v.deinit(); 806 807 // use arena since extractOps allocates an ArrayList internally 808 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 809 defer arena.deinit(); 810 811 const ops = [_]zat.cbor.Value{ 812 .{ .map = &.{ 813 .{ .key = "action", .value = .{ .text = "create" } }, 814 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } }, 815 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } }, 816 } }, 817 .{ .map = &.{ 818 .{ .key = "action", .value = .{ .text = "delete" } }, 819 .{ .key = "path", .value = .{ .text = "app.bsky.feed.like/3k2def000000" } }, 820 } }, 821 }; 822 823 const payload: zat.cbor.Value = .{ .map = &.{ 824 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 825 .{ .key = "ops", .value = .{ .array = &ops } }, 826 } }; 827 828 const result = v.extractOps(arena.allocator(), payload); 829 try std.testing.expect(result != null); 830 try std.testing.expectEqual(@as(usize, 2), result.?.len); 831 try std.testing.expectEqualStrings("app.bsky.feed.post/3k2abc000000", result.?[0].path); 832 try std.testing.expectEqualStrings("app.bsky.feed.like/3k2def000000", result.?[1].path); 833 try std.testing.expect(result.?[0].value != null); // create has cid 834 try std.testing.expect(result.?[1].value == null); // delete has no cid 835} 836 837test "extractOps rejects malformed path without slash" { 838 var stats = broadcaster.Stats{}; 839 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }); 840 defer v.deinit(); 841 842 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 843 defer arena.deinit(); 844 845 const ops = [_]zat.cbor.Value{ 846 .{ .map = &.{ 847 .{ .key = "action", .value = .{ .text = "create" } }, 848 .{ .key = "path", .value = .{ .text = "noslash" } }, 849 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } }, 850 } }, 851 }; 852 853 const payload: zat.cbor.Value = .{ .map = &.{ 854 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 855 .{ .key = "ops", .value = .{ .array = &ops } }, 856 } }; 857 858 // malformed path (no slash) → all ops skipped → returns null 859 const result = v.extractOps(arena.allocator(), payload); 860 try std.testing.expect(result == null); 861} 862 863test "checkCommitStructure validates path field" { 864 var stats = broadcaster.Stats{}; 865 var v = Validator.init(std.testing.allocator, &stats); 866 defer v.deinit(); 867 868 // valid path 869 const valid_ops = [_]zat.cbor.Value{ 870 .{ .map = &.{ 871 .{ .key = "action", .value = .{ .text = "create" } }, 872 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } }, 873 } }, 874 }; 875 876 const valid_payload: zat.cbor.Value = .{ .map = &.{ 877 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 878 .{ .key = "ops", .value = .{ .array = &valid_ops } }, 879 } }; 880 881 try v.checkCommitStructure(valid_payload); 882 883 // invalid collection in path 884 const invalid_ops = [_]zat.cbor.Value{ 885 .{ .map = &.{ 886 .{ .key = "action", .value = .{ .text = "create" } }, 887 .{ .key = "path", .value = .{ .text = "not-an-nsid/3k2abc000000" } }, 888 } }, 889 }; 890 891 const invalid_payload: zat.cbor.Value = .{ .map = &.{ 892 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 893 .{ .key = "ops", .value = .{ .array = &invalid_ops } }, 894 } }; 895 896 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(invalid_payload)); 897} 898 899test "queueResolve deduplicates repeated DIDs" { 900 var stats = broadcaster.Stats{}; 901 var v = Validator.init(std.testing.allocator, &stats); 902 defer v.deinit(); 903 904 // queue the same DID 100 times 905 for (0..100) |_| { 906 v.queueResolve("did:plc:duplicate"); 907 } 908 909 // should have exactly 1 entry, not 100 910 try std.testing.expectEqual(@as(usize, 1), v.queue.items.len); 911 try std.testing.expectEqual(@as(u32, 1), v.queued_set.count()); 912}