atproto relay implementation in zig
zlay.waow.tech
1//! relay frame validator — DID key resolution + real signature verification
2//!
3//! validates firehose commit frames by verifying the commit signature against
4//! the pre-resolved signing key for the DID. accepts pre-decoded CBOR payload
5//! from the subscriber (decoded via zat SDK). on cache miss, skips validation
6//! and queues background resolution. no frame is ever blocked on network I/O.
7
8const std = @import("std");
9const zat = @import("zat");
10const broadcaster = @import("broadcaster.zig");
11const event_log_mod = @import("event_log.zig");
12const lru = @import("lru.zig");
13
14const Allocator = std.mem.Allocator;
15const log = std.log.scoped(.relay);
16
17/// decoded and cached signing key for a DID
18const CachedKey = struct {
19 key_type: zat.multicodec.KeyType,
20 raw: [33]u8, // compressed public key (secp256k1 or p256)
21 len: u8,
22 resolve_time: i64 = 0, // epoch seconds when resolved
23};
24
25pub const ValidationResult = struct {
26 valid: bool,
27 skipped: bool,
28 data_cid: ?[]const u8 = null, // MST root CID from verified commit
29 commit_rev: ?[]const u8 = null, // rev from verified commit
30};
31
32/// configuration for commit validation checks
33pub const ValidatorConfig = struct {
34 /// verify MST structure during signature verification
35 verify_mst: bool = false, // off by default for relay throughput
36 /// verify commit diffs via MST inversion (sync 1.1)
37 verify_commit_diff: bool = false,
38 /// max allowed operations per commit
39 max_ops: usize = 200,
40 /// max clock skew for rev timestamps (seconds)
41 rev_clock_skew: i64 = 300, // 5 minutes
42};
43
44pub const Validator = struct {
45 allocator: Allocator,
46 stats: *broadcaster.Stats,
47 config: ValidatorConfig,
48 persist: ?*event_log_mod.DiskPersist = null,
49 // DID → signing key cache (decoded, ready for verification)
50 cache: lru.LruCache(CachedKey),
51 // background resolve queue
52 queue: std.ArrayListUnmanaged([]const u8) = .{},
53 // in-flight set — prevents duplicate DID entries in the queue
54 queued_set: std.StringHashMapUnmanaged(void) = .{},
55 queue_mutex: std.Thread.Mutex = .{},
56 queue_cond: std.Thread.Condition = .{},
57 resolver_threads: [max_resolver_threads]?std.Thread = .{null} ** max_resolver_threads,
58 alive: std.atomic.Value(bool) = .{ .raw = true },
59 max_cache_size: u32 = 250_000,
60
61 const max_resolver_threads = 8;
62 const default_resolver_threads = 4;
63 const max_queue_size: usize = 100_000;
64
65 pub fn init(allocator: Allocator, stats: *broadcaster.Stats) Validator {
66 return initWithConfig(allocator, stats, .{});
67 }
68
69 pub fn initWithConfig(allocator: Allocator, stats: *broadcaster.Stats, config: ValidatorConfig) Validator {
70 return .{
71 .allocator = allocator,
72 .stats = stats,
73 .config = config,
74 .cache = lru.LruCache(CachedKey).init(allocator, 250_000),
75 };
76 }
77
78 pub fn deinit(self: *Validator) void {
79 self.alive.store(false, .release);
80 self.queue_cond.broadcast();
81 for (&self.resolver_threads) |*t| {
82 if (t.*) |thread| {
83 thread.join();
84 t.* = null;
85 }
86 }
87
88 self.cache.deinit();
89
90 // free queued DIDs
91 for (self.queue.items) |did| {
92 self.allocator.free(did);
93 }
94 self.queue.deinit(self.allocator);
95 self.queued_set.deinit(self.allocator);
96 }
97
98 /// start background resolver threads
99 pub fn start(self: *Validator) !void {
100 self.max_cache_size = parseEnvInt(u32, "VALIDATOR_CACHE_SIZE", self.max_cache_size);
101 self.cache.capacity = self.max_cache_size;
102 const n = parseEnvInt(u8, "RESOLVER_THREADS", default_resolver_threads);
103 const count = @min(n, max_resolver_threads);
104 for (self.resolver_threads[0..count]) |*t| {
105 t.* = try std.Thread.spawn(.{ .stack_size = @import("main.zig").default_stack_size }, resolveLoop, .{self});
106 }
107 }
108
109 /// validate a #sync frame: signature verification only (no ops, no MST).
110 /// #sync resets a repo to a new commit state — used for recovery from broken streams.
111 /// on cache miss, queues background resolution and skips.
112 pub fn validateSync(self: *Validator, payload: zat.cbor.Value) ValidationResult {
113 const did = payload.getString("did") orelse {
114 _ = self.stats.skipped.fetchAdd(1, .monotonic);
115 return .{ .valid = true, .skipped = true };
116 };
117
118 if (zat.Did.parse(did) == null) {
119 _ = self.stats.failed.fetchAdd(1, .monotonic);
120 return .{ .valid = false, .skipped = false };
121 }
122
123 // check rev is valid TID (if present)
124 if (payload.getString("rev")) |rev| {
125 if (zat.Tid.parse(rev) == null) {
126 _ = self.stats.failed.fetchAdd(1, .monotonic);
127 return .{ .valid = false, .skipped = false };
128 }
129 }
130
131 const blocks = payload.getBytes("blocks") orelse {
132 _ = self.stats.failed.fetchAdd(1, .monotonic);
133 return .{ .valid = false, .skipped = false };
134 };
135
136 // #sync CAR should be small (just the signed commit block)
137 // lexicon maxLength: 10000
138 if (blocks.len > 10_000) {
139 _ = self.stats.failed.fetchAdd(1, .monotonic);
140 return .{ .valid = false, .skipped = false };
141 }
142
143 // cache lookup
144 const cached_key: ?CachedKey = self.cache.get(did);
145
146 if (cached_key == null) {
147 _ = self.stats.cache_misses.fetchAdd(1, .monotonic);
148 _ = self.stats.skipped.fetchAdd(1, .monotonic);
149 self.queueResolve(did);
150 return .{ .valid = true, .skipped = true };
151 }
152
153 _ = self.stats.cache_hits.fetchAdd(1, .monotonic);
154
155 // verify signature (no MST, no ops)
156 const public_key = zat.multicodec.PublicKey{
157 .key_type = cached_key.?.key_type,
158 .raw = cached_key.?.raw[0..cached_key.?.len],
159 };
160
161 var arena = std.heap.ArenaAllocator.init(self.allocator);
162 defer arena.deinit();
163
164 const result = zat.verifyCommitCar(arena.allocator(), blocks, public_key, .{
165 .verify_mst = false,
166 .expected_did = did,
167 .max_car_size = 10 * 1024,
168 }) catch |err| {
169 log.debug("sync verification failed for {s}: {s}", .{ did, @errorName(err) });
170 // sync spec: on signature failure, key may have rotated.
171 // evict cached key and queue re-resolution. skip this frame.
172 self.evictKey(did);
173 self.queueResolve(did);
174 _ = self.stats.skipped.fetchAdd(1, .monotonic);
175 return .{ .valid = true, .skipped = true };
176 };
177
178 _ = self.stats.validated.fetchAdd(1, .monotonic);
179 return .{
180 .valid = true,
181 .skipped = false,
182 .data_cid = result.commit_cid,
183 .commit_rev = result.commit_rev,
184 };
185 }
186
187 /// validate a commit frame using a pre-decoded CBOR payload (from SDK decoder).
188 /// on cache miss, queues background resolution and skips.
189 pub fn validateCommit(self: *Validator, payload: zat.cbor.Value) ValidationResult {
190 // extract DID from decoded payload
191 const did = payload.getString("repo") orelse {
192 _ = self.stats.skipped.fetchAdd(1, .monotonic);
193 return .{ .valid = true, .skipped = true };
194 };
195
196 // check cache for pre-resolved signing key
197 const cached_key: ?CachedKey = self.cache.get(did);
198
199 if (cached_key == null) {
200 // cache miss — queue for background resolution, skip validation
201 _ = self.stats.cache_misses.fetchAdd(1, .monotonic);
202 _ = self.stats.skipped.fetchAdd(1, .monotonic);
203 self.queueResolve(did);
204 return .{ .valid = true, .skipped = true };
205 }
206
207 _ = self.stats.cache_hits.fetchAdd(1, .monotonic);
208
209 // cache hit — do structure checks + signature verification
210 if (self.verifyCommit(payload, did, cached_key.?)) |vr| {
211 _ = self.stats.validated.fetchAdd(1, .monotonic);
212 return vr;
213 } else |err| {
214 log.debug("commit verification failed for {s}: {s}", .{ did, @errorName(err) });
215 // sync spec: on signature failure, key may have rotated.
216 // evict cached key and queue re-resolution. skip this frame
217 // (treat as cache miss). next commit will use the refreshed key.
218 self.evictKey(did);
219 self.queueResolve(did);
220 _ = self.stats.skipped.fetchAdd(1, .monotonic);
221 return .{ .valid = true, .skipped = true };
222 }
223 }
224
225 fn verifyCommit(self: *Validator, payload: zat.cbor.Value, expected_did: []const u8, cached_key: CachedKey) !ValidationResult {
226 // commit structure checks first (cheap, no allocation)
227 try self.checkCommitStructure(payload);
228
229 // extract blocks (raw CAR bytes) from the pre-decoded payload
230 const blocks = payload.getBytes("blocks") orelse return error.InvalidFrame;
231
232 // blocks size check — lexicon maxLength: 2000000
233 if (blocks.len > 2_000_000) return error.InvalidFrame;
234
235 // build public key for verification
236 const public_key = zat.multicodec.PublicKey{
237 .key_type = cached_key.key_type,
238 .raw = cached_key.raw[0..cached_key.len],
239 };
240
241 // run real signature verification (needs its own arena for CAR/MST temporaries)
242 var arena = std.heap.ArenaAllocator.init(self.allocator);
243 defer arena.deinit();
244 const alloc = arena.allocator();
245
246 // try sync 1.1 path: extract ops and use verifyCommitDiff
247 if (self.config.verify_commit_diff) {
248 if (self.extractOps(alloc, payload)) |msg_ops| {
249 // get stored prev_data from payload
250 const prev_data: ?[]const u8 = if (payload.get("prevData")) |pd| switch (pd) {
251 .cid => |c| c.raw,
252 .null => null,
253 else => null,
254 } else null;
255
256 const diff_result = zat.verifyCommitDiff(alloc, blocks, msg_ops, prev_data, public_key, .{
257 .expected_did = expected_did,
258 .skip_inversion = prev_data == null,
259 }) catch |err| {
260 return err;
261 };
262
263 return .{
264 .valid = true,
265 .skipped = false,
266 .data_cid = diff_result.data_cid,
267 .commit_rev = diff_result.commit_rev,
268 };
269 }
270 }
271
272 // fallback: legacy verification (signature + optional MST walk)
273 const result = zat.verifyCommitCar(alloc, blocks, public_key, .{
274 .verify_mst = self.config.verify_mst,
275 .expected_did = expected_did,
276 }) catch |err| {
277 return err;
278 };
279
280 return .{
281 .valid = true,
282 .skipped = false,
283 .data_cid = result.commit_cid,
284 .commit_rev = result.commit_rev,
285 };
286 }
287
288 /// extract ops from payload and convert to mst.Operation array.
289 /// the firehose format uses a single "path" field ("collection/rkey"),
290 /// not separate "collection"/"rkey" fields.
291 fn extractOps(self: *Validator, alloc: Allocator, payload: zat.cbor.Value) ?[]const zat.MstOperation {
292 _ = self;
293 const ops_array = payload.getArray("ops") orelse return null;
294 var ops: std.ArrayListUnmanaged(zat.MstOperation) = .{};
295 for (ops_array) |op| {
296 const action = op.getString("action") orelse continue;
297 const path = op.getString("path") orelse continue;
298
299 // validate path contains "/" (collection/rkey)
300 if (std.mem.indexOfScalar(u8, path, '/') == null) continue;
301
302 // extract CID values
303 const cid_value: ?[]const u8 = if (op.get("cid")) |v| switch (v) {
304 .cid => |c| c.raw,
305 else => null,
306 } else null;
307
308 var value: ?[]const u8 = null;
309 var prev: ?[]const u8 = null;
310
311 if (std.mem.eql(u8, action, "create")) {
312 value = cid_value;
313 } else if (std.mem.eql(u8, action, "update")) {
314 value = cid_value;
315 prev = if (op.get("prev")) |v| switch (v) {
316 .cid => |c| c.raw,
317 else => null,
318 } else null;
319 } else if (std.mem.eql(u8, action, "delete")) {
320 prev = if (op.get("prev")) |v| switch (v) {
321 .cid => |c| c.raw,
322 else => null,
323 } else null;
324 } else continue;
325
326 ops.append(alloc, .{
327 .path = path,
328 .value = value,
329 .prev = prev,
330 }) catch return null;
331 }
332
333 if (ops.items.len == 0) return null;
334 return ops.items;
335 }
336
337 fn checkCommitStructure(self: *Validator, payload: zat.cbor.Value) !void {
338 // check repo field is a valid DID
339 const repo = payload.getString("repo") orelse return error.InvalidFrame;
340 if (zat.Did.parse(repo) == null) return error.InvalidFrame;
341
342 // check rev is a valid TID
343 if (payload.getString("rev")) |rev| {
344 if (zat.Tid.parse(rev) == null) return error.InvalidFrame;
345 }
346
347 // check ops count
348 if (payload.get("ops")) |ops_value| {
349 switch (ops_value) {
350 .array => |ops| {
351 if (ops.len > self.config.max_ops) return error.InvalidFrame;
352 // validate each op has valid path (collection/rkey)
353 for (ops) |op| {
354 if (op.getString("path")) |path| {
355 if (std.mem.indexOfScalar(u8, path, '/')) |sep| {
356 const collection = path[0..sep];
357 const rkey = path[sep + 1 ..];
358 if (zat.Nsid.parse(collection) == null) return error.InvalidFrame;
359 if (rkey.len > 0) {
360 if (zat.Rkey.parse(rkey) == null) return error.InvalidFrame;
361 }
362 } else return error.InvalidFrame; // path must contain '/'
363 }
364 }
365 },
366 else => return error.InvalidFrame,
367 }
368 }
369 }
370
371 fn queueResolve(self: *Validator, did: []const u8) void {
372 // check if already cached (race between validate and resolver)
373 if (self.cache.contains(did)) return;
374
375 const duped = self.allocator.dupe(u8, did) catch return;
376
377 self.queue_mutex.lock();
378 defer self.queue_mutex.unlock();
379
380 // skip if already queued (prevents unbounded queue growth)
381 if (self.queued_set.contains(duped)) {
382 self.allocator.free(duped);
383 return;
384 }
385
386 // cap queue size — drop DID without adding to queued_set so it can be re-queued later
387 if (self.queue.items.len >= max_queue_size) {
388 self.allocator.free(duped);
389 return;
390 }
391
392 self.queue.append(self.allocator, duped) catch {
393 self.allocator.free(duped);
394 return;
395 };
396 self.queued_set.put(self.allocator, duped, {}) catch {};
397 self.queue_cond.signal();
398 }
399
400 fn resolveLoop(self: *Validator) void {
401 var resolver = zat.DidResolver.initWithOptions(self.allocator, .{ .keep_alive = true });
402 defer resolver.deinit();
403
404 while (self.alive.load(.acquire)) {
405 var did: ?[]const u8 = null;
406 {
407 self.queue_mutex.lock();
408 defer self.queue_mutex.unlock();
409 while (self.queue.items.len == 0 and self.alive.load(.acquire)) {
410 self.queue_cond.timedWait(&self.queue_mutex, 1 * std.time.ns_per_s) catch {};
411 }
412 if (self.queue.items.len > 0) {
413 did = self.queue.orderedRemove(0);
414 _ = self.queued_set.remove(did.?);
415 }
416 }
417
418 const d = did orelse continue;
419 defer self.allocator.free(d);
420
421 // skip if already cached (resolved while queued)
422 if (self.cache.contains(d)) continue;
423
424 // resolve DID → signing key
425 const parsed = zat.Did.parse(d) orelse continue;
426 var doc = resolver.resolve(parsed) catch |err| {
427 log.debug("DID resolve failed for {s}: {s}", .{ d, @errorName(err) });
428 continue;
429 };
430 defer doc.deinit();
431
432 // extract and decode signing key
433 const vm = doc.signingKey() orelse continue;
434 const key_bytes = zat.multibase.decode(self.allocator, vm.public_key_multibase) catch continue;
435 defer self.allocator.free(key_bytes);
436 const public_key = zat.multicodec.parsePublicKey(key_bytes) catch continue;
437
438 // store decoded key in cache (fixed-size, no pointer chasing)
439 var cached = CachedKey{
440 .key_type = public_key.key_type,
441 .raw = undefined,
442 .len = @intCast(public_key.raw.len),
443 .resolve_time = std.time.timestamp(),
444 };
445 @memcpy(cached.raw[0..public_key.raw.len], public_key.raw);
446
447 self.cache.put(d, cached) catch continue;
448
449 // --- host validation (merged from migration queue) ---
450 // while we have the DID doc, check PDS endpoint and update host if needed.
451 // best-effort: failures don't prevent signing key caching.
452 if (self.persist) |persist| {
453 if (doc.pdsEndpoint()) |pds_endpoint| {
454 if (extractHostFromUrl(pds_endpoint)) |pds_host| {
455 const pds_host_id = (persist.getHostIdForHostname(pds_host) catch null) orelse continue;
456 const uid = persist.uidForDid(d) catch continue;
457 const current_host = persist.getAccountHostId(uid) catch continue;
458 if (current_host != 0 and current_host != pds_host_id) {
459 persist.setAccountHostId(uid, pds_host_id) catch {};
460 log.info("host updated via DID doc: {s} -> host {d}", .{ d, pds_host_id });
461 }
462 }
463 }
464 }
465 }
466 }
467
468 /// evict a DID's cached signing key (e.g. on #identity event).
469 /// the next commit from this DID will trigger a fresh resolution.
470 pub fn evictKey(self: *Validator, did: []const u8) void {
471 _ = self.cache.remove(did);
472 }
473
474 /// cache size (for diagnostics)
475 pub fn cacheSize(self: *Validator) u32 {
476 return self.cache.count();
477 }
478
479 /// resolve queue length (for diagnostics — non-blocking)
480 pub fn resolveQueueLen(self: *Validator) usize {
481 if (!self.queue_mutex.tryLock()) return 0;
482 defer self.queue_mutex.unlock();
483 return self.queue.items.len;
484 }
485
486 /// resolve dedup set size (for diagnostics — non-blocking)
487 pub fn resolveQueuedSetCount(self: *Validator) u32 {
488 if (!self.queue_mutex.tryLock()) return 0;
489 defer self.queue_mutex.unlock();
490 return self.queued_set.count();
491 }
492
493 /// signing key cache hashmap backing capacity (for memory attribution)
494 pub fn cacheMapCapacity(self: *Validator) u32 {
495 return self.cache.mapCapacity();
496 }
497
498 /// resolver dedup set hashmap backing capacity (for memory attribution — non-blocking)
499 pub fn resolveQueuedSetCapacity(self: *Validator) u32 {
500 if (!self.queue_mutex.tryLock()) return 0;
501 defer self.queue_mutex.unlock();
502 return self.queued_set.capacity();
503 }
504};
505
506/// extract hostname from a URL like "https://pds.example.com" or "https://pds.example.com:443/path"
507fn extractHostFromUrl(url: []const u8) ?[]const u8 {
508 // strip scheme
509 var rest = url;
510 if (std.mem.startsWith(u8, rest, "https://")) {
511 rest = rest["https://".len..];
512 } else if (std.mem.startsWith(u8, rest, "http://")) {
513 rest = rest["http://".len..];
514 }
515 // strip path
516 if (std.mem.indexOfScalar(u8, rest, '/')) |i| {
517 rest = rest[0..i];
518 }
519 // strip port
520 if (std.mem.indexOfScalar(u8, rest, ':')) |i| {
521 rest = rest[0..i];
522 }
523 if (rest.len == 0) return null;
524 return rest;
525}
526
527fn parseEnvInt(comptime T: type, key: []const u8, default: T) T {
528 const val = std.posix.getenv(key) orelse return default;
529 return std.fmt.parseInt(T, val, 10) catch default;
530}
531
532// --- tests ---
533
534test "validateCommit skips on cache miss" {
535 var stats = broadcaster.Stats{};
536 var v = Validator.init(std.testing.allocator, &stats);
537 defer v.deinit();
538
539 // build a commit payload using SDK
540 const payload: zat.cbor.Value = .{ .map = &.{
541 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
542 .{ .key = "seq", .value = .{ .unsigned = 42 } },
543 .{ .key = "rev", .value = .{ .text = "3k2abc000000" } },
544 .{ .key = "time", .value = .{ .text = "2024-01-15T10:30:00Z" } },
545 } };
546
547 const result = v.validateCommit(payload);
548 try std.testing.expect(result.valid);
549 try std.testing.expect(result.skipped);
550 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire));
551}
552
553test "validateCommit skips when no repo field" {
554 var stats = broadcaster.Stats{};
555 var v = Validator.init(std.testing.allocator, &stats);
556 defer v.deinit();
557
558 // payload without "repo" field
559 const payload: zat.cbor.Value = .{ .map = &.{
560 .{ .key = "seq", .value = .{ .unsigned = 42 } },
561 } };
562
563 const result = v.validateCommit(payload);
564 try std.testing.expect(result.valid);
565 try std.testing.expect(result.skipped);
566 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire));
567}
568
569test "checkCommitStructure rejects invalid DID" {
570 var stats = broadcaster.Stats{};
571 var v = Validator.init(std.testing.allocator, &stats);
572 defer v.deinit();
573
574 const payload: zat.cbor.Value = .{ .map = &.{
575 .{ .key = "repo", .value = .{ .text = "not-a-did" } },
576 } };
577
578 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload));
579}
580
581test "checkCommitStructure accepts valid commit" {
582 var stats = broadcaster.Stats{};
583 var v = Validator.init(std.testing.allocator, &stats);
584 defer v.deinit();
585
586 const payload: zat.cbor.Value = .{ .map = &.{
587 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
588 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
589 } };
590
591 try v.checkCommitStructure(payload);
592}
593
594test "validateSync skips on cache miss" {
595 var stats = broadcaster.Stats{};
596 var v = Validator.init(std.testing.allocator, &stats);
597 defer v.deinit();
598
599 const payload: zat.cbor.Value = .{ .map = &.{
600 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
601 .{ .key = "seq", .value = .{ .unsigned = 42 } },
602 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
603 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } },
604 } };
605
606 const result = v.validateSync(payload);
607 try std.testing.expect(result.valid);
608 try std.testing.expect(result.skipped);
609 try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire));
610}
611
612test "validateSync rejects invalid DID" {
613 var stats = broadcaster.Stats{};
614 var v = Validator.init(std.testing.allocator, &stats);
615 defer v.deinit();
616
617 const payload: zat.cbor.Value = .{ .map = &.{
618 .{ .key = "did", .value = .{ .text = "not-a-did" } },
619 .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } },
620 } };
621
622 const result = v.validateSync(payload);
623 try std.testing.expect(!result.valid);
624 try std.testing.expect(!result.skipped);
625 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire));
626}
627
628test "validateSync rejects missing blocks" {
629 var stats = broadcaster.Stats{};
630 var v = Validator.init(std.testing.allocator, &stats);
631 defer v.deinit();
632
633 const payload: zat.cbor.Value = .{ .map = &.{
634 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
635 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
636 } };
637
638 const result = v.validateSync(payload);
639 try std.testing.expect(!result.valid);
640 try std.testing.expect(!result.skipped);
641 try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire));
642}
643
644test "validateSync skips when no did field" {
645 var stats = broadcaster.Stats{};
646 var v = Validator.init(std.testing.allocator, &stats);
647 defer v.deinit();
648
649 const payload: zat.cbor.Value = .{ .map = &.{
650 .{ .key = "seq", .value = .{ .unsigned = 42 } },
651 } };
652
653 const result = v.validateSync(payload);
654 try std.testing.expect(result.valid);
655 try std.testing.expect(result.skipped);
656 try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire));
657}
658
659test "LRU cache evicts least recently used" {
660 var stats = broadcaster.Stats{};
661 var v = Validator.init(std.testing.allocator, &stats);
662 v.cache.capacity = 3;
663 defer v.deinit();
664
665 const mk = CachedKey{ .key_type = .p256, .raw = .{0} ** 33, .len = 33 };
666
667 try v.cache.put("did:plc:aaa", mk);
668 try v.cache.put("did:plc:bbb", mk);
669 try v.cache.put("did:plc:ccc", mk);
670
671 // access "aaa" to promote it
672 _ = v.cache.get("did:plc:aaa");
673
674 // insert "ddd" — should evict "bbb" (LRU)
675 try v.cache.put("did:plc:ddd", mk);
676
677 try std.testing.expect(v.cache.get("did:plc:bbb") == null);
678 try std.testing.expect(v.cache.get("did:plc:aaa") != null);
679 try std.testing.expect(v.cache.get("did:plc:ccc") != null);
680 try std.testing.expect(v.cache.get("did:plc:ddd") != null);
681 try std.testing.expectEqual(@as(u32, 3), v.cache.count());
682}
683
684test "checkCommitStructure rejects too many ops" {
685 var stats = broadcaster.Stats{};
686 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .max_ops = 2 });
687 defer v.deinit();
688
689 // build ops array with 3 items (over limit of 2)
690 const ops = [_]zat.cbor.Value{
691 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} },
692 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} },
693 .{ .map = &.{.{ .key = "action", .value = .{ .text = "create" } }} },
694 };
695
696 const payload: zat.cbor.Value = .{ .map = &.{
697 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
698 .{ .key = "ops", .value = .{ .array = &ops } },
699 } };
700
701 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(payload));
702}
703
704// --- spec conformance tests ---
705
706test "spec: #commit blocks > 2,000,000 bytes rejected" {
707 // lexicon maxLength for #commit blocks: 2,000,000
708 var stats = broadcaster.Stats{};
709 var v = Validator.init(std.testing.allocator, &stats);
710 defer v.deinit();
711
712 // insert a fake cached key so we reach the blocks size check
713 const did = "did:plc:test123";
714 try v.cache.put(did, .{
715 .key_type = .p256,
716 .raw = .{0} ** 33,
717 .len = 33,
718 .resolve_time = 100,
719 });
720
721 // blocks with 2,000,001 bytes (1 byte over limit)
722 const oversized_blocks = try std.testing.allocator.alloc(u8, 2_000_001);
723 defer std.testing.allocator.free(oversized_blocks);
724 @memset(oversized_blocks, 0);
725
726 const payload: zat.cbor.Value = .{ .map = &.{
727 .{ .key = "repo", .value = .{ .text = did } },
728 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
729 .{ .key = "blocks", .value = .{ .bytes = oversized_blocks } },
730 } };
731
732 const result = v.validateCommit(payload);
733 try std.testing.expect(!result.valid or result.skipped);
734}
735
736test "spec: #commit blocks = 2,000,000 bytes accepted (boundary)" {
737 // lexicon maxLength for #commit blocks: 2,000,000 — exactly at limit should pass size check
738 var stats = broadcaster.Stats{};
739 var v = Validator.init(std.testing.allocator, &stats);
740 defer v.deinit();
741
742 const did = "did:plc:test123";
743 try v.cache.put(did, .{
744 .key_type = .p256,
745 .raw = .{0} ** 33,
746 .len = 33,
747 .resolve_time = 100,
748 });
749
750 // exactly 2,000,000 bytes — should pass size check (may fail signature verify, that's ok)
751 const exact_blocks = try std.testing.allocator.alloc(u8, 2_000_000);
752 defer std.testing.allocator.free(exact_blocks);
753 @memset(exact_blocks, 0);
754
755 const payload: zat.cbor.Value = .{ .map = &.{
756 .{ .key = "repo", .value = .{ .text = did } },
757 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
758 .{ .key = "blocks", .value = .{ .bytes = exact_blocks } },
759 } };
760
761 const result = v.validateCommit(payload);
762 // should not be rejected for size — may fail signature verification (that's fine,
763 // it means we passed the size check). with P1.1c, sig failure → skipped=true.
764 try std.testing.expect(result.valid or result.skipped);
765}
766
767test "spec: #sync blocks > 10,000 bytes rejected" {
768 // lexicon maxLength for #sync blocks: 10,000
769 var stats = broadcaster.Stats{};
770 var v = Validator.init(std.testing.allocator, &stats);
771 defer v.deinit();
772
773 const payload: zat.cbor.Value = .{ .map = &.{
774 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
775 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
776 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_001) } },
777 } };
778
779 const result = v.validateSync(payload);
780 try std.testing.expect(!result.valid);
781 try std.testing.expect(!result.skipped);
782}
783
784test "spec: #sync blocks = 10,000 bytes accepted (boundary)" {
785 // lexicon maxLength for #sync blocks: 10,000 — exactly at limit should pass size check
786 var stats = broadcaster.Stats{};
787 var v = Validator.init(std.testing.allocator, &stats);
788 defer v.deinit();
789
790 const payload: zat.cbor.Value = .{ .map = &.{
791 .{ .key = "did", .value = .{ .text = "did:plc:test123" } },
792 .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } },
793 .{ .key = "blocks", .value = .{ .bytes = &([_]u8{0} ** 10_000) } },
794 } };
795
796 const result = v.validateSync(payload);
797 // should pass size check — will be a cache miss → skipped (no cached key)
798 try std.testing.expect(result.valid);
799 try std.testing.expect(result.skipped);
800}
801
802test "extractOps reads path field from firehose format" {
803 var stats = broadcaster.Stats{};
804 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true });
805 defer v.deinit();
806
807 // use arena since extractOps allocates an ArrayList internally
808 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
809 defer arena.deinit();
810
811 const ops = [_]zat.cbor.Value{
812 .{ .map = &.{
813 .{ .key = "action", .value = .{ .text = "create" } },
814 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } },
815 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } },
816 } },
817 .{ .map = &.{
818 .{ .key = "action", .value = .{ .text = "delete" } },
819 .{ .key = "path", .value = .{ .text = "app.bsky.feed.like/3k2def000000" } },
820 } },
821 };
822
823 const payload: zat.cbor.Value = .{ .map = &.{
824 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
825 .{ .key = "ops", .value = .{ .array = &ops } },
826 } };
827
828 const result = v.extractOps(arena.allocator(), payload);
829 try std.testing.expect(result != null);
830 try std.testing.expectEqual(@as(usize, 2), result.?.len);
831 try std.testing.expectEqualStrings("app.bsky.feed.post/3k2abc000000", result.?[0].path);
832 try std.testing.expectEqualStrings("app.bsky.feed.like/3k2def000000", result.?[1].path);
833 try std.testing.expect(result.?[0].value != null); // create has cid
834 try std.testing.expect(result.?[1].value == null); // delete has no cid
835}
836
837test "extractOps rejects malformed path without slash" {
838 var stats = broadcaster.Stats{};
839 var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true });
840 defer v.deinit();
841
842 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
843 defer arena.deinit();
844
845 const ops = [_]zat.cbor.Value{
846 .{ .map = &.{
847 .{ .key = "action", .value = .{ .text = "create" } },
848 .{ .key = "path", .value = .{ .text = "noslash" } },
849 .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } },
850 } },
851 };
852
853 const payload: zat.cbor.Value = .{ .map = &.{
854 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
855 .{ .key = "ops", .value = .{ .array = &ops } },
856 } };
857
858 // malformed path (no slash) → all ops skipped → returns null
859 const result = v.extractOps(arena.allocator(), payload);
860 try std.testing.expect(result == null);
861}
862
863test "checkCommitStructure validates path field" {
864 var stats = broadcaster.Stats{};
865 var v = Validator.init(std.testing.allocator, &stats);
866 defer v.deinit();
867
868 // valid path
869 const valid_ops = [_]zat.cbor.Value{
870 .{ .map = &.{
871 .{ .key = "action", .value = .{ .text = "create" } },
872 .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } },
873 } },
874 };
875
876 const valid_payload: zat.cbor.Value = .{ .map = &.{
877 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
878 .{ .key = "ops", .value = .{ .array = &valid_ops } },
879 } };
880
881 try v.checkCommitStructure(valid_payload);
882
883 // invalid collection in path
884 const invalid_ops = [_]zat.cbor.Value{
885 .{ .map = &.{
886 .{ .key = "action", .value = .{ .text = "create" } },
887 .{ .key = "path", .value = .{ .text = "not-an-nsid/3k2abc000000" } },
888 } },
889 };
890
891 const invalid_payload: zat.cbor.Value = .{ .map = &.{
892 .{ .key = "repo", .value = .{ .text = "did:plc:test123" } },
893 .{ .key = "ops", .value = .{ .array = &invalid_ops } },
894 } };
895
896 try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(invalid_payload));
897}
898
899test "queueResolve deduplicates repeated DIDs" {
900 var stats = broadcaster.Stats{};
901 var v = Validator.init(std.testing.allocator, &stats);
902 defer v.deinit();
903
904 // queue the same DID 100 times
905 for (0..100) |_| {
906 v.queueResolve("did:plc:duplicate");
907 }
908
909 // should have exactly 1 entry, not 100
910 try std.testing.expectEqual(@as(usize, 1), v.queue.items.len);
911 try std.testing.expectEqual(@as(u32, 1), v.queued_set.count());
912}