use bytes::Bytes; use cid::Cid; use ipld_core::ipld::Ipld; use iroh_car::CarReader; use serde_json::Value as JsonValue; use sqlx::PgPool; use std::collections::HashMap; use std::io::Cursor; use thiserror::Error; use tracing::debug; use uuid::Uuid; #[derive(Error, Debug)] pub enum ImportError { #[error("CAR parsing error: {0}")] CarParse(String), #[error("Expected exactly one root in CAR file")] InvalidRootCount, #[error("Block not found: {0}")] BlockNotFound(String), #[error("Invalid CBOR: {0}")] InvalidCbor(String), #[error("Database error: {0}")] Database(#[from] sqlx::Error), #[error("Block store error: {0}")] BlockStore(String), #[error("Import size limit exceeded")] SizeLimitExceeded, #[error("Repo not found")] RepoNotFound, #[error("Concurrent modification detected")] ConcurrentModification, #[error("Invalid commit structure: {0}")] InvalidCommit(String), #[error("Verification failed: {0}")] VerificationFailed(#[from] super::verify::VerifyError), #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")] DidMismatch { car_did: String, auth_did: String }, } #[derive(Debug, Clone)] pub struct BlobRef { pub cid: String, pub mime_type: Option, } pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap), ImportError> { let cursor = Cursor::new(data); let mut reader = CarReader::new(cursor) .await .map_err(|e| ImportError::CarParse(e.to_string()))?; let header = reader.header(); let roots = header.roots(); if roots.len() != 1 { return Err(ImportError::InvalidRootCount); } let root = roots[0]; let mut blocks = HashMap::new(); while let Ok(Some((cid, block))) = reader.next_block().await { blocks.insert(cid, Bytes::from(block)); } if !blocks.contains_key(&root) { return Err(ImportError::BlockNotFound(root.to_string())); } Ok((root, blocks)) } pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec { if depth > 32 { return vec![]; } match value { Ipld::List(arr) => arr .iter() .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) .collect(), Ipld::Map(obj) => { if let Some(Ipld::String(type_str)) = obj.get("$type") && type_str == "blob" && let Some(Ipld::Link(link_cid)) = obj.get("ref") { let mime = obj.get("mimeType").and_then(|v| { if let Ipld::String(s) = v { Some(s.clone()) } else { None } }); return vec![BlobRef { cid: link_cid.to_string(), mime_type: mime, }]; } obj.values() .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) .collect() } _ => vec![], } } pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec { if depth > 32 { return vec![]; } match value { JsonValue::Array(arr) => arr .iter() .flat_map(|v| find_blob_refs(v, depth + 1)) .collect(), JsonValue::Object(obj) => { if let Some(JsonValue::String(type_str)) = obj.get("$type") && type_str == "blob" && let Some(JsonValue::Object(ref_obj)) = obj.get("ref") && let Some(JsonValue::String(link)) = ref_obj.get("$link") { let mime = obj .get("mimeType") .and_then(|v| v.as_str()) .map(String::from); return vec![BlobRef { cid: link.clone(), mime_type: mime, }]; } obj.values() .flat_map(|v| find_blob_refs(v, depth + 1)) .collect() } _ => vec![], } } pub fn extract_links(value: &Ipld, links: &mut Vec) { match value { Ipld::Link(cid) => { links.push(*cid); } Ipld::Map(map) => { for v in map.values() { extract_links(v, links); } } Ipld::List(arr) => { for v in arr { extract_links(v, links); } } _ => {} } } #[derive(Debug)] pub struct ImportedRecord { pub collection: String, pub rkey: String, pub cid: Cid, pub blob_refs: Vec, } pub fn walk_mst( blocks: &HashMap, root_cid: &Cid, ) -> Result, ImportError> { let mut records = Vec::new(); walk_mst_node(blocks, root_cid, &[], &mut records)?; Ok(records) } fn walk_mst_node( blocks: &HashMap, cid: &Cid, prev_key: &[u8], records: &mut Vec, ) -> Result<(), ImportError> { let block = blocks .get(cid) .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?; let value: Ipld = serde_ipld_dagcbor::from_slice(block) .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; if let Ipld::Map(ref obj) = value { if let Some(Ipld::Link(left_cid)) = obj.get("l") { walk_mst_node(blocks, left_cid, prev_key, records)?; } let mut current_key = prev_key.to_vec(); if let Some(Ipld::List(entries)) = obj.get("e") { for entry in entries { if let Ipld::Map(entry_obj) = entry { let prefix_len = entry_obj .get("p") .and_then(|p| { if let Ipld::Integer(n) = p { Some(*n as usize) } else { None } }) .unwrap_or(0); let key_suffix = entry_obj.get("k").and_then(|k| { if let Ipld::Bytes(b) = k { Some(b.clone()) } else { None } }); if let Some(suffix) = key_suffix { current_key.truncate(prefix_len); current_key.extend_from_slice(&suffix); } if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { walk_mst_node(blocks, tree_cid, ¤t_key, records)?; } let record_cid = entry_obj.get("v").and_then(|v| { if let Ipld::Link(cid) = v { Some(*cid) } else { None } }); if let Some(record_cid) = record_cid && let Ok(full_key) = String::from_utf8(current_key.clone()) && let Some(record_block) = blocks.get(&record_cid) && let Ok(record_value) = serde_ipld_dagcbor::from_slice::(record_block) { let blob_refs = find_blob_refs_ipld(&record_value, 0); let parts: Vec<&str> = full_key.split('/').collect(); if parts.len() >= 2 { let collection = parts[..parts.len() - 1].join("/"); let rkey = parts[parts.len() - 1].to_string(); records.push(ImportedRecord { collection, rkey, cid: record_cid, blob_refs, }); } } } } } } Ok(()) } pub struct CommitInfo { pub rev: Option, pub prev: Option, } pub struct ImportResult { pub records: Vec, pub data_cid: Cid, } fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> { let obj = match commit { Ipld::Map(m) => m, _ => { return Err(ImportError::InvalidCommit( "Commit must be a map".to_string(), )); } }; let data_cid = obj .get("data") .and_then(|d| { if let Ipld::Link(cid) = d { Some(*cid) } else { None } }) .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?; let rev = obj.get("rev").and_then(|r| { if let Ipld::String(s) = r { Some(s.clone()) } else { None } }); let prev = obj.get("prev").and_then(|p| { if let Ipld::Link(cid) = p { Some(cid.to_string()) } else if let Ipld::Null = p { None } else { None } }); Ok((data_cid, CommitInfo { rev, prev })) } pub async fn apply_import( db: &PgPool, user_id: Uuid, root: Cid, blocks: HashMap, max_blocks: usize, ) -> Result { if blocks.len() > max_blocks { return Err(ImportError::SizeLimitExceeded); } let root_block = blocks .get(&root) .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?; let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block) .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; let (data_cid, _commit_info) = extract_commit_info(&commit)?; let records = walk_mst(&blocks, &data_cid)?; debug!( "Importing {} blocks and {} records for user {}", blocks.len(), records.len(), user_id ); let mut tx = db.begin().await?; let repo = sqlx::query!( "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", user_id ) .fetch_optional(&mut *tx) .await .map_err(|e| { if let sqlx::Error::Database(ref db_err) = e && db_err.code().as_deref() == Some("55P03") { return ImportError::ConcurrentModification; } ImportError::Database(e) })?; if repo.is_none() { return Err(ImportError::RepoNotFound); } let block_chunks: Vec> = blocks .iter() .collect::>() .chunks(100) .map(|c| c.to_vec()) .collect(); for chunk in block_chunks { for (cid, data) in chunk { let cid_bytes = cid.to_bytes(); sqlx::query!( "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", &cid_bytes, data.as_ref() ) .execute(&mut *tx) .await?; } } sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) .execute(&mut *tx) .await?; for record in &records { let record_cid_str = record.cid.to_string(); sqlx::query!( r#" INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4 "#, user_id, record.collection, record.rkey, record_cid_str ) .execute(&mut *tx) .await?; } tx.commit().await?; debug!( "Successfully imported {} blocks and {} records", blocks.len(), records.len() ); Ok(ImportResult { records, data_cid }) } #[cfg(test)] mod tests { use super::*; #[test] fn test_find_blob_refs() { let record = serde_json::json!({ "$type": "app.bsky.feed.post", "text": "Hello world", "embed": { "$type": "app.bsky.embed.images", "images": [ { "alt": "Test image", "image": { "$type": "blob", "ref": { "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" }, "mimeType": "image/jpeg", "size": 12345 } } ] } }); let blob_refs = find_blob_refs(&record, 0); assert_eq!(blob_refs.len(), 1); assert_eq!( blob_refs[0].cid, "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" ); assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string())); } #[test] fn test_find_blob_refs_no_blobs() { let record = serde_json::json!({ "$type": "app.bsky.feed.post", "text": "Hello world" }); let blob_refs = find_blob_refs(&record, 0); assert!(blob_refs.is_empty()); } #[test] fn test_find_blob_refs_depth_limit() { fn deeply_nested(depth: usize) -> JsonValue { if depth == 0 { serde_json::json!({ "$type": "blob", "ref": { "$link": "bafkreitest" }, "mimeType": "image/png" }) } else { serde_json::json!({ "nested": deeply_nested(depth - 1) }) } } let deep = deeply_nested(40); let blob_refs = find_blob_refs(&deep, 0); assert!(blob_refs.is_empty()); } }