this repo has no description
1use bytes::Bytes; 2use cid::Cid; 3use ipld_core::ipld::Ipld; 4use iroh_car::CarReader; 5use serde_json::Value as JsonValue; 6use sqlx::PgPool; 7use std::collections::HashMap; 8use std::io::Cursor; 9use thiserror::Error; 10use tracing::debug; 11use uuid::Uuid; 12 13#[derive(Error, Debug)] 14pub enum ImportError { 15 #[error("CAR parsing error: {0}")] 16 CarParse(String), 17 #[error("Expected exactly one root in CAR file")] 18 InvalidRootCount, 19 #[error("Block not found: {0}")] 20 BlockNotFound(String), 21 #[error("Invalid CBOR: {0}")] 22 InvalidCbor(String), 23 #[error("Database error: {0}")] 24 Database(#[from] sqlx::Error), 25 #[error("Block store error: {0}")] 26 BlockStore(String), 27 #[error("Import size limit exceeded")] 28 SizeLimitExceeded, 29 #[error("Repo not found")] 30 RepoNotFound, 31 #[error("Concurrent modification detected")] 32 ConcurrentModification, 33 #[error("Invalid commit structure: {0}")] 34 InvalidCommit(String), 35 #[error("Verification failed: {0}")] 36 VerificationFailed(#[from] super::verify::VerifyError), 37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")] 38 DidMismatch { car_did: String, auth_did: String }, 39} 40 41#[derive(Debug, Clone)] 42pub struct BlobRef { 43 pub cid: String, 44 pub mime_type: Option<String>, 45} 46 47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> { 48 let cursor = Cursor::new(data); 49 let mut reader = CarReader::new(cursor) 50 .await 51 .map_err(|e| ImportError::CarParse(e.to_string()))?; 52 let header = reader.header(); 53 let roots = header.roots(); 54 if roots.len() != 1 { 55 return Err(ImportError::InvalidRootCount); 56 } 57 let root = roots[0]; 58 let mut blocks = HashMap::new(); 59 while let Ok(Some((cid, block))) = reader.next_block().await { 60 blocks.insert(cid, Bytes::from(block)); 61 } 62 if !blocks.contains_key(&root) { 63 return Err(ImportError::BlockNotFound(root.to_string())); 64 } 65 Ok((root, blocks)) 66} 67 68pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> { 69 if depth > 32 { 70 return vec![]; 71 } 72 match value { 73 Ipld::List(arr) => arr 74 .iter() 75 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 76 .collect(), 77 Ipld::Map(obj) => { 78 if let Some(Ipld::String(type_str)) = obj.get("$type") 79 && type_str == "blob" 80 && let Some(Ipld::Link(link_cid)) = obj.get("ref") 81 { 82 let mime = obj.get("mimeType").and_then(|v| { 83 if let Ipld::String(s) = v { 84 Some(s.clone()) 85 } else { 86 None 87 } 88 }); 89 return vec![BlobRef { 90 cid: link_cid.to_string(), 91 mime_type: mime, 92 }]; 93 } 94 obj.values() 95 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 96 .collect() 97 } 98 _ => vec![], 99 } 100} 101 102pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> { 103 if depth > 32 { 104 return vec![]; 105 } 106 match value { 107 JsonValue::Array(arr) => arr 108 .iter() 109 .flat_map(|v| find_blob_refs(v, depth + 1)) 110 .collect(), 111 JsonValue::Object(obj) => { 112 if let Some(JsonValue::String(type_str)) = obj.get("$type") 113 && type_str == "blob" 114 && let Some(JsonValue::Object(ref_obj)) = obj.get("ref") 115 && let Some(JsonValue::String(link)) = ref_obj.get("$link") 116 { 117 let mime = obj 118 .get("mimeType") 119 .and_then(|v| v.as_str()) 120 .map(String::from); 121 return vec![BlobRef { 122 cid: link.clone(), 123 mime_type: mime, 124 }]; 125 } 126 obj.values() 127 .flat_map(|v| find_blob_refs(v, depth + 1)) 128 .collect() 129 } 130 _ => vec![], 131 } 132} 133 134pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) { 135 match value { 136 Ipld::Link(cid) => { 137 links.push(*cid); 138 } 139 Ipld::Map(map) => { 140 for v in map.values() { 141 extract_links(v, links); 142 } 143 } 144 Ipld::List(arr) => { 145 for v in arr { 146 extract_links(v, links); 147 } 148 } 149 _ => {} 150 } 151} 152 153#[derive(Debug)] 154pub struct ImportedRecord { 155 pub collection: String, 156 pub rkey: String, 157 pub cid: Cid, 158 pub blob_refs: Vec<BlobRef>, 159} 160 161pub fn walk_mst( 162 blocks: &HashMap<Cid, Bytes>, 163 root_cid: &Cid, 164) -> Result<Vec<ImportedRecord>, ImportError> { 165 let mut records = Vec::new(); 166 walk_mst_node(blocks, root_cid, &[], &mut records)?; 167 Ok(records) 168} 169 170fn walk_mst_node( 171 blocks: &HashMap<Cid, Bytes>, 172 cid: &Cid, 173 prev_key: &[u8], 174 records: &mut Vec<ImportedRecord>, 175) -> Result<(), ImportError> { 176 let block = blocks 177 .get(cid) 178 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?; 179 let value: Ipld = serde_ipld_dagcbor::from_slice(block) 180 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 181 182 if let Ipld::Map(ref obj) = value { 183 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 184 walk_mst_node(blocks, left_cid, prev_key, records)?; 185 } 186 187 let mut current_key = prev_key.to_vec(); 188 189 if let Some(Ipld::List(entries)) = obj.get("e") { 190 for entry in entries { 191 if let Ipld::Map(entry_obj) = entry { 192 let prefix_len = entry_obj.get("p").and_then(|p| { 193 if let Ipld::Integer(n) = p { 194 Some(*n as usize) 195 } else { 196 None 197 } 198 }).unwrap_or(0); 199 200 let key_suffix = entry_obj.get("k").and_then(|k| { 201 if let Ipld::Bytes(b) = k { 202 Some(b.clone()) 203 } else { 204 None 205 } 206 }); 207 208 if let Some(suffix) = key_suffix { 209 current_key.truncate(prefix_len); 210 current_key.extend_from_slice(&suffix); 211 } 212 213 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 214 walk_mst_node(blocks, tree_cid, &current_key, records)?; 215 } 216 217 let record_cid = entry_obj.get("v").and_then(|v| { 218 if let Ipld::Link(cid) = v { 219 Some(*cid) 220 } else { 221 None 222 } 223 }); 224 225 if let Some(record_cid) = record_cid { 226 if let Ok(full_key) = String::from_utf8(current_key.clone()) { 227 if let Some(record_block) = blocks.get(&record_cid) 228 && let Ok(record_value) = 229 serde_ipld_dagcbor::from_slice::<Ipld>(record_block) 230 { 231 let blob_refs = find_blob_refs_ipld(&record_value, 0); 232 let parts: Vec<&str> = full_key.split('/').collect(); 233 if parts.len() >= 2 { 234 let collection = parts[..parts.len() - 1].join("/"); 235 let rkey = parts[parts.len() - 1].to_string(); 236 records.push(ImportedRecord { 237 collection, 238 rkey, 239 cid: record_cid, 240 blob_refs, 241 }); 242 } 243 } 244 } 245 } 246 } 247 } 248 } 249 } 250 Ok(()) 251} 252 253pub struct CommitInfo { 254 pub rev: Option<String>, 255 pub prev: Option<String>, 256} 257 258fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> { 259 let obj = match commit { 260 Ipld::Map(m) => m, 261 _ => { 262 return Err(ImportError::InvalidCommit( 263 "Commit must be a map".to_string(), 264 )); 265 } 266 }; 267 let data_cid = obj 268 .get("data") 269 .and_then(|d| { 270 if let Ipld::Link(cid) = d { 271 Some(*cid) 272 } else { 273 None 274 } 275 }) 276 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?; 277 let rev = obj.get("rev").and_then(|r| { 278 if let Ipld::String(s) = r { 279 Some(s.clone()) 280 } else { 281 None 282 } 283 }); 284 let prev = obj.get("prev").and_then(|p| { 285 if let Ipld::Link(cid) = p { 286 Some(cid.to_string()) 287 } else if let Ipld::Null = p { 288 None 289 } else { 290 None 291 } 292 }); 293 Ok((data_cid, CommitInfo { rev, prev })) 294} 295 296pub async fn apply_import( 297 db: &PgPool, 298 user_id: Uuid, 299 root: Cid, 300 blocks: HashMap<Cid, Bytes>, 301 max_blocks: usize, 302) -> Result<Vec<ImportedRecord>, ImportError> { 303 if blocks.len() > max_blocks { 304 return Err(ImportError::SizeLimitExceeded); 305 } 306 let root_block = blocks 307 .get(&root) 308 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?; 309 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block) 310 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 311 let (data_cid, _commit_info) = extract_commit_info(&commit)?; 312 let records = walk_mst(&blocks, &data_cid)?; 313 debug!( 314 "Importing {} blocks and {} records for user {}", 315 blocks.len(), 316 records.len(), 317 user_id 318 ); 319 let mut tx = db.begin().await?; 320 let repo = sqlx::query!( 321 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 322 user_id 323 ) 324 .fetch_optional(&mut *tx) 325 .await 326 .map_err(|e| { 327 if let sqlx::Error::Database(ref db_err) = e 328 && db_err.code().as_deref() == Some("55P03") 329 { 330 return ImportError::ConcurrentModification; 331 } 332 ImportError::Database(e) 333 })?; 334 if repo.is_none() { 335 return Err(ImportError::RepoNotFound); 336 } 337 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks 338 .iter() 339 .collect::<Vec<_>>() 340 .chunks(100) 341 .map(|c| c.to_vec()) 342 .collect(); 343 for chunk in block_chunks { 344 for (cid, data) in chunk { 345 let cid_bytes = cid.to_bytes(); 346 sqlx::query!( 347 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", 348 &cid_bytes, 349 data.as_ref() 350 ) 351 .execute(&mut *tx) 352 .await?; 353 } 354 } 355 let root_str = root.to_string(); 356 sqlx::query!( 357 "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2", 358 root_str, 359 user_id 360 ) 361 .execute(&mut *tx) 362 .await?; 363 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 364 .execute(&mut *tx) 365 .await?; 366 for record in &records { 367 let record_cid_str = record.cid.to_string(); 368 sqlx::query!( 369 r#" 370 INSERT INTO records (repo_id, collection, rkey, record_cid) 371 VALUES ($1, $2, $3, $4) 372 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4 373 "#, 374 user_id, 375 record.collection, 376 record.rkey, 377 record_cid_str 378 ) 379 .execute(&mut *tx) 380 .await?; 381 } 382 tx.commit().await?; 383 debug!( 384 "Successfully imported {} blocks and {} records", 385 blocks.len(), 386 records.len() 387 ); 388 Ok(records) 389} 390 391#[cfg(test)] 392mod tests { 393 use super::*; 394 395 #[test] 396 fn test_find_blob_refs() { 397 let record = serde_json::json!({ 398 "$type": "app.bsky.feed.post", 399 "text": "Hello world", 400 "embed": { 401 "$type": "app.bsky.embed.images", 402 "images": [ 403 { 404 "alt": "Test image", 405 "image": { 406 "$type": "blob", 407 "ref": { 408 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 409 }, 410 "mimeType": "image/jpeg", 411 "size": 12345 412 } 413 } 414 ] 415 } 416 }); 417 let blob_refs = find_blob_refs(&record, 0); 418 assert_eq!(blob_refs.len(), 1); 419 assert_eq!( 420 blob_refs[0].cid, 421 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 422 ); 423 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string())); 424 } 425 426 #[test] 427 fn test_find_blob_refs_no_blobs() { 428 let record = serde_json::json!({ 429 "$type": "app.bsky.feed.post", 430 "text": "Hello world" 431 }); 432 let blob_refs = find_blob_refs(&record, 0); 433 assert!(blob_refs.is_empty()); 434 } 435 436 #[test] 437 fn test_find_blob_refs_depth_limit() { 438 fn deeply_nested(depth: usize) -> JsonValue { 439 if depth == 0 { 440 serde_json::json!({ 441 "$type": "blob", 442 "ref": { "$link": "bafkreitest" }, 443 "mimeType": "image/png" 444 }) 445 } else { 446 serde_json::json!({ "nested": deeply_nested(depth - 1) }) 447 } 448 } 449 let deep = deeply_nested(40); 450 let blob_refs = find_blob_refs(&deep, 0); 451 assert!(blob_refs.is_empty()); 452 } 453}