tangled
alpha
login
or
join now
nonbinary.computer
/
jacquard
80
fork
atom
A better Rust ATProto crate
80
fork
atom
overview
issues
9
pulls
pipelines
error improvements
Orual
4 months ago
d989e800
3adf93f9
+109
-47
7 changed files
expand all
collapse all
unified
split
crates
jacquard-repo
src
car
reader.rs
writer.rs
commit
firehose.rs
error.rs
mst
diff.rs
tree.rs
repo.rs
+13
-4
crates/jacquard-repo/src/car/reader.rs
···
27
27
/// Returns BTreeMap of CID -> block data (sorted order for determinism).
28
28
/// For large CAR files, consider using `stream_car()` instead.
29
29
pub async fn read_car(path: impl AsRef<Path>) -> Result<BTreeMap<IpldCid, Bytes>> {
30
30
-
let file = File::open(path).await.map_err(|e| RepoError::io(e))?;
30
30
+
let path = path.as_ref();
31
31
+
let file = File::open(path)
32
32
+
.await
33
33
+
.map_err(|e| RepoError::io(e).with_context(format!("opening CAR file: {}", path.display())))?;
31
34
32
35
let reader = CarReader::new(file).await.map_err(|e| RepoError::car(e))?;
33
36
···
47
50
///
48
51
/// Useful for checking roots without loading all blocks.
49
52
pub async fn read_car_header(path: impl AsRef<Path>) -> Result<Vec<IpldCid>> {
50
50
-
let file = File::open(path).await.map_err(|e| RepoError::io(e))?;
53
53
+
let path = path.as_ref();
54
54
+
let file = File::open(path)
55
55
+
.await
56
56
+
.map_err(|e| RepoError::io(e).with_context(format!("opening CAR file: {}", path.display())))?;
51
57
52
58
let reader = CarReader::new(file).await.map_err(|e| RepoError::car(e))?;
53
59
···
67
73
let root = roots
68
74
.first()
69
75
.copied()
70
70
-
.ok_or_else(|| RepoError::invalid("CAR file has no roots"))?;
76
76
+
.ok_or_else(|| RepoError::car_invalid("CAR file has no roots"))?;
71
77
72
78
let mut blocks = BTreeMap::new();
73
79
let stream = reader.stream();
···
85
91
///
86
92
/// Useful for processing large CAR files incrementally.
87
93
pub async fn stream_car(path: impl AsRef<Path>) -> Result<CarBlockStream> {
88
88
-
let file = File::open(path).await.map_err(|e| RepoError::io(e))?;
94
94
+
let path = path.as_ref();
95
95
+
let file = File::open(path)
96
96
+
.await
97
97
+
.map_err(|e| RepoError::io(e).with_context(format!("opening CAR file: {}", path.display())))?;
89
98
90
99
let reader = CarReader::new(file).await.map_err(|e| RepoError::car(e))?;
91
100
+19
-10
crates/jacquard-repo/src/car/writer.rs
···
22
22
roots: Vec<IpldCid>,
23
23
blocks: BTreeMap<IpldCid, Bytes>,
24
24
) -> Result<()> {
25
25
-
let file = File::create(path).await.map_err(|e| RepoError::io(e))?;
25
25
+
let path = path.as_ref();
26
26
+
let file = File::create(path)
27
27
+
.await
28
28
+
.map_err(|e| RepoError::io(e).with_context(format!("creating CAR file: {}", path.display())))?;
26
29
27
30
let header = iroh_car::CarHeader::new_v1(roots);
28
31
let mut writer = CarWriter::new(header, file);
···
31
34
writer
32
35
.write(cid, data.as_ref())
33
36
.await
34
34
-
.map_err(|e| RepoError::car(e))?;
37
37
+
.map_err(|e| RepoError::car(e).with_context(format!("writing block {}", cid)))?;
35
38
}
36
39
37
37
-
writer.finish().await.map_err(|e| RepoError::car(e))?;
40
40
+
writer.finish().await.map_err(|e| RepoError::car(e).with_context("finalizing CAR file"))?;
38
41
39
42
Ok(())
40
43
}
···
52
55
writer
53
56
.write(cid, data.as_ref())
54
57
.await
55
55
-
.map_err(|e| RepoError::car(e))?;
58
58
+
.map_err(|e| RepoError::car(e).with_context(format!("writing block {}", cid)))?;
56
59
}
57
60
58
58
-
writer.finish().await.map_err(|e| RepoError::car(e))?;
61
61
+
writer.finish().await.map_err(|e| RepoError::car(e).with_context("finalizing CAR bytes"))?;
59
62
60
60
-
buffer.flush().await.map_err(|e| RepoError::io(e))?;
63
63
+
buffer.flush().await.map_err(|e| RepoError::io(e).with_context("flushing CAR buffer"))?;
61
64
62
65
Ok(buffer)
63
66
}
···
75
78
commit_cid: IpldCid,
76
79
mst: &Mst<S>,
77
80
) -> Result<()> {
78
78
-
let file = File::create(path).await.map_err(|e| RepoError::io(e))?;
81
81
+
let path = path.as_ref();
82
82
+
let file = File::create(path)
83
83
+
.await
84
84
+
.map_err(|e| RepoError::io(e).with_context(format!("creating CAR export file: {}", path.display())))?;
79
85
80
86
let header = iroh_car::CarHeader::new_v1(vec![commit_cid]);
81
87
let mut writer = CarWriter::new(header, file);
···
85
91
let commit_data = storage
86
92
.get(&commit_cid)
87
93
.await?
88
88
-
.ok_or_else(|| RepoError::not_found("commit", &commit_cid))?;
94
94
+
.ok_or_else(|| {
95
95
+
RepoError::not_found("commit", &commit_cid)
96
96
+
.with_help("Commit must be persisted to storage before exporting - ensure apply_commit() was called")
97
97
+
})?;
89
98
90
99
writer
91
100
.write(commit_cid, &commit_data)
92
101
.await
93
93
-
.map_err(|e| RepoError::car(e))?;
102
102
+
.map_err(|e| RepoError::car(e).with_context("writing commit block"))?;
94
103
95
104
// Stream MST and record blocks
96
105
mst.write_blocks_to_car(&mut writer).await?;
97
106
98
107
// Finish writing
99
99
-
writer.finish().await.map_err(|e| RepoError::car(e))?;
108
108
+
writer.finish().await.map_err(|e| RepoError::car(e).with_context("finalizing CAR export"))?;
100
109
101
110
Ok(())
102
111
}
+9
-7
crates/jacquard-repo/src/commit/firehose.rs
···
190
190
let commit_cid: IpldCid = self
191
191
.commit
192
192
.to_ipld()
193
193
-
.map_err(|e| RepoError::invalid(format!("Invalid commit CID: {}", e)))?;
193
193
+
.map_err(|e| RepoError::invalid_cid_conversion(e, "commit CID"))?;
194
194
let commit_bytes = temp_storage
195
195
.get(&commit_cid)
196
196
.await?
···
204
204
"DID mismatch: commit has {}, message has {}",
205
205
commit.did(),
206
206
self.repo
207
207
-
)));
207
207
+
))
208
208
+
.with_help("DID mismatch indicates the commit was signed by a different identity - verify the commit is from the expected repository"));
208
209
}
209
210
210
211
// Verify signature
···
234
235
let computed_root = computed_mst.get_pointer().await?;
235
236
236
237
if computed_root != expected_root {
237
237
-
return Err(RepoError::invalid_commit(format!(
238
238
+
return Err(RepoError::cid_mismatch(format!(
238
239
"MST root mismatch: expected {}, got {}",
239
240
expected_root, computed_root
240
241
)));
···
270
271
RepoError::invalid_commit("Sync v1.1 validation requires prev_data field")
271
272
})?
272
273
.to_ipld()
273
273
-
.map_err(|e| RepoError::invalid(format!("Invalid prev_data CID: {}", e)))?;
274
274
+
.map_err(|e| RepoError::invalid_cid_conversion(e, "prev_data CID"))?;
274
275
275
276
// 2. Parse CAR blocks from the firehose message into temporary storage
276
277
let parsed = parse_car_bytes(&self.blocks).await?;
···
280
281
let commit_cid: IpldCid = self
281
282
.commit
282
283
.to_ipld()
283
283
-
.map_err(|e| RepoError::invalid(format!("Invalid commit CID: {}", e)))?;
284
284
+
.map_err(|e| RepoError::invalid_cid_conversion(e, "commit CID"))?;
284
285
let commit_bytes = temp_storage
285
286
.get(&commit_cid)
286
287
.await?
···
294
295
"DID mismatch: commit has {}, message has {}",
295
296
commit.did(),
296
297
self.repo
297
297
-
)));
298
298
+
))
299
299
+
.with_help("DID mismatch indicates the commit was signed by a different identity - verify the commit is from the expected repository"));
298
300
}
299
301
300
302
// Verify signature
···
318
320
let computed_root = computed_mst.get_pointer().await?;
319
321
320
322
if computed_root != expected_root {
321
321
-
return Err(RepoError::invalid_commit(format!(
323
323
+
return Err(RepoError::cid_mismatch(format!(
322
324
"MST root mismatch: expected {}, got {}",
323
325
expected_root, computed_root
324
326
)));
+33
-1
crates/jacquard-repo/src/error.rs
···
33
33
InvalidKey,
34
34
/// Invalid CID
35
35
InvalidCid,
36
36
+
/// Invalid CID conversion (string/bytes to CID)
37
37
+
InvalidCidConversion,
38
38
+
/// CID mismatch during validation (prev, data, etc.)
39
39
+
CidMismatch,
36
40
/// Resource not found
37
41
NotFound,
38
42
/// Cryptographic operation failed
···
45
49
Car,
46
50
/// I/O error
47
51
Io,
52
52
+
/// Background task failed (panic or cancellation)
53
53
+
TaskFailed,
48
54
}
49
55
50
56
impl RepoError {
···
152
158
/// Create a generic invalid error
153
159
pub fn invalid(msg: impl Into<String>) -> Self {
154
160
Self::new(RepoErrorKind::InvalidMst, Some(msg.into().into()))
161
161
+
}
162
162
+
163
163
+
/// Create an invalid CID conversion error
164
164
+
pub fn invalid_cid_conversion(source: impl Error + Send + Sync + 'static, context: &str) -> Self {
165
165
+
Self::new(RepoErrorKind::InvalidCidConversion, Some(Box::new(source)))
166
166
+
.with_context(context.to_string())
167
167
+
.with_help("CID conversion failed - check that the source data is a valid CIDv1 string or bytes. Common causes: malformed base32 encoding, incorrect multicodec prefix, or invalid multihash.")
168
168
+
}
169
169
+
170
170
+
/// Create a CID mismatch error (for validation failures)
171
171
+
pub fn cid_mismatch(context: impl Into<String>) -> Self {
172
172
+
Self::new(RepoErrorKind::CidMismatch, None)
173
173
+
.with_context(context.into())
174
174
+
.with_help("CID validation failed - the expected and actual CIDs don't match. This typically indicates: data was modified unexpectedly, incorrect prev CID provided for update/delete, or MST root doesn't match commit data field.")
175
175
+
}
176
176
+
177
177
+
/// Create a task failure error (background operations)
178
178
+
pub fn task_failed(source: impl Error + Send + Sync + 'static) -> Self {
179
179
+
Self::new(RepoErrorKind::TaskFailed, Some(Box::new(source)))
180
180
+
.with_help("Background task failed - this usually indicates a panic in concurrent MST operations or task cancellation. Check for logic errors in tree traversal or storage operations.")
181
181
+
}
182
182
+
183
183
+
/// Create a CAR invalid structure error (without wrapping an error)
184
184
+
pub fn car_invalid(msg: impl Into<String>) -> Self {
185
185
+
Self::new(RepoErrorKind::Car, Some(msg.into().into()))
186
186
+
.with_help("CAR file structure is invalid - check that the file has required root CIDs in header and follows CAR v1 format.")
155
187
}
156
188
}
157
189
···
383
415
impl From<ProofError> for RepoError {
384
416
fn from(e: ProofError) -> Self {
385
417
match &e {
386
386
-
ProofError::NoRoot => RepoError::invalid("CAR file has no root CID"),
418
418
+
ProofError::NoRoot => RepoError::car_invalid("CAR file has no root CID"),
387
419
ProofError::CommitNotFound => {
388
420
RepoError::new(RepoErrorKind::NotFound, Some(Box::new(e)))
389
421
}
+2
-1
crates/jacquard-repo/src/mst/diff.rs
···
409
409
// Serialize the MST node
410
410
let entries = tree.get_entries().await?;
411
411
let node_data = serialize_node_data(&entries).await?;
412
412
-
let cbor = serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| RepoError::serialization(e))?;
412
412
+
let cbor = serde_ipld_dagcbor::to_vec(&node_data)
413
413
+
.map_err(|e| RepoError::serialization(e).with_context(format!("serializing MST node for diff tracking: {}", tree_cid)))?;
413
414
414
415
// Track the serialized block
415
416
diff.new_mst_blocks.insert(tree_cid, Bytes::from(cbor));
+17
-14
crates/jacquard-repo/src/mst/tree.rs
···
220
220
) -> Result<Self> {
221
221
// Serialize and compute CID (don't persist yet)
222
222
let node_data = util::serialize_node_data(&entries).await?;
223
223
-
let cbor =
224
224
-
serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| RepoError::serialization(e))?;
223
223
+
let cbor = serde_ipld_dagcbor::to_vec(&node_data)
224
224
+
.map_err(|e| RepoError::serialization(e).with_context("serializing MST node during creation"))?;
225
225
let cid = util::compute_cid(&cbor)?;
226
226
227
227
let mst = Self {
···
276
276
.storage
277
277
.get(&pointer)
278
278
.await?
279
279
-
.ok_or_else(|| RepoError::not_found("MST node", &pointer))?;
279
279
+
.ok_or_else(|| {
280
280
+
RepoError::not_found("MST node", &pointer)
281
281
+
.with_help("MST node missing from storage - ensure all blocks were properly persisted or that the tree CID is correct")
282
282
+
})?;
280
283
281
281
-
let node_data: super::node::NodeData =
282
282
-
serde_ipld_dagcbor::from_slice(&node_bytes).map_err(|e| RepoError::serialization(e))?;
284
284
+
let node_data: super::node::NodeData = serde_ipld_dagcbor::from_slice(&node_bytes)
285
285
+
.map_err(|e| RepoError::serialization(e).with_context(format!("deserializing MST node from storage: {}", pointer)))?;
283
286
284
287
let entries = util::deserialize_node_data(self.storage.clone(), &node_data, self.layer)?;
285
288
···
322
325
if !outdated_children.is_empty() {
323
326
try_join_all(outdated_children)
324
327
.await
325
325
-
.map_err(|e| RepoError::invalid(format!("Task join error: {}", e)))?;
328
328
+
.map_err(|e| RepoError::task_failed(e))?;
326
329
327
330
// Re-fetch entries with updated child CIDs
328
331
entries = self.get_entries().await?;
···
330
333
331
334
// Now serialize and compute CID with fresh child CIDs
332
335
let node_data = util::serialize_node_data(&entries).await?;
333
333
-
let cbor =
334
334
-
serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| RepoError::serialization(e))?;
336
336
+
let cbor = serde_ipld_dagcbor::to_vec(&node_data)
337
337
+
.map_err(|e| RepoError::serialization(e).with_context("serializing MST node for CID computation"))?;
335
338
let cid = util::compute_cid(&cbor)?;
336
339
337
340
// Update pointer and mark as fresh
···
957
960
.ok_or_else(|| RepoError::not_found("key", key.as_str()))?;
958
961
959
962
if ¤t != prev {
960
960
-
return Err(RepoError::invalid_mst(format!(
963
963
+
return Err(RepoError::cid_mismatch(format!(
961
964
"Update prev CID mismatch for key {}: expected {}, got {}",
962
965
key, prev, current
963
966
)));
···
974
977
.ok_or_else(|| RepoError::not_found("key", key.as_str()))?;
975
978
976
979
if ¤t != prev {
977
977
-
return Err(RepoError::invalid_mst(format!(
980
980
+
return Err(RepoError::cid_mismatch(format!(
978
981
"Delete prev CID mismatch for key {}: expected {}, got {}",
979
982
key, prev, current
980
983
)));
···
1035
1038
// Serialize this node
1036
1039
let entries = self.get_entries().await?;
1037
1040
let node_data = util::serialize_node_data(&entries).await?;
1038
1038
-
let cbor =
1039
1039
-
serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| RepoError::serialization(e))?;
1041
1041
+
let cbor = serde_ipld_dagcbor::to_vec(&node_data)
1042
1042
+
.map_err(|e| RepoError::serialization(e).with_context("serializing MST node for block collection"))?;
1040
1043
blocks.insert(pointer, Bytes::from(cbor));
1041
1044
1042
1045
// Recursively collect from subtrees
···
1331
1334
// Serialize this node
1332
1335
let entries = tree.get_entries().await?;
1333
1336
let node_data = util::serialize_node_data(&entries).await?;
1334
1334
-
let cbor =
1335
1335
-
serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| RepoError::serialization(e))?;
1337
1337
+
let cbor = serde_ipld_dagcbor::to_vec(&node_data)
1338
1338
+
.map_err(|e| RepoError::serialization(e).with_context("serializing MST node for parallel block collection"))?;
1336
1339
blocks.insert(pointer, Bytes::from(cbor));
1337
1340
1338
1341
// Spawn tasks for each subtree
+16
-10
crates/jacquard-repo/src/repo.rs
···
145
145
let commit_bytes = storage
146
146
.get(commit_cid)
147
147
.await?
148
148
-
.ok_or_else(|| RepoError::not_found("commit", commit_cid))?;
148
148
+
.ok_or_else(|| {
149
149
+
RepoError::not_found("commit", commit_cid)
150
150
+
.with_help("Commit must be applied to storage before loading repository - use apply_commit() or ensure commit is persisted")
151
151
+
})?;
149
152
150
153
let commit = Commit::from_cbor(&commit_bytes)?;
151
154
let mst_root = commit.data();
···
266
269
267
270
// Serialize record to DAG-CBOR
268
271
let cbor = serde_ipld_dagcbor::to_vec(record)
269
269
-
.map_err(|e| RepoError::serialization(e))?;
272
272
+
.map_err(|e| RepoError::serialization(e).with_context(format!("serializing record data for {}/{}", collection.as_ref(), rkey.as_ref())))?;
270
273
271
274
// Compute CID and store data
272
275
let cid = self.storage.put(&cbor).await?;
···
283
286
284
287
// Serialize record to DAG-CBOR
285
288
let cbor = serde_ipld_dagcbor::to_vec(record)
286
286
-
.map_err(|e| RepoError::serialization(e))?;
289
289
+
.map_err(|e| RepoError::serialization(e).with_context(format!("serializing record data for {}/{}", collection.as_ref(), rkey.as_ref())))?;
287
290
288
291
// Compute CID and store data
289
292
let cid = self.storage.put(&cbor).await?;
···
291
294
// Validate prev if provided
292
295
if let Some(prev_cid) = prev {
293
296
if &cid != prev_cid {
294
294
-
return Err(RepoError::invalid(format!(
297
297
+
return Err(RepoError::cid_mismatch(format!(
295
298
"Update prev CID mismatch for key {}: expected {}, got {}",
296
299
key, prev_cid, cid
297
300
)));
···
317
320
// Validate prev if provided
318
321
if let Some(prev_cid) = prev {
319
322
if ¤t != prev_cid {
320
320
-
return Err(RepoError::invalid(format!(
323
323
+
return Err(RepoError::cid_mismatch(format!(
321
324
"Delete prev CID mismatch for key {}: expected {}, got {}",
322
325
key, prev_cid, current
323
326
)));
···
371
374
372
375
// Serialize record to DAG-CBOR
373
376
let cbor = serde_ipld_dagcbor::to_vec(record)
374
374
-
.map_err(|e| RepoError::serialization(e))?;
377
377
+
.map_err(|e| RepoError::serialization(e).with_context(format!("serializing record data for {}/{}", collection.as_ref(), rkey.as_ref())))?;
375
378
376
379
// Compute CID and store data
377
380
let cid = self.storage.put(&cbor).await?;
···
388
391
389
392
// Serialize record to DAG-CBOR
390
393
let cbor = serde_ipld_dagcbor::to_vec(record)
391
391
-
.map_err(|e| RepoError::serialization(e))?;
394
394
+
.map_err(|e| RepoError::serialization(e).with_context(format!("serializing record data for {}/{}", collection.as_ref(), rkey.as_ref())))?;
392
395
393
396
// Compute CID and store data
394
397
let cid = self.storage.put(&cbor).await?;
···
396
399
// Validate prev if provided
397
400
if let Some(prev_cid) = prev {
398
401
if &cid != prev_cid {
399
399
-
return Err(RepoError::invalid(format!(
402
402
+
return Err(RepoError::cid_mismatch(format!(
400
403
"Update prev CID mismatch for key {}: expected {}, got {}",
401
404
key, prev_cid, cid
402
405
)));
···
422
425
// Validate prev if provided
423
426
if let Some(prev_cid) = prev {
424
427
if ¤t != prev_cid {
425
425
-
return Err(RepoError::invalid(format!(
428
428
+
return Err(RepoError::cid_mismatch(format!(
426
429
"Delete prev CID mismatch for key {}: expected {}, got {}",
427
430
key, prev_cid, current
428
431
)));
···
519
522
.storage
520
523
.get(&commit_cid)
521
524
.await?
522
522
-
.ok_or_else(|| RepoError::not_found("commit block", &commit_cid))?;
525
525
+
.ok_or_else(|| {
526
526
+
RepoError::not_found("commit block", &commit_cid)
527
527
+
.with_help("Commit block should have been persisted by apply_commit() - this indicates a storage inconsistency")
528
528
+
})?;
523
529
let commit = Commit::from_cbor(&commit_bytes)?;
524
530
525
531
self.commit = commit.into_static();