this repo has no description
1use bytes::Bytes; 2use cid::Cid; 3use ipld_core::ipld::Ipld; 4use iroh_car::CarReader; 5use serde_json::Value as JsonValue; 6use sqlx::PgPool; 7use std::collections::HashMap; 8use std::io::Cursor; 9use thiserror::Error; 10use tracing::debug; 11use uuid::Uuid; 12 13#[derive(Error, Debug)] 14pub enum ImportError { 15 #[error("CAR parsing error: {0}")] 16 CarParse(String), 17 #[error("Expected exactly one root in CAR file")] 18 InvalidRootCount, 19 #[error("Block not found: {0}")] 20 BlockNotFound(String), 21 #[error("Invalid CBOR: {0}")] 22 InvalidCbor(String), 23 #[error("Database error: {0}")] 24 Database(#[from] sqlx::Error), 25 #[error("Block store error: {0}")] 26 BlockStore(String), 27 #[error("Import size limit exceeded")] 28 SizeLimitExceeded, 29 #[error("Repo not found")] 30 RepoNotFound, 31 #[error("Concurrent modification detected")] 32 ConcurrentModification, 33 #[error("Invalid commit structure: {0}")] 34 InvalidCommit(String), 35 #[error("Verification failed: {0}")] 36 VerificationFailed(#[from] super::verify::VerifyError), 37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")] 38 DidMismatch { car_did: String, auth_did: String }, 39} 40 41#[derive(Debug, Clone)] 42pub struct BlobRef { 43 pub cid: String, 44 pub mime_type: Option<String>, 45} 46 47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> { 48 let cursor = Cursor::new(data); 49 let mut reader = CarReader::new(cursor) 50 .await 51 .map_err(|e| ImportError::CarParse(e.to_string()))?; 52 53 let header = reader.header(); 54 let roots = header.roots(); 55 56 if roots.len() != 1 { 57 return Err(ImportError::InvalidRootCount); 58 } 59 60 let root = roots[0]; 61 let mut blocks = HashMap::new(); 62 63 while let Ok(Some((cid, block))) = reader.next_block().await { 64 blocks.insert(cid, Bytes::from(block)); 65 } 66 67 if !blocks.contains_key(&root) { 68 return Err(ImportError::BlockNotFound(root.to_string())); 69 } 70 71 Ok((root, blocks)) 72} 73 74pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> { 75 if depth > 32 { 76 return vec![]; 77 } 78 79 match value { 80 Ipld::List(arr) => arr 81 .iter() 82 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 83 .collect(), 84 Ipld::Map(obj) => { 85 if let Some(Ipld::String(type_str)) = obj.get("$type") { 86 if type_str == "blob" { 87 if let Some(Ipld::Link(link_cid)) = obj.get("ref") { 88 let mime = obj 89 .get("mimeType") 90 .and_then(|v| if let Ipld::String(s) = v { Some(s.clone()) } else { None }); 91 return vec![BlobRef { 92 cid: link_cid.to_string(), 93 mime_type: mime, 94 }]; 95 } 96 } 97 } 98 99 obj.values() 100 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 101 .collect() 102 } 103 _ => vec![], 104 } 105} 106 107pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> { 108 if depth > 32 { 109 return vec![]; 110 } 111 112 match value { 113 JsonValue::Array(arr) => arr 114 .iter() 115 .flat_map(|v| find_blob_refs(v, depth + 1)) 116 .collect(), 117 JsonValue::Object(obj) => { 118 if let Some(JsonValue::String(type_str)) = obj.get("$type") { 119 if type_str == "blob" { 120 if let Some(JsonValue::Object(ref_obj)) = obj.get("ref") { 121 if let Some(JsonValue::String(link)) = ref_obj.get("$link") { 122 let mime = obj 123 .get("mimeType") 124 .and_then(|v| v.as_str()) 125 .map(String::from); 126 return vec![BlobRef { 127 cid: link.clone(), 128 mime_type: mime, 129 }]; 130 } 131 } 132 } 133 } 134 135 obj.values() 136 .flat_map(|v| find_blob_refs(v, depth + 1)) 137 .collect() 138 } 139 _ => vec![], 140 } 141} 142 143pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) { 144 match value { 145 Ipld::Link(cid) => { 146 links.push(*cid); 147 } 148 Ipld::Map(map) => { 149 for v in map.values() { 150 extract_links(v, links); 151 } 152 } 153 Ipld::List(arr) => { 154 for v in arr { 155 extract_links(v, links); 156 } 157 } 158 _ => {} 159 } 160} 161 162#[derive(Debug)] 163pub struct ImportedRecord { 164 pub collection: String, 165 pub rkey: String, 166 pub cid: Cid, 167 pub blob_refs: Vec<BlobRef>, 168} 169 170pub fn walk_mst( 171 blocks: &HashMap<Cid, Bytes>, 172 root_cid: &Cid, 173) -> Result<Vec<ImportedRecord>, ImportError> { 174 let mut records = Vec::new(); 175 let mut stack = vec![*root_cid]; 176 let mut visited = std::collections::HashSet::new(); 177 178 while let Some(cid) = stack.pop() { 179 if visited.contains(&cid) { 180 continue; 181 } 182 visited.insert(cid); 183 184 let block = blocks 185 .get(&cid) 186 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?; 187 188 let value: Ipld = serde_ipld_dagcbor::from_slice(block) 189 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 190 191 if let Ipld::Map(ref obj) = value { 192 if let Some(Ipld::List(entries)) = obj.get("e") { 193 for entry in entries { 194 if let Ipld::Map(entry_obj) = entry { 195 let key = entry_obj.get("k").and_then(|k| { 196 if let Ipld::Bytes(b) = k { 197 String::from_utf8(b.clone()).ok() 198 } else if let Ipld::String(s) = k { 199 Some(s.clone()) 200 } else { 201 None 202 } 203 }); 204 205 let record_cid = entry_obj.get("v").and_then(|v| { 206 if let Ipld::Link(cid) = v { 207 Some(*cid) 208 } else { 209 None 210 } 211 }); 212 213 if let (Some(key), Some(record_cid)) = (key, record_cid) { 214 if let Some(record_block) = blocks.get(&record_cid) { 215 if let Ok(record_value) = 216 serde_ipld_dagcbor::from_slice::<Ipld>(record_block) 217 { 218 let blob_refs = find_blob_refs_ipld(&record_value, 0); 219 220 let parts: Vec<&str> = key.split('/').collect(); 221 if parts.len() >= 2 { 222 let collection = parts[..parts.len() - 1].join("/"); 223 let rkey = parts[parts.len() - 1].to_string(); 224 225 records.push(ImportedRecord { 226 collection, 227 rkey, 228 cid: record_cid, 229 blob_refs, 230 }); 231 } 232 } 233 } 234 } 235 236 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 237 stack.push(*tree_cid); 238 } 239 } 240 } 241 } 242 243 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 244 stack.push(*left_cid); 245 } 246 } 247 } 248 249 Ok(records) 250} 251 252pub struct CommitInfo { 253 pub rev: Option<String>, 254 pub prev: Option<String>, 255} 256 257fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> { 258 let obj = match commit { 259 Ipld::Map(m) => m, 260 _ => return Err(ImportError::InvalidCommit("Commit must be a map".to_string())), 261 }; 262 263 let data_cid = obj 264 .get("data") 265 .and_then(|d| if let Ipld::Link(cid) = d { Some(*cid) } else { None }) 266 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?; 267 268 let rev = obj.get("rev").and_then(|r| { 269 if let Ipld::String(s) = r { 270 Some(s.clone()) 271 } else { 272 None 273 } 274 }); 275 276 let prev = obj.get("prev").and_then(|p| { 277 if let Ipld::Link(cid) = p { 278 Some(cid.to_string()) 279 } else if let Ipld::Null = p { 280 None 281 } else { 282 None 283 } 284 }); 285 286 Ok((data_cid, CommitInfo { rev, prev })) 287} 288 289pub async fn apply_import( 290 db: &PgPool, 291 user_id: Uuid, 292 root: Cid, 293 blocks: HashMap<Cid, Bytes>, 294 max_blocks: usize, 295) -> Result<Vec<ImportedRecord>, ImportError> { 296 if blocks.len() > max_blocks { 297 return Err(ImportError::SizeLimitExceeded); 298 } 299 300 let root_block = blocks 301 .get(&root) 302 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?; 303 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block) 304 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 305 306 let (data_cid, _commit_info) = extract_commit_info(&commit)?; 307 308 let records = walk_mst(&blocks, &data_cid)?; 309 310 debug!( 311 "Importing {} blocks and {} records for user {}", 312 blocks.len(), 313 records.len(), 314 user_id 315 ); 316 317 let mut tx = db.begin().await?; 318 319 let repo = sqlx::query!( 320 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 321 user_id 322 ) 323 .fetch_optional(&mut *tx) 324 .await 325 .map_err(|e| { 326 if let sqlx::Error::Database(ref db_err) = e { 327 if db_err.code().as_deref() == Some("55P03") { 328 return ImportError::ConcurrentModification; 329 } 330 } 331 ImportError::Database(e) 332 })?; 333 334 if repo.is_none() { 335 return Err(ImportError::RepoNotFound); 336 } 337 338 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks 339 .iter() 340 .collect::<Vec<_>>() 341 .chunks(100) 342 .map(|c| c.to_vec()) 343 .collect(); 344 345 for chunk in block_chunks { 346 for (cid, data) in chunk { 347 let cid_bytes = cid.to_bytes(); 348 sqlx::query!( 349 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", 350 &cid_bytes, 351 data.as_ref() 352 ) 353 .execute(&mut *tx) 354 .await?; 355 } 356 } 357 358 let root_str = root.to_string(); 359 sqlx::query!( 360 "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2", 361 root_str, 362 user_id 363 ) 364 .execute(&mut *tx) 365 .await?; 366 367 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 368 .execute(&mut *tx) 369 .await?; 370 371 for record in &records { 372 let record_cid_str = record.cid.to_string(); 373 sqlx::query!( 374 r#" 375 INSERT INTO records (repo_id, collection, rkey, record_cid) 376 VALUES ($1, $2, $3, $4) 377 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4 378 "#, 379 user_id, 380 record.collection, 381 record.rkey, 382 record_cid_str 383 ) 384 .execute(&mut *tx) 385 .await?; 386 } 387 388 tx.commit().await?; 389 390 debug!( 391 "Successfully imported {} blocks and {} records", 392 blocks.len(), 393 records.len() 394 ); 395 396 Ok(records) 397} 398 399#[cfg(test)] 400mod tests { 401 use super::*; 402 403 #[test] 404 fn test_find_blob_refs() { 405 let record = serde_json::json!({ 406 "$type": "app.bsky.feed.post", 407 "text": "Hello world", 408 "embed": { 409 "$type": "app.bsky.embed.images", 410 "images": [ 411 { 412 "alt": "Test image", 413 "image": { 414 "$type": "blob", 415 "ref": { 416 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 417 }, 418 "mimeType": "image/jpeg", 419 "size": 12345 420 } 421 } 422 ] 423 } 424 }); 425 426 let blob_refs = find_blob_refs(&record, 0); 427 assert_eq!(blob_refs.len(), 1); 428 assert_eq!( 429 blob_refs[0].cid, 430 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 431 ); 432 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string())); 433 } 434 435 #[test] 436 fn test_find_blob_refs_no_blobs() { 437 let record = serde_json::json!({ 438 "$type": "app.bsky.feed.post", 439 "text": "Hello world" 440 }); 441 442 let blob_refs = find_blob_refs(&record, 0); 443 assert!(blob_refs.is_empty()); 444 } 445 446 #[test] 447 fn test_find_blob_refs_depth_limit() { 448 fn deeply_nested(depth: usize) -> JsonValue { 449 if depth == 0 { 450 serde_json::json!({ 451 "$type": "blob", 452 "ref": { "$link": "bafkreitest" }, 453 "mimeType": "image/png" 454 }) 455 } else { 456 serde_json::json!({ "nested": deeply_nested(depth - 1) }) 457 } 458 } 459 460 let deep = deeply_nested(40); 461 let blob_refs = find_blob_refs(&deep, 0); 462 assert!(blob_refs.is_empty()); 463 } 464}