this repo has no description
1use bytes::Bytes; 2use cid::Cid; 3use ipld_core::ipld::Ipld; 4use iroh_car::CarReader; 5use serde_json::Value as JsonValue; 6use sqlx::PgPool; 7use std::collections::HashMap; 8use std::io::Cursor; 9use thiserror::Error; 10use tracing::debug; 11use uuid::Uuid; 12 13#[derive(Error, Debug)] 14pub enum ImportError { 15 #[error("CAR parsing error: {0}")] 16 CarParse(String), 17 #[error("Expected exactly one root in CAR file")] 18 InvalidRootCount, 19 #[error("Block not found: {0}")] 20 BlockNotFound(String), 21 #[error("Invalid CBOR: {0}")] 22 InvalidCbor(String), 23 #[error("Database error: {0}")] 24 Database(#[from] sqlx::Error), 25 #[error("Block store error: {0}")] 26 BlockStore(String), 27 #[error("Import size limit exceeded")] 28 SizeLimitExceeded, 29 #[error("Repo not found")] 30 RepoNotFound, 31 #[error("Concurrent modification detected")] 32 ConcurrentModification, 33 #[error("Invalid commit structure: {0}")] 34 InvalidCommit(String), 35 #[error("Verification failed: {0}")] 36 VerificationFailed(#[from] super::verify::VerifyError), 37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")] 38 DidMismatch { car_did: String, auth_did: String }, 39} 40 41#[derive(Debug, Clone)] 42pub struct BlobRef { 43 pub cid: String, 44 pub mime_type: Option<String>, 45} 46 47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> { 48 let cursor = Cursor::new(data); 49 let mut reader = CarReader::new(cursor) 50 .await 51 .map_err(|e| ImportError::CarParse(e.to_string()))?; 52 let header = reader.header(); 53 let roots = header.roots(); 54 if roots.len() != 1 { 55 return Err(ImportError::InvalidRootCount); 56 } 57 let root = roots[0]; 58 let mut blocks = HashMap::new(); 59 while let Ok(Some((cid, block))) = reader.next_block().await { 60 blocks.insert(cid, Bytes::from(block)); 61 } 62 if !blocks.contains_key(&root) { 63 return Err(ImportError::BlockNotFound(root.to_string())); 64 } 65 Ok((root, blocks)) 66} 67 68pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> { 69 if depth > 32 { 70 return vec![]; 71 } 72 match value { 73 Ipld::List(arr) => arr 74 .iter() 75 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 76 .collect(), 77 Ipld::Map(obj) => { 78 if let Some(Ipld::String(type_str)) = obj.get("$type") 79 && type_str == "blob" 80 && let Some(Ipld::Link(link_cid)) = obj.get("ref") { 81 let mime = obj.get("mimeType").and_then(|v| { 82 if let Ipld::String(s) = v { 83 Some(s.clone()) 84 } else { 85 None 86 } 87 }); 88 return vec![BlobRef { 89 cid: link_cid.to_string(), 90 mime_type: mime, 91 }]; 92 } 93 obj.values() 94 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 95 .collect() 96 } 97 _ => vec![], 98 } 99} 100 101pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> { 102 if depth > 32 { 103 return vec![]; 104 } 105 match value { 106 JsonValue::Array(arr) => arr 107 .iter() 108 .flat_map(|v| find_blob_refs(v, depth + 1)) 109 .collect(), 110 JsonValue::Object(obj) => { 111 if let Some(JsonValue::String(type_str)) = obj.get("$type") 112 && type_str == "blob" 113 && let Some(JsonValue::Object(ref_obj)) = obj.get("ref") 114 && let Some(JsonValue::String(link)) = ref_obj.get("$link") { 115 let mime = obj 116 .get("mimeType") 117 .and_then(|v| v.as_str()) 118 .map(String::from); 119 return vec![BlobRef { 120 cid: link.clone(), 121 mime_type: mime, 122 }]; 123 } 124 obj.values() 125 .flat_map(|v| find_blob_refs(v, depth + 1)) 126 .collect() 127 } 128 _ => vec![], 129 } 130} 131 132pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) { 133 match value { 134 Ipld::Link(cid) => { 135 links.push(*cid); 136 } 137 Ipld::Map(map) => { 138 for v in map.values() { 139 extract_links(v, links); 140 } 141 } 142 Ipld::List(arr) => { 143 for v in arr { 144 extract_links(v, links); 145 } 146 } 147 _ => {} 148 } 149} 150 151#[derive(Debug)] 152pub struct ImportedRecord { 153 pub collection: String, 154 pub rkey: String, 155 pub cid: Cid, 156 pub blob_refs: Vec<BlobRef>, 157} 158 159pub fn walk_mst( 160 blocks: &HashMap<Cid, Bytes>, 161 root_cid: &Cid, 162) -> Result<Vec<ImportedRecord>, ImportError> { 163 let mut records = Vec::new(); 164 let mut stack = vec![*root_cid]; 165 let mut visited = std::collections::HashSet::new(); 166 while let Some(cid) = stack.pop() { 167 if visited.contains(&cid) { 168 continue; 169 } 170 visited.insert(cid); 171 let block = blocks 172 .get(&cid) 173 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?; 174 let value: Ipld = serde_ipld_dagcbor::from_slice(block) 175 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 176 if let Ipld::Map(ref obj) = value { 177 if let Some(Ipld::List(entries)) = obj.get("e") { 178 for entry in entries { 179 if let Ipld::Map(entry_obj) = entry { 180 let key = entry_obj.get("k").and_then(|k| { 181 if let Ipld::Bytes(b) = k { 182 String::from_utf8(b.clone()).ok() 183 } else if let Ipld::String(s) = k { 184 Some(s.clone()) 185 } else { 186 None 187 } 188 }); 189 let record_cid = entry_obj.get("v").and_then(|v| { 190 if let Ipld::Link(cid) = v { 191 Some(*cid) 192 } else { 193 None 194 } 195 }); 196 if let (Some(key), Some(record_cid)) = (key, record_cid) 197 && let Some(record_block) = blocks.get(&record_cid) 198 && let Ok(record_value) = 199 serde_ipld_dagcbor::from_slice::<Ipld>(record_block) 200 { 201 let blob_refs = find_blob_refs_ipld(&record_value, 0); 202 let parts: Vec<&str> = key.split('/').collect(); 203 if parts.len() >= 2 { 204 let collection = parts[..parts.len() - 1].join("/"); 205 let rkey = parts[parts.len() - 1].to_string(); 206 records.push(ImportedRecord { 207 collection, 208 rkey, 209 cid: record_cid, 210 blob_refs, 211 }); 212 } 213 } 214 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 215 stack.push(*tree_cid); 216 } 217 } 218 } 219 } 220 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 221 stack.push(*left_cid); 222 } 223 } 224 } 225 Ok(records) 226} 227 228pub struct CommitInfo { 229 pub rev: Option<String>, 230 pub prev: Option<String>, 231} 232 233fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> { 234 let obj = match commit { 235 Ipld::Map(m) => m, 236 _ => { 237 return Err(ImportError::InvalidCommit( 238 "Commit must be a map".to_string(), 239 )); 240 } 241 }; 242 let data_cid = obj 243 .get("data") 244 .and_then(|d| { 245 if let Ipld::Link(cid) = d { 246 Some(*cid) 247 } else { 248 None 249 } 250 }) 251 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?; 252 let rev = obj.get("rev").and_then(|r| { 253 if let Ipld::String(s) = r { 254 Some(s.clone()) 255 } else { 256 None 257 } 258 }); 259 let prev = obj.get("prev").and_then(|p| { 260 if let Ipld::Link(cid) = p { 261 Some(cid.to_string()) 262 } else if let Ipld::Null = p { 263 None 264 } else { 265 None 266 } 267 }); 268 Ok((data_cid, CommitInfo { rev, prev })) 269} 270 271pub async fn apply_import( 272 db: &PgPool, 273 user_id: Uuid, 274 root: Cid, 275 blocks: HashMap<Cid, Bytes>, 276 max_blocks: usize, 277) -> Result<Vec<ImportedRecord>, ImportError> { 278 if blocks.len() > max_blocks { 279 return Err(ImportError::SizeLimitExceeded); 280 } 281 let root_block = blocks 282 .get(&root) 283 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?; 284 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block) 285 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 286 let (data_cid, _commit_info) = extract_commit_info(&commit)?; 287 let records = walk_mst(&blocks, &data_cid)?; 288 debug!( 289 "Importing {} blocks and {} records for user {}", 290 blocks.len(), 291 records.len(), 292 user_id 293 ); 294 let mut tx = db.begin().await?; 295 let repo = sqlx::query!( 296 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 297 user_id 298 ) 299 .fetch_optional(&mut *tx) 300 .await 301 .map_err(|e| { 302 if let sqlx::Error::Database(ref db_err) = e 303 && db_err.code().as_deref() == Some("55P03") { 304 return ImportError::ConcurrentModification; 305 } 306 ImportError::Database(e) 307 })?; 308 if repo.is_none() { 309 return Err(ImportError::RepoNotFound); 310 } 311 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks 312 .iter() 313 .collect::<Vec<_>>() 314 .chunks(100) 315 .map(|c| c.to_vec()) 316 .collect(); 317 for chunk in block_chunks { 318 for (cid, data) in chunk { 319 let cid_bytes = cid.to_bytes(); 320 sqlx::query!( 321 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", 322 &cid_bytes, 323 data.as_ref() 324 ) 325 .execute(&mut *tx) 326 .await?; 327 } 328 } 329 let root_str = root.to_string(); 330 sqlx::query!( 331 "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2", 332 root_str, 333 user_id 334 ) 335 .execute(&mut *tx) 336 .await?; 337 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 338 .execute(&mut *tx) 339 .await?; 340 for record in &records { 341 let record_cid_str = record.cid.to_string(); 342 sqlx::query!( 343 r#" 344 INSERT INTO records (repo_id, collection, rkey, record_cid) 345 VALUES ($1, $2, $3, $4) 346 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4 347 "#, 348 user_id, 349 record.collection, 350 record.rkey, 351 record_cid_str 352 ) 353 .execute(&mut *tx) 354 .await?; 355 } 356 tx.commit().await?; 357 debug!( 358 "Successfully imported {} blocks and {} records", 359 blocks.len(), 360 records.len() 361 ); 362 Ok(records) 363} 364 365#[cfg(test)] 366mod tests { 367 use super::*; 368 369 #[test] 370 fn test_find_blob_refs() { 371 let record = serde_json::json!({ 372 "$type": "app.bsky.feed.post", 373 "text": "Hello world", 374 "embed": { 375 "$type": "app.bsky.embed.images", 376 "images": [ 377 { 378 "alt": "Test image", 379 "image": { 380 "$type": "blob", 381 "ref": { 382 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 383 }, 384 "mimeType": "image/jpeg", 385 "size": 12345 386 } 387 } 388 ] 389 } 390 }); 391 let blob_refs = find_blob_refs(&record, 0); 392 assert_eq!(blob_refs.len(), 1); 393 assert_eq!( 394 blob_refs[0].cid, 395 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 396 ); 397 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string())); 398 } 399 400 #[test] 401 fn test_find_blob_refs_no_blobs() { 402 let record = serde_json::json!({ 403 "$type": "app.bsky.feed.post", 404 "text": "Hello world" 405 }); 406 let blob_refs = find_blob_refs(&record, 0); 407 assert!(blob_refs.is_empty()); 408 } 409 410 #[test] 411 fn test_find_blob_refs_depth_limit() { 412 fn deeply_nested(depth: usize) -> JsonValue { 413 if depth == 0 { 414 serde_json::json!({ 415 "$type": "blob", 416 "ref": { "$link": "bafkreitest" }, 417 "mimeType": "image/png" 418 }) 419 } else { 420 serde_json::json!({ "nested": deeply_nested(depth - 1) }) 421 } 422 } 423 let deep = deeply_nested(40); 424 let blob_refs = find_blob_refs(&deep, 0); 425 assert!(blob_refs.is_empty()); 426 } 427}