atproto relay implementation in zig zlay.waow.tech

feat: handle #sync firehose frames (sync 1.1)

previously #sync frames fell through to the identity fallback, getting
persisted with kind=identity. now they're properly routed: validated via
signature-only verification (no MST/ops), persisted with kind=sync (6),
and update account state like commits. rev ordering is intentionally not
enforced since sync is used for rollbacks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+167 -15
+25 -15
src/subscriber.zig
··· 323 323 324 324 // route by frame type 325 325 const is_commit = std.mem.eql(u8, frame_type, "#commit"); 326 + const is_sync = std.mem.eql(u8, frame_type, "#sync"); 326 327 const is_account = std.mem.eql(u8, frame_type, "#account"); 327 328 328 329 // extract DID: "repo" for commits, "did" for identity/account ··· 373 374 } 374 375 } 375 376 376 - // for commits: check account is active, validate, extract state 377 + // for commits and syncs: check account is active, validate, extract state 377 378 var commit_data_cid: ?[]const u8 = null; 378 379 var commit_rev: ?[]const u8 = null; 379 - if (is_commit) { 380 - // drop commits for inactive accounts 381 - // Go relay: EnsureAccountActive — silently drops 380 + if (is_commit or is_sync) { 381 + // drop for inactive accounts 382 382 if (sub.persist) |dp| { 383 383 if (uid > 0) { 384 384 const active = dp.isAccountActive(uid) catch true; ··· 389 389 } 390 390 } 391 391 392 - const result = sub.validator.validateCommit(payload); 393 - if (!result.valid) return; 394 - commit_data_cid = result.data_cid; 395 - commit_rev = result.commit_rev; 392 + if (is_commit) { 393 + const result = sub.validator.validateCommit(payload); 394 + if (!result.valid) return; 395 + commit_data_cid = result.data_cid; 396 + commit_rev = result.commit_rev; 396 397 397 - // track collections from commit ops (phase 1 live indexing) 398 - if (sub.collection_index) |ci| { 399 - if (did) |d| { 400 - if (payload.get("ops")) |ops| { 401 - ci.trackCommitOps(d, ops); 398 + // track collections from commit ops (phase 1 live indexing) 399 + if (sub.collection_index) |ci| { 400 + if (did) |d| { 401 + if (payload.get("ops")) |ops| { 402 + ci.trackCommitOps(d, ops); 403 + } 402 404 } 403 405 } 406 + } else { 407 + // #sync: signature verification only, no ops/MST 408 + const result = sub.validator.validateSync(payload); 409 + if (!result.valid) return; 410 + commit_data_cid = result.data_cid; 411 + commit_rev = result.commit_rev; 404 412 } 405 413 } 406 414 407 415 // determine event kind for persistence 408 416 const kind: event_log_mod.EvtKind = if (is_commit) 409 417 .commit 418 + else if (is_sync) 419 + .sync 410 420 else if (is_account) 411 421 .account 412 422 else ··· 420 430 return; 421 431 }; 422 432 423 - // update per-DID state after successful commit validation 424 - if (is_commit and uid > 0) { 433 + // update per-DID state after successful commit/sync validation 434 + if ((is_commit or is_sync) and uid > 0) { 425 435 if (commit_rev) |rev| { 426 436 const cid_str: []const u8 = if (commit_data_cid) |cid_raw| 427 437 zat.multibase.encode(alloc, .base32lower, cid_raw) catch ""
+142
src/validator.zig
··· 89 89 self.resolver_thread = try std.Thread.spawn(.{}, resolveLoop, .{self}); 90 90 } 91 91 92 + /// validate a #sync frame: signature verification only (no ops, no MST). 93 + /// #sync resets a repo to a new commit state — used for recovery from broken streams. 94 + /// on cache miss, queues background resolution and skips. 95 + pub fn validateSync(self: *Validator, payload: zat.cbor.Value) ValidationResult { 96 + const did = payload.getString("did") orelse { 97 + _ = self.stats.skipped.fetchAdd(1, .monotonic); 98 + return .{ .valid = true, .skipped = true }; 99 + }; 100 + 101 + if (zat.Did.parse(did) == null) { 102 + _ = self.stats.failed.fetchAdd(1, .monotonic); 103 + return .{ .valid = false, .skipped = false }; 104 + } 105 + 106 + // check rev is valid TID (if present) 107 + if (payload.getString("rev")) |rev| { 108 + if (zat.Tid.parse(rev) == null) { 109 + _ = self.stats.failed.fetchAdd(1, .monotonic); 110 + return .{ .valid = false, .skipped = false }; 111 + } 112 + } 113 + 114 + const blocks = payload.getBytes("blocks") orelse { 115 + _ = self.stats.failed.fetchAdd(1, .monotonic); 116 + return .{ .valid = false, .skipped = false }; 117 + }; 118 + 119 + // #sync CAR should be small (just the signed commit block) 120 + if (blocks.len > 10 * 1024) { 121 + _ = self.stats.failed.fetchAdd(1, .monotonic); 122 + return .{ .valid = false, .skipped = false }; 123 + } 124 + 125 + // cache lookup 126 + const cached_key: ?CachedKey = blk: { 127 + self.cache_mutex.lock(); 128 + defer self.cache_mutex.unlock(); 129 + break :blk self.cache.get(did); 130 + }; 131 + 132 + if (cached_key == null) { 133 + _ = self.stats.cache_misses.fetchAdd(1, .monotonic); 134 + _ = self.stats.skipped.fetchAdd(1, .monotonic); 135 + self.queueResolve(did); 136 + return .{ .valid = true, .skipped = true }; 137 + } 138 + 139 + _ = self.stats.cache_hits.fetchAdd(1, .monotonic); 140 + 141 + // verify signature (no MST, no ops) 142 + const public_key = zat.multicodec.PublicKey{ 143 + .key_type = cached_key.?.key_type, 144 + .raw = cached_key.?.raw[0..cached_key.?.len], 145 + }; 146 + 147 + var arena = std.heap.ArenaAllocator.init(self.allocator); 148 + defer arena.deinit(); 149 + 150 + const result = zat.verifyCommitCar(arena.allocator(), blocks, public_key, .{ 151 + .verify_mst = false, 152 + .expected_did = did, 153 + .max_car_size = 10 * 1024, 154 + }) catch |err| { 155 + log.debug("sync verification failed for {s}: {s}", .{ did, @errorName(err) }); 156 + _ = self.stats.failed.fetchAdd(1, .monotonic); 157 + return .{ .valid = false, .skipped = false }; 158 + }; 159 + 160 + _ = self.stats.validated.fetchAdd(1, .monotonic); 161 + return .{ 162 + .valid = true, 163 + .skipped = false, 164 + .data_cid = result.commit_cid, 165 + .commit_rev = result.commit_rev, 166 + }; 167 + } 168 + 92 169 /// validate a commit frame using a pre-decoded CBOR payload (from SDK decoder). 93 170 /// on cache miss, queues background resolution and skips. 94 171 pub fn validateCommit(self: *Validator, payload: zat.cbor.Value) ValidationResult { ··· 427 504 } }; 428 505 429 506 try v.checkCommitStructure(payload); 507 + } 508 + 509 + test "validateSync skips on cache miss" { 510 + var stats = broadcaster.Stats{}; 511 + var v = Validator.init(std.testing.allocator, &stats); 512 + defer v.deinit(); 513 + 514 + const payload: zat.cbor.Value = .{ .map = &.{ 515 + .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 516 + .{ .key = "seq", .value = .{ .unsigned = 42 } }, 517 + .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 518 + .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } }, 519 + } }; 520 + 521 + const result = v.validateSync(payload); 522 + try std.testing.expect(result.valid); 523 + try std.testing.expect(result.skipped); 524 + try std.testing.expectEqual(@as(u64, 1), stats.cache_misses.load(.acquire)); 525 + } 526 + 527 + test "validateSync rejects invalid DID" { 528 + var stats = broadcaster.Stats{}; 529 + var v = Validator.init(std.testing.allocator, &stats); 530 + defer v.deinit(); 531 + 532 + const payload: zat.cbor.Value = .{ .map = &.{ 533 + .{ .key = "did", .value = .{ .text = "not-a-did" } }, 534 + .{ .key = "blocks", .value = .{ .bytes = "deadbeef" } }, 535 + } }; 536 + 537 + const result = v.validateSync(payload); 538 + try std.testing.expect(!result.valid); 539 + try std.testing.expect(!result.skipped); 540 + try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire)); 541 + } 542 + 543 + test "validateSync rejects missing blocks" { 544 + var stats = broadcaster.Stats{}; 545 + var v = Validator.init(std.testing.allocator, &stats); 546 + defer v.deinit(); 547 + 548 + const payload: zat.cbor.Value = .{ .map = &.{ 549 + .{ .key = "did", .value = .{ .text = "did:plc:test123" } }, 550 + .{ .key = "rev", .value = .{ .text = "3k2abcdefghij" } }, 551 + } }; 552 + 553 + const result = v.validateSync(payload); 554 + try std.testing.expect(!result.valid); 555 + try std.testing.expect(!result.skipped); 556 + try std.testing.expectEqual(@as(u64, 1), stats.failed.load(.acquire)); 557 + } 558 + 559 + test "validateSync skips when no did field" { 560 + var stats = broadcaster.Stats{}; 561 + var v = Validator.init(std.testing.allocator, &stats); 562 + defer v.deinit(); 563 + 564 + const payload: zat.cbor.Value = .{ .map = &.{ 565 + .{ .key = "seq", .value = .{ .unsigned = 42 } }, 566 + } }; 567 + 568 + const result = v.validateSync(payload); 569 + try std.testing.expect(result.valid); 570 + try std.testing.expect(result.skipped); 571 + try std.testing.expectEqual(@as(u64, 1), stats.skipped.load(.acquire)); 430 572 } 431 573 432 574 test "checkCommitStructure rejects too many ops" {