this repo has no description
at main 14 kB view raw
1use bytes::Bytes; 2use cid::Cid; 3use ipld_core::ipld::Ipld; 4use iroh_car::CarReader; 5use serde_json::Value as JsonValue; 6use sqlx::PgPool; 7use std::collections::HashMap; 8use std::io::Cursor; 9use thiserror::Error; 10use tracing::debug; 11use uuid::Uuid; 12 13#[derive(Error, Debug)] 14pub enum ImportError { 15 #[error("CAR parsing error: {0}")] 16 CarParse(String), 17 #[error("Expected exactly one root in CAR file")] 18 InvalidRootCount, 19 #[error("Block not found: {0}")] 20 BlockNotFound(String), 21 #[error("Invalid CBOR: {0}")] 22 InvalidCbor(String), 23 #[error("Database error: {0}")] 24 Database(#[from] sqlx::Error), 25 #[error("Block store error: {0}")] 26 BlockStore(String), 27 #[error("Import size limit exceeded")] 28 SizeLimitExceeded, 29 #[error("Repo not found")] 30 RepoNotFound, 31 #[error("Concurrent modification detected")] 32 ConcurrentModification, 33 #[error("Invalid commit structure: {0}")] 34 InvalidCommit(String), 35 #[error("Verification failed: {0}")] 36 VerificationFailed(#[from] super::verify::VerifyError), 37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")] 38 DidMismatch { car_did: String, auth_did: String }, 39} 40 41#[derive(Debug, Clone)] 42pub struct BlobRef { 43 pub cid: String, 44 pub mime_type: Option<String>, 45} 46 47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> { 48 let cursor = Cursor::new(data); 49 let mut reader = CarReader::new(cursor) 50 .await 51 .map_err(|e| ImportError::CarParse(e.to_string()))?; 52 let header = reader.header(); 53 let roots = header.roots(); 54 if roots.len() != 1 { 55 return Err(ImportError::InvalidRootCount); 56 } 57 let root = roots[0]; 58 let mut blocks = HashMap::new(); 59 while let Ok(Some((cid, block))) = reader.next_block().await { 60 blocks.insert(cid, Bytes::from(block)); 61 } 62 if !blocks.contains_key(&root) { 63 return Err(ImportError::BlockNotFound(root.to_string())); 64 } 65 Ok((root, blocks)) 66} 67 68pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> { 69 if depth > 32 { 70 return vec![]; 71 } 72 match value { 73 Ipld::List(arr) => arr 74 .iter() 75 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 76 .collect(), 77 Ipld::Map(obj) => { 78 if let Some(Ipld::String(type_str)) = obj.get("$type") 79 && type_str == "blob" 80 { 81 let cid_str = if let Some(Ipld::Link(link_cid)) = obj.get("ref") { 82 Some(link_cid.to_string()) 83 } else if let Some(Ipld::Map(ref_obj)) = obj.get("ref") 84 && let Some(Ipld::String(link)) = ref_obj.get("$link") 85 { 86 Some(link.clone()) 87 } else { 88 None 89 }; 90 91 if let Some(cid) = cid_str { 92 let mime = obj.get("mimeType").and_then(|v| { 93 if let Ipld::String(s) = v { 94 Some(s.clone()) 95 } else { 96 None 97 } 98 }); 99 return vec![BlobRef { 100 cid, 101 mime_type: mime, 102 }]; 103 } 104 } 105 obj.values() 106 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 107 .collect() 108 } 109 _ => vec![], 110 } 111} 112 113pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> { 114 if depth > 32 { 115 return vec![]; 116 } 117 match value { 118 JsonValue::Array(arr) => arr 119 .iter() 120 .flat_map(|v| find_blob_refs(v, depth + 1)) 121 .collect(), 122 JsonValue::Object(obj) => { 123 if let Some(JsonValue::String(type_str)) = obj.get("$type") 124 && type_str == "blob" 125 && let Some(JsonValue::Object(ref_obj)) = obj.get("ref") 126 && let Some(JsonValue::String(link)) = ref_obj.get("$link") 127 { 128 let mime = obj 129 .get("mimeType") 130 .and_then(|v| v.as_str()) 131 .map(String::from); 132 return vec![BlobRef { 133 cid: link.clone(), 134 mime_type: mime, 135 }]; 136 } 137 obj.values() 138 .flat_map(|v| find_blob_refs(v, depth + 1)) 139 .collect() 140 } 141 _ => vec![], 142 } 143} 144 145pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) { 146 match value { 147 Ipld::Link(cid) => { 148 links.push(*cid); 149 } 150 Ipld::Map(map) => { 151 map.values().for_each(|v| extract_links(v, links)); 152 } 153 Ipld::List(arr) => { 154 arr.iter().for_each(|v| extract_links(v, links)); 155 } 156 _ => {} 157 } 158} 159 160#[derive(Debug)] 161pub struct ImportedRecord { 162 pub collection: String, 163 pub rkey: String, 164 pub cid: Cid, 165 pub blob_refs: Vec<BlobRef>, 166} 167 168pub fn walk_mst( 169 blocks: &HashMap<Cid, Bytes>, 170 root_cid: &Cid, 171) -> Result<Vec<ImportedRecord>, ImportError> { 172 let mut records = Vec::new(); 173 walk_mst_node(blocks, root_cid, &[], &mut records)?; 174 Ok(records) 175} 176 177fn walk_mst_node( 178 blocks: &HashMap<Cid, Bytes>, 179 cid: &Cid, 180 prev_key: &[u8], 181 records: &mut Vec<ImportedRecord>, 182) -> Result<(), ImportError> { 183 let block = blocks 184 .get(cid) 185 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?; 186 let value: Ipld = serde_ipld_dagcbor::from_slice(block) 187 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 188 189 if let Ipld::Map(ref obj) = value { 190 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 191 walk_mst_node(blocks, left_cid, prev_key, records)?; 192 } 193 194 let mut current_key = prev_key.to_vec(); 195 196 if let Some(Ipld::List(entries)) = obj.get("e") { 197 for entry in entries { 198 if let Ipld::Map(entry_obj) = entry { 199 let prefix_len = entry_obj 200 .get("p") 201 .and_then(|p| { 202 if let Ipld::Integer(n) = p { 203 Some(*n as usize) 204 } else { 205 None 206 } 207 }) 208 .unwrap_or(0); 209 210 let key_suffix = entry_obj.get("k").and_then(|k| { 211 if let Ipld::Bytes(b) = k { 212 Some(b.clone()) 213 } else { 214 None 215 } 216 }); 217 218 if let Some(suffix) = key_suffix { 219 current_key.truncate(prefix_len); 220 current_key.extend_from_slice(&suffix); 221 } 222 223 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 224 walk_mst_node(blocks, tree_cid, &current_key, records)?; 225 } 226 227 let record_cid = entry_obj.get("v").and_then(|v| { 228 if let Ipld::Link(cid) = v { 229 Some(*cid) 230 } else { 231 None 232 } 233 }); 234 235 if let Some(record_cid) = record_cid 236 && let Ok(full_key) = String::from_utf8(current_key.clone()) 237 && let Some(record_block) = blocks.get(&record_cid) 238 && let Ok(record_value) = 239 serde_ipld_dagcbor::from_slice::<Ipld>(record_block) 240 { 241 let blob_refs = find_blob_refs_ipld(&record_value, 0); 242 let parts: Vec<&str> = full_key.split('/').collect(); 243 if parts.len() >= 2 { 244 let collection = parts[..parts.len() - 1].join("/"); 245 let rkey = parts[parts.len() - 1].to_string(); 246 records.push(ImportedRecord { 247 collection, 248 rkey, 249 cid: record_cid, 250 blob_refs, 251 }); 252 } 253 } 254 } 255 } 256 } 257 } 258 Ok(()) 259} 260 261pub struct CommitInfo { 262 pub rev: Option<String>, 263 pub prev: Option<String>, 264} 265 266pub struct ImportResult { 267 pub records: Vec<ImportedRecord>, 268 pub data_cid: Cid, 269} 270 271fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> { 272 let obj = match commit { 273 Ipld::Map(m) => m, 274 _ => { 275 return Err(ImportError::InvalidCommit( 276 "Commit must be a map".to_string(), 277 )); 278 } 279 }; 280 let data_cid = obj 281 .get("data") 282 .and_then(|d| { 283 if let Ipld::Link(cid) = d { 284 Some(*cid) 285 } else { 286 None 287 } 288 }) 289 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?; 290 let rev = obj.get("rev").and_then(|r| { 291 if let Ipld::String(s) = r { 292 Some(s.clone()) 293 } else { 294 None 295 } 296 }); 297 let prev = obj.get("prev").and_then(|p| { 298 if let Ipld::Link(cid) = p { 299 Some(cid.to_string()) 300 } else if let Ipld::Null = p { 301 None 302 } else { 303 None 304 } 305 }); 306 Ok((data_cid, CommitInfo { rev, prev })) 307} 308 309pub async fn apply_import( 310 db: &PgPool, 311 user_id: Uuid, 312 root: Cid, 313 blocks: HashMap<Cid, Bytes>, 314 max_blocks: usize, 315) -> Result<ImportResult, ImportError> { 316 if blocks.len() > max_blocks { 317 return Err(ImportError::SizeLimitExceeded); 318 } 319 let root_block = blocks 320 .get(&root) 321 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?; 322 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block) 323 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 324 let (data_cid, _commit_info) = extract_commit_info(&commit)?; 325 let records = walk_mst(&blocks, &data_cid)?; 326 debug!( 327 "Importing {} blocks and {} records for user {}", 328 blocks.len(), 329 records.len(), 330 user_id 331 ); 332 let mut tx = db.begin().await?; 333 let repo = sqlx::query!( 334 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 335 user_id 336 ) 337 .fetch_optional(&mut *tx) 338 .await 339 .map_err(|e| { 340 if let sqlx::Error::Database(ref db_err) = e 341 && db_err.code().as_deref() == Some("55P03") 342 { 343 return ImportError::ConcurrentModification; 344 } 345 ImportError::Database(e) 346 })?; 347 if repo.is_none() { 348 return Err(ImportError::RepoNotFound); 349 } 350 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks 351 .iter() 352 .collect::<Vec<_>>() 353 .chunks(100) 354 .map(|c| c.to_vec()) 355 .collect(); 356 for chunk in block_chunks { 357 for (cid, data) in chunk { 358 let cid_bytes = cid.to_bytes(); 359 sqlx::query!( 360 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", 361 &cid_bytes, 362 data.as_ref() 363 ) 364 .execute(&mut *tx) 365 .await?; 366 } 367 } 368 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 369 .execute(&mut *tx) 370 .await?; 371 for record in &records { 372 let record_cid_str = record.cid.to_string(); 373 sqlx::query!( 374 r#" 375 INSERT INTO records (repo_id, collection, rkey, record_cid) 376 VALUES ($1, $2, $3, $4) 377 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4 378 "#, 379 user_id, 380 record.collection, 381 record.rkey, 382 record_cid_str 383 ) 384 .execute(&mut *tx) 385 .await?; 386 } 387 tx.commit().await?; 388 debug!( 389 "Successfully imported {} blocks and {} records", 390 blocks.len(), 391 records.len() 392 ); 393 Ok(ImportResult { records, data_cid }) 394} 395 396#[cfg(test)] 397mod tests { 398 use super::*; 399 400 #[test] 401 fn test_find_blob_refs() { 402 let record = serde_json::json!({ 403 "$type": "app.bsky.feed.post", 404 "text": "Hello world", 405 "embed": { 406 "$type": "app.bsky.embed.images", 407 "images": [ 408 { 409 "alt": "Test image", 410 "image": { 411 "$type": "blob", 412 "ref": { 413 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 414 }, 415 "mimeType": "image/jpeg", 416 "size": 12345 417 } 418 } 419 ] 420 } 421 }); 422 let blob_refs = find_blob_refs(&record, 0); 423 assert_eq!(blob_refs.len(), 1); 424 assert_eq!( 425 blob_refs[0].cid, 426 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 427 ); 428 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string())); 429 } 430 431 #[test] 432 fn test_find_blob_refs_no_blobs() { 433 let record = serde_json::json!({ 434 "$type": "app.bsky.feed.post", 435 "text": "Hello world" 436 }); 437 let blob_refs = find_blob_refs(&record, 0); 438 assert!(blob_refs.is_empty()); 439 } 440 441 #[test] 442 fn test_find_blob_refs_depth_limit() { 443 fn deeply_nested(depth: usize) -> JsonValue { 444 if depth == 0 { 445 serde_json::json!({ 446 "$type": "blob", 447 "ref": { "$link": "bafkreitest" }, 448 "mimeType": "image/png" 449 }) 450 } else { 451 serde_json::json!({ "nested": deeply_nested(depth - 1) }) 452 } 453 } 454 let deep = deeply_nested(40); 455 let blob_refs = find_blob_refs(&deep, 0); 456 assert!(blob_refs.is_empty()); 457 } 458}