atproto utils for zig zat.dev
atproto sdk zig

fix: align firehose event types with sync spec

- add missing CommitEvent fields: since, commit CID, blobs
- add cid to RepoOp for downstream CID verification
- make rev, time required on CommitEvent (spec: required)
- make time required on IdentityEvent and AccountEvent (spec: required)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+91 -51
+91 -51
src/internal/firehose.zig
··· 49 pub const CommitEvent = struct { 50 seq: i64, 51 repo: []const u8, // DID 52 - rev: ?[]const u8 = null, 53 - time: ?[]const u8 = null, 54 ops: []const RepoOp, 55 too_big: bool = false, 56 }; 57 ··· 59 action: CommitAction, 60 collection: []const u8, 61 rkey: []const u8, 62 record: ?cbor.Value = null, // decoded DAG-CBOR record from CAR block 63 }; 64 65 pub const IdentityEvent = struct { 66 seq: i64, 67 did: []const u8, 68 - time: ?[]const u8 = null, 69 handle: ?[]const u8 = null, 70 }; 71 72 pub const AccountEvent = struct { 73 seq: i64, 74 did: []const u8, 75 - time: ?[]const u8 = null, 76 active: bool = true, 77 status: ?AccountStatus = null, 78 }; ··· 137 fn decodeCommit(allocator: Allocator, payload: cbor.Value) DecodeError!Event { 138 const seq_val = payload.getInt("seq") orelse return error.MissingField; 139 const repo = payload.getString("repo") orelse return error.MissingField; 140 141 // parse CAR blocks 142 const blocks_bytes = payload.getBytes("blocks"); ··· 160 const collection = path[0..slash]; 161 const rkey = path[slash + 1 ..]; 162 163 - // look up record from CAR blocks via CID 164 var record: ?cbor.Value = null; 165 - if (parsed_car) |c| { 166 - if (op_val.get("cid")) |cid_val| { 167 - switch (cid_val) { 168 - .cid => |cid| { 169 if (car.findBlock(c, cid.raw)) |block_data| { 170 record = cbor.decodeAll(allocator, block_data) catch null; 171 } 172 - }, 173 - else => {}, 174 - } 175 } 176 } 177 ··· 179 .action = action, 180 .collection = collection, 181 .rkey = rkey, 182 .record = record, 183 }); 184 } ··· 187 return .{ .commit = .{ 188 .seq = seq_val, 189 .repo = repo, 190 - .rev = payload.getString("rev"), 191 - .time = payload.getString("time"), 192 .ops = try ops.toOwnedSlice(allocator), 193 .too_big = payload.getBool("tooBig") orelse false, 194 } }; 195 } ··· 198 return .{ .identity = .{ 199 .seq = payload.getInt("seq") orelse return error.MissingField, 200 .did = payload.getString("did") orelse return error.MissingField, 201 - .time = payload.getString("time"), 202 .handle = payload.getString("handle"), 203 } }; 204 } ··· 208 return .{ .account = .{ 209 .seq = payload.getInt("seq") orelse return error.MissingField, 210 .did = payload.getString("did") orelse return error.MissingField, 211 - .time = payload.getString("time"), 212 .active = payload.getBool("active") orelse true, 213 .status = if (status_str) |s| AccountStatus.parse(s) else null, 214 } }; ··· 293 .blocks = car_blocks.items, 294 }; 295 const blocks_bytes = try car.writeAlloc(allocator, car_data); 296 297 // build payload entries 298 var entries: std.ArrayList(cbor.Value.MapEntry) = .{}; 299 defer entries.deinit(allocator); 300 301 - if (blocks_bytes.len > 0) { 302 - try entries.append(allocator, .{ .key = "blocks", .value = .{ .bytes = blocks_bytes } }); 303 } 304 try entries.append(allocator, .{ .key = "ops", .value = .{ .array = op_values.items } }); 305 try entries.append(allocator, .{ .key = "repo", .value = .{ .text = commit.repo } }); 306 - if (commit.rev) |rev| { 307 - try entries.append(allocator, .{ .key = "rev", .value = .{ .text = rev } }); 308 - } 309 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(commit.seq) } }); 310 - if (commit.time) |t| { 311 - try entries.append(allocator, .{ .key = "time", .value = .{ .text = t } }); 312 } 313 if (commit.too_big) { 314 try entries.append(allocator, .{ .key = "tooBig", .value = .{ .boolean = true } }); 315 } ··· 326 try entries.append(allocator, .{ .key = "handle", .value = .{ .text = h } }); 327 } 328 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(identity.seq) } }); 329 - if (identity.time) |t| { 330 - try entries.append(allocator, .{ .key = "time", .value = .{ .text = t } }); 331 - } 332 333 try cbor.encode(allocator, writer, .{ .map = entries.items }); 334 } ··· 345 if (account.status) |s| { 346 try entries.append(allocator, .{ .key = "status", .value = .{ .text = @tagName(s) } }); 347 } 348 - if (account.time) |t| { 349 - try entries.append(allocator, .{ .key = "time", .value = .{ .text = t } }); 350 - } 351 352 try cbor.encode(allocator, writer, .{ .map = entries.items }); 353 } ··· 502 defer arena.deinit(); 503 const alloc = arena.allocator(); 504 505 - // header: {op: 1, t: "#identity"} 506 - const header_bytes = [_]u8{ 507 - 0xa2, // map(2) 508 - 0x62, 'o', 'p', 0x01, // "op": 1 509 - 0x61, 't', 0x69, '#', 'i', 'd', 'e', 'n', 't', 'i', 't', 'y', // "t": "#identity" 510 - }; 511 - // payload: {seq: 42, did: "did:plc:test"} 512 - const payload_bytes = [_]u8{ 513 - 0xa2, // map(2) 514 - 0x63, 's', 'e', 'q', 0x18, 42, // "seq": 42 515 - 0x63, 'd', 'i', 'd', 0x6c, 'd', 'i', 'd', ':', 'p', 'l', 'c', ':', 't', 'e', 's', 't', // "did": "did:plc:test" 516 - }; 517 518 - var frame: [header_bytes.len + payload_bytes.len]u8 = undefined; 519 - @memcpy(frame[0..header_bytes.len], &header_bytes); 520 - @memcpy(frame[header_bytes.len..], &payload_bytes); 521 - 522 - const event = try decodeFrame(alloc, &frame); 523 const identity = event.identity; 524 try std.testing.expectEqual(@as(i64, 42), identity.seq); 525 try std.testing.expectEqualStrings("did:plc:test", identity.did); 526 } 527 528 test "Event.seq works" { ··· 532 const identity_event = Event{ .identity = .{ 533 .seq = 42, 534 .did = "did:plc:test", 535 } }; 536 try std.testing.expectEqual(@as(i64, 42), identity_event.seq().?); 537 } ··· 563 const original = Event{ .identity = .{ 564 .seq = 42, 565 .did = "did:plc:test123", 566 - .handle = "alice.bsky.social", 567 .time = "2024-01-15T10:30:00Z", 568 } }; 569 570 const frame = try encodeFrame(alloc, original); ··· 573 const id = decoded.identity; 574 try std.testing.expectEqual(@as(i64, 42), id.seq); 575 try std.testing.expectEqualStrings("did:plc:test123", id.did); 576 try std.testing.expectEqualStrings("alice.bsky.social", id.handle.?); 577 - try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", id.time.?); 578 } 579 580 test "encode → decode account frame" { ··· 585 const original = Event{ .account = .{ 586 .seq = 100, 587 .did = "did:plc:suspended", 588 .active = false, 589 .status = .suspended, 590 - .time = "2024-01-15T10:30:00Z", 591 } }; 592 593 const frame = try encodeFrame(alloc, original); ··· 596 const acct = decoded.account; 597 try std.testing.expectEqual(@as(i64, 100), acct.seq); 598 try std.testing.expectEqualStrings("did:plc:suspended", acct.did); 599 try std.testing.expectEqual(false, acct.active); 600 try std.testing.expectEqual(AccountStatus.suspended, acct.status.?); 601 } ··· 613 const original = Event{ .commit = .{ 614 .seq = 999, 615 .repo = "did:plc:poster", 616 - .rev = "abc123", 617 .time = "2024-01-15T10:30:00Z", 618 .ops = &.{.{ 619 .action = .create, 620 .collection = "app.bsky.feed.post", ··· 629 const commit = decoded.commit; 630 try std.testing.expectEqual(@as(i64, 999), commit.seq); 631 try std.testing.expectEqualStrings("did:plc:poster", commit.repo); 632 - try std.testing.expectEqualStrings("abc123", commit.rev.?); 633 try std.testing.expectEqual(@as(usize, 1), commit.ops.len); 634 635 const op = commit.ops[0]; 636 try std.testing.expectEqual(CommitAction.create, op.action); 637 try std.testing.expectEqualStrings("app.bsky.feed.post", op.collection); 638 try std.testing.expectEqualStrings("3k2abc", op.rkey); 639 640 // record should be decoded from the CAR blocks 641 const rec = op.record.?; ··· 651 const original = Event{ .commit = .{ 652 .seq = 500, 653 .repo = "did:plc:deleter", 654 .ops = &.{.{ 655 .action = .delete, 656 .collection = "app.bsky.feed.post", ··· 663 const decoded = try decodeFrame(alloc, frame); 664 665 try std.testing.expectEqual(@as(i64, 500), decoded.commit.seq); 666 try std.testing.expectEqual(@as(usize, 1), decoded.commit.ops.len); 667 try std.testing.expectEqual(CommitAction.delete, decoded.commit.ops[0].action); 668 try std.testing.expect(decoded.commit.ops[0].record == null); 669 }
··· 49 pub const CommitEvent = struct { 50 seq: i64, 51 repo: []const u8, // DID 52 + rev: []const u8, // TID — revision of the commit 53 + time: []const u8, // datetime — when event was received 54 + since: ?[]const u8 = null, // TID — rev of preceding commit (null = full repo export) 55 + commit: ?cbor.Cid = null, // CID of the commit object 56 ops: []const RepoOp, 57 + blobs: []const cbor.Cid = &.{}, // new blobs referenced by records in this commit 58 too_big: bool = false, 59 }; 60 ··· 62 action: CommitAction, 63 collection: []const u8, 64 rkey: []const u8, 65 + cid: ?cbor.Cid = null, // CID of the record (null for deletes) 66 record: ?cbor.Value = null, // decoded DAG-CBOR record from CAR block 67 }; 68 69 pub const IdentityEvent = struct { 70 seq: i64, 71 did: []const u8, 72 + time: []const u8, // datetime — when event was received 73 handle: ?[]const u8 = null, 74 }; 75 76 pub const AccountEvent = struct { 77 seq: i64, 78 did: []const u8, 79 + time: []const u8, // datetime — when event was received 80 active: bool = true, 81 status: ?AccountStatus = null, 82 }; ··· 141 fn decodeCommit(allocator: Allocator, payload: cbor.Value) DecodeError!Event { 142 const seq_val = payload.getInt("seq") orelse return error.MissingField; 143 const repo = payload.getString("repo") orelse return error.MissingField; 144 + const rev = payload.getString("rev") orelse return error.MissingField; 145 + const time = payload.getString("time") orelse return error.MissingField; 146 + 147 + // parse commit CID 148 + var commit_cid: ?cbor.Cid = null; 149 + if (payload.get("commit")) |commit_val| { 150 + switch (commit_val) { 151 + .cid => |c| commit_cid = c, 152 + else => {}, 153 + } 154 + } 155 + 156 + // parse blobs array (array of CID links) 157 + var blobs: std.ArrayList(cbor.Cid) = .{}; 158 + if (payload.getArray("blobs")) |blob_values| { 159 + for (blob_values) |blob_val| { 160 + switch (blob_val) { 161 + .cid => |c| try blobs.append(allocator, c), 162 + else => {}, 163 + } 164 + } 165 + } 166 167 // parse CAR blocks 168 const blocks_bytes = payload.getBytes("blocks"); ··· 186 const collection = path[0..slash]; 187 const rkey = path[slash + 1 ..]; 188 189 + // extract CID from op and look up record from CAR blocks 190 + var op_cid: ?cbor.Cid = null; 191 var record: ?cbor.Value = null; 192 + if (op_val.get("cid")) |cid_val| { 193 + switch (cid_val) { 194 + .cid => |cid| { 195 + op_cid = cid; 196 + if (parsed_car) |c| { 197 if (car.findBlock(c, cid.raw)) |block_data| { 198 record = cbor.decodeAll(allocator, block_data) catch null; 199 } 200 + } 201 + }, 202 + else => {}, 203 } 204 } 205 ··· 207 .action = action, 208 .collection = collection, 209 .rkey = rkey, 210 + .cid = op_cid, 211 .record = record, 212 }); 213 } ··· 216 return .{ .commit = .{ 217 .seq = seq_val, 218 .repo = repo, 219 + .rev = rev, 220 + .time = time, 221 + .since = payload.getString("since"), 222 + .commit = commit_cid, 223 .ops = try ops.toOwnedSlice(allocator), 224 + .blobs = try blobs.toOwnedSlice(allocator), 225 .too_big = payload.getBool("tooBig") orelse false, 226 } }; 227 } ··· 230 return .{ .identity = .{ 231 .seq = payload.getInt("seq") orelse return error.MissingField, 232 .did = payload.getString("did") orelse return error.MissingField, 233 + .time = payload.getString("time") orelse return error.MissingField, 234 .handle = payload.getString("handle"), 235 } }; 236 } ··· 240 return .{ .account = .{ 241 .seq = payload.getInt("seq") orelse return error.MissingField, 242 .did = payload.getString("did") orelse return error.MissingField, 243 + .time = payload.getString("time") orelse return error.MissingField, 244 .active = payload.getBool("active") orelse true, 245 .status = if (status_str) |s| AccountStatus.parse(s) else null, 246 } }; ··· 325 .blocks = car_blocks.items, 326 }; 327 const blocks_bytes = try car.writeAlloc(allocator, car_data); 328 + 329 + // build blobs array 330 + var blob_values: std.ArrayList(cbor.Value) = .{}; 331 + defer blob_values.deinit(allocator); 332 + for (commit.blobs) |blob| { 333 + try blob_values.append(allocator, .{ .cid = blob }); 334 + } 335 336 // build payload entries 337 var entries: std.ArrayList(cbor.Value.MapEntry) = .{}; 338 defer entries.deinit(allocator); 339 340 + try entries.append(allocator, .{ .key = "blocks", .value = .{ .bytes = blocks_bytes } }); 341 + if (commit.commit) |c| { 342 + try entries.append(allocator, .{ .key = "commit", .value = .{ .cid = c } }); 343 } 344 + try entries.append(allocator, .{ .key = "blobs", .value = .{ .array = blob_values.items } }); 345 try entries.append(allocator, .{ .key = "ops", .value = .{ .array = op_values.items } }); 346 try entries.append(allocator, .{ .key = "repo", .value = .{ .text = commit.repo } }); 347 + try entries.append(allocator, .{ .key = "rev", .value = .{ .text = commit.rev } }); 348 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(commit.seq) } }); 349 + if (commit.since) |s| { 350 + try entries.append(allocator, .{ .key = "since", .value = .{ .text = s } }); 351 } 352 + try entries.append(allocator, .{ .key = "time", .value = .{ .text = commit.time } }); 353 if (commit.too_big) { 354 try entries.append(allocator, .{ .key = "tooBig", .value = .{ .boolean = true } }); 355 } ··· 366 try entries.append(allocator, .{ .key = "handle", .value = .{ .text = h } }); 367 } 368 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(identity.seq) } }); 369 + try entries.append(allocator, .{ .key = "time", .value = .{ .text = identity.time } }); 370 371 try cbor.encode(allocator, writer, .{ .map = entries.items }); 372 } ··· 383 if (account.status) |s| { 384 try entries.append(allocator, .{ .key = "status", .value = .{ .text = @tagName(s) } }); 385 } 386 + try entries.append(allocator, .{ .key = "time", .value = .{ .text = account.time } }); 387 388 try cbor.encode(allocator, writer, .{ .map = entries.items }); 389 } ··· 538 defer arena.deinit(); 539 const alloc = arena.allocator(); 540 541 + // build frame via encoder for cleaner test 542 + const original = Event{ .identity = .{ 543 + .seq = 42, 544 + .did = "did:plc:test", 545 + .time = "2024-01-15T10:30:00Z", 546 + } }; 547 + const frame = try encodeFrame(alloc, original); 548 549 + const event = try decodeFrame(alloc, frame); 550 const identity = event.identity; 551 try std.testing.expectEqual(@as(i64, 42), identity.seq); 552 try std.testing.expectEqualStrings("did:plc:test", identity.did); 553 + try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", identity.time); 554 } 555 556 test "Event.seq works" { ··· 560 const identity_event = Event{ .identity = .{ 561 .seq = 42, 562 .did = "did:plc:test", 563 + .time = "2024-01-15T10:30:00Z", 564 } }; 565 try std.testing.expectEqual(@as(i64, 42), identity_event.seq().?); 566 } ··· 592 const original = Event{ .identity = .{ 593 .seq = 42, 594 .did = "did:plc:test123", 595 .time = "2024-01-15T10:30:00Z", 596 + .handle = "alice.bsky.social", 597 } }; 598 599 const frame = try encodeFrame(alloc, original); ··· 602 const id = decoded.identity; 603 try std.testing.expectEqual(@as(i64, 42), id.seq); 604 try std.testing.expectEqualStrings("did:plc:test123", id.did); 605 + try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", id.time); 606 try std.testing.expectEqualStrings("alice.bsky.social", id.handle.?); 607 } 608 609 test "encode → decode account frame" { ··· 614 const original = Event{ .account = .{ 615 .seq = 100, 616 .did = "did:plc:suspended", 617 + .time = "2024-01-15T10:30:00Z", 618 .active = false, 619 .status = .suspended, 620 } }; 621 622 const frame = try encodeFrame(alloc, original); ··· 625 const acct = decoded.account; 626 try std.testing.expectEqual(@as(i64, 100), acct.seq); 627 try std.testing.expectEqualStrings("did:plc:suspended", acct.did); 628 + try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", acct.time); 629 try std.testing.expectEqual(false, acct.active); 630 try std.testing.expectEqual(AccountStatus.suspended, acct.status.?); 631 } ··· 643 const original = Event{ .commit = .{ 644 .seq = 999, 645 .repo = "did:plc:poster", 646 + .rev = "3k2abc000000", 647 .time = "2024-01-15T10:30:00Z", 648 + .since = "3k2abd000000", 649 .ops = &.{.{ 650 .action = .create, 651 .collection = "app.bsky.feed.post", ··· 660 const commit = decoded.commit; 661 try std.testing.expectEqual(@as(i64, 999), commit.seq); 662 try std.testing.expectEqualStrings("did:plc:poster", commit.repo); 663 + try std.testing.expectEqualStrings("3k2abc000000", commit.rev); 664 + try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", commit.time); 665 + try std.testing.expectEqualStrings("3k2abd000000", commit.since.?); 666 + try std.testing.expectEqual(@as(usize, 0), commit.blobs.len); 667 try std.testing.expectEqual(@as(usize, 1), commit.ops.len); 668 669 const op = commit.ops[0]; 670 try std.testing.expectEqual(CommitAction.create, op.action); 671 try std.testing.expectEqualStrings("app.bsky.feed.post", op.collection); 672 try std.testing.expectEqualStrings("3k2abc", op.rkey); 673 + try std.testing.expect(op.cid != null); 674 675 // record should be decoded from the CAR blocks 676 const rec = op.record.?; ··· 686 const original = Event{ .commit = .{ 687 .seq = 500, 688 .repo = "did:plc:deleter", 689 + .rev = "3k2xyz000000", 690 + .time = "2024-01-15T10:30:00Z", 691 .ops = &.{.{ 692 .action = .delete, 693 .collection = "app.bsky.feed.post", ··· 700 const decoded = try decodeFrame(alloc, frame); 701 702 try std.testing.expectEqual(@as(i64, 500), decoded.commit.seq); 703 + try std.testing.expectEqualStrings("3k2xyz000000", decoded.commit.rev); 704 + try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", decoded.commit.time); 705 try std.testing.expectEqual(@as(usize, 1), decoded.commit.ops.len); 706 try std.testing.expectEqual(CommitAction.delete, decoded.commit.ops[0].action); 707 + try std.testing.expect(decoded.commit.ops[0].cid == null); 708 try std.testing.expect(decoded.commit.ops[0].record == null); 709 }