atproto relay implementation in zig zlay.waow.tech

inductive proof chain plumbing: extractOps fix, chain checks, CAS upsert

phase 1: fix extractOps/checkCommitStructure to read firehose "path" field
instead of nonexistent "collection"/"rkey" — verify_commit_diff was dead code.

phase 2: add since/prevData chain continuity checks and future-rev rejection.
log-only + chain_breaks metric, no commits dropped yet.

phase 3: conditional upsert (WHERE rev < EXCLUDED.rev) on updateAccountState
to prevent concurrent workers from rolling back rev on same DID.

also: fix deploy docs in CLAUDE.md (justfile module syntax, kubeconfig path).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+229 -27
+2 -2
CLAUDE.md
··· 11 11 12 12 ## deploy 13 13 14 - configs at `../@zzstoatzz.io/relay/` — `just zlay-publish-remote ReleaseSafe` 14 + configs at `../@zzstoatzz.io/relay/` — `just zlay publish-remote ReleaseSafe` 15 15 16 - MUST set `KUBECONFIG="$(pwd)/zlay-kubeconfig.yaml"` — default context is docker-desktop. 16 + KUBECONFIG is set automatically by the zlay module (`zlay/kubeconfig.yaml`). 17 17 18 18 ## docs 19 19
+8 -1
src/broadcaster.zig
··· 36 36 slow_consumers: std.atomic.Value(u64) = .{ .raw = 0 }, 37 37 connected_inbound: std.atomic.Value(u64) = .{ .raw = 0 }, 38 38 cache_evictions: std.atomic.Value(u64) = .{ .raw = 0 }, 39 + chain_breaks: std.atomic.Value(u64) = .{ .raw = 0 }, 39 40 start_time: i64 = 0, 40 41 }; 41 42 ··· 632 633 \\# HELP relay_resolve_queued_set_count DID resolve dedup set size 633 634 \\relay_resolve_queued_set_count {d} 634 635 \\ 636 + \\# TYPE relay_chain_breaks_total counter 637 + \\# HELP relay_chain_breaks_total since/prevData chain continuity failures 638 + \\relay_chain_breaks_total {d} 639 + \\ 635 640 , .{ 636 641 stats.frames_in.load(.acquire), 637 642 stats.frames_out.load(.acquire), ··· 655 660 attribution.did_cache_entries, 656 661 attribution.resolve_queue_len, 657 662 attribution.resolve_queued_set_count, 663 + stats.chain_breaks.load(.acquire), 658 664 }) catch return fbs.getWritten(); 659 665 660 666 // linux-only process metrics from /proc ··· 769 775 770 776 pub fn formatStatsResponse(stats: *const Stats, buf: []u8) []const u8 { 771 777 return std.fmt.bufPrint(buf, 772 - \\{{"seq":{d},"relay_seq":{d},"consumers":{d},"connected_inbound":{d},"frames_in":{d},"frames_out":{d},"validated":{d},"failed":{d},"skipped":{d},"decode_errors":{d},"cache_hits":{d},"cache_misses":{d},"slow_consumers":{d},"uptime_seconds":{d}}} 778 + \\{{"seq":{d},"relay_seq":{d},"consumers":{d},"connected_inbound":{d},"frames_in":{d},"frames_out":{d},"validated":{d},"failed":{d},"skipped":{d},"decode_errors":{d},"cache_hits":{d},"cache_misses":{d},"slow_consumers":{d},"chain_breaks":{d},"uptime_seconds":{d}}} 773 779 , .{ 774 780 stats.seq.load(.acquire), 775 781 stats.relay_seq.load(.acquire), ··· 784 790 stats.cache_hits.load(.acquire), 785 791 stats.cache_misses.load(.acquire), 786 792 stats.slow_consumers.load(.acquire), 793 + stats.chain_breaks.load(.acquire), 787 794 std.time.timestamp() - stats.start_time, 788 795 }) catch ""; 789 796 }
+8 -4
src/event_log.zig
··· 350 350 }; 351 351 } 352 352 353 - /// update stored sync state after a verified commit (upsert into account_repo) 354 - pub fn updateAccountState(self: *DiskPersist, uid: u64, rev: []const u8, data_cid: []const u8) !void { 355 - _ = try self.db.exec( 356 - "INSERT INTO account_repo (uid, rev, commit_data_cid) VALUES ($1, $2, $3) ON CONFLICT (uid) DO UPDATE SET rev = EXCLUDED.rev, commit_data_cid = EXCLUDED.commit_data_cid", 353 + /// update stored sync state after a verified commit (conditional upsert into account_repo). 354 + /// uses WHERE clause to ensure rev only moves forward, preventing race conditions 355 + /// when concurrent workers process commits for the same DID. 356 + /// returns true if the state was updated, false if a newer rev was already stored. 357 + pub fn updateAccountState(self: *DiskPersist, uid: u64, rev: []const u8, data_cid: []const u8) !bool { 358 + const rows_affected = try self.db.exec( 359 + "INSERT INTO account_repo (uid, rev, commit_data_cid) VALUES ($1, $2, $3) ON CONFLICT (uid) DO UPDATE SET rev = EXCLUDED.rev, commit_data_cid = EXCLUDED.commit_data_cid WHERE account_repo.rev < EXCLUDED.rev", 357 360 .{ @as(i64, @intCast(uid)), rev, data_cid }, 358 361 ); 362 + return (rows_affected orelse 0) > 0; 359 363 } 360 364 361 365 // --- account status ---
+49 -3
src/frame_worker.zig
··· 128 128 } 129 129 } 130 130 131 - // stale rev check 131 + // rev checks: stale, future, chain continuity 132 132 if (is_commit and uid > 0) { 133 133 if (payload.getString("rev")) |incoming_rev| { 134 + // future-rev rejection: drop commits with timestamps too far ahead 135 + if (zat.Tid.parse(incoming_rev)) |tid| { 136 + const rev_us: i64 = @intCast(tid.timestamp()); 137 + const now_us = std.time.microTimestamp(); 138 + const skew_us: i64 = work.validator.config.rev_clock_skew * 1_000_000; 139 + if (rev_us > now_us + skew_us) { 140 + log.info("host {s}: dropping future rev uid={d} rev={s}", .{ 141 + work.hostname, uid, incoming_rev, 142 + }); 143 + _ = work.bc.stats.failed.fetchAdd(1, .monotonic); 144 + return; 145 + } 146 + } 147 + 134 148 if (work.persist) |dp| { 135 149 if (dp.getAccountState(uid, alloc) catch null) |prev| { 150 + // stale rev check 136 151 if (std.mem.order(u8, incoming_rev, prev.rev) != .gt) { 137 152 log.debug("host {s}: dropping stale commit uid={d} rev={s} <= {s}", .{ 138 153 work.hostname, uid, incoming_rev, prev.rev, 139 154 }); 140 155 _ = work.bc.stats.skipped.fetchAdd(1, .monotonic); 141 156 return; 157 + } 158 + 159 + // chain continuity: since should match stored rev 160 + if (payload.getString("since")) |since| { 161 + if (!std.mem.eql(u8, since, prev.rev)) { 162 + log.info("host {s}: chain break uid={d} since={s} stored_rev={s}", .{ 163 + work.hostname, uid, since, prev.rev, 164 + }); 165 + _ = work.bc.stats.chain_breaks.fetchAdd(1, .monotonic); 166 + } 167 + } 168 + 169 + // chain continuity: prevData CID should match stored data_cid 170 + if (payload.get("prevData")) |pd| { 171 + if (pd == .cid) { 172 + const prev_data_encoded = zat.multibase.encode(alloc, .base32lower, pd.cid.raw) catch ""; 173 + if (prev_data_encoded.len > 0 and prev.data_cid.len > 0 and 174 + !std.mem.eql(u8, prev_data_encoded, prev.data_cid)) 175 + { 176 + log.info("host {s}: chain break uid={d} prevData mismatch", .{ 177 + work.hostname, uid, 178 + }); 179 + _ = work.bc.stats.chain_breaks.fetchAdd(1, .monotonic); 180 + } 181 + } 142 182 } 143 183 } 144 184 } ··· 202 242 zat.multibase.encode(alloc, .base32lower, cid_raw) catch "" 203 243 else 204 244 ""; 205 - dp.updateAccountState(uid, rev, cid_str) catch |err| { 245 + if (dp.updateAccountState(uid, rev, cid_str)) |updated| { 246 + if (!updated) { 247 + log.debug("host {s}: stale state update lost race uid={d} rev={s}", .{ 248 + work.hostname, uid, rev, 249 + }); 250 + } 251 + } else |err| { 206 252 log.debug("account state update failed: {s}", .{@errorName(err)}); 207 - }; 253 + } 208 254 } 209 255 } 210 256 } else {
+49 -4
src/subscriber.zig
··· 438 438 } 439 439 } 440 440 441 - // stale rev check (indigo ingest.go:114, rsky utils.rs:77): 442 - // drop commits where rev <= stored rev to prevent duplicates/rollbacks 441 + // rev checks: stale, future, chain continuity 443 442 if (is_commit and uid > 0) { 444 443 if (payload.getString("rev")) |incoming_rev| { 444 + // future-rev rejection 445 + if (zat.Tid.parse(incoming_rev)) |tid| { 446 + const rev_us: i64 = @intCast(tid.timestamp()); 447 + const now_us = std.time.microTimestamp(); 448 + const skew_us: i64 = sub.validator.config.rev_clock_skew * 1_000_000; 449 + if (rev_us > now_us + skew_us) { 450 + log.info("host {s}: dropping future rev uid={d} rev={s}", .{ 451 + sub.options.hostname, uid, incoming_rev, 452 + }); 453 + _ = sub.bc.stats.failed.fetchAdd(1, .monotonic); 454 + return; 455 + } 456 + } 457 + 445 458 if (sub.persist) |dp| { 446 459 if (dp.getAccountState(uid, alloc) catch null) |prev| { 460 + // stale rev check 447 461 if (std.mem.order(u8, incoming_rev, prev.rev) != .gt) { 448 462 log.debug("host {s}: dropping stale commit uid={d} rev={s} <= {s}", .{ 449 463 sub.options.hostname, uid, incoming_rev, prev.rev, 450 464 }); 451 465 _ = sub.bc.stats.skipped.fetchAdd(1, .monotonic); 452 466 return; 467 + } 468 + 469 + // chain continuity: since should match stored rev 470 + if (payload.getString("since")) |since| { 471 + if (!std.mem.eql(u8, since, prev.rev)) { 472 + log.info("host {s}: chain break uid={d} since={s} stored_rev={s}", .{ 473 + sub.options.hostname, uid, since, prev.rev, 474 + }); 475 + _ = sub.bc.stats.chain_breaks.fetchAdd(1, .monotonic); 476 + } 477 + } 478 + 479 + // chain continuity: prevData CID should match stored data_cid 480 + if (payload.get("prevData")) |pd| { 481 + if (pd == .cid) { 482 + const prev_data_encoded = zat.multibase.encode(alloc, .base32lower, pd.cid.raw) catch ""; 483 + if (prev_data_encoded.len > 0 and prev.data_cid.len > 0 and 484 + !std.mem.eql(u8, prev_data_encoded, prev.data_cid)) 485 + { 486 + log.info("host {s}: chain break uid={d} prevData mismatch", .{ 487 + sub.options.hostname, uid, 488 + }); 489 + _ = sub.bc.stats.chain_breaks.fetchAdd(1, .monotonic); 490 + } 491 + } 453 492 } 454 493 } 455 494 } ··· 516 555 zat.multibase.encode(alloc, .base32lower, cid_raw) catch "" 517 556 else 518 557 ""; 519 - dp.updateAccountState(uid, rev, cid_str) catch |err| { 558 + if (dp.updateAccountState(uid, rev, cid_str)) |updated| { 559 + if (!updated) { 560 + log.debug("host {s}: stale state update lost race uid={d} rev={s}", .{ 561 + sub.options.hostname, uid, rev, 562 + }); 563 + } 564 + } else |err| { 520 565 log.debug("account state update failed: {s}", .{@errorName(err)}); 521 - }; 566 + } 522 567 } 523 568 } 524 569 } else {
+113 -13
src/validator.zig
··· 285 285 }; 286 286 } 287 287 288 - /// extract ops from payload and convert to mst.Operation array 288 + /// extract ops from payload and convert to mst.Operation array. 289 + /// the firehose format uses a single "path" field ("collection/rkey"), 290 + /// not separate "collection"/"rkey" fields. 289 291 fn extractOps(self: *Validator, alloc: Allocator, payload: zat.cbor.Value) ?[]const zat.MstOperation { 290 292 _ = self; 291 293 const ops_array = payload.getArray("ops") orelse return null; 292 294 var ops: std.ArrayListUnmanaged(zat.MstOperation) = .{}; 293 295 for (ops_array) |op| { 294 296 const action = op.getString("action") orelse continue; 295 - const collection = op.getString("collection") orelse continue; 296 - const rkey = op.getString("rkey") orelse continue; 297 + const path = op.getString("path") orelse continue; 297 298 298 - // build path: "collection/rkey" 299 - const path = std.fmt.allocPrint(alloc, "{s}/{s}", .{ collection, rkey }) catch return null; 299 + // validate path contains "/" (collection/rkey) 300 + if (std.mem.indexOfScalar(u8, path, '/') == null) continue; 300 301 301 302 // extract CID values 302 303 const cid_value: ?[]const u8 = if (op.get("cid")) |v| switch (v) { ··· 311 312 value = cid_value; 312 313 } else if (std.mem.eql(u8, action, "update")) { 313 314 value = cid_value; 314 - // prev is extracted from the MST during inversion, not from payload 315 - // for update ops, we need both value and prev — prev comes from prevData chain 316 315 prev = if (op.get("prev")) |v| switch (v) { 317 316 .cid => |c| c.raw, 318 317 else => null, ··· 350 349 switch (ops_value) { 351 350 .array => |ops| { 352 351 if (ops.len > self.config.max_ops) return error.InvalidFrame; 353 - // validate each op has valid collection/rkey 352 + // validate each op has valid path (collection/rkey) 354 353 for (ops) |op| { 355 - if (op.getString("collection")) |coll| { 356 - if (zat.Nsid.parse(coll) == null) return error.InvalidFrame; 357 - } 358 - if (op.getString("rkey")) |rk| { 359 - if (zat.Rkey.parse(rk) == null) return error.InvalidFrame; 354 + if (op.getString("path")) |path| { 355 + if (std.mem.indexOfScalar(u8, path, '/')) |sep| { 356 + const collection = path[0..sep]; 357 + const rkey = path[sep + 1 ..]; 358 + if (zat.Nsid.parse(collection) == null) return error.InvalidFrame; 359 + if (rkey.len > 0) { 360 + if (zat.Rkey.parse(rkey) == null) return error.InvalidFrame; 361 + } 362 + } else return error.InvalidFrame; // path must contain '/' 360 363 } 361 364 } 362 365 }, ··· 782 785 // should pass size check — will be a cache miss → skipped (no cached key) 783 786 try std.testing.expect(result.valid); 784 787 try std.testing.expect(result.skipped); 788 + } 789 + 790 + test "extractOps reads path field from firehose format" { 791 + var stats = broadcaster.Stats{}; 792 + var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }); 793 + defer v.deinit(); 794 + 795 + // use arena since extractOps allocates an ArrayList internally 796 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 797 + defer arena.deinit(); 798 + 799 + const ops = [_]zat.cbor.Value{ 800 + .{ .map = &.{ 801 + .{ .key = "action", .value = .{ .text = "create" } }, 802 + .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } }, 803 + .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } }, 804 + } }, 805 + .{ .map = &.{ 806 + .{ .key = "action", .value = .{ .text = "delete" } }, 807 + .{ .key = "path", .value = .{ .text = "app.bsky.feed.like/3k2def000000" } }, 808 + } }, 809 + }; 810 + 811 + const payload: zat.cbor.Value = .{ .map = &.{ 812 + .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 813 + .{ .key = "ops", .value = .{ .array = &ops } }, 814 + } }; 815 + 816 + const result = v.extractOps(arena.allocator(), payload); 817 + try std.testing.expect(result != null); 818 + try std.testing.expectEqual(@as(usize, 2), result.?.len); 819 + try std.testing.expectEqualStrings("app.bsky.feed.post/3k2abc000000", result.?[0].path); 820 + try std.testing.expectEqualStrings("app.bsky.feed.like/3k2def000000", result.?[1].path); 821 + try std.testing.expect(result.?[0].value != null); // create has cid 822 + try std.testing.expect(result.?[1].value == null); // delete has no cid 823 + } 824 + 825 + test "extractOps rejects malformed path without slash" { 826 + var stats = broadcaster.Stats{}; 827 + var v = Validator.initWithConfig(std.testing.allocator, &stats, .{ .verify_commit_diff = true }); 828 + defer v.deinit(); 829 + 830 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 831 + defer arena.deinit(); 832 + 833 + const ops = [_]zat.cbor.Value{ 834 + .{ .map = &.{ 835 + .{ .key = "action", .value = .{ .text = "create" } }, 836 + .{ .key = "path", .value = .{ .text = "noslash" } }, 837 + .{ .key = "cid", .value = .{ .cid = .{ .raw = "fakecid12345" } } }, 838 + } }, 839 + }; 840 + 841 + const payload: zat.cbor.Value = .{ .map = &.{ 842 + .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 843 + .{ .key = "ops", .value = .{ .array = &ops } }, 844 + } }; 845 + 846 + // malformed path (no slash) → all ops skipped → returns null 847 + const result = v.extractOps(arena.allocator(), payload); 848 + try std.testing.expect(result == null); 849 + } 850 + 851 + test "checkCommitStructure validates path field" { 852 + var stats = broadcaster.Stats{}; 853 + var v = Validator.init(std.testing.allocator, &stats); 854 + defer v.deinit(); 855 + 856 + // valid path 857 + const valid_ops = [_]zat.cbor.Value{ 858 + .{ .map = &.{ 859 + .{ .key = "action", .value = .{ .text = "create" } }, 860 + .{ .key = "path", .value = .{ .text = "app.bsky.feed.post/3k2abc000000" } }, 861 + } }, 862 + }; 863 + 864 + const valid_payload: zat.cbor.Value = .{ .map = &.{ 865 + .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 866 + .{ .key = "ops", .value = .{ .array = &valid_ops } }, 867 + } }; 868 + 869 + try v.checkCommitStructure(valid_payload); 870 + 871 + // invalid collection in path 872 + const invalid_ops = [_]zat.cbor.Value{ 873 + .{ .map = &.{ 874 + .{ .key = "action", .value = .{ .text = "create" } }, 875 + .{ .key = "path", .value = .{ .text = "not-an-nsid/3k2abc000000" } }, 876 + } }, 877 + }; 878 + 879 + const invalid_payload: zat.cbor.Value = .{ .map = &.{ 880 + .{ .key = "repo", .value = .{ .text = "did:plc:test123" } }, 881 + .{ .key = "ops", .value = .{ .array = &invalid_ops } }, 882 + } }; 883 + 884 + try std.testing.expectError(error.InvalidFrame, v.checkCommitStructure(invalid_payload)); 785 885 } 786 886 787 887 test "queueResolve deduplicates repeated DIDs" {