Auto-indexing service and GraphQL API for AT Protocol Records

refactor: extract car/ into standalone atproto_car package

Extract server/src/car/ into a standalone Gleam package at atproto_car/
for eventual publication to Hex.

Changes:
- Create atproto_car package with public API for CAR file parsing
- Move varint, cid, cbor, blockstore modules to internal/
- Add new mst module for MST tree traversal
- Update server to use atproto_car as path dependency
- Remove old server/src/car/ directory

The package provides:
- extract_records_with_paths() for MST-aware record extraction
- record_to_json() for JSON serialization
- cid_to_string() and cid_to_bytes() utilities

+1680 -670
+24
atproto_car/README.md
··· 1 + # atproto_car 2 + 3 + [![Package Version](https://img.shields.io/hexpm/v/atproto_car)](https://hex.pm/packages/atproto_car) 4 + [![Hex Docs](https://img.shields.io/badge/hex-docs-ffaff3)](https://hexdocs.pm/atproto_car/) 5 + 6 + ```sh 7 + gleam add atproto_car@1 8 + ``` 9 + ```gleam 10 + import atproto_car 11 + 12 + pub fn main() -> Nil { 13 + // TODO: An example of the project in use 14 + } 15 + ``` 16 + 17 + Further documentation can be found at <https://hexdocs.pm/atproto_car>. 18 + 19 + ## Development 20 + 21 + ```sh 22 + gleam run # Run the project 23 + gleam test # Run the tests 24 + ```
+11
atproto_car/gleam.toml
··· 1 + name = "atproto_car" 2 + version = "0.1.0" 3 + description = "CAR (Content Addressable aRchive) file parsing for AT Protocol" 4 + 5 + [dependencies] 6 + gleam_stdlib = ">= 0.60.0 and < 1.0.0" 7 + erl_cbor = ">= 2.0.0 and < 3.0.0" 8 + base32 = ">= 1.0.0 and < 2.0.0" 9 + 10 + [dev-dependencies] 11 + gleeunit = ">= 1.0.0 and < 2.0.0"
+15
atproto_car/manifest.toml
··· 1 + # This file was generated by Gleam 2 + # You typically do not need to edit this file 3 + 4 + packages = [ 5 + { name = "base32", version = "1.0.0", build_tools = ["rebar3"], requirements = [], otp_app = "base32", source = "hex", outer_checksum = "0449285348ED0C4CD83C7198E76C5FD5A0451C4EF18695B9FD43792A503E551A" }, 6 + { name = "erl_cbor", version = "2.0.1", build_tools = ["rebar3"], requirements = [], otp_app = "erl_cbor", source = "hex", outer_checksum = "42B3D75E862608FD1884F37F7E136B8EA739B26C139C3135F4E5B57532A02ACA" }, 7 + { name = "gleam_stdlib", version = "0.67.1", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "6CE3E4189A8B8EC2F73AB61A2FBDE49F159D6C9C61C49E3B3082E439F260D3D0" }, 8 + { name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" }, 9 + ] 10 + 11 + [requirements] 12 + base32 = { version = ">= 1.0.0 and < 2.0.0" } 13 + erl_cbor = { version = ">= 2.0.0 and < 3.0.0" } 14 + gleam_stdlib = { version = ">= 0.60.0 and < 1.0.0" } 15 + gleeunit = { version = ">= 1.0.0 and < 2.0.0" }
+355
atproto_car/src/atproto_car.gleam
··· 1 + //// CAR (Content Addressable aRchive) file parsing for AT Protocol 2 + //// 3 + //// This library parses CAR files containing AT Protocol repository data, 4 + //// including MST (Merkle Search Tree) traversal for record extraction. 5 + //// 6 + //// ## Example 7 + //// 8 + //// ```gleam 9 + //// import atproto_car 10 + //// 11 + //// let records = atproto_car.extract_records_with_paths( 12 + //// car_bytes, 13 + //// ["app.bsky.feed.post", "app.bsky.actor.profile"], 14 + //// ) 15 + //// ``` 16 + 17 + import atproto_car/internal/blockstore 18 + import atproto_car/internal/cbor 19 + import atproto_car/internal/cid 20 + import atproto_car/internal/mst 21 + import atproto_car/internal/varint 22 + import gleam/dynamic.{type Dynamic} 23 + import gleam/dynamic/decode 24 + import gleam/list 25 + import gleam/result 26 + import gleam/string 27 + 28 + // Re-export Cid type for public API 29 + pub type Cid = 30 + cid.Cid 31 + 32 + /// A parsed CAR block containing CID and raw data 33 + pub type CarBlock { 34 + CarBlock(cid: Cid, data: BitArray) 35 + } 36 + 37 + /// A record extracted from a CAR file (no path) 38 + pub type Record { 39 + Record(type_: String, data: Dynamic, cid: Cid) 40 + } 41 + 42 + /// A record with its full path from the MST 43 + pub type RecordWithPath { 44 + RecordWithPath( 45 + path: String, 46 + collection: String, 47 + rkey: String, 48 + type_: String, 49 + data: Dynamic, 50 + cid: Cid, 51 + ) 52 + } 53 + 54 + /// CAR parsing errors 55 + pub type CarError { 56 + InvalidHeader 57 + InvalidBlock 58 + InvalidVarint 59 + CborDecodeError 60 + CidParseError(cid.CidError) 61 + } 62 + 63 + /// Parse CAR header, return roots and remaining bytes 64 + pub fn parse_header(bytes: BitArray) -> Result(#(List(Cid), BitArray), CarError) { 65 + use #(header_len, rest) <- result.try( 66 + varint.decode(bytes) |> result.replace_error(InvalidVarint), 67 + ) 68 + 69 + case rest { 70 + <<header_bytes:bytes-size(header_len), remaining:bits>> -> { 71 + case cbor.decode(header_bytes) { 72 + Ok(_header) -> Ok(#([], remaining)) 73 + Error(_) -> Error(CborDecodeError) 74 + } 75 + } 76 + _ -> Error(InvalidHeader) 77 + } 78 + } 79 + 80 + /// Parse a single block from bytes 81 + fn parse_block(bytes: BitArray) -> Result(#(CarBlock, BitArray), CarError) { 82 + use #(block_len, rest) <- result.try( 83 + varint.decode(bytes) |> result.replace_error(InvalidVarint), 84 + ) 85 + 86 + case rest { 87 + <<block_bytes:bytes-size(block_len), remaining:bits>> -> { 88 + case cid.parse(block_bytes) { 89 + Ok(#(parsed_cid, data)) -> { 90 + Ok(#(CarBlock(cid: parsed_cid, data: data), remaining)) 91 + } 92 + Error(e) -> Error(CidParseError(e)) 93 + } 94 + } 95 + _ -> Error(InvalidBlock) 96 + } 97 + } 98 + 99 + /// Fold over all blocks in a CAR file 100 + pub fn fold_blocks( 101 + bytes: BitArray, 102 + acc: a, 103 + f: fn(a, CarBlock) -> a, 104 + ) -> Result(a, CarError) { 105 + case bytes { 106 + <<>> -> Ok(acc) 107 + _ -> { 108 + use #(block, rest) <- result.try(parse_block(bytes)) 109 + fold_blocks(rest, f(acc, block), f) 110 + } 111 + } 112 + } 113 + 114 + /// Try to decode a block as a record 115 + fn decode_as_record(block: CarBlock) -> Result(Record, Nil) { 116 + case cbor.decode(block.data) { 117 + Ok(decoded) -> { 118 + let type_decoder = { 119 + use type_ <- decode.field("$type", decode.string) 120 + decode.success(type_) 121 + } 122 + case decode.run(decoded, type_decoder) { 123 + Ok(type_) -> Ok(Record(type_: type_, data: decoded, cid: block.cid)) 124 + Error(_) -> Error(Nil) 125 + } 126 + } 127 + Error(_) -> Error(Nil) 128 + } 129 + } 130 + 131 + /// Extract records matching given collections from CAR bytes 132 + pub fn extract_records( 133 + car_bytes: BitArray, 134 + collections: List(String), 135 + ) -> Result(List(Record), CarError) { 136 + use #(_roots, blocks_bytes) <- result.try(parse_header(car_bytes)) 137 + 138 + fold_blocks(blocks_bytes, [], fn(acc, block) { 139 + case decode_as_record(block) { 140 + Ok(record) -> { 141 + case list.contains(collections, record.type_) { 142 + True -> [record, ..acc] 143 + False -> acc 144 + } 145 + } 146 + Error(_) -> acc 147 + } 148 + }) 149 + } 150 + 151 + /// Extract all records from CAR bytes (no collection filtering) 152 + pub fn extract_all_records( 153 + car_bytes: BitArray, 154 + ) -> Result(List(Record), CarError) { 155 + use #(_roots, blocks_bytes) <- result.try(parse_header(car_bytes)) 156 + 157 + fold_blocks(blocks_bytes, [], fn(acc, block) { 158 + case decode_as_record(block) { 159 + Ok(record) -> [record, ..acc] 160 + Error(_) -> acc 161 + } 162 + }) 163 + } 164 + 165 + /// Extract records with their MST paths from CAR bytes 166 + pub fn extract_records_with_paths( 167 + car_bytes: BitArray, 168 + collections: List(String), 169 + ) -> Result(List(RecordWithPath), CarError) { 170 + use #(roots, header_size, blocks_bytes) <- result.try(parse_header_with_roots( 171 + car_bytes, 172 + )) 173 + 174 + use store <- result.try( 175 + blockstore.from_blocks_bytes(car_bytes, header_size, blocks_bytes) 176 + |> result.map_error(fn(_) { InvalidBlock }), 177 + ) 178 + 179 + case roots { 180 + [] -> Ok([]) 181 + [commit_cid, ..] -> { 182 + case blockstore.get(store, commit_cid) { 183 + Error(_) -> Ok([]) 184 + Ok(commit_data) -> { 185 + case get_data_cid(commit_data) { 186 + Error(_) -> Ok([]) 187 + Ok(mst_root_cid) -> { 188 + let mst_entries = mst.walk(store, mst_root_cid) 189 + 190 + let records = 191 + list.filter_map(mst_entries, fn(entry) { 192 + case string.split_once(entry.path, "/") { 193 + Error(_) -> Error(Nil) 194 + Ok(#(collection, rkey)) -> { 195 + case list.contains(collections, collection) { 196 + False -> Error(Nil) 197 + True -> { 198 + case blockstore.get(store, entry.cid) { 199 + Error(_) -> Error(Nil) 200 + Ok(record_data) -> { 201 + case cbor.decode(record_data) { 202 + Error(_) -> Error(Nil) 203 + Ok(decoded) -> { 204 + case get_type_field(decoded) { 205 + Error(_) -> Error(Nil) 206 + Ok(type_) -> 207 + Ok(RecordWithPath( 208 + path: entry.path, 209 + collection: collection, 210 + rkey: rkey, 211 + type_: type_, 212 + data: decoded, 213 + cid: entry.cid, 214 + )) 215 + } 216 + } 217 + } 218 + } 219 + } 220 + } 221 + } 222 + } 223 + } 224 + }) 225 + 226 + Ok(records) 227 + } 228 + } 229 + } 230 + } 231 + } 232 + } 233 + } 234 + 235 + /// Parse header and extract root CIDs 236 + fn parse_header_with_roots( 237 + bytes: BitArray, 238 + ) -> Result(#(List(Cid), Int, BitArray), CarError) { 239 + use #(header_len, rest) <- result.try( 240 + varint.decode(bytes) |> result.replace_error(InvalidVarint), 241 + ) 242 + 243 + let varint_size = varint.encoded_size(header_len) 244 + let header_size = varint_size + header_len 245 + 246 + case rest { 247 + <<header_bytes:bytes-size(header_len), remaining:bits>> -> { 248 + case cbor.decode(header_bytes) { 249 + Ok(header) -> { 250 + let roots = get_roots_from_header(header) 251 + Ok(#(roots, header_size, remaining)) 252 + } 253 + Error(_) -> Error(CborDecodeError) 254 + } 255 + } 256 + _ -> Error(InvalidHeader) 257 + } 258 + } 259 + 260 + /// Extract roots array from CAR header 261 + fn get_roots_from_header(header: Dynamic) -> List(Cid) { 262 + let decoder = { 263 + use roots <- decode.field("roots", decode.list(decode.dynamic)) 264 + decode.success(roots) 265 + } 266 + case decode.run(header, decoder) { 267 + Ok(roots) -> list.filter_map(roots, parse_cid_from_dynamic) 268 + Error(_) -> [] 269 + } 270 + } 271 + 272 + /// Parse CID from dynamic (tag 42 tuple) 273 + fn parse_cid_from_dynamic(data: Dynamic) -> Result(Cid, Nil) { 274 + case decode_cid_bytes(data) { 275 + Ok(bytes) -> { 276 + case cid.parse(bytes) { 277 + Ok(#(parsed, _)) -> Ok(parsed) 278 + Error(_) -> Error(Nil) 279 + } 280 + } 281 + Error(_) -> Error(Nil) 282 + } 283 + } 284 + 285 + @external(erlang, "cid_ffi", "decode_cid_bytes") 286 + fn decode_cid_bytes(data: Dynamic) -> Result(BitArray, Nil) 287 + 288 + /// Get "data" CID from commit block (MST root) 289 + fn get_data_cid(commit_data: BitArray) -> Result(Cid, Nil) { 290 + case cbor.decode(commit_data) { 291 + Ok(decoded) -> { 292 + let decoder = { 293 + use data <- decode.field("data", decode.dynamic) 294 + decode.success(data) 295 + } 296 + case decode.run(decoded, decoder) { 297 + Ok(data) -> parse_cid_from_dynamic(data) 298 + Error(_) -> Error(Nil) 299 + } 300 + } 301 + Error(_) -> Error(Nil) 302 + } 303 + } 304 + 305 + /// Get $type field from decoded record 306 + fn get_type_field(data: Dynamic) -> Result(String, Nil) { 307 + let decoder = { 308 + use type_ <- decode.field("$type", decode.string) 309 + decode.success(type_) 310 + } 311 + case decode.run(data, decoder) { 312 + Ok(type_) -> Ok(type_) 313 + Error(_) -> Error(Nil) 314 + } 315 + } 316 + 317 + /// Convert a record's data to JSON string 318 + pub fn record_to_json(record: RecordWithPath) -> String { 319 + dynamic_to_json(record.data) 320 + } 321 + 322 + fn dynamic_to_json(value: Dynamic) -> String { 323 + let sanitized = sanitize_for_json(value) 324 + let iolist = do_json_encode(sanitized) 325 + iolist_to_string(iolist) 326 + } 327 + 328 + @external(erlang, "cbor_ffi", "sanitize_for_json") 329 + fn sanitize_for_json(value: Dynamic) -> Dynamic 330 + 331 + @external(erlang, "json", "encode") 332 + fn do_json_encode(value: Dynamic) -> Dynamic 333 + 334 + @external(erlang, "erlang", "iolist_to_binary") 335 + fn iolist_to_binary(iolist: Dynamic) -> Dynamic 336 + 337 + fn iolist_to_string(iolist: Dynamic) -> String { 338 + let binary = iolist_to_binary(iolist) 339 + case decode.run(binary, decode.string) { 340 + Ok(str) -> str 341 + Error(_) -> "" 342 + } 343 + } 344 + 345 + // === CID utilities === 346 + 347 + /// Convert CID to base32lower string representation 348 + pub fn cid_to_string(c: Cid) -> String { 349 + cid.to_string(c) 350 + } 351 + 352 + /// Convert CID to raw bytes 353 + pub fn cid_to_bytes(c: Cid) -> BitArray { 354 + cid.to_bytes(c) 355 + }
+21
atproto_car/src/atproto_car/internal/blockstore_ffi.erl
··· 1 + -module(blockstore_ffi). 2 + -export([ets_new/0, ets_insert/3, ets_get/2]). 3 + 4 + %% Create a new ETS table for blockstore 5 + %% Returns an opaque reference to the table 6 + ets_new() -> 7 + ets:new(blockstore, [set, public, {read_concurrency, true}]). 8 + 9 + %% Insert a key-value pair into ETS table 10 + %% Key is binary (CID bytes), Value is integer (offset) 11 + ets_insert(Table, Key, Value) -> 12 + ets:insert(Table, {Key, Value}), 13 + nil. 14 + 15 + %% Get a value from ETS table by key 16 + %% Returns {ok, Value} or {error, nil} 17 + ets_get(Table, Key) -> 18 + case ets:lookup(Table, Key) of 19 + [{_, Value}] -> {ok, Value}; 20 + [] -> {error, nil} 21 + end.
+184
atproto_car/src/atproto_car/internal/mst.gleam
··· 1 + //// MST (Merkle Search Tree) traversal for AT Protocol repositories 2 + 3 + import atproto_car/internal/blockstore.{type BlockStore} 4 + import atproto_car/internal/cbor 5 + import atproto_car/internal/cid.{type Cid} 6 + import gleam/dynamic.{type Dynamic} 7 + import gleam/dynamic/decode 8 + import gleam/list 9 + import gleam/string 10 + 11 + /// MST entry - path and CID 12 + pub type MstEntry { 13 + MstEntry(path: String, cid: Cid) 14 + } 15 + 16 + /// Parsed MST node 17 + type MstNode { 18 + MstNode(left: Result(Cid, Nil), entries: List(MstNodeEntry)) 19 + } 20 + 21 + /// MST node entry 22 + type MstNodeEntry { 23 + MstNodeEntry( 24 + prefix_len: Int, 25 + key_suffix: BitArray, 26 + val: Cid, 27 + tree: Result(Cid, Nil), 28 + ) 29 + } 30 + 31 + /// Walk MST from root CID, returning all entries with their paths 32 + pub fn walk(store: BlockStore, root_cid: Cid) -> List(MstEntry) { 33 + walk_node(store, root_cid, "") 34 + } 35 + 36 + /// Walk a single MST node recursively 37 + fn walk_node( 38 + store: BlockStore, 39 + node_cid: Cid, 40 + prev_key: String, 41 + ) -> List(MstEntry) { 42 + case blockstore.get(store, node_cid) { 43 + Error(_) -> [] 44 + Ok(node_data) -> { 45 + case parse_node(node_data) { 46 + Error(_) -> [] 47 + Ok(node) -> { 48 + // Process left subtree 49 + let left_entries = case node.left { 50 + Error(_) -> [] 51 + Ok(left_cid) -> walk_node(store, left_cid, prev_key) 52 + } 53 + 54 + // Process entries 55 + let #(entry_results, _) = 56 + list.fold(node.entries, #([], prev_key), fn(acc, entry) { 57 + let #(entries, last_key) = acc 58 + 59 + // Build full key from prefix + suffix 60 + let prefix = string.slice(last_key, 0, entry.prefix_len) 61 + let suffix = bit_array_to_string(entry.key_suffix) 62 + let full_key = prefix <> suffix 63 + 64 + // Add this entry 65 + let this_entry = MstEntry(path: full_key, cid: entry.val) 66 + 67 + // Process right subtree 68 + let right_entries = case entry.tree { 69 + Error(_) -> [] 70 + Ok(tree_cid) -> walk_node(store, tree_cid, full_key) 71 + } 72 + 73 + #(list.flatten([entries, [this_entry], right_entries]), full_key) 74 + }) 75 + 76 + list.flatten([left_entries, entry_results]) 77 + } 78 + } 79 + } 80 + } 81 + } 82 + 83 + fn bit_array_to_string(bytes: BitArray) -> String { 84 + case do_bit_array_to_string(bytes) { 85 + Ok(s) -> s 86 + Error(_) -> "" 87 + } 88 + } 89 + 90 + @external(erlang, "cid_ffi", "decode_binary") 91 + fn do_bit_array_to_string(bytes: BitArray) -> Result(String, Nil) 92 + 93 + /// Parse MST node from CBOR data 94 + fn parse_node(data: BitArray) -> Result(MstNode, Nil) { 95 + case cbor.decode(data) { 96 + Error(_) -> Error(Nil) 97 + Ok(decoded) -> { 98 + let left = get_optional_cid_field(decoded, "l") 99 + let entries = get_entries(decoded) 100 + Ok(MstNode(left: left, entries: entries)) 101 + } 102 + } 103 + } 104 + 105 + /// Get optional CID field from dynamic 106 + fn get_optional_cid_field(data: Dynamic, field: String) -> Result(Cid, Nil) { 107 + let decoder = { 108 + use val <- decode.field(field, decode.dynamic) 109 + decode.success(val) 110 + } 111 + case decode.run(data, decoder) { 112 + Ok(val) -> parse_cid_from_dynamic(val) 113 + Error(_) -> Error(Nil) 114 + } 115 + } 116 + 117 + /// Parse CID from dynamic (tag 42 tuple) 118 + fn parse_cid_from_dynamic(data: Dynamic) -> Result(Cid, Nil) { 119 + case decode_cid_bytes(data) { 120 + Ok(bytes) -> { 121 + case cid.parse(bytes) { 122 + Ok(#(parsed, _)) -> Ok(parsed) 123 + Error(_) -> Error(Nil) 124 + } 125 + } 126 + Error(_) -> Error(Nil) 127 + } 128 + } 129 + 130 + @external(erlang, "cid_ffi", "decode_cid_bytes") 131 + fn decode_cid_bytes(data: Dynamic) -> Result(BitArray, Nil) 132 + 133 + /// Get entries array from MST node 134 + fn get_entries(data: Dynamic) -> List(MstNodeEntry) { 135 + let decoder = { 136 + use entries <- decode.field("e", decode.list(decode.dynamic)) 137 + decode.success(entries) 138 + } 139 + case decode.run(data, decoder) { 140 + Ok(entries) -> list.filter_map(entries, parse_entry) 141 + Error(_) -> [] 142 + } 143 + } 144 + 145 + /// Parse a single MST entry 146 + fn parse_entry(data: Dynamic) -> Result(MstNodeEntry, Nil) { 147 + let p_decoder = { 148 + use p <- decode.field("p", decode.int) 149 + decode.success(p) 150 + } 151 + 152 + case decode.run(data, p_decoder) { 153 + Error(_) -> Error(Nil) 154 + Ok(p) -> { 155 + case get_binary_field(data, "k") { 156 + Error(_) -> Error(Nil) 157 + Ok(k) -> { 158 + case get_optional_cid_field(data, "v") { 159 + Error(_) -> Error(Nil) 160 + Ok(v) -> { 161 + let t = get_optional_cid_field(data, "t") 162 + Ok(MstNodeEntry(prefix_len: p, key_suffix: k, val: v, tree: t)) 163 + } 164 + } 165 + } 166 + } 167 + } 168 + } 169 + } 170 + 171 + /// Get binary field from dynamic 172 + fn get_binary_field(data: Dynamic, field: String) -> Result(BitArray, Nil) { 173 + let decoder = { 174 + use val <- decode.field(field, decode.dynamic) 175 + decode.success(val) 176 + } 177 + case decode.run(data, decoder) { 178 + Ok(val) -> decode_binary_dyn(val) 179 + Error(_) -> Error(Nil) 180 + } 181 + } 182 + 183 + @external(erlang, "cid_ffi", "decode_binary") 184 + fn decode_binary_dyn(data: Dynamic) -> Result(BitArray, Nil)
+18
atproto_car/test/atproto_car_test.gleam
··· 1 + import atproto_car 2 + import gleeunit 3 + import gleeunit/should 4 + 5 + pub fn main() { 6 + gleeunit.main() 7 + } 8 + 9 + pub fn parse_empty_returns_error_test() { 10 + atproto_car.parse_header(<<>>) 11 + |> should.be_error() 12 + } 13 + 14 + pub fn parse_invalid_header_returns_error_test() { 15 + // Invalid varint followed by garbage 16 + atproto_car.parse_header(<<0xFF, 0xFF, 0xFF>>) 17 + |> should.be_error() 18 + }
+1036
dev-docs/plans/2025-12-10-extract-atproto-car.md
··· 1 + # Extract atproto_car Library Implementation Plan 2 + 3 + > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. 4 + 5 + **Goal:** Extract `server/src/car/` into a standalone Gleam package `atproto_car` at the repository root for eventual publication to Hex. 6 + 7 + **Architecture:** Single public module `atproto_car` re-exports key types and functions. Internal modules under `atproto_car/internal/` handle implementation details (blockstore, cbor, cid, mst, varint). Erlang FFI files provide ETS and encoding support. 8 + 9 + **Tech Stack:** Gleam, Erlang FFI, erl_cbor, base32 10 + 11 + --- 12 + 13 + ## Task 1: Create Package Skeleton 14 + 15 + **Files:** 16 + - Create: `atproto_car/gleam.toml` 17 + - Create: `atproto_car/src/atproto_car.gleam` 18 + - Create: `atproto_car/test/atproto_car_test.gleam` 19 + 20 + **Step 1: Create directory structure** 21 + 22 + ```bash 23 + mkdir -p atproto_car/src/atproto_car/internal 24 + mkdir -p atproto_car/test 25 + ``` 26 + 27 + **Step 2: Create gleam.toml** 28 + 29 + Create `atproto_car/gleam.toml`: 30 + 31 + ```toml 32 + name = "atproto_car" 33 + version = "0.1.0" 34 + description = "CAR (Content Addressable aRchive) file parsing for AT Protocol" 35 + licences = ["MIT"] 36 + target = "erlang" 37 + 38 + [dependencies] 39 + gleam_stdlib = ">= 0.60.0 and < 1.0.0" 40 + erl_cbor = ">= 2.0.0 and < 3.0.0" 41 + base32 = ">= 1.0.0 and < 2.0.0" 42 + 43 + [dev-dependencies] 44 + gleeunit = ">= 1.0.0 and < 2.0.0" 45 + ``` 46 + 47 + **Step 3: Create stub main module** 48 + 49 + Create `atproto_car/src/atproto_car.gleam`: 50 + 51 + ```gleam 52 + //// CAR (Content Addressable aRchive) file parsing for AT Protocol 53 + //// 54 + //// This library parses CAR files containing AT Protocol repository data, 55 + //// including MST (Merkle Search Tree) traversal for record extraction. 56 + 57 + pub type Placeholder { 58 + Placeholder 59 + } 60 + ``` 61 + 62 + **Step 4: Create test stub** 63 + 64 + Create `atproto_car/test/atproto_car_test.gleam`: 65 + 66 + ```gleam 67 + import gleeunit 68 + 69 + pub fn main() { 70 + gleeunit.main() 71 + } 72 + ``` 73 + 74 + **Step 5: Verify package builds** 75 + 76 + Run: `cd atproto_car && gleam build` 77 + Expected: Successful build with no errors 78 + 79 + **Step 6: Commit** 80 + 81 + ```bash 82 + git add atproto_car/ 83 + git commit -m "feat(atproto_car): create package skeleton" 84 + ``` 85 + 86 + --- 87 + 88 + ## Task 2: Move varint Module 89 + 90 + **Files:** 91 + - Create: `atproto_car/src/atproto_car/internal/varint.gleam` 92 + - Reference: `server/src/car/varint.gleam` 93 + 94 + **Step 1: Copy varint module** 95 + 96 + Copy `server/src/car/varint.gleam` to `atproto_car/src/atproto_car/internal/varint.gleam`. 97 + 98 + The file should be copied exactly as-is (no changes needed - it has no external dependencies). 99 + 100 + **Step 2: Verify it compiles** 101 + 102 + Run: `cd atproto_car && gleam build` 103 + Expected: Successful build 104 + 105 + **Step 3: Commit** 106 + 107 + ```bash 108 + git add atproto_car/src/atproto_car/internal/varint.gleam 109 + git commit -m "feat(atproto_car): add varint module" 110 + ``` 111 + 112 + --- 113 + 114 + ## Task 3: Move CID Module and FFI 115 + 116 + **Files:** 117 + - Create: `atproto_car/src/atproto_car/internal/cid.gleam` 118 + - Create: `atproto_car/src/atproto_car/internal/cid_ffi.erl` 119 + - Reference: `server/src/car/cid.gleam` 120 + - Reference: `server/src/car/cid_ffi.erl` 121 + 122 + **Step 1: Copy cid.gleam** 123 + 124 + Copy `server/src/car/cid.gleam` to `atproto_car/src/atproto_car/internal/cid.gleam`. 125 + 126 + **Step 2: Update import in cid.gleam** 127 + 128 + Change line 7: 129 + ```gleam 130 + // Before 131 + import car/varint 132 + 133 + // After 134 + import atproto_car/internal/varint 135 + ``` 136 + 137 + **Step 3: Copy cid_ffi.erl** 138 + 139 + Copy `server/src/car/cid_ffi.erl` to `atproto_car/src/atproto_car/internal/cid_ffi.erl`. 140 + 141 + No changes needed to the Erlang file. 142 + 143 + **Step 4: Verify it compiles** 144 + 145 + Run: `cd atproto_car && gleam build` 146 + Expected: Successful build 147 + 148 + **Step 5: Commit** 149 + 150 + ```bash 151 + git add atproto_car/src/atproto_car/internal/cid.gleam 152 + git add atproto_car/src/atproto_car/internal/cid_ffi.erl 153 + git commit -m "feat(atproto_car): add cid module and FFI" 154 + ``` 155 + 156 + --- 157 + 158 + ## Task 4: Move CBOR Module and FFI 159 + 160 + **Files:** 161 + - Create: `atproto_car/src/atproto_car/internal/cbor.gleam` 162 + - Create: `atproto_car/src/atproto_car/internal/cbor_ffi.erl` 163 + - Reference: `server/src/car/cbor.gleam` 164 + - Reference: `server/src/car/cbor_ffi.erl` 165 + 166 + **Step 1: Copy cbor.gleam** 167 + 168 + Copy `server/src/car/cbor.gleam` to `atproto_car/src/atproto_car/internal/cbor.gleam`. 169 + 170 + No changes needed (no internal imports). 171 + 172 + **Step 2: Copy cbor_ffi.erl** 173 + 174 + Copy `server/src/car/cbor_ffi.erl` to `atproto_car/src/atproto_car/internal/cbor_ffi.erl`. 175 + 176 + No changes needed. 177 + 178 + **Step 3: Verify it compiles** 179 + 180 + Run: `cd atproto_car && gleam build` 181 + Expected: Successful build 182 + 183 + **Step 4: Commit** 184 + 185 + ```bash 186 + git add atproto_car/src/atproto_car/internal/cbor.gleam 187 + git add atproto_car/src/atproto_car/internal/cbor_ffi.erl 188 + git commit -m "feat(atproto_car): add cbor module and FFI" 189 + ``` 190 + 191 + --- 192 + 193 + ## Task 5: Move Blockstore Module and FFI 194 + 195 + **Files:** 196 + - Create: `atproto_car/src/atproto_car/internal/blockstore.gleam` 197 + - Create: `atproto_car/src/atproto_car/internal/blockstore_ffi.erl` 198 + - Reference: `server/src/car/blockstore.gleam` 199 + - Reference: `server/src/car/car_ffi.erl` 200 + 201 + **Step 1: Copy blockstore.gleam** 202 + 203 + Copy `server/src/car/blockstore.gleam` to `atproto_car/src/atproto_car/internal/blockstore.gleam`. 204 + 205 + **Step 2: Update imports in blockstore.gleam** 206 + 207 + Change lines 7-8: 208 + ```gleam 209 + // Before 210 + import car/cid.{type Cid} 211 + import car/varint 212 + 213 + // After 214 + import atproto_car/internal/cid.{type Cid} 215 + import atproto_car/internal/varint 216 + ``` 217 + 218 + **Step 3: Update FFI module reference in blockstore.gleam** 219 + 220 + Change lines 28-35 to reference new FFI module name: 221 + ```gleam 222 + // Before 223 + @external(erlang, "car_ffi", "ets_new") 224 + fn ets_new() -> EtsTable 225 + 226 + @external(erlang, "car_ffi", "ets_insert") 227 + fn ets_insert(table: EtsTable, key: BitArray, value: Int) -> Nil 228 + 229 + @external(erlang, "car_ffi", "ets_get") 230 + fn ets_get(table: EtsTable, key: BitArray) -> Result(Int, Nil) 231 + 232 + // After 233 + @external(erlang, "blockstore_ffi", "ets_new") 234 + fn ets_new() -> EtsTable 235 + 236 + @external(erlang, "blockstore_ffi", "ets_insert") 237 + fn ets_insert(table: EtsTable, key: BitArray, value: Int) -> Nil 238 + 239 + @external(erlang, "blockstore_ffi", "ets_get") 240 + fn ets_get(table: EtsTable, key: BitArray) -> Result(Int, Nil) 241 + ``` 242 + 243 + **Step 4: Create blockstore_ffi.erl** 244 + 245 + Create `atproto_car/src/atproto_car/internal/blockstore_ffi.erl`: 246 + 247 + ```erlang 248 + -module(blockstore_ffi). 249 + -export([ets_new/0, ets_insert/3, ets_get/2]). 250 + 251 + %% Create a new ETS table for blockstore 252 + %% Returns an opaque reference to the table 253 + ets_new() -> 254 + ets:new(blockstore, [set, public, {read_concurrency, true}]). 255 + 256 + %% Insert a key-value pair into ETS table 257 + %% Key is binary (CID bytes), Value is integer (offset) 258 + ets_insert(Table, Key, Value) -> 259 + ets:insert(Table, {Key, Value}), 260 + nil. 261 + 262 + %% Get a value from ETS table by key 263 + %% Returns {ok, Value} or {error, nil} 264 + ets_get(Table, Key) -> 265 + case ets:lookup(Table, Key) of 266 + [{_, Value}] -> {ok, Value}; 267 + [] -> {error, nil} 268 + end. 269 + ``` 270 + 271 + **Step 5: Verify it compiles** 272 + 273 + Run: `cd atproto_car && gleam build` 274 + Expected: Successful build 275 + 276 + **Step 6: Commit** 277 + 278 + ```bash 279 + git add atproto_car/src/atproto_car/internal/blockstore.gleam 280 + git add atproto_car/src/atproto_car/internal/blockstore_ffi.erl 281 + git commit -m "feat(atproto_car): add blockstore module and FFI" 282 + ``` 283 + 284 + --- 285 + 286 + ## Task 6: Create MST Module (Extract from car.gleam) 287 + 288 + **Files:** 289 + - Create: `atproto_car/src/atproto_car/internal/mst.gleam` 290 + - Reference: `server/src/car/car.gleam` (lines 334-596) 291 + 292 + **Step 1: Create mst.gleam with MST types and walking logic** 293 + 294 + Create `atproto_car/src/atproto_car/internal/mst.gleam`: 295 + 296 + ```gleam 297 + //// MST (Merkle Search Tree) traversal for AT Protocol repositories 298 + 299 + import atproto_car/internal/blockstore.{type BlockStore} 300 + import atproto_car/internal/cbor 301 + import atproto_car/internal/cid.{type Cid} 302 + import gleam/dynamic.{type Dynamic} 303 + import gleam/dynamic/decode 304 + import gleam/list 305 + import gleam/string 306 + 307 + /// MST entry - path and CID 308 + pub type MstEntry { 309 + MstEntry(path: String, cid: Cid) 310 + } 311 + 312 + /// Parsed MST node 313 + type MstNode { 314 + MstNode(left: Result(Cid, Nil), entries: List(MstNodeEntry)) 315 + } 316 + 317 + /// MST node entry 318 + type MstNodeEntry { 319 + MstNodeEntry( 320 + prefix_len: Int, 321 + key_suffix: BitArray, 322 + val: Cid, 323 + tree: Result(Cid, Nil), 324 + ) 325 + } 326 + 327 + /// Walk MST from root CID, returning all entries with their paths 328 + pub fn walk(store: BlockStore, root_cid: Cid) -> List(MstEntry) { 329 + walk_node(store, root_cid, "") 330 + } 331 + 332 + /// Walk a single MST node recursively 333 + fn walk_node( 334 + store: BlockStore, 335 + node_cid: Cid, 336 + prev_key: String, 337 + ) -> List(MstEntry) { 338 + case blockstore.get(store, node_cid) { 339 + Error(_) -> [] 340 + Ok(node_data) -> { 341 + case parse_node(node_data) { 342 + Error(_) -> [] 343 + Ok(node) -> { 344 + // Process left subtree 345 + let left_entries = case node.left { 346 + Error(_) -> [] 347 + Ok(left_cid) -> walk_node(store, left_cid, prev_key) 348 + } 349 + 350 + // Process entries 351 + let #(entry_results, _) = 352 + list.fold(node.entries, #([], prev_key), fn(acc, entry) { 353 + let #(entries, last_key) = acc 354 + 355 + // Build full key from prefix + suffix 356 + let prefix = string.slice(last_key, 0, entry.prefix_len) 357 + let suffix = bit_array_to_string(entry.key_suffix) 358 + let full_key = prefix <> suffix 359 + 360 + // Add this entry 361 + let this_entry = MstEntry(path: full_key, cid: entry.val) 362 + 363 + // Process right subtree 364 + let right_entries = case entry.tree { 365 + Error(_) -> [] 366 + Ok(tree_cid) -> walk_node(store, tree_cid, full_key) 367 + } 368 + 369 + #(list.flatten([entries, [this_entry], right_entries]), full_key) 370 + }) 371 + 372 + list.flatten([left_entries, entry_results]) 373 + } 374 + } 375 + } 376 + } 377 + } 378 + 379 + fn bit_array_to_string(bytes: BitArray) -> String { 380 + case do_bit_array_to_string(bytes) { 381 + Ok(s) -> s 382 + Error(_) -> "" 383 + } 384 + } 385 + 386 + @external(erlang, "cid_ffi", "decode_binary") 387 + fn do_bit_array_to_string(bytes: BitArray) -> Result(String, Nil) 388 + 389 + /// Parse MST node from CBOR data 390 + fn parse_node(data: BitArray) -> Result(MstNode, Nil) { 391 + case cbor.decode(data) { 392 + Error(_) -> Error(Nil) 393 + Ok(decoded) -> { 394 + let left = get_optional_cid_field(decoded, "l") 395 + let entries = get_entries(decoded) 396 + Ok(MstNode(left: left, entries: entries)) 397 + } 398 + } 399 + } 400 + 401 + /// Get optional CID field from dynamic 402 + fn get_optional_cid_field(data: Dynamic, field: String) -> Result(Cid, Nil) { 403 + let decoder = { 404 + use val <- decode.field(field, decode.dynamic) 405 + decode.success(val) 406 + } 407 + case decode.run(data, decoder) { 408 + Ok(val) -> parse_cid_from_dynamic(val) 409 + Error(_) -> Error(Nil) 410 + } 411 + } 412 + 413 + /// Parse CID from dynamic (tag 42 tuple) 414 + fn parse_cid_from_dynamic(data: Dynamic) -> Result(Cid, Nil) { 415 + case decode_cid_bytes(data) { 416 + Ok(bytes) -> { 417 + case cid.parse(bytes) { 418 + Ok(#(parsed, _)) -> Ok(parsed) 419 + Error(_) -> Error(Nil) 420 + } 421 + } 422 + Error(_) -> Error(Nil) 423 + } 424 + } 425 + 426 + @external(erlang, "cid_ffi", "decode_cid_bytes") 427 + fn decode_cid_bytes(data: Dynamic) -> Result(BitArray, Nil) 428 + 429 + /// Get entries array from MST node 430 + fn get_entries(data: Dynamic) -> List(MstNodeEntry) { 431 + let decoder = { 432 + use entries <- decode.field("e", decode.list(decode.dynamic)) 433 + decode.success(entries) 434 + } 435 + case decode.run(data, decoder) { 436 + Ok(entries) -> list.filter_map(entries, parse_entry) 437 + Error(_) -> [] 438 + } 439 + } 440 + 441 + /// Parse a single MST entry 442 + fn parse_entry(data: Dynamic) -> Result(MstNodeEntry, Nil) { 443 + let p_decoder = { 444 + use p <- decode.field("p", decode.int) 445 + decode.success(p) 446 + } 447 + 448 + case decode.run(data, p_decoder) { 449 + Error(_) -> Error(Nil) 450 + Ok(p) -> { 451 + case get_binary_field(data, "k") { 452 + Error(_) -> Error(Nil) 453 + Ok(k) -> { 454 + case get_optional_cid_field(data, "v") { 455 + Error(_) -> Error(Nil) 456 + Ok(v) -> { 457 + let t = get_optional_cid_field(data, "t") 458 + Ok(MstNodeEntry(prefix_len: p, key_suffix: k, val: v, tree: t)) 459 + } 460 + } 461 + } 462 + } 463 + } 464 + } 465 + } 466 + 467 + /// Get binary field from dynamic 468 + fn get_binary_field(data: Dynamic, field: String) -> Result(BitArray, Nil) { 469 + let decoder = { 470 + use val <- decode.field(field, decode.dynamic) 471 + decode.success(val) 472 + } 473 + case decode.run(data, decoder) { 474 + Ok(val) -> decode_binary_dyn(val) 475 + Error(_) -> Error(Nil) 476 + } 477 + } 478 + 479 + @external(erlang, "cid_ffi", "decode_binary") 480 + fn decode_binary_dyn(data: Dynamic) -> Result(BitArray, Nil) 481 + ``` 482 + 483 + **Step 2: Verify it compiles** 484 + 485 + Run: `cd atproto_car && gleam build` 486 + Expected: Successful build 487 + 488 + **Step 3: Commit** 489 + 490 + ```bash 491 + git add atproto_car/src/atproto_car/internal/mst.gleam 492 + git commit -m "feat(atproto_car): add mst module for tree traversal" 493 + ``` 494 + 495 + --- 496 + 497 + ## Task 7: Create Main atproto_car Module 498 + 499 + **Files:** 500 + - Modify: `atproto_car/src/atproto_car.gleam` 501 + - Reference: `server/src/car/car.gleam` 502 + 503 + **Step 1: Write the main module** 504 + 505 + Replace `atproto_car/src/atproto_car.gleam` with: 506 + 507 + ```gleam 508 + //// CAR (Content Addressable aRchive) file parsing for AT Protocol 509 + //// 510 + //// This library parses CAR files containing AT Protocol repository data, 511 + //// including MST (Merkle Search Tree) traversal for record extraction. 512 + //// 513 + //// ## Example 514 + //// 515 + //// ```gleam 516 + //// import atproto_car 517 + //// 518 + //// let records = atproto_car.extract_records_with_paths( 519 + //// car_bytes, 520 + //// ["app.bsky.feed.post", "app.bsky.actor.profile"], 521 + //// ) 522 + //// ``` 523 + 524 + import atproto_car/internal/blockstore.{type BlockStore} 525 + import atproto_car/internal/cbor 526 + import atproto_car/internal/cid.{type Cid} 527 + import atproto_car/internal/mst 528 + import atproto_car/internal/varint 529 + import gleam/dynamic.{type Dynamic} 530 + import gleam/dynamic/decode 531 + import gleam/list 532 + import gleam/result 533 + import gleam/string 534 + 535 + // Re-export Cid type 536 + pub type Cid = 537 + cid.Cid 538 + 539 + /// A parsed CAR block containing CID and raw data 540 + pub type CarBlock { 541 + CarBlock(cid: Cid, data: BitArray) 542 + } 543 + 544 + /// A record extracted from a CAR file (no path) 545 + pub type Record { 546 + Record(type_: String, data: Dynamic, cid: Cid) 547 + } 548 + 549 + /// A record with its full path from the MST 550 + pub type RecordWithPath { 551 + RecordWithPath( 552 + path: String, 553 + collection: String, 554 + rkey: String, 555 + type_: String, 556 + data: Dynamic, 557 + cid: Cid, 558 + ) 559 + } 560 + 561 + /// CAR parsing errors 562 + pub type CarError { 563 + InvalidHeader 564 + InvalidBlock 565 + InvalidVarint 566 + CborDecodeError 567 + CidParseError(cid.CidError) 568 + } 569 + 570 + /// Parse CAR header, return roots and remaining bytes 571 + pub fn parse_header(bytes: BitArray) -> Result(#(List(Cid), BitArray), CarError) { 572 + use #(header_len, rest) <- result.try( 573 + varint.decode(bytes) |> result.replace_error(InvalidVarint), 574 + ) 575 + 576 + case rest { 577 + <<header_bytes:bytes-size(header_len), remaining:bits>> -> { 578 + case cbor.decode(header_bytes) { 579 + Ok(_header) -> Ok(#([], remaining)) 580 + Error(_) -> Error(CborDecodeError) 581 + } 582 + } 583 + _ -> Error(InvalidHeader) 584 + } 585 + } 586 + 587 + /// Parse a single block from bytes 588 + fn parse_block(bytes: BitArray) -> Result(#(CarBlock, BitArray), CarError) { 589 + use #(block_len, rest) <- result.try( 590 + varint.decode(bytes) |> result.replace_error(InvalidVarint), 591 + ) 592 + 593 + case rest { 594 + <<block_bytes:bytes-size(block_len), remaining:bits>> -> { 595 + case cid.parse(block_bytes) { 596 + Ok(#(parsed_cid, data)) -> { 597 + Ok(#(CarBlock(cid: parsed_cid, data: data), remaining)) 598 + } 599 + Error(e) -> Error(CidParseError(e)) 600 + } 601 + } 602 + _ -> Error(InvalidBlock) 603 + } 604 + } 605 + 606 + /// Fold over all blocks in a CAR file 607 + pub fn fold_blocks( 608 + bytes: BitArray, 609 + acc: a, 610 + f: fn(a, CarBlock) -> a, 611 + ) -> Result(a, CarError) { 612 + case bytes { 613 + <<>> -> Ok(acc) 614 + _ -> { 615 + use #(block, rest) <- result.try(parse_block(bytes)) 616 + fold_blocks(rest, f(acc, block), f) 617 + } 618 + } 619 + } 620 + 621 + /// Try to decode a block as a record 622 + fn decode_as_record(block: CarBlock) -> Result(Record, Nil) { 623 + case cbor.decode(block.data) { 624 + Ok(decoded) -> { 625 + let type_decoder = { 626 + use type_ <- decode.field("$type", decode.string) 627 + decode.success(type_) 628 + } 629 + case decode.run(decoded, type_decoder) { 630 + Ok(type_) -> Ok(Record(type_: type_, data: decoded, cid: block.cid)) 631 + Error(_) -> Error(Nil) 632 + } 633 + } 634 + Error(_) -> Error(Nil) 635 + } 636 + } 637 + 638 + /// Extract records matching given collections from CAR bytes 639 + pub fn extract_records( 640 + car_bytes: BitArray, 641 + collections: List(String), 642 + ) -> Result(List(Record), CarError) { 643 + use #(_roots, blocks_bytes) <- result.try(parse_header(car_bytes)) 644 + 645 + fold_blocks(blocks_bytes, [], fn(acc, block) { 646 + case decode_as_record(block) { 647 + Ok(record) -> { 648 + case list.contains(collections, record.type_) { 649 + True -> [record, ..acc] 650 + False -> acc 651 + } 652 + } 653 + Error(_) -> acc 654 + } 655 + }) 656 + } 657 + 658 + /// Extract all records from CAR bytes (no collection filtering) 659 + pub fn extract_all_records( 660 + car_bytes: BitArray, 661 + ) -> Result(List(Record), CarError) { 662 + use #(_roots, blocks_bytes) <- result.try(parse_header(car_bytes)) 663 + 664 + fold_blocks(blocks_bytes, [], fn(acc, block) { 665 + case decode_as_record(block) { 666 + Ok(record) -> [record, ..acc] 667 + Error(_) -> acc 668 + } 669 + }) 670 + } 671 + 672 + /// Extract records with their MST paths from CAR bytes 673 + pub fn extract_records_with_paths( 674 + car_bytes: BitArray, 675 + collections: List(String), 676 + ) -> Result(List(RecordWithPath), CarError) { 677 + use #(roots, header_size, blocks_bytes) <- result.try( 678 + parse_header_with_roots(car_bytes), 679 + ) 680 + 681 + use store <- result.try( 682 + blockstore.from_blocks_bytes(car_bytes, header_size, blocks_bytes) 683 + |> result.map_error(fn(_) { InvalidBlock }), 684 + ) 685 + 686 + case roots { 687 + [] -> Ok([]) 688 + [commit_cid, ..] -> { 689 + case blockstore.get(store, commit_cid) { 690 + Error(_) -> Ok([]) 691 + Ok(commit_data) -> { 692 + case get_data_cid(commit_data) { 693 + Error(_) -> Ok([]) 694 + Ok(mst_root_cid) -> { 695 + let mst_entries = mst.walk(store, mst_root_cid) 696 + 697 + let records = 698 + list.filter_map(mst_entries, fn(entry) { 699 + case string.split_once(entry.path, "/") { 700 + Error(_) -> Error(Nil) 701 + Ok(#(collection, rkey)) -> { 702 + case list.contains(collections, collection) { 703 + False -> Error(Nil) 704 + True -> { 705 + case blockstore.get(store, entry.cid) { 706 + Error(_) -> Error(Nil) 707 + Ok(record_data) -> { 708 + case cbor.decode(record_data) { 709 + Error(_) -> Error(Nil) 710 + Ok(decoded) -> { 711 + case get_type_field(decoded) { 712 + Error(_) -> Error(Nil) 713 + Ok(type_) -> 714 + Ok(RecordWithPath( 715 + path: entry.path, 716 + collection: collection, 717 + rkey: rkey, 718 + type_: type_, 719 + data: decoded, 720 + cid: entry.cid, 721 + )) 722 + } 723 + } 724 + } 725 + } 726 + } 727 + } 728 + } 729 + } 730 + } 731 + }) 732 + 733 + Ok(records) 734 + } 735 + } 736 + } 737 + } 738 + } 739 + } 740 + } 741 + 742 + /// Parse header and extract root CIDs 743 + fn parse_header_with_roots( 744 + bytes: BitArray, 745 + ) -> Result(#(List(Cid), Int, BitArray), CarError) { 746 + use #(header_len, rest) <- result.try( 747 + varint.decode(bytes) |> result.replace_error(InvalidVarint), 748 + ) 749 + 750 + let varint_size = varint.encoded_size(header_len) 751 + let header_size = varint_size + header_len 752 + 753 + case rest { 754 + <<header_bytes:bytes-size(header_len), remaining:bits>> -> { 755 + case cbor.decode(header_bytes) { 756 + Ok(header) -> { 757 + let roots = get_roots_from_header(header) 758 + Ok(#(roots, header_size, remaining)) 759 + } 760 + Error(_) -> Error(CborDecodeError) 761 + } 762 + } 763 + _ -> Error(InvalidHeader) 764 + } 765 + } 766 + 767 + /// Extract roots array from CAR header 768 + fn get_roots_from_header(header: Dynamic) -> List(Cid) { 769 + let decoder = { 770 + use roots <- decode.field("roots", decode.list(decode.dynamic)) 771 + decode.success(roots) 772 + } 773 + case decode.run(header, decoder) { 774 + Ok(roots) -> list.filter_map(roots, parse_cid_from_dynamic) 775 + Error(_) -> [] 776 + } 777 + } 778 + 779 + /// Parse CID from dynamic (tag 42 tuple) 780 + fn parse_cid_from_dynamic(data: Dynamic) -> Result(Cid, Nil) { 781 + case decode_cid_bytes(data) { 782 + Ok(bytes) -> { 783 + case cid.parse(bytes) { 784 + Ok(#(parsed, _)) -> Ok(parsed) 785 + Error(_) -> Error(Nil) 786 + } 787 + } 788 + Error(_) -> Error(Nil) 789 + } 790 + } 791 + 792 + @external(erlang, "cid_ffi", "decode_cid_bytes") 793 + fn decode_cid_bytes(data: Dynamic) -> Result(BitArray, Nil) 794 + 795 + /// Get "data" CID from commit block (MST root) 796 + fn get_data_cid(commit_data: BitArray) -> Result(Cid, Nil) { 797 + case cbor.decode(commit_data) { 798 + Ok(decoded) -> { 799 + let decoder = { 800 + use data <- decode.field("data", decode.dynamic) 801 + decode.success(data) 802 + } 803 + case decode.run(decoded, decoder) { 804 + Ok(data) -> parse_cid_from_dynamic(data) 805 + Error(_) -> Error(Nil) 806 + } 807 + } 808 + Error(_) -> Error(Nil) 809 + } 810 + } 811 + 812 + /// Get $type field from decoded record 813 + fn get_type_field(data: Dynamic) -> Result(String, Nil) { 814 + let decoder = { 815 + use type_ <- decode.field("$type", decode.string) 816 + decode.success(type_) 817 + } 818 + case decode.run(data, decoder) { 819 + Ok(type_) -> Ok(type_) 820 + Error(_) -> Error(Nil) 821 + } 822 + } 823 + 824 + /// Convert a record's data to JSON string 825 + pub fn record_to_json(record: RecordWithPath) -> String { 826 + dynamic_to_json(record.data) 827 + } 828 + 829 + fn dynamic_to_json(value: Dynamic) -> String { 830 + let sanitized = sanitize_for_json(value) 831 + let iolist = do_json_encode(sanitized) 832 + iolist_to_string(iolist) 833 + } 834 + 835 + @external(erlang, "cbor_ffi", "sanitize_for_json") 836 + fn sanitize_for_json(value: Dynamic) -> Dynamic 837 + 838 + @external(erlang, "json", "encode") 839 + fn do_json_encode(value: Dynamic) -> Dynamic 840 + 841 + @external(erlang, "erlang", "iolist_to_binary") 842 + fn iolist_to_binary(iolist: Dynamic) -> Dynamic 843 + 844 + fn iolist_to_string(iolist: Dynamic) -> String { 845 + let binary = iolist_to_binary(iolist) 846 + case decode.run(binary, decode.string) { 847 + Ok(str) -> str 848 + Error(_) -> "" 849 + } 850 + } 851 + 852 + // === CID utilities === 853 + 854 + /// Convert CID to base32lower string representation 855 + pub fn cid_to_string(c: Cid) -> String { 856 + cid.to_string(c) 857 + } 858 + 859 + /// Convert CID to raw bytes 860 + pub fn cid_to_bytes(c: Cid) -> BitArray { 861 + cid.to_bytes(c) 862 + } 863 + ``` 864 + 865 + **Step 2: Verify it compiles** 866 + 867 + Run: `cd atproto_car && gleam build` 868 + Expected: Successful build 869 + 870 + **Step 3: Commit** 871 + 872 + ```bash 873 + git add atproto_car/src/atproto_car.gleam 874 + git commit -m "feat(atproto_car): implement main module with public API" 875 + ``` 876 + 877 + --- 878 + 879 + ## Task 8: Run Tests in New Package 880 + 881 + **Files:** 882 + - Modify: `atproto_car/test/atproto_car_test.gleam` 883 + 884 + **Step 1: Add basic smoke test** 885 + 886 + Replace `atproto_car/test/atproto_car_test.gleam`: 887 + 888 + ```gleam 889 + import atproto_car 890 + import gleeunit 891 + import gleeunit/should 892 + 893 + pub fn main() { 894 + gleeunit.main() 895 + } 896 + 897 + pub fn parse_empty_returns_error_test() { 898 + atproto_car.parse_header(<<>>) 899 + |> should.be_error() 900 + } 901 + 902 + pub fn parse_invalid_header_returns_error_test() { 903 + // Invalid varint followed by garbage 904 + atproto_car.parse_header(<<0xFF, 0xFF, 0xFF>>) 905 + |> should.be_error() 906 + } 907 + ``` 908 + 909 + **Step 2: Run tests** 910 + 911 + Run: `cd atproto_car && gleam test` 912 + Expected: 2 tests pass 913 + 914 + **Step 3: Commit** 915 + 916 + ```bash 917 + git add atproto_car/test/atproto_car_test.gleam 918 + git commit -m "test(atproto_car): add basic smoke tests" 919 + ``` 920 + 921 + --- 922 + 923 + ## Task 9: Update Server to Use New Package 924 + 925 + **Files:** 926 + - Modify: `server/gleam.toml` 927 + - Modify: `server/src/backfill.gleam` 928 + 929 + **Step 1: Add path dependency to server/gleam.toml** 930 + 931 + Add after line 16 (after lexicon_graphql): 932 + ```toml 933 + atproto_car = { path = "../atproto_car" } 934 + ``` 935 + 936 + **Step 2: Update imports in backfill.gleam** 937 + 938 + Change lines 1-2: 939 + ```gleam 940 + // Before 941 + import car/car 942 + import car/cid 943 + 944 + // After 945 + import atproto_car 946 + ``` 947 + 948 + **Step 3: Update function calls in backfill.gleam** 949 + 950 + Search and replace throughout the file: 951 + - `car.extract_records_with_paths` → `atproto_car.extract_records_with_paths` 952 + - `car.record_to_json` → `atproto_car.record_to_json` 953 + - `car.RecordWithPath` → `atproto_car.RecordWithPath` 954 + - `cid.to_string` → `atproto_car.cid_to_string` 955 + 956 + **Step 4: Verify server compiles** 957 + 958 + Run: `cd server && gleam build` 959 + Expected: Successful build (may have warnings about unused old car/ modules) 960 + 961 + **Step 5: Commit** 962 + 963 + ```bash 964 + git add server/gleam.toml server/src/backfill.gleam 965 + git commit -m "refactor(server): use atproto_car package instead of internal car/" 966 + ``` 967 + 968 + --- 969 + 970 + ## Task 10: Delete Old car/ Directory 971 + 972 + **Files:** 973 + - Delete: `server/src/car/` (entire directory) 974 + 975 + **Step 1: Verify no other files import from car/** 976 + 977 + Run: `grep -r "import car/" server/src/ --include="*.gleam" | grep -v backfill` 978 + Expected: No output (only backfill.gleam should have imported it, and we fixed that) 979 + 980 + **Step 2: Delete the old directory** 981 + 982 + ```bash 983 + rm -rf server/src/car/ 984 + ``` 985 + 986 + **Step 3: Verify server still compiles** 987 + 988 + Run: `cd server && gleam build` 989 + Expected: Successful build 990 + 991 + **Step 4: Run server tests** 992 + 993 + Run: `cd server && gleam test` 994 + Expected: All 314 tests pass 995 + 996 + **Step 5: Commit** 997 + 998 + ```bash 999 + git add -A 1000 + git commit -m "refactor(server): remove old car/ directory (now in atproto_car)" 1001 + ``` 1002 + 1003 + --- 1004 + 1005 + ## Task 11: Final Verification 1006 + 1007 + **Step 1: Run full test suite from repo root** 1008 + 1009 + ```bash 1010 + cd server && gleam test 1011 + cd ../atproto_car && gleam test 1012 + ``` 1013 + 1014 + Expected: All tests pass in both packages 1015 + 1016 + **Step 2: Verify clean git status** 1017 + 1018 + Run: `git status` 1019 + Expected: Clean working tree 1020 + 1021 + **Step 3: Review commits** 1022 + 1023 + Run: `git log --oneline -10` 1024 + Expected: Clear progression of commits showing the extraction 1025 + 1026 + --- 1027 + 1028 + ## Summary 1029 + 1030 + After completing all tasks: 1031 + 1032 + 1. `atproto_car/` exists at repo root as standalone Gleam package 1033 + 2. Server depends on it via path dependency 1034 + 3. Old `server/src/car/` is deleted 1035 + 4. All tests pass 1036 + 5. Package is ready for eventual Hex publication
+1
server/gleam.toml
··· 14 14 15 15 [dependencies] 16 16 lexicon_graphql = { path = "../lexicon_graphql" } 17 + atproto_car = { path = "../atproto_car" } 17 18 gleam_stdlib = ">= 0.60.0 and < 1.0.0" 18 19 mist = ">= 5.0.3 and < 6.0.0" 19 20 wisp = ">= 2.1.0 and < 3.0.0"
+4 -2
server/manifest.toml
··· 3 3 4 4 packages = [ 5 5 { name = "argv", version = "1.0.2", build_tools = ["gleam"], requirements = [], otp_app = "argv", source = "hex", outer_checksum = "BA1FF0929525DEBA1CE67256E5ADF77A7CDDFE729E3E3F57A5BDCAA031DED09D" }, 6 + { name = "atproto_car", version = "0.1.0", build_tools = ["gleam"], requirements = ["base32", "erl_cbor", "gleam_stdlib"], source = "local", path = "../atproto_car" }, 6 7 { name = "base32", version = "1.0.0", build_tools = ["rebar3"], requirements = [], otp_app = "base32", source = "hex", outer_checksum = "0449285348ED0C4CD83C7198E76C5FD5A0451C4EF18695B9FD43792A503E551A" }, 7 8 { name = "certifi", version = "2.15.0", build_tools = ["rebar3"], requirements = [], otp_app = "certifi", source = "hex", outer_checksum = "B147ED22CE71D72EAFDAD94F055165C1C182F61A2FF49DF28BCC71D1D5B94A60" }, 8 9 { name = "directories", version = "1.2.0", build_tools = ["gleam"], requirements = ["envoy", "gleam_stdlib", "platform", "simplifile"], otp_app = "directories", source = "hex", outer_checksum = "D13090CFCDF6759B87217E8DDD73A75903A700148A82C1D33799F333E249BF9E" }, ··· 30 31 { name = "gramps", version = "6.0.0", build_tools = ["gleam"], requirements = ["gleam_crypto", "gleam_erlang", "gleam_http", "gleam_stdlib"], otp_app = "gramps", source = "hex", outer_checksum = "8B7195978FBFD30B43DF791A8A272041B81E45D245314D7A41FC57237AA882A0" }, 31 32 { name = "group_registry", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_otp", "gleam_stdlib"], otp_app = "group_registry", source = "hex", outer_checksum = "BC798A53D6F2406DB94E27CB45C57052CB56B32ACF7CC16EA20F6BAEC7E36B90" }, 32 33 { name = "hackney", version = "1.25.0", build_tools = ["rebar3"], requirements = ["certifi", "idna", "metrics", "mimerl", "parse_trans", "ssl_verify_fun", "unicode_util_compat"], otp_app = "hackney", source = "hex", outer_checksum = "7209BFD75FD1F42467211FF8F59EA74D6F2A9E81CBCEE95A56711EE79FD6B1D4" }, 33 - { name = "honk", version = "1.2.0", build_tools = ["gleam"], requirements = ["argv", "gleam_json", "gleam_regexp", "gleam_stdlib", "gleam_time", "simplifile"], source = "local", path = "../../honk" }, 34 + { name = "honk", version = "1.2.0", build_tools = ["gleam"], requirements = ["argv", "gleam_json", "gleam_regexp", "gleam_stdlib", "gleam_time", "simplifile"], otp_app = "honk", source = "hex", outer_checksum = "1682706E9B16A58DA004655CD8154514E4B1013B0335EE109BD93BBDABB62A41" }, 34 35 { name = "houdini", version = "1.2.0", build_tools = ["gleam"], requirements = [], otp_app = "houdini", source = "hex", outer_checksum = "5DB1053F1AF828049C2B206D4403C18970ABEF5C18671CA3C2D2ED0DD64F6385" }, 35 36 { name = "hpack_erl", version = "0.3.0", build_tools = ["rebar3"], requirements = [], otp_app = "hpack", source = "hex", outer_checksum = "D6137D7079169D8C485C6962DFE261AF5B9EF60FBC557344511C1E65E3D95FB0" }, 36 37 { name = "idna", version = "6.1.1", build_tools = ["rebar3"], requirements = ["unicode_util_compat"], otp_app = "idna", source = "hex", outer_checksum = "92376EB7894412ED19AC475E4A86F7B413C1B9FBB5BD16DCCD57934157944CEA" }, ··· 56 57 57 58 [requirements] 58 59 argv = { version = ">= 1.0.0 and < 2.0.0" } 60 + atproto_car = { path = "../atproto_car" } 59 61 base32 = { version = ">= 1.0.0 and < 2.0.0" } 60 62 dotenv_gleam = { version = ">= 2.0.1 and < 3.0.0" } 61 63 envoy = { version = ">= 1.0.2 and < 2.0.0" } ··· 72 74 gleeunit = { version = ">= 1.0.0 and < 2.0.0" } 73 75 goose = { version = ">= 2.0.0 and < 3.0.0" } 74 76 group_registry = { version = ">= 1.0.0 and < 2.0.0" } 75 - honk = { path = "../../honk" } 77 + honk = { version = ">= 1.0.0 and < 2.0.0" } 76 78 jose = { version = ">= 1.11.10 and < 2.0.0" } 77 79 lexicon_graphql = { path = "../lexicon_graphql" } 78 80 logging = { version = ">= 1.3.0 and < 2.0.0" }
+5 -6
server/src/backfill.gleam
··· 1 - import car/car 2 - import car/cid 1 + import atproto_car 3 2 import database/repositories/actors 4 3 import database/repositories/config as config_repo 5 4 import database/repositories/lexicons ··· 210 209 211 210 /// Convert a CAR record (with proper MST path) to a database Record type 212 211 fn car_record_with_path_to_db_record( 213 - car_record: car.RecordWithPath, 212 + car_record: atproto_car.RecordWithPath, 214 213 did: String, 215 214 ) -> Record { 216 215 let now = ··· 224 223 <> car_record.collection 225 224 <> "/" 226 225 <> car_record.rkey, 227 - cid: cid.to_string(car_record.cid), 226 + cid: atproto_car.cid_to_string(car_record.cid), 228 227 did: did, 229 228 collection: car_record.collection, 230 - json: car.record_to_json(car_record), 229 + json: atproto_car.record_to_json(car_record), 231 230 indexed_at: now, 232 231 ) 233 232 } ··· 336 335 337 336 // Phase 2: Parse CAR and walk MST 338 337 let parse_start = monotonic_now() 339 - case car.extract_records_with_paths(car_bytes, collections) { 338 + case atproto_car.extract_records_with_paths(car_bytes, collections) { 340 339 Ok(car_records) -> { 341 340 let parse_ms = elapsed_ms(parse_start) 342 341
+5 -5
server/src/car/blockstore.gleam atproto_car/src/atproto_car/internal/blockstore.gleam
··· 4 4 /// and reads block data on-demand when requested. 5 5 /// 6 6 /// Uses ETS for O(1) lookups and binary CID keys (no hex conversion). 7 - import car/cid.{type Cid} 8 - import car/varint 7 + import atproto_car/internal/cid.{type Cid} 8 + import atproto_car/internal/varint 9 9 import gleam/result 10 10 11 11 /// Opaque type for ETS table reference ··· 25 25 } 26 26 27 27 /// ETS FFI functions 28 - @external(erlang, "car_ffi", "ets_new") 28 + @external(erlang, "blockstore_ffi", "ets_new") 29 29 fn ets_new() -> EtsTable 30 30 31 - @external(erlang, "car_ffi", "ets_insert") 31 + @external(erlang, "blockstore_ffi", "ets_insert") 32 32 fn ets_insert(table: EtsTable, key: BitArray, value: Int) -> Nil 33 33 34 - @external(erlang, "car_ffi", "ets_get") 34 + @external(erlang, "blockstore_ffi", "ets_get") 35 35 fn ets_get(table: EtsTable, key: BitArray) -> Result(Int, Nil) 36 36 37 37 /// Build a blockstore from CAR bytes (after header has been parsed)
-626
server/src/car/car.gleam
··· 1 - /// CAR (Content Addressable aRchive) file parsing 2 - /// 3 - /// CAR format: 4 - /// - varint(header_length) + CBOR header with {"version": 1, "roots": [CID]} 5 - /// - Sequence of blocks: varint(cid_len + data_len) + CID bytes + data bytes 6 - import car/blockstore.{type BlockStore} 7 - import car/cbor 8 - import car/cid.{type Cid} 9 - import car/varint 10 - import gleam/dynamic.{type Dynamic} 11 - import gleam/dynamic/decode 12 - import gleam/int 13 - import gleam/list 14 - import gleam/result 15 - import gleam/string 16 - import logging 17 - 18 - /// Opaque type for monotonic time 19 - pub type MonotonicTime 20 - 21 - /// Get current monotonic time for timing measurements 22 - @external(erlang, "backfill_ffi", "monotonic_now") 23 - fn monotonic_now() -> MonotonicTime 24 - 25 - /// Get elapsed milliseconds since a start time 26 - @external(erlang, "backfill_ffi", "elapsed_ms") 27 - fn elapsed_ms(start: MonotonicTime) -> Int 28 - 29 - /// A parsed CAR block containing CID and raw data 30 - pub type CarBlock { 31 - CarBlock(cid: Cid, data: BitArray) 32 - } 33 - 34 - /// A record extracted from a CAR file (old style, no path) 35 - pub type Record { 36 - Record(type_: String, data: Dynamic, cid: Cid) 37 - } 38 - 39 - /// A record with its full path from the MST 40 - pub type RecordWithPath { 41 - RecordWithPath( 42 - path: String, 43 - collection: String, 44 - rkey: String, 45 - type_: String, 46 - data: Dynamic, 47 - cid: Cid, 48 - ) 49 - } 50 - 51 - /// CAR parsing errors 52 - pub type CarError { 53 - InvalidHeader 54 - InvalidBlock 55 - InvalidVarint 56 - CborDecodeError 57 - CidParseError(cid.CidError) 58 - } 59 - 60 - /// Parse CAR header, return roots and remaining bytes 61 - pub fn parse_header(bytes: BitArray) -> Result(#(List(Cid), BitArray), CarError) { 62 - // Read header length varint 63 - use #(header_len, rest) <- result.try( 64 - varint.decode(bytes) |> result.replace_error(InvalidVarint), 65 - ) 66 - 67 - // Extract header bytes and decode CBOR 68 - case rest { 69 - <<header_bytes:bytes-size(header_len), remaining:bits>> -> { 70 - case cbor.decode(header_bytes) { 71 - Ok(_header) -> { 72 - // Extract roots from header (we don't validate them for now) 73 - // Header is {"version": 1, "roots": [CID, ...]} 74 - // For simple extraction, we just return empty roots and remaining bytes 75 - Ok(#([], remaining)) 76 - } 77 - Error(_) -> Error(CborDecodeError) 78 - } 79 - } 80 - _ -> Error(InvalidHeader) 81 - } 82 - } 83 - 84 - /// Parse a single block from bytes 85 - /// Returns the block and remaining bytes 86 - fn parse_block(bytes: BitArray) -> Result(#(CarBlock, BitArray), CarError) { 87 - // Read block length varint (includes CID + data) 88 - use #(block_len, rest) <- result.try( 89 - varint.decode(bytes) |> result.replace_error(InvalidVarint), 90 - ) 91 - 92 - case rest { 93 - <<block_bytes:bytes-size(block_len), remaining:bits>> -> { 94 - // Parse CID from start of block 95 - case cid.parse(block_bytes) { 96 - Ok(#(parsed_cid, data)) -> { 97 - Ok(#(CarBlock(cid: parsed_cid, data: data), remaining)) 98 - } 99 - Error(e) -> Error(CidParseError(e)) 100 - } 101 - } 102 - _ -> Error(InvalidBlock) 103 - } 104 - } 105 - 106 - /// Fold over all blocks in a CAR file 107 - pub fn fold_blocks( 108 - bytes: BitArray, 109 - acc: a, 110 - f: fn(a, CarBlock) -> a, 111 - ) -> Result(a, CarError) { 112 - case bytes { 113 - <<>> -> Ok(acc) 114 - _ -> { 115 - use #(block, rest) <- result.try(parse_block(bytes)) 116 - fold_blocks(rest, f(acc, block), f) 117 - } 118 - } 119 - } 120 - 121 - /// Try to decode a block as a record 122 - /// Records have a $type field, MST nodes and commits don't 123 - fn decode_as_record(block: CarBlock) -> Result(Record, Nil) { 124 - case cbor.decode(block.data) { 125 - Ok(decoded) -> { 126 - // Try to get $type field - records have it, other blocks don't 127 - // Using the new decode module API 128 - let type_decoder = { 129 - use type_ <- decode.field("$type", decode.string) 130 - decode.success(type_) 131 - } 132 - case decode.run(decoded, type_decoder) { 133 - Ok(type_) -> Ok(Record(type_: type_, data: decoded, cid: block.cid)) 134 - Error(_) -> Error(Nil) 135 - } 136 - } 137 - Error(_) -> Error(Nil) 138 - } 139 - } 140 - 141 - /// Extract records matching given collections from CAR bytes 142 - pub fn extract_records( 143 - car_bytes: BitArray, 144 - collections: List(String), 145 - ) -> Result(List(Record), CarError) { 146 - // Parse header to get past it 147 - use #(_roots, blocks_bytes) <- result.try(parse_header(car_bytes)) 148 - 149 - // Fold over blocks, collecting matching records 150 - fold_blocks(blocks_bytes, [], fn(acc, block) { 151 - case decode_as_record(block) { 152 - Ok(record) -> { 153 - case list.contains(collections, record.type_) { 154 - True -> [record, ..acc] 155 - False -> acc 156 - } 157 - } 158 - Error(_) -> acc 159 - } 160 - }) 161 - } 162 - 163 - /// Extract all records from CAR bytes (no collection filtering) 164 - pub fn extract_all_records( 165 - car_bytes: BitArray, 166 - ) -> Result(List(Record), CarError) { 167 - use #(_roots, blocks_bytes) <- result.try(parse_header(car_bytes)) 168 - 169 - fold_blocks(blocks_bytes, [], fn(acc, block) { 170 - case decode_as_record(block) { 171 - Ok(record) -> [record, ..acc] 172 - Error(_) -> acc 173 - } 174 - }) 175 - } 176 - 177 - /// Extract records with their MST paths from CAR bytes 178 - /// This properly walks the MST to get collection/rkey paths 179 - /// Uses lazy blockstore - indexes CID offsets but only loads data on demand 180 - pub fn extract_records_with_paths( 181 - car_bytes: BitArray, 182 - collections: List(String), 183 - ) -> Result(List(RecordWithPath), CarError) { 184 - let total_start = monotonic_now() 185 - 186 - // Phase 1: Parse header and get root CID 187 - let header_start = monotonic_now() 188 - use #(roots, header_size, blocks_bytes) <- result.try(parse_header_with_roots( 189 - car_bytes, 190 - )) 191 - let header_ms = elapsed_ms(header_start) 192 - 193 - logging.log( 194 - logging.Debug, 195 - "[car] Parsed header, found " 196 - <> string.inspect(list.length(roots)) 197 - <> " roots", 198 - ) 199 - 200 - // Phase 2: Build lazy blockstore - indexes CID -> offset without loading block data 201 - let index_start = monotonic_now() 202 - use store <- result.try( 203 - blockstore.from_blocks_bytes(car_bytes, header_size, blocks_bytes) 204 - |> result.map_error(fn(_) { InvalidBlock }), 205 - ) 206 - let index_ms = elapsed_ms(index_start) 207 - let block_count = blockstore.size(store) 208 - 209 - logging.log( 210 - logging.Debug, 211 - "[car] Built block index with " <> string.inspect(block_count) <> " blocks", 212 - ) 213 - 214 - // Get the commit block (first root) 215 - case roots { 216 - [] -> { 217 - logging.log(logging.Debug, "[car] No roots found in CAR header") 218 - Ok([]) 219 - } 220 - [commit_cid, ..] -> { 221 - // Get commit block and find MST root 222 - logging.log(logging.Debug, "[car] Looking for commit block") 223 - case blockstore.get(store, commit_cid) { 224 - Error(_) -> { 225 - logging.log( 226 - logging.Debug, 227 - "[car] Commit block not found in blockstore", 228 - ) 229 - Ok([]) 230 - } 231 - Ok(commit_data) -> { 232 - logging.log( 233 - logging.Debug, 234 - "[car] Found commit block, looking for data CID", 235 - ) 236 - case get_data_cid(commit_data) { 237 - Error(_) -> { 238 - logging.log( 239 - logging.Debug, 240 - "[car] Could not get data CID from commit", 241 - ) 242 - Ok([]) 243 - } 244 - Ok(mst_root_cid) -> { 245 - logging.log(logging.Debug, "[car] Found MST root, walking tree") 246 - 247 - // Phase 3: Walk MST to get paths (lazy - loads blocks on demand) 248 - let mst_start = monotonic_now() 249 - let mst_entries = walk_mst(store, mst_root_cid) 250 - let mst_ms = elapsed_ms(mst_start) 251 - let mst_count = list.length(mst_entries) 252 - 253 - logging.log( 254 - logging.Debug, 255 - "[car] MST walk returned " 256 - <> string.inspect(mst_count) 257 - <> " entries", 258 - ) 259 - 260 - // Phase 4: For each MST entry, get the record data and filter by collection 261 - let extract_start = monotonic_now() 262 - let records = 263 - list.filter_map(mst_entries, fn(entry) { 264 - // Split path into collection/rkey 265 - case string.split_once(entry.path, "/") { 266 - Error(_) -> Error(Nil) 267 - Ok(#(collection, rkey)) -> { 268 - // Filter by collection 269 - case list.contains(collections, collection) { 270 - False -> Error(Nil) 271 - True -> { 272 - // Get record data (lazy load from blockstore) 273 - case blockstore.get(store, entry.cid) { 274 - Error(_) -> Error(Nil) 275 - Ok(record_data) -> { 276 - case cbor.decode(record_data) { 277 - Error(_) -> Error(Nil) 278 - Ok(decoded) -> { 279 - // Get $type 280 - case get_type_field(decoded) { 281 - Error(_) -> Error(Nil) 282 - Ok(type_) -> 283 - Ok(RecordWithPath( 284 - path: entry.path, 285 - collection: collection, 286 - rkey: rkey, 287 - type_: type_, 288 - data: decoded, 289 - cid: entry.cid, 290 - )) 291 - } 292 - } 293 - } 294 - } 295 - } 296 - } 297 - } 298 - } 299 - } 300 - }) 301 - let extract_ms = elapsed_ms(extract_start) 302 - let total_ms = elapsed_ms(total_start) 303 - 304 - // Log timing breakdown 305 - logging.log( 306 - logging.Debug, 307 - "[car] parse timing: header=" 308 - <> int.to_string(header_ms) 309 - <> "ms index=" 310 - <> int.to_string(index_ms) 311 - <> "ms mst=" 312 - <> int.to_string(mst_ms) 313 - <> "ms extract=" 314 - <> int.to_string(extract_ms) 315 - <> "ms total=" 316 - <> int.to_string(total_ms) 317 - <> "ms blocks=" 318 - <> int.to_string(block_count) 319 - <> " mst_entries=" 320 - <> int.to_string(mst_count) 321 - <> " records=" 322 - <> int.to_string(list.length(records)), 323 - ) 324 - 325 - Ok(records) 326 - } 327 - } 328 - } 329 - } 330 - } 331 - } 332 - } 333 - 334 - /// MST entry - path and CID 335 - type MstEntry { 336 - MstEntry(path: String, cid: Cid) 337 - } 338 - 339 - /// Parse header and extract root CIDs, also returns header size for blockstore 340 - fn parse_header_with_roots( 341 - bytes: BitArray, 342 - ) -> Result(#(List(Cid), Int, BitArray), CarError) { 343 - use #(header_len, rest) <- result.try( 344 - varint.decode(bytes) |> result.replace_error(InvalidVarint), 345 - ) 346 - 347 - // Calculate header size: varint size + header content 348 - let varint_size = varint.encoded_size(header_len) 349 - let header_size = varint_size + header_len 350 - 351 - case rest { 352 - <<header_bytes:bytes-size(header_len), remaining:bits>> -> { 353 - case cbor.decode(header_bytes) { 354 - Ok(header) -> { 355 - let roots = get_roots_from_header(header) 356 - Ok(#(roots, header_size, remaining)) 357 - } 358 - Error(_) -> Error(CborDecodeError) 359 - } 360 - } 361 - _ -> Error(InvalidHeader) 362 - } 363 - } 364 - 365 - /// Extract roots array from CAR header 366 - fn get_roots_from_header(header: Dynamic) -> List(Cid) { 367 - logging.log(logging.Debug, "[car] Header data: " <> string.inspect(header)) 368 - let decoder = { 369 - use roots <- decode.field("roots", decode.list(decode.dynamic)) 370 - decode.success(roots) 371 - } 372 - case decode.run(header, decoder) { 373 - Ok(roots) -> { 374 - logging.log( 375 - logging.Debug, 376 - "[car] Found " <> string.inspect(list.length(roots)) <> " root entries", 377 - ) 378 - list.filter_map(roots, fn(r) { 379 - logging.log(logging.Debug, "[car] Root entry: " <> string.inspect(r)) 380 - parse_cid_from_dynamic(r) 381 - }) 382 - } 383 - Error(e) -> { 384 - logging.log( 385 - logging.Debug, 386 - "[car] Failed to decode roots: " <> string.inspect(e), 387 - ) 388 - [] 389 - } 390 - } 391 - } 392 - 393 - /// Parse CID from dynamic (tag 42 tuple) 394 - fn parse_cid_from_dynamic(data: Dynamic) -> Result(Cid, Nil) { 395 - case decode_cid_bytes(data) { 396 - Ok(bytes) -> { 397 - case cid.parse(bytes) { 398 - Ok(#(parsed, _)) -> Ok(parsed) 399 - Error(_) -> Error(Nil) 400 - } 401 - } 402 - Error(_) -> Error(Nil) 403 - } 404 - } 405 - 406 - @external(erlang, "cid_ffi", "decode_cid_bytes") 407 - fn decode_cid_bytes(data: Dynamic) -> Result(BitArray, Nil) 408 - 409 - /// Get "data" CID from commit block (MST root) 410 - fn get_data_cid(commit_data: BitArray) -> Result(Cid, Nil) { 411 - case cbor.decode(commit_data) { 412 - Ok(decoded) -> { 413 - let decoder = { 414 - use data <- decode.field("data", decode.dynamic) 415 - decode.success(data) 416 - } 417 - case decode.run(decoded, decoder) { 418 - Ok(data) -> parse_cid_from_dynamic(data) 419 - Error(_) -> Error(Nil) 420 - } 421 - } 422 - Error(_) -> Error(Nil) 423 - } 424 - } 425 - 426 - /// Get $type field from decoded record 427 - fn get_type_field(data: Dynamic) -> Result(String, Nil) { 428 - let decoder = { 429 - use type_ <- decode.field("$type", decode.string) 430 - decode.success(type_) 431 - } 432 - case decode.run(data, decoder) { 433 - Ok(type_) -> Ok(type_) 434 - Error(_) -> Error(Nil) 435 - } 436 - } 437 - 438 - /// Walk MST from root CID, returning all entries with their paths 439 - /// Uses lazy blockstore - blocks are loaded on-demand during traversal 440 - fn walk_mst(store: BlockStore, root_cid: Cid) -> List(MstEntry) { 441 - walk_mst_node(store, root_cid, "") 442 - } 443 - 444 - /// Walk a single MST node recursively 445 - /// Loads blocks from blockstore on-demand (lazy) 446 - fn walk_mst_node( 447 - store: BlockStore, 448 - node_cid: Cid, 449 - prev_key: String, 450 - ) -> List(MstEntry) { 451 - case blockstore.get(store, node_cid) { 452 - Error(_) -> [] 453 - Ok(node_data) -> { 454 - case parse_mst_node(node_data) { 455 - Error(_) -> [] 456 - Ok(node) -> { 457 - // Process left subtree 458 - let left_entries = case node.left { 459 - Error(_) -> [] 460 - Ok(left_cid) -> walk_mst_node(store, left_cid, prev_key) 461 - } 462 - 463 - // Process entries 464 - let #(entry_results, _) = 465 - list.fold(node.entries, #([], prev_key), fn(acc, entry) { 466 - let #(entries, last_key) = acc 467 - 468 - // Build full key from prefix + suffix 469 - let prefix = string.slice(last_key, 0, entry.prefix_len) 470 - let suffix = bit_array_to_string(entry.key_suffix) 471 - let full_key = prefix <> suffix 472 - 473 - // Add this entry 474 - let this_entry = MstEntry(path: full_key, cid: entry.val) 475 - 476 - // Process right subtree 477 - let right_entries = case entry.tree { 478 - Error(_) -> [] 479 - Ok(tree_cid) -> walk_mst_node(store, tree_cid, full_key) 480 - } 481 - 482 - #(list.flatten([entries, [this_entry], right_entries]), full_key) 483 - }) 484 - 485 - list.flatten([left_entries, entry_results]) 486 - } 487 - } 488 - } 489 - } 490 - } 491 - 492 - fn bit_array_to_string(bytes: BitArray) -> String { 493 - case do_bit_array_to_string(bytes) { 494 - Ok(s) -> s 495 - Error(_) -> "" 496 - } 497 - } 498 - 499 - @external(erlang, "cid_ffi", "decode_binary") 500 - fn do_bit_array_to_string(bytes: BitArray) -> Result(String, Nil) 501 - 502 - /// Parsed MST node 503 - type MstNode { 504 - MstNode(left: Result(Cid, Nil), entries: List(MstNodeEntry)) 505 - } 506 - 507 - /// MST node entry 508 - type MstNodeEntry { 509 - MstNodeEntry( 510 - prefix_len: Int, 511 - key_suffix: BitArray, 512 - val: Cid, 513 - tree: Result(Cid, Nil), 514 - ) 515 - } 516 - 517 - /// Parse MST node from CBOR data 518 - fn parse_mst_node(data: BitArray) -> Result(MstNode, Nil) { 519 - case cbor.decode(data) { 520 - Error(_) -> Error(Nil) 521 - Ok(decoded) -> { 522 - let left = get_optional_cid_field(decoded, "l") 523 - let entries = get_entries(decoded) 524 - Ok(MstNode(left: left, entries: entries)) 525 - } 526 - } 527 - } 528 - 529 - /// Get optional CID field from dynamic 530 - fn get_optional_cid_field(data: Dynamic, field: String) -> Result(Cid, Nil) { 531 - let decoder = { 532 - use val <- decode.field(field, decode.dynamic) 533 - decode.success(val) 534 - } 535 - case decode.run(data, decoder) { 536 - Ok(val) -> parse_cid_from_dynamic(val) 537 - Error(_) -> Error(Nil) 538 - } 539 - } 540 - 541 - /// Get entries array from MST node 542 - fn get_entries(data: Dynamic) -> List(MstNodeEntry) { 543 - let decoder = { 544 - use entries <- decode.field("e", decode.list(decode.dynamic)) 545 - decode.success(entries) 546 - } 547 - case decode.run(data, decoder) { 548 - Ok(entries) -> list.filter_map(entries, parse_mst_entry) 549 - Error(_) -> [] 550 - } 551 - } 552 - 553 - /// Parse a single MST entry 554 - fn parse_mst_entry(data: Dynamic) -> Result(MstNodeEntry, Nil) { 555 - // Get prefix length 556 - let p_decoder = { 557 - use p <- decode.field("p", decode.int) 558 - decode.success(p) 559 - } 560 - 561 - case decode.run(data, p_decoder) { 562 - Error(_) -> Error(Nil) 563 - Ok(p) -> { 564 - // Get key suffix 565 - case get_binary_field(data, "k") { 566 - Error(_) -> Error(Nil) 567 - Ok(k) -> { 568 - // Get value CID 569 - case get_optional_cid_field(data, "v") { 570 - Error(_) -> Error(Nil) 571 - Ok(v) -> { 572 - // Get optional tree CID 573 - let t = get_optional_cid_field(data, "t") 574 - Ok(MstNodeEntry(prefix_len: p, key_suffix: k, val: v, tree: t)) 575 - } 576 - } 577 - } 578 - } 579 - } 580 - } 581 - } 582 - 583 - /// Get binary field from dynamic 584 - fn get_binary_field(data: Dynamic, field: String) -> Result(BitArray, Nil) { 585 - let decoder = { 586 - use val <- decode.field(field, decode.dynamic) 587 - decode.success(val) 588 - } 589 - case decode.run(data, decoder) { 590 - Ok(val) -> decode_binary_dyn(val) 591 - Error(_) -> Error(Nil) 592 - } 593 - } 594 - 595 - @external(erlang, "cid_ffi", "decode_binary") 596 - fn decode_binary_dyn(data: Dynamic) -> Result(BitArray, Nil) 597 - 598 - /// Convert a CAR record's CBOR data to JSON string 599 - /// Sanitizes CID links (tag 42) to {$link: "base32cid"} format 600 - pub fn record_to_json(record: RecordWithPath) -> String { 601 - dynamic_to_json(record.data) 602 - } 603 - 604 - /// Convert a Dynamic value (Erlang term) to JSON string 605 - fn dynamic_to_json(value: Dynamic) -> String { 606 - let sanitized = sanitize_for_json(value) 607 - let iolist = do_json_encode(sanitized) 608 - iolist_to_string(iolist) 609 - } 610 - 611 - @external(erlang, "cbor_ffi", "sanitize_for_json") 612 - fn sanitize_for_json(value: Dynamic) -> Dynamic 613 - 614 - @external(erlang, "json", "encode") 615 - fn do_json_encode(value: Dynamic) -> Dynamic 616 - 617 - @external(erlang, "erlang", "iolist_to_binary") 618 - fn iolist_to_binary(iolist: Dynamic) -> Dynamic 619 - 620 - fn iolist_to_string(iolist: Dynamic) -> String { 621 - let binary = iolist_to_binary(iolist) 622 - case decode.run(binary, decode.string) { 623 - Ok(str) -> str 624 - Error(_) -> "" 625 - } 626 - }
-30
server/src/car/car_ffi.erl
··· 1 - -module(car_ffi). 2 - -export([bytes_to_hex_lower/1, ets_new/0, ets_insert/3, ets_get/2, ets_size/1]). 3 - 4 - %% Convert binary to lowercase hex string using OTP 24+ binary:encode_hex 5 - %% This is O(n) vs O(n²) for string concatenation in a loop 6 - bytes_to_hex_lower(Bytes) -> 7 - binary:encode_hex(Bytes, lowercase). 8 - 9 - %% Create a new ETS table for blockstore 10 - %% Returns an opaque reference to the table 11 - ets_new() -> 12 - ets:new(blockstore, [set, public, {read_concurrency, true}]). 13 - 14 - %% Insert a key-value pair into ETS table 15 - %% Key is binary (CID bytes), Value is integer (offset) 16 - ets_insert(Table, Key, Value) -> 17 - ets:insert(Table, {Key, Value}), 18 - nil. 19 - 20 - %% Get a value from ETS table by key 21 - %% Returns {ok, Value} or {error, nil} 22 - ets_get(Table, Key) -> 23 - case ets:lookup(Table, Key) of 24 - [{_, Value}] -> {ok, Value}; 25 - [] -> {error, nil} 26 - end. 27 - 28 - %% Get the number of entries in the ETS table 29 - ets_size(Table) -> 30 - ets:info(Table, size).
server/src/car/cbor.gleam atproto_car/src/atproto_car/internal/cbor.gleam
server/src/car/cbor_ffi.erl atproto_car/src/atproto_car/internal/cbor_ffi.erl
+1 -1
server/src/car/cid.gleam atproto_car/src/atproto_car/internal/cid.gleam
··· 4 4 /// - version (varint): CIDv0 or CIDv1 5 5 /// - codec (varint): Content codec (0x71 = dag-cbor for AT Protocol) 6 6 /// - multihash: hash_type (varint) + hash_len (varint) + digest (bytes) 7 - import car/varint 7 + import atproto_car/internal/varint 8 8 import gleam/bit_array 9 9 import gleam/result 10 10
server/src/car/cid_ffi.erl atproto_car/src/atproto_car/internal/cid_ffi.erl
server/src/car/varint.gleam atproto_car/src/atproto_car/internal/varint.gleam