atproto utils for zig zat.dev
atproto sdk zig

feat: add firehose support with DAG-CBOR, CAR, and CID codecs

full encode/decode for the com.atproto.sync.subscribeRepos firehose:
- DAG-CBOR codec with deterministic key sorting and shortest integer encoding
- CID creation (SHA-256 hashing → CIDv1) and parsing
- CAR v1 reader/writer with root extraction and block lookup
- firehose frame encoder/decoder with WebSocket client
- 47 tests including real-record CID verification against production data

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+2075
+351
src/internal/car.zig
··· 1 + //! CAR v1 codec (Content Addressable aRchive) 2 + //! 3 + //! read and write CAR v1 files used in AT Protocol firehose commit events. 4 + //! the `blocks` field of a #commit payload is a CAR file containing 5 + //! the signed commit, MST nodes, and record data. 6 + //! 7 + //! format: [varint header_len] [DAG-CBOR header] [varint block_len] [CID] [data] ... 8 + //! 9 + //! see: https://ipld.io/specs/transport/car/carv1/ 10 + 11 + const std = @import("std"); 12 + const cbor = @import("cbor.zig"); 13 + 14 + const Allocator = std.mem.Allocator; 15 + 16 + /// a single block from a CAR file 17 + pub const Block = struct { 18 + cid_raw: []const u8, // raw CID bytes (for matching against op CIDs) 19 + data: []const u8, // block content (DAG-CBOR encoded) 20 + }; 21 + 22 + /// parsed CAR file 23 + pub const Car = struct { 24 + roots: []const cbor.Cid, 25 + blocks: []const Block, 26 + }; 27 + 28 + pub const CarError = error{ 29 + InvalidHeader, 30 + InvalidVarint, 31 + InvalidCid, 32 + UnexpectedEof, 33 + OutOfMemory, 34 + }; 35 + 36 + /// parse a CAR v1 file from raw bytes 37 + pub fn read(allocator: Allocator, data: []const u8) CarError!Car { 38 + var pos: usize = 0; 39 + 40 + // read header length (unsigned varint) 41 + const header_len = cbor.readUvarint(data, &pos) orelse return error.InvalidVarint; 42 + const header_len_usize = std.math.cast(usize, header_len) orelse return error.InvalidHeader; 43 + const header_end = pos + header_len_usize; 44 + if (header_end > data.len) return error.UnexpectedEof; 45 + 46 + // decode header (DAG-CBOR map with "version" and "roots") 47 + const header_bytes = data[pos..header_end]; 48 + const header = cbor.decodeAll(allocator, header_bytes) catch return error.InvalidHeader; 49 + 50 + // extract roots (array of CID links) 51 + var roots: std.ArrayList(cbor.Cid) = .{}; 52 + if (header.getArray("roots")) |root_values| { 53 + for (root_values) |root_val| { 54 + switch (root_val) { 55 + .cid => |c| try roots.append(allocator, c), 56 + else => {}, 57 + } 58 + } 59 + } 60 + 61 + pos = header_end; 62 + 63 + // read blocks 64 + var blocks: std.ArrayList(Block) = .{}; 65 + 66 + while (pos < data.len) { 67 + // block: [varint total_len] [CID bytes] [data bytes] 68 + // total_len includes both CID and data 69 + const block_len = cbor.readUvarint(data, &pos) orelse return error.InvalidVarint; 70 + const block_len_usize = std.math.cast(usize, block_len) orelse return error.InvalidHeader; 71 + const block_end = pos + block_len_usize; 72 + if (block_end > data.len) return error.UnexpectedEof; 73 + 74 + const block_data = data[pos..block_end]; 75 + 76 + // parse CID to determine its length, then the rest is block content 77 + const cid_len = cidLength(block_data) orelse return error.InvalidCid; 78 + if (cid_len > block_data.len) return error.InvalidCid; 79 + 80 + try blocks.append(allocator, .{ 81 + .cid_raw = block_data[0..cid_len], 82 + .data = block_data[cid_len..], 83 + }); 84 + 85 + pos = block_end; 86 + } 87 + 88 + return .{ 89 + .roots = try roots.toOwnedSlice(allocator), 90 + .blocks = try blocks.toOwnedSlice(allocator), 91 + }; 92 + } 93 + 94 + /// determine the byte length of a CID at the start of data 95 + fn cidLength(data: []const u8) ?usize { 96 + if (data.len < 2) return null; 97 + 98 + // CIDv0: starts with 0x12 0x20 (sha2-256 multihash, 32 byte digest) 99 + if (data[0] == 0x12 and data[1] == 0x20) { 100 + return 34; // 1 + 1 + 32 101 + } 102 + 103 + // CIDv1: version varint + codec varint + multihash (hash_fn varint + digest_len varint + digest) 104 + var pos: usize = 0; 105 + _ = cbor.readUvarint(data, &pos) orelse return null; // version 106 + _ = cbor.readUvarint(data, &pos) orelse return null; // codec 107 + _ = cbor.readUvarint(data, &pos) orelse return null; // hash function 108 + const digest_len = cbor.readUvarint(data, &pos) orelse return null; 109 + 110 + const digest_len_usize = std.math.cast(usize, digest_len) orelse return null; 111 + return pos + digest_len_usize; 112 + } 113 + 114 + /// find a block by matching CID bytes 115 + pub fn findBlock(c: Car, cid_raw: []const u8) ?[]const u8 { 116 + for (c.blocks) |block| { 117 + if (std.mem.eql(u8, block.cid_raw, cid_raw)) return block.data; 118 + } 119 + return null; 120 + } 121 + 122 + // === writer === 123 + 124 + /// write a CAR v1 file to the given writer. 125 + /// produces: [varint header_len] [DAG-CBOR header] [blocks...] 126 + /// where each block is: [varint block_len] [CID bytes] [data bytes] 127 + pub fn write(allocator: Allocator, writer: anytype, c: Car) !void { 128 + // build header: {"roots": [...CID links...], "version": 1} 129 + var root_values: std.ArrayList(cbor.Value) = .{}; 130 + defer root_values.deinit(allocator); 131 + for (c.roots) |root| { 132 + try root_values.append(allocator, .{ .cid = root }); 133 + } 134 + 135 + const header_value: cbor.Value = .{ .map = &.{ 136 + .{ .key = "roots", .value = .{ .array = root_values.items } }, 137 + .{ .key = "version", .value = .{ .unsigned = 1 } }, 138 + } }; 139 + 140 + // encode header to bytes 141 + const header_bytes = try cbor.encodeAlloc(allocator, header_value); 142 + defer allocator.free(header_bytes); 143 + 144 + // write header length + header 145 + try cbor.writeUvarint(writer, header_bytes.len); 146 + try writer.writeAll(header_bytes); 147 + 148 + // write blocks 149 + for (c.blocks) |block| { 150 + const block_len = block.cid_raw.len + block.data.len; 151 + try cbor.writeUvarint(writer, block_len); 152 + try writer.writeAll(block.cid_raw); 153 + try writer.writeAll(block.data); 154 + } 155 + } 156 + 157 + /// write a CAR v1 file to a freshly allocated byte slice 158 + pub fn writeAlloc(allocator: Allocator, c: Car) ![]u8 { 159 + var list: std.ArrayList(u8) = .{}; 160 + errdefer list.deinit(allocator); 161 + try write(allocator, list.writer(allocator), c); 162 + return try list.toOwnedSlice(allocator); 163 + } 164 + 165 + // === tests === 166 + 167 + test "cidLength CIDv0" { 168 + // sha2-256 multihash: 0x12 0x20 + 32 bytes 169 + var data: [34]u8 = undefined; 170 + data[0] = 0x12; 171 + data[1] = 0x20; 172 + @memset(data[2..], 0xaa); 173 + 174 + try std.testing.expectEqual(@as(?usize, 34), cidLength(&data)); 175 + } 176 + 177 + test "cidLength CIDv1" { 178 + // CIDv1: version=1, codec=0x71 (dag-cbor), sha2-256 (0x12), 32-byte digest 179 + const data = [_]u8{ 180 + 0x01, // version varint 181 + 0x71, // codec varint (dag-cbor) 182 + 0x12, // hash fn varint (sha2-256) 183 + 0x20, // digest len varint (32) 184 + } ++ [_]u8{0xaa} ** 32; 185 + 186 + try std.testing.expectEqual(@as(?usize, 36), cidLength(&data)); 187 + } 188 + 189 + test "read minimal CAR" { 190 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 191 + defer arena.deinit(); 192 + const alloc = arena.allocator(); 193 + 194 + // construct a minimal CAR v1 file: 195 + // header: DAG-CBOR {"version": 1, "roots": []} 196 + const header_cbor = [_]u8{ 197 + 0xa2, // map(2) 198 + 0x67, 'v', 'e', 'r', 's', 'i', 'o', 'n', 0x01, // "version": 1 199 + 0x65, 'r', 'o', 'o', 't', 's', 0x80, // "roots": [] 200 + }; 201 + 202 + // one block: CIDv1 (dag-cbor, sha2-256) + CBOR data 203 + const cid_prefix = [_]u8{ 204 + 0x01, // version 205 + 0x71, // dag-cbor 206 + 0x12, // sha2-256 207 + 0x20, // 32-byte digest 208 + }; 209 + const digest = [_]u8{0xaa} ** 32; 210 + const block_content = [_]u8{ 211 + 0xa1, // map(1) 212 + 0x64, 't', 'e', 'x', 't', // "text" 213 + 0x62, 'h', 'i', // "hi" 214 + }; 215 + 216 + // assemble the CAR file 217 + var car_buf: [256]u8 = undefined; 218 + var car_pos: usize = 0; 219 + 220 + // header length varint 221 + car_buf[car_pos] = @intCast(header_cbor.len); 222 + car_pos += 1; 223 + 224 + // header 225 + @memcpy(car_buf[car_pos..][0..header_cbor.len], &header_cbor); 226 + car_pos += header_cbor.len; 227 + 228 + // block length varint (CID + content) 229 + const block_total_len = cid_prefix.len + digest.len + block_content.len; 230 + car_buf[car_pos] = @intCast(block_total_len); 231 + car_pos += 1; 232 + 233 + // CID 234 + @memcpy(car_buf[car_pos..][0..cid_prefix.len], &cid_prefix); 235 + car_pos += cid_prefix.len; 236 + @memcpy(car_buf[car_pos..][0..digest.len], &digest); 237 + car_pos += digest.len; 238 + 239 + // block content 240 + @memcpy(car_buf[car_pos..][0..block_content.len], &block_content); 241 + car_pos += block_content.len; 242 + 243 + const car_file = try read(alloc, car_buf[0..car_pos]); 244 + 245 + try std.testing.expectEqual(@as(usize, 1), car_file.blocks.len); 246 + try std.testing.expectEqual(@as(usize, block_content.len), car_file.blocks[0].data.len); 247 + 248 + // decode the block content as CBOR 249 + const val = try cbor.decodeAll(alloc, car_file.blocks[0].data); 250 + try std.testing.expectEqualStrings("hi", val.getString("text").?); 251 + } 252 + 253 + test "read CAR with roots" { 254 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 255 + defer arena.deinit(); 256 + const alloc = arena.allocator(); 257 + 258 + // create a CID to use as a root 259 + const root_cid = try cbor.Cid.forDagCbor(alloc, "root block data"); 260 + 261 + // build header with a root: {"roots": [CID], "version": 1} 262 + const header_value: cbor.Value = .{ .map = &.{ 263 + .{ .key = "roots", .value = .{ .array = &.{.{ .cid = root_cid }} } }, 264 + .{ .key = "version", .value = .{ .unsigned = 1 } }, 265 + } }; 266 + const header_bytes = try cbor.encodeAlloc(alloc, header_value); 267 + 268 + // assemble minimal CAR: header only, no blocks 269 + var car_buf: std.ArrayList(u8) = .{}; 270 + defer car_buf.deinit(alloc); 271 + try cbor.writeUvarint(car_buf.writer(alloc), header_bytes.len); 272 + try car_buf.appendSlice(alloc, header_bytes); 273 + 274 + const car_file = try read(alloc, car_buf.items); 275 + try std.testing.expectEqual(@as(usize, 1), car_file.roots.len); 276 + try std.testing.expectEqual(root_cid.version, car_file.roots[0].version); 277 + try std.testing.expectEqual(root_cid.codec, car_file.roots[0].codec); 278 + try std.testing.expectEqualSlices(u8, root_cid.digest, car_file.roots[0].digest); 279 + } 280 + 281 + test "write → read round-trip" { 282 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 283 + defer arena.deinit(); 284 + const alloc = arena.allocator(); 285 + 286 + // create some blocks 287 + const block1_data = "block one data"; 288 + const block2_data = "block two data"; 289 + const cid1 = try cbor.Cid.forDagCbor(alloc, block1_data); 290 + const cid2 = try cbor.Cid.forDagCbor(alloc, block2_data); 291 + 292 + const original = Car{ 293 + .roots = &.{cid1}, 294 + .blocks = &.{ 295 + .{ .cid_raw = cid1.raw, .data = block1_data }, 296 + .{ .cid_raw = cid2.raw, .data = block2_data }, 297 + }, 298 + }; 299 + 300 + // write then read 301 + const car_bytes = try writeAlloc(alloc, original); 302 + const parsed = try read(alloc, car_bytes); 303 + 304 + // verify roots 305 + try std.testing.expectEqual(@as(usize, 1), parsed.roots.len); 306 + try std.testing.expectEqualSlices(u8, cid1.digest, parsed.roots[0].digest); 307 + 308 + // verify blocks 309 + try std.testing.expectEqual(@as(usize, 2), parsed.blocks.len); 310 + try std.testing.expectEqualSlices(u8, block1_data, parsed.blocks[0].data); 311 + try std.testing.expectEqualSlices(u8, block2_data, parsed.blocks[1].data); 312 + 313 + // verify CID matching 314 + try std.testing.expectEqualSlices(u8, cid1.raw, parsed.blocks[0].cid_raw); 315 + try std.testing.expectEqualSlices(u8, cid2.raw, parsed.blocks[1].cid_raw); 316 + } 317 + 318 + test "write → read round-trip with CBOR block content" { 319 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 320 + defer arena.deinit(); 321 + const alloc = arena.allocator(); 322 + 323 + // encode a record as DAG-CBOR 324 + const record: cbor.Value = .{ .map = &.{ 325 + .{ .key = "$type", .value = .{ .text = "app.bsky.feed.post" } }, 326 + .{ .key = "text", .value = .{ .text = "hello from CAR writer" } }, 327 + } }; 328 + const record_bytes = try cbor.encodeAlloc(alloc, record); 329 + const cid = try cbor.Cid.forDagCbor(alloc, record_bytes); 330 + 331 + const original = Car{ 332 + .roots = &.{cid}, 333 + .blocks = &.{ 334 + .{ .cid_raw = cid.raw, .data = record_bytes }, 335 + }, 336 + }; 337 + 338 + const car_bytes = try writeAlloc(alloc, original); 339 + const parsed = try read(alloc, car_bytes); 340 + 341 + // find the block by CID and decode it 342 + const found = findBlock(parsed, cid.raw).?; 343 + const decoded = try cbor.decodeAll(alloc, found); 344 + try std.testing.expectEqualStrings("hello from CAR writer", decoded.getString("text").?); 345 + try std.testing.expectEqualStrings("app.bsky.feed.post", decoded.getString("$type").?); 346 + } 347 + 348 + test "findBlock returns null for missing CID" { 349 + const c = Car{ .roots = &.{}, .blocks = &.{} }; 350 + try std.testing.expect(findBlock(c, "nonexistent") == null); 351 + }
+1048
src/internal/cbor.zig
··· 1 + //! DAG-CBOR codec 2 + //! 3 + //! encode and decode the DAG-CBOR subset used by AT Protocol. 4 + //! handles: integers, byte/text strings, arrays, maps, tag 42 (CID links), 5 + //! booleans, null. no floats, no indefinite lengths. 6 + //! 7 + //! encoding follows DAG-CBOR deterministic rules: 8 + //! - integers use shortest encoding 9 + //! - map keys sorted by byte length, then lexicographically 10 + //! - CIDs encoded as tag 42 with 0x00 identity multibase prefix 11 + //! 12 + //! see: https://ipld.io/specs/codecs/dag-cbor/spec/ 13 + 14 + const std = @import("std"); 15 + const Allocator = std.mem.Allocator; 16 + 17 + /// CBOR major types (high 3 bits of initial byte) 18 + const MajorType = enum(u3) { 19 + unsigned = 0, 20 + negative = 1, 21 + byte_string = 2, 22 + text_string = 3, 23 + array = 4, 24 + map = 5, 25 + tag = 6, 26 + simple = 7, 27 + }; 28 + 29 + /// decoded CBOR value 30 + pub const Value = union(enum) { 31 + unsigned: u64, 32 + negative: i64, // stored as -(1 + raw), so -1 is stored as -1 33 + bytes: []const u8, 34 + text: []const u8, 35 + array: []const Value, 36 + map: []const MapEntry, 37 + tag: Tag, 38 + boolean: bool, 39 + null, 40 + cid: Cid, 41 + 42 + pub const MapEntry = struct { 43 + key: []const u8, // DAG-CBOR: keys are always text strings 44 + value: Value, 45 + }; 46 + 47 + pub const Tag = struct { 48 + number: u64, 49 + content: *const Value, 50 + }; 51 + 52 + /// look up a key in a map value 53 + pub fn get(self: Value, key: []const u8) ?Value { 54 + return switch (self) { 55 + .map => |entries| { 56 + for (entries) |entry| { 57 + if (std.mem.eql(u8, entry.key, key)) return entry.value; 58 + } 59 + return null; 60 + }, 61 + else => null, 62 + }; 63 + } 64 + 65 + /// get a text string from a map by key 66 + pub fn getString(self: Value, key: []const u8) ?[]const u8 { 67 + const v = self.get(key) orelse return null; 68 + return switch (v) { 69 + .text => |s| s, 70 + else => null, 71 + }; 72 + } 73 + 74 + /// get an integer from a map by key 75 + pub fn getInt(self: Value, key: []const u8) ?i64 { 76 + const v = self.get(key) orelse return null; 77 + return switch (v) { 78 + .unsigned => |u| std.math.cast(i64, u), 79 + .negative => |n| n, 80 + else => null, 81 + }; 82 + } 83 + 84 + /// get a bool from a map by key 85 + pub fn getBool(self: Value, key: []const u8) ?bool { 86 + const v = self.get(key) orelse return null; 87 + return switch (v) { 88 + .boolean => |b| b, 89 + else => null, 90 + }; 91 + } 92 + 93 + /// get a byte string from a map by key 94 + pub fn getBytes(self: Value, key: []const u8) ?[]const u8 { 95 + const v = self.get(key) orelse return null; 96 + return switch (v) { 97 + .bytes => |b| b, 98 + else => null, 99 + }; 100 + } 101 + 102 + /// get an array from a map by key 103 + pub fn getArray(self: Value, key: []const u8) ?[]const Value { 104 + const v = self.get(key) orelse return null; 105 + return switch (v) { 106 + .array => |a| a, 107 + else => null, 108 + }; 109 + } 110 + }; 111 + 112 + /// well-known multicodec values 113 + pub const Codec = struct { 114 + pub const dag_cbor: u64 = 0x71; 115 + pub const dag_pb: u64 = 0x70; 116 + pub const raw: u64 = 0x55; 117 + }; 118 + 119 + /// well-known multihash function codes 120 + pub const HashFn = struct { 121 + pub const sha2_256: u64 = 0x12; 122 + pub const identity: u64 = 0x00; 123 + }; 124 + 125 + /// CID (Content Identifier) parsed from tag 42 126 + pub const Cid = struct { 127 + version: u64, 128 + codec: u64, 129 + hash_fn: u64, 130 + digest: []const u8, 131 + raw: []const u8, // full CID bytes (for matching against CAR block CIDs) 132 + 133 + /// create a CIDv1 by hashing DAG-CBOR encoded data with SHA-256. 134 + /// the returned Cid's raw/digest slices are owned by the allocator. 135 + pub fn forDagCbor(allocator: Allocator, data: []const u8) !Cid { 136 + return create(allocator, 1, Codec.dag_cbor, HashFn.sha2_256, data); 137 + } 138 + 139 + /// create a CIDv1 with the given codec by hashing data with SHA-256. 140 + pub fn create(allocator: Allocator, version: u64, codec: u64, hash_fn_code: u64, data: []const u8) !Cid { 141 + // compute SHA-256 digest 142 + const Sha256 = std.crypto.hash.sha2.Sha256; 143 + var hash: [Sha256.digest_length]u8 = undefined; 144 + Sha256.hash(data, &hash, .{}); 145 + 146 + // build raw CID bytes: version varint + codec varint + hash_fn varint + digest_len varint + digest 147 + var raw_buf: std.ArrayList(u8) = .{}; 148 + errdefer raw_buf.deinit(allocator); 149 + const writer = raw_buf.writer(allocator); 150 + try writeUvarint(writer, version); 151 + try writeUvarint(writer, codec); 152 + try writeUvarint(writer, hash_fn_code); 153 + try writeUvarint(writer, Sha256.digest_length); 154 + try writer.writeAll(&hash); 155 + 156 + const raw = try raw_buf.toOwnedSlice(allocator); 157 + 158 + // locate digest within the raw slice (it's the last 32 bytes) 159 + const digest = raw[raw.len - Sha256.digest_length ..]; 160 + 161 + return .{ 162 + .version = version, 163 + .codec = codec, 164 + .hash_fn = hash_fn_code, 165 + .digest = digest, 166 + .raw = raw, 167 + }; 168 + } 169 + 170 + /// serialize this CID to raw bytes (version varint + codec varint + multihash) 171 + pub fn toBytes(self: Cid, allocator: Allocator) ![]u8 { 172 + // if we already have raw bytes, just duplicate them 173 + if (self.raw.len > 0) { 174 + return try allocator.dupe(u8, self.raw); 175 + } 176 + 177 + var buf: std.ArrayList(u8) = .{}; 178 + errdefer buf.deinit(allocator); 179 + const writer = buf.writer(allocator); 180 + try writeUvarint(writer, self.version); 181 + try writeUvarint(writer, self.codec); 182 + try writeUvarint(writer, self.hash_fn); 183 + try writeUvarint(writer, @as(u64, self.digest.len)); 184 + try writer.writeAll(self.digest); 185 + return try buf.toOwnedSlice(allocator); 186 + } 187 + }; 188 + 189 + pub const DecodeError = error{ 190 + UnexpectedEof, 191 + IndefiniteLength, 192 + UnsupportedSimpleValue, 193 + UnsupportedFloat, 194 + InvalidMapKey, 195 + InvalidCid, 196 + ReservedAdditionalInfo, 197 + Overflow, 198 + OutOfMemory, 199 + }; 200 + 201 + /// decode a single CBOR value from the front of `data`. 202 + /// returns the value and the number of bytes consumed. 203 + pub fn decode(allocator: Allocator, data: []const u8) DecodeError!struct { value: Value, consumed: usize } { 204 + var pos: usize = 0; 205 + const value = try decodeAt(allocator, data, &pos); 206 + return .{ .value = value, .consumed = pos }; 207 + } 208 + 209 + /// decode all bytes as a single CBOR value 210 + pub fn decodeAll(allocator: Allocator, data: []const u8) DecodeError!Value { 211 + var pos: usize = 0; 212 + return try decodeAt(allocator, data, &pos); 213 + } 214 + 215 + fn decodeAt(allocator: Allocator, data: []const u8, pos: *usize) DecodeError!Value { 216 + if (pos.* >= data.len) return error.UnexpectedEof; 217 + 218 + const initial = data[pos.*]; 219 + pos.* += 1; 220 + 221 + const major: MajorType = @enumFromInt(@as(u3, @truncate(initial >> 5))); 222 + const additional: u5 = @truncate(initial); 223 + 224 + return switch (major) { 225 + .unsigned => { 226 + const val = try readArgument(data, pos, additional); 227 + return .{ .unsigned = val }; 228 + }, 229 + .negative => { 230 + const val = try readArgument(data, pos, additional); 231 + // negative CBOR: value is -1 - val 232 + if (val > std.math.maxInt(i64)) return error.Overflow; 233 + return .{ .negative = -1 - @as(i64, @intCast(val)) }; 234 + }, 235 + .byte_string => { 236 + const len = try readArgument(data, pos, additional); 237 + const end = pos.* + @as(usize, @intCast(len)); 238 + if (end > data.len) return error.UnexpectedEof; 239 + const bytes = data[pos.*..end]; 240 + pos.* = end; 241 + return .{ .bytes = bytes }; 242 + }, 243 + .text_string => { 244 + const len = try readArgument(data, pos, additional); 245 + const end = pos.* + @as(usize, @intCast(len)); 246 + if (end > data.len) return error.UnexpectedEof; 247 + const text = data[pos.*..end]; 248 + pos.* = end; 249 + return .{ .text = text }; 250 + }, 251 + .array => { 252 + const count = try readArgument(data, pos, additional); 253 + const items = try allocator.alloc(Value, @intCast(count)); 254 + for (items) |*item| { 255 + item.* = try decodeAt(allocator, data, pos); 256 + } 257 + return .{ .array = items }; 258 + }, 259 + .map => { 260 + const count = try readArgument(data, pos, additional); 261 + const entries = try allocator.alloc(Value.MapEntry, @intCast(count)); 262 + for (entries) |*entry| { 263 + // DAG-CBOR: map keys must be text strings 264 + const key_val = try decodeAt(allocator, data, pos); 265 + const key = switch (key_val) { 266 + .text => |t| t, 267 + else => return error.InvalidMapKey, 268 + }; 269 + entry.* = .{ 270 + .key = key, 271 + .value = try decodeAt(allocator, data, pos), 272 + }; 273 + } 274 + return .{ .map = entries }; 275 + }, 276 + .tag => { 277 + const tag_num = try readArgument(data, pos, additional); 278 + if (tag_num == 42) { 279 + // CID link — content is a byte string with 0x00 prefix 280 + const content = try decodeAt(allocator, data, pos); 281 + const cid_bytes = switch (content) { 282 + .bytes => |b| b, 283 + else => return error.InvalidCid, 284 + }; 285 + if (cid_bytes.len < 1 or cid_bytes[0] != 0x00) return error.InvalidCid; 286 + const raw = cid_bytes[1..]; // skip identity multibase prefix 287 + return .{ .cid = try parseCid(raw) }; 288 + } 289 + // generic tag — allocate content on heap 290 + const content_ptr = try allocator.create(Value); 291 + content_ptr.* = try decodeAt(allocator, data, pos); 292 + return .{ .tag = .{ .number = tag_num, .content = content_ptr } }; 293 + }, 294 + .simple => { 295 + return switch (additional) { 296 + 20 => .{ .boolean = false }, 297 + 21 => .{ .boolean = true }, 298 + 22 => .null, 299 + 25, 26, 27 => error.UnsupportedFloat, // DAG-CBOR forbids floats in AT Protocol 300 + 31 => error.IndefiniteLength, // break code — DAG-CBOR forbids indefinite lengths 301 + else => error.UnsupportedSimpleValue, 302 + }; 303 + }, 304 + }; 305 + } 306 + 307 + /// read the argument value from additional info + following bytes 308 + fn readArgument(data: []const u8, pos: *usize, additional: u5) DecodeError!u64 { 309 + return switch (additional) { 310 + 0...23 => @as(u64, additional), 311 + 24 => { // 1-byte 312 + if (pos.* >= data.len) return error.UnexpectedEof; 313 + const val = data[pos.*]; 314 + pos.* += 1; 315 + return @as(u64, val); 316 + }, 317 + 25 => { // 2-byte big-endian 318 + if (pos.* + 2 > data.len) return error.UnexpectedEof; 319 + const val = std.mem.readInt(u16, data[pos.*..][0..2], .big); 320 + pos.* += 2; 321 + return @as(u64, val); 322 + }, 323 + 26 => { // 4-byte big-endian 324 + if (pos.* + 4 > data.len) return error.UnexpectedEof; 325 + const val = std.mem.readInt(u32, data[pos.*..][0..4], .big); 326 + pos.* += 4; 327 + return @as(u64, val); 328 + }, 329 + 27 => { // 8-byte big-endian 330 + if (pos.* + 8 > data.len) return error.UnexpectedEof; 331 + const val = std.mem.readInt(u64, data[pos.*..][0..8], .big); 332 + pos.* += 8; 333 + return val; 334 + }, 335 + 28, 29, 30 => error.ReservedAdditionalInfo, 336 + 31 => error.IndefiniteLength, 337 + }; 338 + } 339 + 340 + /// parse a CID from raw bytes (after removing the 0x00 multibase prefix) 341 + pub fn parseCid(raw: []const u8) DecodeError!Cid { 342 + if (raw.len < 2) return error.InvalidCid; 343 + 344 + // CIDv0: starts with 0x12 0x20 (sha2-256, 32-byte digest) 345 + if (raw[0] == 0x12 and raw[1] == 0x20) { 346 + if (raw.len < 34) return error.InvalidCid; 347 + return .{ 348 + .version = 0, 349 + .codec = 0x70, // dag-pb (implicit for CIDv0) 350 + .hash_fn = 0x12, // sha2-256 351 + .digest = raw[2..34], 352 + .raw = raw, 353 + }; 354 + } 355 + 356 + // CIDv1: version varint + codec varint + multihash 357 + var pos: usize = 0; 358 + const version = readUvarint(raw, &pos) orelse return error.InvalidCid; 359 + const codec = readUvarint(raw, &pos) orelse return error.InvalidCid; 360 + const hash_fn = readUvarint(raw, &pos) orelse return error.InvalidCid; 361 + const digest_len = readUvarint(raw, &pos) orelse return error.InvalidCid; 362 + 363 + if (pos + digest_len > raw.len) return error.InvalidCid; 364 + 365 + return .{ 366 + .version = version, 367 + .codec = codec, 368 + .hash_fn = hash_fn, 369 + .digest = raw[pos..][0..digest_len], 370 + .raw = raw, 371 + }; 372 + } 373 + 374 + /// read an unsigned varint (LEB128) 375 + pub fn readUvarint(data: []const u8, pos: *usize) ?u64 { 376 + var result: u64 = 0; 377 + var shift: u6 = 0; 378 + while (pos.* < data.len) { 379 + const byte = data[pos.*]; 380 + pos.* += 1; 381 + result |= @as(u64, byte & 0x7f) << shift; 382 + if (byte & 0x80 == 0) return result; 383 + shift +|= 7; 384 + if (shift >= 64) return null; 385 + } 386 + return null; 387 + } 388 + 389 + // === encoder === 390 + 391 + pub const EncodeError = error{ 392 + OutOfMemory, 393 + }; 394 + 395 + /// write the CBOR initial byte + argument using shortest encoding (DAG-CBOR requirement) 396 + fn writeArgument(writer: anytype, major: u3, val: u64) !void { 397 + const prefix: u8 = @as(u8, major) << 5; 398 + if (val < 24) { 399 + try writer.writeByte(prefix | @as(u8, @intCast(val))); 400 + } else if (val <= 0xff) { 401 + try writer.writeByte(prefix | 24); 402 + try writer.writeByte(@as(u8, @intCast(val))); 403 + } else if (val <= 0xffff) { 404 + try writer.writeByte(prefix | 25); 405 + const v: u16 = @intCast(val); 406 + try writer.writeAll(&[2]u8{ @truncate(v >> 8), @truncate(v) }); 407 + } else if (val <= 0xffffffff) { 408 + try writer.writeByte(prefix | 26); 409 + const v: u32 = @intCast(val); 410 + try writer.writeAll(&[4]u8{ 411 + @truncate(v >> 24), @truncate(v >> 16), 412 + @truncate(v >> 8), @truncate(v), 413 + }); 414 + } else { 415 + try writer.writeByte(prefix | 27); 416 + try writer.writeAll(&[8]u8{ 417 + @truncate(val >> 56), @truncate(val >> 48), 418 + @truncate(val >> 40), @truncate(val >> 32), 419 + @truncate(val >> 24), @truncate(val >> 16), 420 + @truncate(val >> 8), @truncate(val), 421 + }); 422 + } 423 + } 424 + 425 + /// DAG-CBOR map key ordering: shorter keys first, then lexicographic 426 + fn dagCborKeyLessThan(_: void, a: Value.MapEntry, b: Value.MapEntry) bool { 427 + if (a.key.len != b.key.len) return a.key.len < b.key.len; 428 + return std.mem.order(u8, a.key, b.key) == .lt; 429 + } 430 + 431 + /// encode a Value to the given writer in DAG-CBOR format. 432 + /// allocator is needed for sorting map keys during encoding. 433 + pub fn encode(allocator: Allocator, writer: anytype, value: Value) !void { 434 + switch (value) { 435 + .unsigned => |v| try writeArgument(writer, 0, v), 436 + .negative => |v| { 437 + // CBOR negative: -1 - n encoded in major type 1 438 + const raw: u64 = @intCast(-1 - v); 439 + try writeArgument(writer, 1, raw); 440 + }, 441 + .bytes => |b| { 442 + try writeArgument(writer, 2, b.len); 443 + try writer.writeAll(b); 444 + }, 445 + .text => |t| { 446 + try writeArgument(writer, 3, t.len); 447 + try writer.writeAll(t); 448 + }, 449 + .array => |items| { 450 + try writeArgument(writer, 4, items.len); 451 + for (items) |item| { 452 + try encode(allocator, writer, item); 453 + } 454 + }, 455 + .map => |entries| { 456 + try writeArgument(writer, 5, entries.len); 457 + // DAG-CBOR: keys sorted by byte length, then lexicographically 458 + const sorted = try allocator.dupe(Value.MapEntry, entries); 459 + defer allocator.free(sorted); 460 + std.mem.sort(Value.MapEntry, sorted, {}, dagCborKeyLessThan); 461 + for (sorted) |entry| { 462 + try encode(allocator, writer, .{ .text = entry.key }); 463 + try encode(allocator, writer, entry.value); 464 + } 465 + }, 466 + .tag => |t| { 467 + try writeArgument(writer, 6, t.number); 468 + try encode(allocator, writer, t.content.*); 469 + }, 470 + .boolean => |b| try writer.writeByte(if (b) @as(u8, 0xf5) else @as(u8, 0xf4)), 471 + .null => try writer.writeByte(0xf6), 472 + .cid => |c| { 473 + // tag 42 + byte string with 0x00 identity multibase prefix + raw CID bytes 474 + try writeArgument(writer, 6, 42); 475 + try writeArgument(writer, 2, 1 + c.raw.len); 476 + try writer.writeByte(0x00); 477 + try writer.writeAll(c.raw); 478 + }, 479 + } 480 + } 481 + 482 + /// encode a Value to a freshly allocated byte slice 483 + pub fn encodeAlloc(allocator: Allocator, value: Value) ![]u8 { 484 + var list: std.ArrayList(u8) = .{}; 485 + errdefer list.deinit(allocator); 486 + try encode(allocator, list.writer(allocator), value); 487 + return try list.toOwnedSlice(allocator); 488 + } 489 + 490 + /// write an unsigned varint (LEB128) — used for CID and CAR serialization 491 + pub fn writeUvarint(writer: anytype, val: u64) !void { 492 + var v = val; 493 + while (v >= 0x80) { 494 + try writer.writeByte(@as(u8, @truncate(v)) | 0x80); 495 + v >>= 7; 496 + } 497 + try writer.writeByte(@as(u8, @truncate(v))); 498 + } 499 + 500 + // === tests === 501 + 502 + test "decode unsigned integers" { 503 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 504 + defer arena.deinit(); 505 + const alloc = arena.allocator(); 506 + 507 + // 0 508 + try std.testing.expectEqual(@as(u64, 0), (try decode(alloc, &.{0x00})).value.unsigned); 509 + // 1 510 + try std.testing.expectEqual(@as(u64, 1), (try decode(alloc, &.{0x01})).value.unsigned); 511 + // 23 512 + try std.testing.expectEqual(@as(u64, 23), (try decode(alloc, &.{0x17})).value.unsigned); 513 + // 24 (1-byte follows) 514 + try std.testing.expectEqual(@as(u64, 24), (try decode(alloc, &.{ 0x18, 24 })).value.unsigned); 515 + // 1000 (2-byte follows) 516 + try std.testing.expectEqual(@as(u64, 1000), (try decode(alloc, &.{ 0x19, 0x03, 0xe8 })).value.unsigned); 517 + } 518 + 519 + test "decode negative integers" { 520 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 521 + defer arena.deinit(); 522 + const alloc = arena.allocator(); 523 + 524 + // -1 (major 1, additional 0) 525 + try std.testing.expectEqual(@as(i64, -1), (try decode(alloc, &.{0x20})).value.negative); 526 + // -10 527 + try std.testing.expectEqual(@as(i64, -10), (try decode(alloc, &.{0x29})).value.negative); 528 + } 529 + 530 + test "decode text strings" { 531 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 532 + defer arena.deinit(); 533 + const alloc = arena.allocator(); 534 + 535 + // empty string 536 + try std.testing.expectEqualStrings("", (try decode(alloc, &.{0x60})).value.text); 537 + // "a" 538 + try std.testing.expectEqualStrings("a", (try decode(alloc, &.{ 0x61, 'a' })).value.text); 539 + // "hello" 540 + try std.testing.expectEqualStrings("hello", (try decode(alloc, &.{ 0x65, 'h', 'e', 'l', 'l', 'o' })).value.text); 541 + } 542 + 543 + test "decode byte strings" { 544 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 545 + defer arena.deinit(); 546 + const alloc = arena.allocator(); 547 + 548 + // empty bytes 549 + try std.testing.expectEqualSlices(u8, &.{}, (try decode(alloc, &.{0x40})).value.bytes); 550 + // 3 bytes 551 + try std.testing.expectEqualSlices(u8, &.{ 1, 2, 3 }, (try decode(alloc, &.{ 0x43, 1, 2, 3 })).value.bytes); 552 + } 553 + 554 + test "decode booleans and null" { 555 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 556 + defer arena.deinit(); 557 + const alloc = arena.allocator(); 558 + 559 + try std.testing.expectEqual(false, (try decode(alloc, &.{0xf4})).value.boolean); 560 + try std.testing.expectEqual(true, (try decode(alloc, &.{0xf5})).value.boolean); 561 + try std.testing.expectEqual(Value.null, (try decode(alloc, &.{0xf6})).value); 562 + } 563 + 564 + test "decode array" { 565 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 566 + defer arena.deinit(); 567 + const alloc = arena.allocator(); 568 + 569 + // [1, 2, 3] 570 + const result = try decode(alloc, &.{ 0x83, 0x01, 0x02, 0x03 }); 571 + const arr = result.value.array; 572 + try std.testing.expectEqual(@as(usize, 3), arr.len); 573 + try std.testing.expectEqual(@as(u64, 1), arr[0].unsigned); 574 + try std.testing.expectEqual(@as(u64, 2), arr[1].unsigned); 575 + try std.testing.expectEqual(@as(u64, 3), arr[2].unsigned); 576 + } 577 + 578 + test "decode map" { 579 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 580 + defer arena.deinit(); 581 + const alloc = arena.allocator(); 582 + 583 + // {"a": 1, "b": 2} 584 + const result = try decode(alloc, &.{ 585 + 0xa2, // map(2) 586 + 0x61, 'a', 0x01, // "a": 1 587 + 0x61, 'b', 0x02, // "b": 2 588 + }); 589 + const val = result.value; 590 + try std.testing.expectEqual(@as(u64, 1), val.get("a").?.unsigned); 591 + try std.testing.expectEqual(@as(u64, 2), val.get("b").?.unsigned); 592 + } 593 + 594 + test "decode nested map" { 595 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 596 + defer arena.deinit(); 597 + const alloc = arena.allocator(); 598 + 599 + // {"op": 1, "t": "#commit"} 600 + const result = try decode(alloc, &.{ 601 + 0xa2, // map(2) 602 + 0x62, 'o', 'p', 0x01, // "op": 1 603 + 0x61, 't', 0x67, '#', 'c', 'o', 'm', 'm', 'i', 't', // "t": "#commit" 604 + }); 605 + const val = result.value; 606 + try std.testing.expectEqual(@as(u64, 1), val.get("op").?.unsigned); 607 + try std.testing.expectEqualStrings("#commit", val.getString("t").?); 608 + } 609 + 610 + test "consumed bytes tracking" { 611 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 612 + defer arena.deinit(); 613 + const alloc = arena.allocator(); 614 + 615 + // two concatenated CBOR values: 1, 2 616 + const data = &[_]u8{ 0x01, 0x02 }; 617 + const first = try decode(alloc, data); 618 + try std.testing.expectEqual(@as(u64, 1), first.value.unsigned); 619 + try std.testing.expectEqual(@as(usize, 1), first.consumed); 620 + 621 + const second = try decode(alloc, data[first.consumed..]); 622 + try std.testing.expectEqual(@as(u64, 2), second.value.unsigned); 623 + } 624 + 625 + test "reject floats" { 626 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 627 + defer arena.deinit(); 628 + const alloc = arena.allocator(); 629 + 630 + // half-float (f16) 631 + try std.testing.expectError(error.UnsupportedFloat, decode(alloc, &.{ 0xf9, 0x00, 0x00 })); 632 + } 633 + 634 + test "Value helper methods" { 635 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 636 + defer arena.deinit(); 637 + const alloc = arena.allocator(); 638 + 639 + const result = try decode(alloc, &.{ 640 + 0xa3, // map(3) 641 + 0x64, 'n', 'a', 'm', 'e', 0x65, 'a', 'l', 'i', 'c', 'e', // "name": "alice" 642 + 0x63, 'a', 'g', 'e', 0x18, 30, // "age": 30 643 + 0x66, 'a', 'c', 't', 'i', 'v', 'e', 0xf5, // "active": true 644 + }); 645 + const val = result.value; 646 + try std.testing.expectEqualStrings("alice", val.getString("name").?); 647 + try std.testing.expectEqual(@as(i64, 30), val.getInt("age").?); 648 + try std.testing.expectEqual(true, val.getBool("active").?); 649 + try std.testing.expect(val.getString("missing") == null); 650 + } 651 + 652 + // === encoder tests === 653 + 654 + test "encode unsigned integers" { 655 + var buf: [16]u8 = undefined; 656 + var stream = std.io.fixedBufferStream(&buf); 657 + const alloc = std.testing.allocator; 658 + 659 + // 0 → single byte 660 + try encode(alloc, stream.writer(), .{ .unsigned = 0 }); 661 + try std.testing.expectEqualSlices(u8, &.{0x00}, stream.getWritten()); 662 + 663 + stream.reset(); 664 + try encode(alloc, stream.writer(), .{ .unsigned = 23 }); 665 + try std.testing.expectEqualSlices(u8, &.{0x17}, stream.getWritten()); 666 + 667 + // 24 → 2 bytes (shortest encoding) 668 + stream.reset(); 669 + try encode(alloc, stream.writer(), .{ .unsigned = 24 }); 670 + try std.testing.expectEqualSlices(u8, &.{ 0x18, 24 }, stream.getWritten()); 671 + 672 + // 1000 → 3 bytes 673 + stream.reset(); 674 + try encode(alloc, stream.writer(), .{ .unsigned = 1000 }); 675 + try std.testing.expectEqualSlices(u8, &.{ 0x19, 0x03, 0xe8 }, stream.getWritten()); 676 + } 677 + 678 + test "encode negative integers" { 679 + var buf: [16]u8 = undefined; 680 + var stream = std.io.fixedBufferStream(&buf); 681 + const alloc = std.testing.allocator; 682 + 683 + // -1 → major 1, additional 0 684 + try encode(alloc, stream.writer(), .{ .negative = -1 }); 685 + try std.testing.expectEqualSlices(u8, &.{0x20}, stream.getWritten()); 686 + 687 + stream.reset(); 688 + try encode(alloc, stream.writer(), .{ .negative = -10 }); 689 + try std.testing.expectEqualSlices(u8, &.{0x29}, stream.getWritten()); 690 + } 691 + 692 + test "encode text strings" { 693 + var buf: [64]u8 = undefined; 694 + var stream = std.io.fixedBufferStream(&buf); 695 + const alloc = std.testing.allocator; 696 + 697 + try encode(alloc, stream.writer(), .{ .text = "" }); 698 + try std.testing.expectEqualSlices(u8, &.{0x60}, stream.getWritten()); 699 + 700 + stream.reset(); 701 + try encode(alloc, stream.writer(), .{ .text = "hello" }); 702 + try std.testing.expectEqualSlices(u8, &.{ 0x65, 'h', 'e', 'l', 'l', 'o' }, stream.getWritten()); 703 + } 704 + 705 + test "encode byte strings" { 706 + var buf: [64]u8 = undefined; 707 + var stream = std.io.fixedBufferStream(&buf); 708 + const alloc = std.testing.allocator; 709 + 710 + try encode(alloc, stream.writer(), .{ .bytes = &.{} }); 711 + try std.testing.expectEqualSlices(u8, &.{0x40}, stream.getWritten()); 712 + 713 + stream.reset(); 714 + try encode(alloc, stream.writer(), .{ .bytes = &.{ 1, 2, 3 } }); 715 + try std.testing.expectEqualSlices(u8, &.{ 0x43, 1, 2, 3 }, stream.getWritten()); 716 + } 717 + 718 + test "encode booleans and null" { 719 + var buf: [4]u8 = undefined; 720 + var stream = std.io.fixedBufferStream(&buf); 721 + const alloc = std.testing.allocator; 722 + 723 + try encode(alloc, stream.writer(), .{ .boolean = false }); 724 + try std.testing.expectEqualSlices(u8, &.{0xf4}, stream.getWritten()); 725 + 726 + stream.reset(); 727 + try encode(alloc, stream.writer(), .{ .boolean = true }); 728 + try std.testing.expectEqualSlices(u8, &.{0xf5}, stream.getWritten()); 729 + 730 + stream.reset(); 731 + try encode(alloc, stream.writer(), .null); 732 + try std.testing.expectEqualSlices(u8, &.{0xf6}, stream.getWritten()); 733 + } 734 + 735 + test "encode array" { 736 + var buf: [64]u8 = undefined; 737 + var stream = std.io.fixedBufferStream(&buf); 738 + const alloc = std.testing.allocator; 739 + 740 + // [1, 2, 3] 741 + try encode(alloc, stream.writer(), .{ .array = &.{ 742 + .{ .unsigned = 1 }, 743 + .{ .unsigned = 2 }, 744 + .{ .unsigned = 3 }, 745 + } }); 746 + try std.testing.expectEqualSlices(u8, &.{ 0x83, 0x01, 0x02, 0x03 }, stream.getWritten()); 747 + } 748 + 749 + test "encode map with DAG-CBOR key sorting" { 750 + var buf: [128]u8 = undefined; 751 + var stream = std.io.fixedBufferStream(&buf); 752 + const alloc = std.testing.allocator; 753 + 754 + // keys provided unsorted — encoder must sort by length, then lex 755 + // "bb" (len 2), "a" (len 1), "cc" (len 2) → sorted: "a", "bb", "cc" 756 + try encode(alloc, stream.writer(), .{ .map = &.{ 757 + .{ .key = "bb", .value = .{ .unsigned = 2 } }, 758 + .{ .key = "a", .value = .{ .unsigned = 1 } }, 759 + .{ .key = "cc", .value = .{ .unsigned = 3 } }, 760 + } }); 761 + 762 + const expected = &[_]u8{ 763 + 0xa3, // map(3) 764 + 0x61, 'a', 0x01, // "a": 1 (shortest key first) 765 + 0x62, 'b', 'b', 0x02, // "bb": 2 (same length, lex order) 766 + 0x62, 'c', 'c', 0x03, // "cc": 3 767 + }; 768 + try std.testing.expectEqualSlices(u8, expected, stream.getWritten()); 769 + } 770 + 771 + test "round-trip encode → decode" { 772 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 773 + defer arena.deinit(); 774 + const alloc = arena.allocator(); 775 + 776 + // build a complex value: {"active": true, "name": "alice", "seq": 42} 777 + const original: Value = .{ .map = &.{ 778 + .{ .key = "name", .value = .{ .text = "alice" } }, 779 + .{ .key = "active", .value = .{ .boolean = true } }, 780 + .{ .key = "seq", .value = .{ .unsigned = 42 } }, 781 + } }; 782 + 783 + const encoded = try encodeAlloc(alloc, original); 784 + const decoded = try decodeAll(alloc, encoded); 785 + 786 + try std.testing.expectEqualStrings("alice", decoded.getString("name").?); 787 + try std.testing.expectEqual(true, decoded.getBool("active").?); 788 + try std.testing.expectEqual(@as(i64, 42), decoded.getInt("seq").?); 789 + } 790 + 791 + test "round-trip nested structures" { 792 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 793 + defer arena.deinit(); 794 + const alloc = arena.allocator(); 795 + 796 + // {"ops": [{"action": "create"}], "seq": 1} 797 + const original: Value = .{ .map = &.{ 798 + .{ .key = "ops", .value = .{ .array = &.{ 799 + .{ .map = &.{ 800 + .{ .key = "action", .value = .{ .text = "create" } }, 801 + } }, 802 + } } }, 803 + .{ .key = "seq", .value = .{ .unsigned = 1 } }, 804 + } }; 805 + 806 + const encoded = try encodeAlloc(alloc, original); 807 + const decoded = try decodeAll(alloc, encoded); 808 + 809 + const ops = decoded.getArray("ops").?; 810 + try std.testing.expectEqual(@as(usize, 1), ops.len); 811 + try std.testing.expectEqualStrings("create", ops[0].getString("action").?); 812 + try std.testing.expectEqual(@as(i64, 1), decoded.getInt("seq").?); 813 + } 814 + 815 + test "encode CID via tag 42" { 816 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 817 + defer arena.deinit(); 818 + const alloc = arena.allocator(); 819 + 820 + // create a CIDv1 (dag-cbor, sha2-256, 32-byte digest of 0xaa) 821 + const raw_cid = [_]u8{ 822 + 0x01, // version 823 + 0x71, // dag-cbor 824 + 0x12, // sha2-256 825 + 0x20, // 32-byte digest 826 + } ++ [_]u8{0xaa} ** 32; 827 + 828 + const original: Value = .{ .cid = .{ 829 + .version = 1, 830 + .codec = 0x71, 831 + .hash_fn = 0x12, 832 + .digest = raw_cid[4..], 833 + .raw = &raw_cid, 834 + } }; 835 + 836 + const encoded = try encodeAlloc(alloc, original); 837 + const decoded = try decodeAll(alloc, encoded); 838 + 839 + // should decode back as a CID with the same raw bytes 840 + const cid = decoded.cid; 841 + try std.testing.expectEqual(@as(u64, 1), cid.version); 842 + try std.testing.expectEqual(@as(u64, 0x71), cid.codec); 843 + try std.testing.expectEqual(@as(u64, 0x12), cid.hash_fn); 844 + try std.testing.expectEqualSlices(u8, &raw_cid, cid.raw); 845 + } 846 + 847 + test "writeUvarint round-trip" { 848 + var buf: [16]u8 = undefined; 849 + var stream = std.io.fixedBufferStream(&buf); 850 + 851 + const test_values = [_]u64{ 0, 1, 127, 128, 255, 256, 16384, 0xffffffff }; 852 + for (test_values) |val| { 853 + stream.reset(); 854 + try writeUvarint(stream.writer(), val); 855 + const written = stream.getWritten(); 856 + 857 + var pos: usize = 0; 858 + const decoded = readUvarint(written, &pos).?; 859 + try std.testing.expectEqual(val, decoded); 860 + try std.testing.expectEqual(written.len, pos); 861 + } 862 + } 863 + 864 + test "DAG-CBOR key sort is stable" { 865 + // same-length keys must be lexicographically sorted 866 + var buf: [128]u8 = undefined; 867 + var stream = std.io.fixedBufferStream(&buf); 868 + const alloc = std.testing.allocator; 869 + 870 + try encode(alloc, stream.writer(), .{ .map = &.{ 871 + .{ .key = "op", .value = .{ .unsigned = 1 } }, 872 + .{ .key = "ab", .value = .{ .unsigned = 2 } }, 873 + } }); 874 + 875 + var arena = std.heap.ArenaAllocator.init(alloc); 876 + defer arena.deinit(); 877 + const decoded = try decodeAll(arena.allocator(), stream.getWritten()); 878 + 879 + // "ab" should come before "op" (lex order, same length) 880 + const entries = decoded.map; 881 + try std.testing.expectEqualStrings("ab", entries[0].key); 882 + try std.testing.expectEqualStrings("op", entries[1].key); 883 + } 884 + 885 + // === CID creation tests === 886 + 887 + test "Cid.forDagCbor creates valid CIDv1" { 888 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 889 + defer arena.deinit(); 890 + const alloc = arena.allocator(); 891 + 892 + // encode some CBOR, then create a CID for it 893 + const value: Value = .{ .map = &.{ 894 + .{ .key = "text", .value = .{ .text = "hello" } }, 895 + } }; 896 + const encoded = try encodeAlloc(alloc, value); 897 + const cid = try Cid.forDagCbor(alloc, encoded); 898 + 899 + try std.testing.expectEqual(@as(u64, 1), cid.version); 900 + try std.testing.expectEqual(Codec.dag_cbor, cid.codec); 901 + try std.testing.expectEqual(HashFn.sha2_256, cid.hash_fn); 902 + try std.testing.expectEqual(@as(usize, 32), cid.digest.len); 903 + // raw should be: version(1) + codec(0x71) + hash_fn(0x12) + digest_len(0x20) + 32 bytes 904 + try std.testing.expectEqual(@as(usize, 36), cid.raw.len); 905 + } 906 + 907 + test "Cid.forDagCbor is deterministic" { 908 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 909 + defer arena.deinit(); 910 + const alloc = arena.allocator(); 911 + 912 + const data = "identical input"; 913 + const cid1 = try Cid.forDagCbor(alloc, data); 914 + const cid2 = try Cid.forDagCbor(alloc, data); 915 + 916 + try std.testing.expectEqualSlices(u8, cid1.raw, cid2.raw); 917 + try std.testing.expectEqualSlices(u8, cid1.digest, cid2.digest); 918 + } 919 + 920 + test "Cid.forDagCbor different data → different CIDs" { 921 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 922 + defer arena.deinit(); 923 + const alloc = arena.allocator(); 924 + 925 + const cid1 = try Cid.forDagCbor(alloc, "data A"); 926 + const cid2 = try Cid.forDagCbor(alloc, "data B"); 927 + 928 + try std.testing.expect(!std.mem.eql(u8, cid1.digest, cid2.digest)); 929 + } 930 + 931 + test "Cid.toBytes round-trips through parseCid" { 932 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 933 + defer arena.deinit(); 934 + const alloc = arena.allocator(); 935 + 936 + const cid = try Cid.forDagCbor(alloc, "test content"); 937 + const bytes = try cid.toBytes(alloc); 938 + const parsed = try parseCid(bytes); 939 + 940 + try std.testing.expectEqual(cid.version, parsed.version); 941 + try std.testing.expectEqual(cid.codec, parsed.codec); 942 + try std.testing.expectEqual(cid.hash_fn, parsed.hash_fn); 943 + try std.testing.expectEqualSlices(u8, cid.digest, parsed.digest); 944 + } 945 + 946 + test "CID round-trip through CBOR encode/decode" { 947 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 948 + defer arena.deinit(); 949 + const alloc = arena.allocator(); 950 + 951 + // create a CID for some content 952 + const cid = try Cid.forDagCbor(alloc, "block data"); 953 + 954 + // embed in a map and round-trip through CBOR 955 + const original: Value = .{ .map = &.{ 956 + .{ .key = "link", .value = .{ .cid = cid } }, 957 + } }; 958 + const encoded = try encodeAlloc(alloc, original); 959 + const decoded = try decodeAll(alloc, encoded); 960 + 961 + const got = decoded.get("link").?.cid; 962 + try std.testing.expectEqual(cid.version, got.version); 963 + try std.testing.expectEqual(cid.codec, got.codec); 964 + try std.testing.expectEqualSlices(u8, cid.digest, got.digest); 965 + } 966 + 967 + // === verify CIDs against real AT Protocol records === 968 + 969 + test "real record: pfrazee 'First!' post CID matches network" { 970 + // at://did:plc:ragtjsm2j2vknwkz3zp4oxrd/app.bsky.feed.post/3jhnzcfawac27 971 + // CID: bafyreiaqnrahsbvcssf2xe4iqhn2fnjw7utmvrbif2v36tqe3r5iqill7i 972 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 973 + defer arena.deinit(); 974 + const alloc = arena.allocator(); 975 + 976 + const record: Value = .{ .map = &.{ 977 + .{ .key = "$type", .value = .{ .text = "app.bsky.feed.post" } }, 978 + .{ .key = "createdAt", .value = .{ .text = "2022-11-17T00:39:00.477Z" } }, 979 + .{ .key = "text", .value = .{ .text = "First!" } }, 980 + } }; 981 + 982 + const encoded = try encodeAlloc(alloc, record); 983 + const cid = try Cid.forDagCbor(alloc, encoded); 984 + 985 + // verify against known production digest 986 + const expected_digest = [_]u8{ 987 + 0x10, 0x6c, 0x40, 0x79, 0x06, 0xa2, 0x94, 0x8b, 988 + 0xab, 0x93, 0x88, 0x81, 0xdb, 0xa2, 0xb5, 0x36, 989 + 0xfd, 0x26, 0xca, 0xc4, 0x28, 0x2e, 0xab, 0xbf, 990 + 0x4e, 0x04, 0xdc, 0x7a, 0x88, 0x21, 0x6b, 0xfa, 991 + }; 992 + 993 + try std.testing.expectEqualSlices(u8, &expected_digest, cid.digest); 994 + try std.testing.expectEqual(@as(u64, 1), cid.version); 995 + try std.testing.expectEqual(Codec.dag_cbor, cid.codec); 996 + try std.testing.expectEqual(HashFn.sha2_256, cid.hash_fn); 997 + } 998 + 999 + test "real record: firehose post with emoji/langs/reply is byte-identical after re-encode" { 1000 + // captured from live firehose: app.bsky.feed.post with emoji, langs, and reply 1001 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 1002 + defer arena.deinit(); 1003 + const alloc = arena.allocator(); 1004 + 1005 + const original_cbor = &[_]u8{ 1006 + 0xa5, 0x64, 0x74, 0x65, 0x78, 0x74, 0x6b, 0xf0, 0x9f, 0xa5, 0xb5, 0x20, 0x6d, 0x65, 0x20, 0x74, 1007 + 0x6f, 0x6f, 0x65, 0x24, 0x74, 0x79, 0x70, 0x65, 0x72, 0x61, 0x70, 0x70, 0x2e, 0x62, 0x73, 0x6b, 1008 + 0x79, 0x2e, 0x66, 0x65, 0x65, 0x64, 0x2e, 0x70, 0x6f, 0x73, 0x74, 0x65, 0x6c, 0x61, 0x6e, 0x67, 1009 + 0x73, 0x81, 0x62, 0x65, 0x6e, 0x65, 0x72, 0x65, 0x70, 0x6c, 0x79, 0xa2, 0x64, 0x72, 0x6f, 0x6f, 1010 + 0x74, 0xa2, 0x63, 0x63, 0x69, 0x64, 0x78, 0x3b, 0x62, 0x61, 0x66, 0x79, 0x72, 0x65, 0x69, 0x62, 1011 + 0x33, 0x70, 0x77, 0x72, 0x66, 0x66, 0x32, 0x79, 0x61, 0x64, 0x7a, 0x6e, 0x6f, 0x70, 0x68, 0x7a, 1012 + 0x66, 0x34, 0x68, 0x63, 0x76, 0x74, 0x79, 0x6f, 0x63, 0x74, 0x77, 0x7a, 0x63, 0x75, 0x6a, 0x76, 1013 + 0x7a, 0x37, 0x78, 0x34, 0x70, 0x6e, 0x67, 0x6b, 0x32, 0x69, 0x73, 0x69, 0x63, 0x7a, 0x37, 0x79, 1014 + 0x73, 0x7a, 0x71, 0x63, 0x75, 0x72, 0x69, 0x78, 0x46, 0x61, 0x74, 0x3a, 0x2f, 0x2f, 0x64, 0x69, 1015 + 0x64, 0x3a, 0x70, 0x6c, 0x63, 0x3a, 0x34, 0x6e, 0x65, 0x6e, 0x64, 0x77, 0x71, 0x72, 0x73, 0x37, 1016 + 0x35, 0x34, 0x67, 0x74, 0x36, 0x71, 0x76, 0x67, 0x72, 0x35, 0x36, 0x6a, 0x6d, 0x6e, 0x2f, 0x61, 1017 + 0x70, 0x70, 0x2e, 0x62, 0x73, 0x6b, 0x79, 0x2e, 0x66, 0x65, 0x65, 0x64, 0x2e, 0x70, 0x6f, 0x73, 1018 + 0x74, 0x2f, 0x33, 0x6d, 0x65, 0x64, 0x67, 0x32, 0x71, 0x76, 0x63, 0x75, 0x63, 0x32, 0x63, 0x66, 1019 + 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0xa2, 0x63, 0x63, 0x69, 0x64, 0x78, 0x3b, 0x62, 0x61, 0x66, 1020 + 0x79, 0x72, 0x65, 0x69, 0x62, 0x33, 0x70, 0x77, 0x72, 0x66, 0x66, 0x32, 0x79, 0x61, 0x64, 0x7a, 1021 + 0x6e, 0x6f, 0x70, 0x68, 0x7a, 0x66, 0x34, 0x68, 0x63, 0x76, 0x74, 0x79, 0x6f, 0x63, 0x74, 0x77, 1022 + 0x7a, 0x63, 0x75, 0x6a, 0x76, 0x7a, 0x37, 0x78, 0x34, 0x70, 0x6e, 0x67, 0x6b, 0x32, 0x69, 0x73, 1023 + 0x69, 0x63, 0x7a, 0x37, 0x79, 0x73, 0x7a, 0x71, 0x63, 0x75, 0x72, 0x69, 0x78, 0x46, 0x61, 0x74, 1024 + 0x3a, 0x2f, 0x2f, 0x64, 0x69, 0x64, 0x3a, 0x70, 0x6c, 0x63, 0x3a, 0x34, 0x6e, 0x65, 0x6e, 0x64, 1025 + 0x77, 0x71, 0x72, 0x73, 0x37, 0x35, 0x34, 0x67, 0x74, 0x36, 0x71, 0x76, 0x67, 0x72, 0x35, 0x36, 1026 + 0x6a, 0x6d, 0x6e, 0x2f, 0x61, 0x70, 0x70, 0x2e, 0x62, 0x73, 0x6b, 0x79, 0x2e, 0x66, 0x65, 0x65, 1027 + 0x64, 0x2e, 0x70, 0x6f, 0x73, 0x74, 0x2f, 0x33, 0x6d, 0x65, 0x64, 0x67, 0x32, 0x71, 0x76, 0x63, 1028 + 0x75, 0x63, 0x32, 0x63, 0x69, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x78, 0x18, 1029 + 0x32, 0x30, 0x32, 0x36, 0x2d, 0x30, 0x32, 0x2d, 0x30, 0x38, 0x54, 0x30, 0x37, 0x3a, 0x34, 0x39, 1030 + 0x3a, 0x32, 0x30, 0x2e, 0x37, 0x37, 0x32, 0x5a, 1031 + }; 1032 + 1033 + // expected CID digest from the firehose frame 1034 + const expected_digest = [_]u8{ 1035 + 0x80, 0x01, 0x66, 0x46, 0x81, 0x57, 0x18, 0xaf, 0xc9, 0x34, 0xcf, 0xbf, 1036 + 0x3b, 0x3e, 0x57, 0x04, 0x24, 0x17, 0x90, 0x29, 0x2f, 0x7b, 0xc4, 0xe0, 1037 + 0xf4, 0xcf, 0xe6, 0xe6, 0xb5, 0xad, 0x11, 0x28, 1038 + }; 1039 + 1040 + // decode → re-encode → verify byte-identical 1041 + const decoded = try decodeAll(alloc, original_cbor); 1042 + const re_encoded = try encodeAlloc(alloc, decoded); 1043 + try std.testing.expectEqualSlices(u8, original_cbor, re_encoded); 1044 + 1045 + // verify CID matches the production CID 1046 + const cid = try Cid.forDagCbor(alloc, re_encoded); 1047 + try std.testing.expectEqualSlices(u8, &expected_digest, cid.digest); 1048 + }
+669
src/internal/firehose.zig
··· 1 + //! firehose codec - com.atproto.sync.subscribeRepos 2 + //! 3 + //! encode and decode AT Protocol firehose events over WebSocket. messages are 4 + //! DAG-CBOR encoded (unlike jetstream, which is JSON). includes frame encoding/ 5 + //! decoding, CAR block packing, and CID creation for records. 6 + //! 7 + //! wire format per frame: 8 + //! [DAG-CBOR header: {op, t}] [DAG-CBOR payload: {seq, repo, ops, blocks, ...}] 9 + //! 10 + //! see: https://atproto.com/specs/event-stream 11 + 12 + const std = @import("std"); 13 + const websocket = @import("websocket"); 14 + const cbor = @import("cbor.zig"); 15 + const car = @import("car.zig"); 16 + const sync = @import("sync.zig"); 17 + 18 + const mem = std.mem; 19 + const Allocator = mem.Allocator; 20 + const posix = std.posix; 21 + const log = std.log.scoped(.zat); 22 + 23 + pub const CommitAction = sync.CommitAction; 24 + pub const AccountStatus = sync.AccountStatus; 25 + 26 + pub const Options = struct { 27 + host: []const u8 = "bsky.network", 28 + cursor: ?i64 = null, 29 + max_message_size: usize = 5 * 1024 * 1024, // 5MB — firehose frames can be large 30 + }; 31 + 32 + /// decoded firehose event 33 + pub const Event = union(enum) { 34 + commit: CommitEvent, 35 + identity: IdentityEvent, 36 + account: AccountEvent, 37 + info: InfoEvent, 38 + 39 + pub fn seq(self: Event) ?i64 { 40 + return switch (self) { 41 + .commit => |c| c.seq, 42 + .identity => |i| i.seq, 43 + .account => |a| a.seq, 44 + .info => null, 45 + }; 46 + } 47 + }; 48 + 49 + pub const CommitEvent = struct { 50 + seq: i64, 51 + repo: []const u8, // DID 52 + rev: ?[]const u8 = null, 53 + time: ?[]const u8 = null, 54 + ops: []const RepoOp, 55 + too_big: bool = false, 56 + }; 57 + 58 + pub const RepoOp = struct { 59 + action: CommitAction, 60 + collection: []const u8, 61 + rkey: []const u8, 62 + record: ?cbor.Value = null, // decoded DAG-CBOR record from CAR block 63 + }; 64 + 65 + pub const IdentityEvent = struct { 66 + seq: i64, 67 + did: []const u8, 68 + time: ?[]const u8 = null, 69 + handle: ?[]const u8 = null, 70 + }; 71 + 72 + pub const AccountEvent = struct { 73 + seq: i64, 74 + did: []const u8, 75 + time: ?[]const u8 = null, 76 + active: bool = true, 77 + status: ?AccountStatus = null, 78 + }; 79 + 80 + pub const InfoEvent = struct { 81 + name: ?[]const u8 = null, 82 + message: ?[]const u8 = null, 83 + }; 84 + 85 + /// frame header from the wire 86 + const FrameHeader = struct { 87 + op: i64, 88 + t: ?[]const u8 = null, 89 + }; 90 + 91 + pub const FrameOp = enum(i64) { 92 + message = 1, 93 + err = -1, 94 + }; 95 + 96 + pub const DecodeError = error{ 97 + InvalidFrame, 98 + InvalidHeader, 99 + UnexpectedEof, 100 + MissingField, 101 + UnknownOp, 102 + UnknownEventType, 103 + } || cbor.DecodeError || car.CarError; 104 + 105 + /// decode a raw WebSocket binary frame into a firehose Event 106 + pub fn decodeFrame(allocator: Allocator, data: []const u8) DecodeError!Event { 107 + // frame = [CBOR header] [CBOR payload] concatenated 108 + const header_result = try cbor.decode(allocator, data); 109 + const header_val = header_result.value; 110 + const payload_data = data[header_result.consumed..]; 111 + 112 + // parse header 113 + const op = header_val.getInt("op") orelse return error.InvalidHeader; 114 + if (op == -1) return error.UnknownOp; // error frame 115 + 116 + const t = header_val.getString("t") orelse return error.InvalidHeader; 117 + 118 + // decode payload 119 + const payload = try cbor.decodeAll(allocator, payload_data); 120 + 121 + if (mem.eql(u8, t, "#commit")) { 122 + return try decodeCommit(allocator, payload); 123 + } else if (mem.eql(u8, t, "#identity")) { 124 + return decodeIdentity(payload); 125 + } else if (mem.eql(u8, t, "#account")) { 126 + return decodeAccount(payload); 127 + } else if (mem.eql(u8, t, "#info")) { 128 + return .{ .info = .{ 129 + .name = payload.getString("name"), 130 + .message = payload.getString("message"), 131 + } }; 132 + } 133 + 134 + return error.UnknownEventType; 135 + } 136 + 137 + fn decodeCommit(allocator: Allocator, payload: cbor.Value) DecodeError!Event { 138 + const seq_val = payload.getInt("seq") orelse return error.MissingField; 139 + const repo = payload.getString("repo") orelse return error.MissingField; 140 + 141 + // parse CAR blocks 142 + const blocks_bytes = payload.getBytes("blocks"); 143 + var parsed_car: ?car.Car = null; 144 + if (blocks_bytes) |b| { 145 + parsed_car = car.read(allocator, b) catch null; 146 + } 147 + 148 + // parse ops 149 + const ops_array = payload.getArray("ops"); 150 + var ops: std.ArrayList(RepoOp) = .{}; 151 + 152 + if (ops_array) |op_values| { 153 + for (op_values) |op_val| { 154 + const action_str = op_val.getString("action") orelse continue; 155 + const action = CommitAction.parse(action_str) orelse continue; 156 + const path = op_val.getString("path") orelse continue; 157 + 158 + // split path into collection/rkey 159 + const slash = mem.indexOfScalar(u8, path, '/') orelse continue; 160 + const collection = path[0..slash]; 161 + const rkey = path[slash + 1 ..]; 162 + 163 + // look up record from CAR blocks via CID 164 + var record: ?cbor.Value = null; 165 + if (parsed_car) |c| { 166 + if (op_val.get("cid")) |cid_val| { 167 + switch (cid_val) { 168 + .cid => |cid| { 169 + if (car.findBlock(c, cid.raw)) |block_data| { 170 + record = cbor.decodeAll(allocator, block_data) catch null; 171 + } 172 + }, 173 + else => {}, 174 + } 175 + } 176 + } 177 + 178 + try ops.append(allocator, .{ 179 + .action = action, 180 + .collection = collection, 181 + .rkey = rkey, 182 + .record = record, 183 + }); 184 + } 185 + } 186 + 187 + return .{ .commit = .{ 188 + .seq = seq_val, 189 + .repo = repo, 190 + .rev = payload.getString("rev"), 191 + .time = payload.getString("time"), 192 + .ops = try ops.toOwnedSlice(allocator), 193 + .too_big = payload.getBool("tooBig") orelse false, 194 + } }; 195 + } 196 + 197 + fn decodeIdentity(payload: cbor.Value) DecodeError!Event { 198 + return .{ .identity = .{ 199 + .seq = payload.getInt("seq") orelse return error.MissingField, 200 + .did = payload.getString("did") orelse return error.MissingField, 201 + .time = payload.getString("time"), 202 + .handle = payload.getString("handle"), 203 + } }; 204 + } 205 + 206 + fn decodeAccount(payload: cbor.Value) DecodeError!Event { 207 + const status_str = payload.getString("status"); 208 + return .{ .account = .{ 209 + .seq = payload.getInt("seq") orelse return error.MissingField, 210 + .did = payload.getString("did") orelse return error.MissingField, 211 + .time = payload.getString("time"), 212 + .active = payload.getBool("active") orelse true, 213 + .status = if (status_str) |s| AccountStatus.parse(s) else null, 214 + } }; 215 + } 216 + 217 + // === encoder === 218 + 219 + /// encode a firehose Event into a wire frame: [DAG-CBOR header] [DAG-CBOR payload] 220 + pub fn encodeFrame(allocator: Allocator, event: Event) ![]u8 { 221 + var list: std.ArrayList(u8) = .{}; 222 + errdefer list.deinit(allocator); 223 + const writer = list.writer(allocator); 224 + 225 + const tag = switch (event) { 226 + .commit => "#commit", 227 + .identity => "#identity", 228 + .account => "#account", 229 + .info => "#info", 230 + }; 231 + 232 + // encode header: {op: 1, t: "#..."} 233 + const header: cbor.Value = .{ .map = &.{ 234 + .{ .key = "op", .value = .{ .unsigned = 1 } }, 235 + .{ .key = "t", .value = .{ .text = tag } }, 236 + } }; 237 + try cbor.encode(allocator, writer, header); 238 + 239 + // encode payload based on event type 240 + switch (event) { 241 + .commit => |c| try encodeCommitPayload(allocator, writer, c), 242 + .identity => |i| try encodeIdentityPayload(allocator, writer, i), 243 + .account => |a| try encodeAccountPayload(allocator, writer, a), 244 + .info => |inf| try encodeInfoPayload(allocator, writer, inf), 245 + } 246 + 247 + return try list.toOwnedSlice(allocator); 248 + } 249 + 250 + fn encodeCommitPayload(allocator: Allocator, writer: anytype, commit: CommitEvent) !void { 251 + // build ops array and CAR blocks simultaneously 252 + var op_values: std.ArrayList(cbor.Value) = .{}; 253 + defer op_values.deinit(allocator); 254 + var car_blocks: std.ArrayList(car.Block) = .{}; 255 + defer car_blocks.deinit(allocator); 256 + var root_cids: std.ArrayList(cbor.Cid) = .{}; 257 + defer root_cids.deinit(allocator); 258 + 259 + for (commit.ops) |op| { 260 + const action_str: []const u8 = @tagName(op.action); 261 + const path = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ op.collection, op.rkey }); 262 + 263 + if (op.record) |record| { 264 + // encode record, create CID, add to CAR blocks 265 + const record_bytes = try cbor.encodeAlloc(allocator, record); 266 + const cid = try cbor.Cid.forDagCbor(allocator, record_bytes); 267 + 268 + try car_blocks.append(allocator, .{ 269 + .cid_raw = cid.raw, 270 + .data = record_bytes, 271 + }); 272 + 273 + if (root_cids.items.len == 0) { 274 + try root_cids.append(allocator, cid); 275 + } 276 + 277 + try op_values.append(allocator, .{ .map = @constCast(&[_]cbor.Value.MapEntry{ 278 + .{ .key = "action", .value = .{ .text = action_str } }, 279 + .{ .key = "cid", .value = .{ .cid = cid } }, 280 + .{ .key = "path", .value = .{ .text = path } }, 281 + }) }); 282 + } else { 283 + try op_values.append(allocator, .{ .map = @constCast(&[_]cbor.Value.MapEntry{ 284 + .{ .key = "action", .value = .{ .text = action_str } }, 285 + .{ .key = "path", .value = .{ .text = path } }, 286 + }) }); 287 + } 288 + } 289 + 290 + // build CAR file from blocks 291 + const car_data = car.Car{ 292 + .roots = root_cids.items, 293 + .blocks = car_blocks.items, 294 + }; 295 + const blocks_bytes = try car.writeAlloc(allocator, car_data); 296 + 297 + // build payload entries 298 + var entries: std.ArrayList(cbor.Value.MapEntry) = .{}; 299 + defer entries.deinit(allocator); 300 + 301 + if (blocks_bytes.len > 0) { 302 + try entries.append(allocator, .{ .key = "blocks", .value = .{ .bytes = blocks_bytes } }); 303 + } 304 + try entries.append(allocator, .{ .key = "ops", .value = .{ .array = op_values.items } }); 305 + try entries.append(allocator, .{ .key = "repo", .value = .{ .text = commit.repo } }); 306 + if (commit.rev) |rev| { 307 + try entries.append(allocator, .{ .key = "rev", .value = .{ .text = rev } }); 308 + } 309 + try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(commit.seq) } }); 310 + if (commit.time) |t| { 311 + try entries.append(allocator, .{ .key = "time", .value = .{ .text = t } }); 312 + } 313 + if (commit.too_big) { 314 + try entries.append(allocator, .{ .key = "tooBig", .value = .{ .boolean = true } }); 315 + } 316 + 317 + try cbor.encode(allocator, writer, .{ .map = entries.items }); 318 + } 319 + 320 + fn encodeIdentityPayload(allocator: Allocator, writer: anytype, identity: IdentityEvent) !void { 321 + var entries: std.ArrayList(cbor.Value.MapEntry) = .{}; 322 + defer entries.deinit(allocator); 323 + 324 + try entries.append(allocator, .{ .key = "did", .value = .{ .text = identity.did } }); 325 + if (identity.handle) |h| { 326 + try entries.append(allocator, .{ .key = "handle", .value = .{ .text = h } }); 327 + } 328 + try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(identity.seq) } }); 329 + if (identity.time) |t| { 330 + try entries.append(allocator, .{ .key = "time", .value = .{ .text = t } }); 331 + } 332 + 333 + try cbor.encode(allocator, writer, .{ .map = entries.items }); 334 + } 335 + 336 + fn encodeAccountPayload(allocator: Allocator, writer: anytype, account: AccountEvent) !void { 337 + var entries: std.ArrayList(cbor.Value.MapEntry) = .{}; 338 + defer entries.deinit(allocator); 339 + 340 + if (!account.active) { 341 + try entries.append(allocator, .{ .key = "active", .value = .{ .boolean = false } }); 342 + } 343 + try entries.append(allocator, .{ .key = "did", .value = .{ .text = account.did } }); 344 + try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(account.seq) } }); 345 + if (account.status) |s| { 346 + try entries.append(allocator, .{ .key = "status", .value = .{ .text = @tagName(s) } }); 347 + } 348 + if (account.time) |t| { 349 + try entries.append(allocator, .{ .key = "time", .value = .{ .text = t } }); 350 + } 351 + 352 + try cbor.encode(allocator, writer, .{ .map = entries.items }); 353 + } 354 + 355 + fn encodeInfoPayload(allocator: Allocator, writer: anytype, info: InfoEvent) !void { 356 + var entries: std.ArrayList(cbor.Value.MapEntry) = .{}; 357 + defer entries.deinit(allocator); 358 + 359 + if (info.message) |m| { 360 + try entries.append(allocator, .{ .key = "message", .value = .{ .text = m } }); 361 + } 362 + if (info.name) |n| { 363 + try entries.append(allocator, .{ .key = "name", .value = .{ .text = n } }); 364 + } 365 + 366 + try cbor.encode(allocator, writer, .{ .map = entries.items }); 367 + } 368 + 369 + pub const FirehoseClient = struct { 370 + allocator: Allocator, 371 + options: Options, 372 + last_seq: ?i64 = null, 373 + 374 + pub fn init(allocator: Allocator, options: Options) FirehoseClient { 375 + return .{ 376 + .allocator = allocator, 377 + .options = options, 378 + .last_seq = if (options.cursor) |c| c else null, 379 + }; 380 + } 381 + 382 + pub fn deinit(_: *FirehoseClient) void {} 383 + 384 + /// subscribe with a user-provided handler. 385 + /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void 386 + /// optional: fn onError(*@TypeOf(handler), anyerror) void 387 + /// blocks forever — reconnects with exponential backoff on disconnect. 388 + pub fn subscribe(self: *FirehoseClient, handler: anytype) void { 389 + var backoff: u64 = 1; 390 + const max_backoff: u64 = 60; 391 + 392 + while (true) { 393 + self.connectAndRead(handler) catch |err| { 394 + if (comptime @hasDecl(@TypeOf(handler.*), "onError")) { 395 + handler.onError(err); 396 + } else { 397 + log.err("firehose error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff }); 398 + } 399 + }; 400 + posix.nanosleep(backoff, 0); 401 + backoff = @min(backoff * 2, max_backoff); 402 + } 403 + } 404 + 405 + fn connectAndRead(self: *FirehoseClient, handler: anytype) !void { 406 + var path_buf: [256]u8 = undefined; 407 + var stream = std.io.fixedBufferStream(&path_buf); 408 + const writer = stream.writer(); 409 + 410 + try writer.writeAll("/xrpc/com.atproto.sync.subscribeRepos"); 411 + if (self.last_seq) |cursor| { 412 + try writer.print("?cursor={d}", .{cursor}); 413 + } 414 + const path = stream.getWritten(); 415 + 416 + log.info("connecting to wss://{s}{s}", .{ self.options.host, path }); 417 + 418 + var client = try websocket.Client.init(self.allocator, .{ 419 + .host = self.options.host, 420 + .port = 443, 421 + .tls = true, 422 + .max_size = self.options.max_message_size, 423 + }); 424 + defer client.deinit(); 425 + 426 + var host_header_buf: [256]u8 = undefined; 427 + const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{self.options.host}) catch self.options.host; 428 + 429 + try client.handshake(path, .{ .headers = host_header }); 430 + 431 + log.info("firehose connected", .{}); 432 + 433 + var ws_handler = WsHandler(@TypeOf(handler.*)){ 434 + .allocator = self.allocator, 435 + .handler = handler, 436 + .client_state = self, 437 + }; 438 + try client.readLoop(&ws_handler); 439 + } 440 + }; 441 + 442 + fn WsHandler(comptime H: type) type { 443 + return struct { 444 + allocator: Allocator, 445 + handler: *H, 446 + client_state: *FirehoseClient, 447 + 448 + const Self = @This(); 449 + 450 + pub fn serverMessage(self: *Self, data: []const u8) !void { 451 + var arena = std.heap.ArenaAllocator.init(self.allocator); 452 + defer arena.deinit(); 453 + 454 + const event = decodeFrame(arena.allocator(), data) catch |err| { 455 + log.debug("frame decode error: {s}", .{@errorName(err)}); 456 + return; 457 + }; 458 + 459 + if (event.seq()) |s| { 460 + self.client_state.last_seq = s; 461 + } 462 + 463 + self.handler.onEvent(event); 464 + } 465 + 466 + pub fn close(_: *Self) void { 467 + log.info("firehose connection closed", .{}); 468 + } 469 + }; 470 + } 471 + 472 + // === tests === 473 + 474 + test "decode frame header" { 475 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 476 + defer arena.deinit(); 477 + const alloc = arena.allocator(); 478 + 479 + // simulate a frame: header {op: 1, t: "#info"} + payload {name: "OutdatedCursor"} 480 + const header_bytes = [_]u8{ 481 + 0xa2, // map(2) 482 + 0x62, 'o', 'p', 0x01, // "op": 1 483 + 0x61, 't', 0x65, '#', 'i', 'n', 'f', 'o', // "t": "#info" 484 + }; 485 + const payload_bytes = [_]u8{ 486 + 0xa1, // map(1) 487 + 0x64, 'n', 'a', 'm', 'e', // "name" 488 + 0x6e, 'O', 'u', 't', 'd', 'a', 't', 'e', 'd', 'C', 'u', 'r', 's', 'o', 'r', // "OutdatedCursor" 489 + }; 490 + 491 + var frame: [header_bytes.len + payload_bytes.len]u8 = undefined; 492 + @memcpy(frame[0..header_bytes.len], &header_bytes); 493 + @memcpy(frame[header_bytes.len..], &payload_bytes); 494 + 495 + const event = try decodeFrame(alloc, &frame); 496 + const info = event.info; 497 + try std.testing.expectEqualStrings("OutdatedCursor", info.name.?); 498 + } 499 + 500 + test "decode identity frame" { 501 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 502 + defer arena.deinit(); 503 + const alloc = arena.allocator(); 504 + 505 + // header: {op: 1, t: "#identity"} 506 + const header_bytes = [_]u8{ 507 + 0xa2, // map(2) 508 + 0x62, 'o', 'p', 0x01, // "op": 1 509 + 0x61, 't', 0x69, '#', 'i', 'd', 'e', 'n', 't', 'i', 't', 'y', // "t": "#identity" 510 + }; 511 + // payload: {seq: 42, did: "did:plc:test"} 512 + const payload_bytes = [_]u8{ 513 + 0xa2, // map(2) 514 + 0x63, 's', 'e', 'q', 0x18, 42, // "seq": 42 515 + 0x63, 'd', 'i', 'd', 0x6c, 'd', 'i', 'd', ':', 'p', 'l', 'c', ':', 't', 'e', 's', 't', // "did": "did:plc:test" 516 + }; 517 + 518 + var frame: [header_bytes.len + payload_bytes.len]u8 = undefined; 519 + @memcpy(frame[0..header_bytes.len], &header_bytes); 520 + @memcpy(frame[header_bytes.len..], &payload_bytes); 521 + 522 + const event = try decodeFrame(alloc, &frame); 523 + const identity = event.identity; 524 + try std.testing.expectEqual(@as(i64, 42), identity.seq); 525 + try std.testing.expectEqualStrings("did:plc:test", identity.did); 526 + } 527 + 528 + test "Event.seq works" { 529 + const info_event = Event{ .info = .{ .name = "test" } }; 530 + try std.testing.expect(info_event.seq() == null); 531 + 532 + const identity_event = Event{ .identity = .{ 533 + .seq = 42, 534 + .did = "did:plc:test", 535 + } }; 536 + try std.testing.expectEqual(@as(i64, 42), identity_event.seq().?); 537 + } 538 + 539 + // === encoder tests === 540 + 541 + test "encode → decode info frame" { 542 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 543 + defer arena.deinit(); 544 + const alloc = arena.allocator(); 545 + 546 + const original = Event{ .info = .{ 547 + .name = "OutdatedCursor", 548 + .message = "cursor is behind", 549 + } }; 550 + 551 + const frame = try encodeFrame(alloc, original); 552 + const decoded = try decodeFrame(alloc, frame); 553 + 554 + try std.testing.expectEqualStrings("OutdatedCursor", decoded.info.name.?); 555 + try std.testing.expectEqualStrings("cursor is behind", decoded.info.message.?); 556 + } 557 + 558 + test "encode → decode identity frame" { 559 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 560 + defer arena.deinit(); 561 + const alloc = arena.allocator(); 562 + 563 + const original = Event{ .identity = .{ 564 + .seq = 42, 565 + .did = "did:plc:test123", 566 + .handle = "alice.bsky.social", 567 + .time = "2024-01-15T10:30:00Z", 568 + } }; 569 + 570 + const frame = try encodeFrame(alloc, original); 571 + const decoded = try decodeFrame(alloc, frame); 572 + 573 + const id = decoded.identity; 574 + try std.testing.expectEqual(@as(i64, 42), id.seq); 575 + try std.testing.expectEqualStrings("did:plc:test123", id.did); 576 + try std.testing.expectEqualStrings("alice.bsky.social", id.handle.?); 577 + try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", id.time.?); 578 + } 579 + 580 + test "encode → decode account frame" { 581 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 582 + defer arena.deinit(); 583 + const alloc = arena.allocator(); 584 + 585 + const original = Event{ .account = .{ 586 + .seq = 100, 587 + .did = "did:plc:suspended", 588 + .active = false, 589 + .status = .suspended, 590 + .time = "2024-01-15T10:30:00Z", 591 + } }; 592 + 593 + const frame = try encodeFrame(alloc, original); 594 + const decoded = try decodeFrame(alloc, frame); 595 + 596 + const acct = decoded.account; 597 + try std.testing.expectEqual(@as(i64, 100), acct.seq); 598 + try std.testing.expectEqualStrings("did:plc:suspended", acct.did); 599 + try std.testing.expectEqual(false, acct.active); 600 + try std.testing.expectEqual(AccountStatus.suspended, acct.status.?); 601 + } 602 + 603 + test "encode → decode commit frame with record" { 604 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 605 + defer arena.deinit(); 606 + const alloc = arena.allocator(); 607 + 608 + const record: cbor.Value = .{ .map = &.{ 609 + .{ .key = "$type", .value = .{ .text = "app.bsky.feed.post" } }, 610 + .{ .key = "text", .value = .{ .text = "hello firehose" } }, 611 + } }; 612 + 613 + const original = Event{ .commit = .{ 614 + .seq = 999, 615 + .repo = "did:plc:poster", 616 + .rev = "abc123", 617 + .time = "2024-01-15T10:30:00Z", 618 + .ops = &.{.{ 619 + .action = .create, 620 + .collection = "app.bsky.feed.post", 621 + .rkey = "3k2abc", 622 + .record = record, 623 + }}, 624 + } }; 625 + 626 + const frame = try encodeFrame(alloc, original); 627 + const decoded = try decodeFrame(alloc, frame); 628 + 629 + const commit = decoded.commit; 630 + try std.testing.expectEqual(@as(i64, 999), commit.seq); 631 + try std.testing.expectEqualStrings("did:plc:poster", commit.repo); 632 + try std.testing.expectEqualStrings("abc123", commit.rev.?); 633 + try std.testing.expectEqual(@as(usize, 1), commit.ops.len); 634 + 635 + const op = commit.ops[0]; 636 + try std.testing.expectEqual(CommitAction.create, op.action); 637 + try std.testing.expectEqualStrings("app.bsky.feed.post", op.collection); 638 + try std.testing.expectEqualStrings("3k2abc", op.rkey); 639 + 640 + // record should be decoded from the CAR blocks 641 + const rec = op.record.?; 642 + try std.testing.expectEqualStrings("hello firehose", rec.getString("text").?); 643 + try std.testing.expectEqualStrings("app.bsky.feed.post", rec.getString("$type").?); 644 + } 645 + 646 + test "encode → decode commit with delete (no record)" { 647 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 648 + defer arena.deinit(); 649 + const alloc = arena.allocator(); 650 + 651 + const original = Event{ .commit = .{ 652 + .seq = 500, 653 + .repo = "did:plc:deleter", 654 + .ops = &.{.{ 655 + .action = .delete, 656 + .collection = "app.bsky.feed.post", 657 + .rkey = "abc123", 658 + .record = null, 659 + }}, 660 + } }; 661 + 662 + const frame = try encodeFrame(alloc, original); 663 + const decoded = try decodeFrame(alloc, frame); 664 + 665 + try std.testing.expectEqual(@as(i64, 500), decoded.commit.seq); 666 + try std.testing.expectEqual(@as(usize, 1), decoded.commit.ops.len); 667 + try std.testing.expectEqual(CommitAction.delete, decoded.commit.ops[0].action); 668 + try std.testing.expect(decoded.commit.ops[0].record == null); 669 + }
+7
src/root.zig
··· 37 37 pub const jetstream = @import("internal/jetstream.zig"); 38 38 pub const JetstreamClient = jetstream.JetstreamClient; 39 39 pub const JetstreamEvent = jetstream.Event; 40 + 41 + // firehose (raw CBOR event stream) 42 + pub const cbor = @import("internal/cbor.zig"); 43 + pub const car = @import("internal/car.zig"); 44 + pub const firehose = @import("internal/firehose.zig"); 45 + pub const FirehoseClient = firehose.FirehoseClient; 46 + pub const FirehoseEvent = firehose.Event;