this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use ipld_core::ipld::Ipld;
4use iroh_car::CarReader;
5use serde_json::Value as JsonValue;
6use sqlx::PgPool;
7use std::collections::HashMap;
8use std::io::Cursor;
9use thiserror::Error;
10use tracing::debug;
11use uuid::Uuid;
12
13#[derive(Error, Debug)]
14pub enum ImportError {
15 #[error("CAR parsing error: {0}")]
16 CarParse(String),
17 #[error("Expected exactly one root in CAR file")]
18 InvalidRootCount,
19 #[error("Block not found: {0}")]
20 BlockNotFound(String),
21 #[error("Invalid CBOR: {0}")]
22 InvalidCbor(String),
23 #[error("Database error: {0}")]
24 Database(#[from] sqlx::Error),
25 #[error("Block store error: {0}")]
26 BlockStore(String),
27 #[error("Import size limit exceeded")]
28 SizeLimitExceeded,
29 #[error("Repo not found")]
30 RepoNotFound,
31 #[error("Concurrent modification detected")]
32 ConcurrentModification,
33 #[error("Invalid commit structure: {0}")]
34 InvalidCommit(String),
35 #[error("Verification failed: {0}")]
36 VerificationFailed(#[from] super::verify::VerifyError),
37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")]
38 DidMismatch { car_did: String, auth_did: String },
39}
40
41#[derive(Debug, Clone)]
42pub struct BlobRef {
43 pub cid: String,
44 pub mime_type: Option<String>,
45}
46
47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> {
48 let cursor = Cursor::new(data);
49 let mut reader = CarReader::new(cursor)
50 .await
51 .map_err(|e| ImportError::CarParse(e.to_string()))?;
52 let header = reader.header();
53 let roots = header.roots();
54 if roots.len() != 1 {
55 return Err(ImportError::InvalidRootCount);
56 }
57 let root = roots[0];
58 let mut blocks = HashMap::new();
59 while let Ok(Some((cid, block))) = reader.next_block().await {
60 blocks.insert(cid, Bytes::from(block));
61 }
62 if !blocks.contains_key(&root) {
63 return Err(ImportError::BlockNotFound(root.to_string()));
64 }
65 Ok((root, blocks))
66}
67
68pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> {
69 if depth > 32 {
70 return vec![];
71 }
72 match value {
73 Ipld::List(arr) => arr
74 .iter()
75 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
76 .collect(),
77 Ipld::Map(obj) => {
78 if let Some(Ipld::String(type_str)) = obj.get("$type")
79 && type_str == "blob"
80 {
81 let cid_str = if let Some(Ipld::Link(link_cid)) = obj.get("ref") {
82 Some(link_cid.to_string())
83 } else if let Some(Ipld::Map(ref_obj)) = obj.get("ref")
84 && let Some(Ipld::String(link)) = ref_obj.get("$link")
85 {
86 Some(link.clone())
87 } else {
88 None
89 };
90
91 if let Some(cid) = cid_str {
92 let mime = obj.get("mimeType").and_then(|v| {
93 if let Ipld::String(s) = v {
94 Some(s.clone())
95 } else {
96 None
97 }
98 });
99 return vec![BlobRef {
100 cid,
101 mime_type: mime,
102 }];
103 }
104 }
105 obj.values()
106 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
107 .collect()
108 }
109 _ => vec![],
110 }
111}
112
113pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> {
114 if depth > 32 {
115 return vec![];
116 }
117 match value {
118 JsonValue::Array(arr) => arr
119 .iter()
120 .flat_map(|v| find_blob_refs(v, depth + 1))
121 .collect(),
122 JsonValue::Object(obj) => {
123 if let Some(JsonValue::String(type_str)) = obj.get("$type")
124 && type_str == "blob"
125 && let Some(JsonValue::Object(ref_obj)) = obj.get("ref")
126 && let Some(JsonValue::String(link)) = ref_obj.get("$link")
127 {
128 let mime = obj
129 .get("mimeType")
130 .and_then(|v| v.as_str())
131 .map(String::from);
132 return vec![BlobRef {
133 cid: link.clone(),
134 mime_type: mime,
135 }];
136 }
137 obj.values()
138 .flat_map(|v| find_blob_refs(v, depth + 1))
139 .collect()
140 }
141 _ => vec![],
142 }
143}
144
145pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) {
146 match value {
147 Ipld::Link(cid) => {
148 links.push(*cid);
149 }
150 Ipld::Map(map) => {
151 for v in map.values() {
152 extract_links(v, links);
153 }
154 }
155 Ipld::List(arr) => {
156 for v in arr {
157 extract_links(v, links);
158 }
159 }
160 _ => {}
161 }
162}
163
164#[derive(Debug)]
165pub struct ImportedRecord {
166 pub collection: String,
167 pub rkey: String,
168 pub cid: Cid,
169 pub blob_refs: Vec<BlobRef>,
170}
171
172pub fn walk_mst(
173 blocks: &HashMap<Cid, Bytes>,
174 root_cid: &Cid,
175) -> Result<Vec<ImportedRecord>, ImportError> {
176 let mut records = Vec::new();
177 walk_mst_node(blocks, root_cid, &[], &mut records)?;
178 Ok(records)
179}
180
181fn walk_mst_node(
182 blocks: &HashMap<Cid, Bytes>,
183 cid: &Cid,
184 prev_key: &[u8],
185 records: &mut Vec<ImportedRecord>,
186) -> Result<(), ImportError> {
187 let block = blocks
188 .get(cid)
189 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?;
190 let value: Ipld = serde_ipld_dagcbor::from_slice(block)
191 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
192
193 if let Ipld::Map(ref obj) = value {
194 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
195 walk_mst_node(blocks, left_cid, prev_key, records)?;
196 }
197
198 let mut current_key = prev_key.to_vec();
199
200 if let Some(Ipld::List(entries)) = obj.get("e") {
201 for entry in entries {
202 if let Ipld::Map(entry_obj) = entry {
203 let prefix_len = entry_obj
204 .get("p")
205 .and_then(|p| {
206 if let Ipld::Integer(n) = p {
207 Some(*n as usize)
208 } else {
209 None
210 }
211 })
212 .unwrap_or(0);
213
214 let key_suffix = entry_obj.get("k").and_then(|k| {
215 if let Ipld::Bytes(b) = k {
216 Some(b.clone())
217 } else {
218 None
219 }
220 });
221
222 if let Some(suffix) = key_suffix {
223 current_key.truncate(prefix_len);
224 current_key.extend_from_slice(&suffix);
225 }
226
227 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
228 walk_mst_node(blocks, tree_cid, ¤t_key, records)?;
229 }
230
231 let record_cid = entry_obj.get("v").and_then(|v| {
232 if let Ipld::Link(cid) = v {
233 Some(*cid)
234 } else {
235 None
236 }
237 });
238
239 if let Some(record_cid) = record_cid
240 && let Ok(full_key) = String::from_utf8(current_key.clone())
241 && let Some(record_block) = blocks.get(&record_cid)
242 && let Ok(record_value) =
243 serde_ipld_dagcbor::from_slice::<Ipld>(record_block)
244 {
245 let blob_refs = find_blob_refs_ipld(&record_value, 0);
246 let parts: Vec<&str> = full_key.split('/').collect();
247 if parts.len() >= 2 {
248 let collection = parts[..parts.len() - 1].join("/");
249 let rkey = parts[parts.len() - 1].to_string();
250 records.push(ImportedRecord {
251 collection,
252 rkey,
253 cid: record_cid,
254 blob_refs,
255 });
256 }
257 }
258 }
259 }
260 }
261 }
262 Ok(())
263}
264
265pub struct CommitInfo {
266 pub rev: Option<String>,
267 pub prev: Option<String>,
268}
269
270pub struct ImportResult {
271 pub records: Vec<ImportedRecord>,
272 pub data_cid: Cid,
273}
274
275fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> {
276 let obj = match commit {
277 Ipld::Map(m) => m,
278 _ => {
279 return Err(ImportError::InvalidCommit(
280 "Commit must be a map".to_string(),
281 ));
282 }
283 };
284 let data_cid = obj
285 .get("data")
286 .and_then(|d| {
287 if let Ipld::Link(cid) = d {
288 Some(*cid)
289 } else {
290 None
291 }
292 })
293 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?;
294 let rev = obj.get("rev").and_then(|r| {
295 if let Ipld::String(s) = r {
296 Some(s.clone())
297 } else {
298 None
299 }
300 });
301 let prev = obj.get("prev").and_then(|p| {
302 if let Ipld::Link(cid) = p {
303 Some(cid.to_string())
304 } else if let Ipld::Null = p {
305 None
306 } else {
307 None
308 }
309 });
310 Ok((data_cid, CommitInfo { rev, prev }))
311}
312
313pub async fn apply_import(
314 db: &PgPool,
315 user_id: Uuid,
316 root: Cid,
317 blocks: HashMap<Cid, Bytes>,
318 max_blocks: usize,
319) -> Result<ImportResult, ImportError> {
320 if blocks.len() > max_blocks {
321 return Err(ImportError::SizeLimitExceeded);
322 }
323 let root_block = blocks
324 .get(&root)
325 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?;
326 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block)
327 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
328 let (data_cid, _commit_info) = extract_commit_info(&commit)?;
329 let records = walk_mst(&blocks, &data_cid)?;
330 debug!(
331 "Importing {} blocks and {} records for user {}",
332 blocks.len(),
333 records.len(),
334 user_id
335 );
336 let mut tx = db.begin().await?;
337 let repo = sqlx::query!(
338 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
339 user_id
340 )
341 .fetch_optional(&mut *tx)
342 .await
343 .map_err(|e| {
344 if let sqlx::Error::Database(ref db_err) = e
345 && db_err.code().as_deref() == Some("55P03")
346 {
347 return ImportError::ConcurrentModification;
348 }
349 ImportError::Database(e)
350 })?;
351 if repo.is_none() {
352 return Err(ImportError::RepoNotFound);
353 }
354 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks
355 .iter()
356 .collect::<Vec<_>>()
357 .chunks(100)
358 .map(|c| c.to_vec())
359 .collect();
360 for chunk in block_chunks {
361 for (cid, data) in chunk {
362 let cid_bytes = cid.to_bytes();
363 sqlx::query!(
364 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
365 &cid_bytes,
366 data.as_ref()
367 )
368 .execute(&mut *tx)
369 .await?;
370 }
371 }
372 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
373 .execute(&mut *tx)
374 .await?;
375 for record in &records {
376 let record_cid_str = record.cid.to_string();
377 sqlx::query!(
378 r#"
379 INSERT INTO records (repo_id, collection, rkey, record_cid)
380 VALUES ($1, $2, $3, $4)
381 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4
382 "#,
383 user_id,
384 record.collection,
385 record.rkey,
386 record_cid_str
387 )
388 .execute(&mut *tx)
389 .await?;
390 }
391 tx.commit().await?;
392 debug!(
393 "Successfully imported {} blocks and {} records",
394 blocks.len(),
395 records.len()
396 );
397 Ok(ImportResult { records, data_cid })
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403
404 #[test]
405 fn test_find_blob_refs() {
406 let record = serde_json::json!({
407 "$type": "app.bsky.feed.post",
408 "text": "Hello world",
409 "embed": {
410 "$type": "app.bsky.embed.images",
411 "images": [
412 {
413 "alt": "Test image",
414 "image": {
415 "$type": "blob",
416 "ref": {
417 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
418 },
419 "mimeType": "image/jpeg",
420 "size": 12345
421 }
422 }
423 ]
424 }
425 });
426 let blob_refs = find_blob_refs(&record, 0);
427 assert_eq!(blob_refs.len(), 1);
428 assert_eq!(
429 blob_refs[0].cid,
430 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
431 );
432 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string()));
433 }
434
435 #[test]
436 fn test_find_blob_refs_no_blobs() {
437 let record = serde_json::json!({
438 "$type": "app.bsky.feed.post",
439 "text": "Hello world"
440 });
441 let blob_refs = find_blob_refs(&record, 0);
442 assert!(blob_refs.is_empty());
443 }
444
445 #[test]
446 fn test_find_blob_refs_depth_limit() {
447 fn deeply_nested(depth: usize) -> JsonValue {
448 if depth == 0 {
449 serde_json::json!({
450 "$type": "blob",
451 "ref": { "$link": "bafkreitest" },
452 "mimeType": "image/png"
453 })
454 } else {
455 serde_json::json!({ "nested": deeply_nested(depth - 1) })
456 }
457 }
458 let deep = deeply_nested(40);
459 let blob_refs = find_blob_refs(&deep, 0);
460 assert!(blob_refs.is_empty());
461 }
462}