this repo has no description
1use bytes::Bytes; 2use cid::Cid; 3use ipld_core::ipld::Ipld; 4use iroh_car::CarReader; 5use serde_json::Value as JsonValue; 6use sqlx::PgPool; 7use std::collections::HashMap; 8use std::io::Cursor; 9use thiserror::Error; 10use tracing::debug; 11use uuid::Uuid; 12 13#[derive(Error, Debug)] 14pub enum ImportError { 15 #[error("CAR parsing error: {0}")] 16 CarParse(String), 17 #[error("Expected exactly one root in CAR file")] 18 InvalidRootCount, 19 #[error("Block not found: {0}")] 20 BlockNotFound(String), 21 #[error("Invalid CBOR: {0}")] 22 InvalidCbor(String), 23 #[error("Database error: {0}")] 24 Database(#[from] sqlx::Error), 25 #[error("Block store error: {0}")] 26 BlockStore(String), 27 #[error("Import size limit exceeded")] 28 SizeLimitExceeded, 29 #[error("Repo not found")] 30 RepoNotFound, 31 #[error("Concurrent modification detected")] 32 ConcurrentModification, 33 #[error("Invalid commit structure: {0}")] 34 InvalidCommit(String), 35 #[error("Verification failed: {0}")] 36 VerificationFailed(#[from] super::verify::VerifyError), 37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")] 38 DidMismatch { car_did: String, auth_did: String }, 39} 40 41#[derive(Debug, Clone)] 42pub struct BlobRef { 43 pub cid: String, 44 pub mime_type: Option<String>, 45} 46 47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> { 48 let cursor = Cursor::new(data); 49 let mut reader = CarReader::new(cursor) 50 .await 51 .map_err(|e| ImportError::CarParse(e.to_string()))?; 52 let header = reader.header(); 53 let roots = header.roots(); 54 if roots.len() != 1 { 55 return Err(ImportError::InvalidRootCount); 56 } 57 let root = roots[0]; 58 let mut blocks = HashMap::new(); 59 while let Ok(Some((cid, block))) = reader.next_block().await { 60 blocks.insert(cid, Bytes::from(block)); 61 } 62 if !blocks.contains_key(&root) { 63 return Err(ImportError::BlockNotFound(root.to_string())); 64 } 65 Ok((root, blocks)) 66} 67 68pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> { 69 if depth > 32 { 70 return vec![]; 71 } 72 match value { 73 Ipld::List(arr) => arr 74 .iter() 75 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 76 .collect(), 77 Ipld::Map(obj) => { 78 if let Some(Ipld::String(type_str)) = obj.get("$type") { 79 if type_str == "blob" { 80 if let Some(Ipld::Link(link_cid)) = obj.get("ref") { 81 let mime = obj 82 .get("mimeType") 83 .and_then(|v| if let Ipld::String(s) = v { Some(s.clone()) } else { None }); 84 return vec![BlobRef { 85 cid: link_cid.to_string(), 86 mime_type: mime, 87 }]; 88 } 89 } 90 } 91 obj.values() 92 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 93 .collect() 94 } 95 _ => vec![], 96 } 97} 98 99pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> { 100 if depth > 32 { 101 return vec![]; 102 } 103 match value { 104 JsonValue::Array(arr) => arr 105 .iter() 106 .flat_map(|v| find_blob_refs(v, depth + 1)) 107 .collect(), 108 JsonValue::Object(obj) => { 109 if let Some(JsonValue::String(type_str)) = obj.get("$type") { 110 if type_str == "blob" { 111 if let Some(JsonValue::Object(ref_obj)) = obj.get("ref") { 112 if let Some(JsonValue::String(link)) = ref_obj.get("$link") { 113 let mime = obj 114 .get("mimeType") 115 .and_then(|v| v.as_str()) 116 .map(String::from); 117 return vec![BlobRef { 118 cid: link.clone(), 119 mime_type: mime, 120 }]; 121 } 122 } 123 } 124 } 125 obj.values() 126 .flat_map(|v| find_blob_refs(v, depth + 1)) 127 .collect() 128 } 129 _ => vec![], 130 } 131} 132 133pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) { 134 match value { 135 Ipld::Link(cid) => { 136 links.push(*cid); 137 } 138 Ipld::Map(map) => { 139 for v in map.values() { 140 extract_links(v, links); 141 } 142 } 143 Ipld::List(arr) => { 144 for v in arr { 145 extract_links(v, links); 146 } 147 } 148 _ => {} 149 } 150} 151 152#[derive(Debug)] 153pub struct ImportedRecord { 154 pub collection: String, 155 pub rkey: String, 156 pub cid: Cid, 157 pub blob_refs: Vec<BlobRef>, 158} 159 160pub fn walk_mst( 161 blocks: &HashMap<Cid, Bytes>, 162 root_cid: &Cid, 163) -> Result<Vec<ImportedRecord>, ImportError> { 164 let mut records = Vec::new(); 165 let mut stack = vec![*root_cid]; 166 let mut visited = std::collections::HashSet::new(); 167 while let Some(cid) = stack.pop() { 168 if visited.contains(&cid) { 169 continue; 170 } 171 visited.insert(cid); 172 let block = blocks 173 .get(&cid) 174 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?; 175 let value: Ipld = serde_ipld_dagcbor::from_slice(block) 176 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 177 if let Ipld::Map(ref obj) = value { 178 if let Some(Ipld::List(entries)) = obj.get("e") { 179 for entry in entries { 180 if let Ipld::Map(entry_obj) = entry { 181 let key = entry_obj.get("k").and_then(|k| { 182 if let Ipld::Bytes(b) = k { 183 String::from_utf8(b.clone()).ok() 184 } else if let Ipld::String(s) = k { 185 Some(s.clone()) 186 } else { 187 None 188 } 189 }); 190 let record_cid = entry_obj.get("v").and_then(|v| { 191 if let Ipld::Link(cid) = v { 192 Some(*cid) 193 } else { 194 None 195 } 196 }); 197 if let (Some(key), Some(record_cid)) = (key, record_cid) { 198 if let Some(record_block) = blocks.get(&record_cid) { 199 if let Ok(record_value) = 200 serde_ipld_dagcbor::from_slice::<Ipld>(record_block) 201 { 202 let blob_refs = find_blob_refs_ipld(&record_value, 0); 203 let parts: Vec<&str> = key.split('/').collect(); 204 if parts.len() >= 2 { 205 let collection = parts[..parts.len() - 1].join("/"); 206 let rkey = parts[parts.len() - 1].to_string(); 207 records.push(ImportedRecord { 208 collection, 209 rkey, 210 cid: record_cid, 211 blob_refs, 212 }); 213 } 214 } 215 } 216 } 217 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 218 stack.push(*tree_cid); 219 } 220 } 221 } 222 } 223 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 224 stack.push(*left_cid); 225 } 226 } 227 } 228 Ok(records) 229} 230 231pub struct CommitInfo { 232 pub rev: Option<String>, 233 pub prev: Option<String>, 234} 235 236fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> { 237 let obj = match commit { 238 Ipld::Map(m) => m, 239 _ => return Err(ImportError::InvalidCommit("Commit must be a map".to_string())), 240 }; 241 let data_cid = obj 242 .get("data") 243 .and_then(|d| if let Ipld::Link(cid) = d { Some(*cid) } else { None }) 244 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?; 245 let rev = obj.get("rev").and_then(|r| { 246 if let Ipld::String(s) = r { 247 Some(s.clone()) 248 } else { 249 None 250 } 251 }); 252 let prev = obj.get("prev").and_then(|p| { 253 if let Ipld::Link(cid) = p { 254 Some(cid.to_string()) 255 } else if let Ipld::Null = p { 256 None 257 } else { 258 None 259 } 260 }); 261 Ok((data_cid, CommitInfo { rev, prev })) 262} 263 264pub async fn apply_import( 265 db: &PgPool, 266 user_id: Uuid, 267 root: Cid, 268 blocks: HashMap<Cid, Bytes>, 269 max_blocks: usize, 270) -> Result<Vec<ImportedRecord>, ImportError> { 271 if blocks.len() > max_blocks { 272 return Err(ImportError::SizeLimitExceeded); 273 } 274 let root_block = blocks 275 .get(&root) 276 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?; 277 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block) 278 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 279 let (data_cid, _commit_info) = extract_commit_info(&commit)?; 280 let records = walk_mst(&blocks, &data_cid)?; 281 debug!( 282 "Importing {} blocks and {} records for user {}", 283 blocks.len(), 284 records.len(), 285 user_id 286 ); 287 let mut tx = db.begin().await?; 288 let repo = sqlx::query!( 289 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 290 user_id 291 ) 292 .fetch_optional(&mut *tx) 293 .await 294 .map_err(|e| { 295 if let sqlx::Error::Database(ref db_err) = e { 296 if db_err.code().as_deref() == Some("55P03") { 297 return ImportError::ConcurrentModification; 298 } 299 } 300 ImportError::Database(e) 301 })?; 302 if repo.is_none() { 303 return Err(ImportError::RepoNotFound); 304 } 305 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks 306 .iter() 307 .collect::<Vec<_>>() 308 .chunks(100) 309 .map(|c| c.to_vec()) 310 .collect(); 311 for chunk in block_chunks { 312 for (cid, data) in chunk { 313 let cid_bytes = cid.to_bytes(); 314 sqlx::query!( 315 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", 316 &cid_bytes, 317 data.as_ref() 318 ) 319 .execute(&mut *tx) 320 .await?; 321 } 322 } 323 let root_str = root.to_string(); 324 sqlx::query!( 325 "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2", 326 root_str, 327 user_id 328 ) 329 .execute(&mut *tx) 330 .await?; 331 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 332 .execute(&mut *tx) 333 .await?; 334 for record in &records { 335 let record_cid_str = record.cid.to_string(); 336 sqlx::query!( 337 r#" 338 INSERT INTO records (repo_id, collection, rkey, record_cid) 339 VALUES ($1, $2, $3, $4) 340 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4 341 "#, 342 user_id, 343 record.collection, 344 record.rkey, 345 record_cid_str 346 ) 347 .execute(&mut *tx) 348 .await?; 349 } 350 tx.commit().await?; 351 debug!( 352 "Successfully imported {} blocks and {} records", 353 blocks.len(), 354 records.len() 355 ); 356 Ok(records) 357} 358 359#[cfg(test)] 360mod tests { 361 use super::*; 362 363 #[test] 364 fn test_find_blob_refs() { 365 let record = serde_json::json!({ 366 "$type": "app.bsky.feed.post", 367 "text": "Hello world", 368 "embed": { 369 "$type": "app.bsky.embed.images", 370 "images": [ 371 { 372 "alt": "Test image", 373 "image": { 374 "$type": "blob", 375 "ref": { 376 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 377 }, 378 "mimeType": "image/jpeg", 379 "size": 12345 380 } 381 } 382 ] 383 } 384 }); 385 let blob_refs = find_blob_refs(&record, 0); 386 assert_eq!(blob_refs.len(), 1); 387 assert_eq!( 388 blob_refs[0].cid, 389 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 390 ); 391 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string())); 392 } 393 394 #[test] 395 fn test_find_blob_refs_no_blobs() { 396 let record = serde_json::json!({ 397 "$type": "app.bsky.feed.post", 398 "text": "Hello world" 399 }); 400 let blob_refs = find_blob_refs(&record, 0); 401 assert!(blob_refs.is_empty()); 402 } 403 404 #[test] 405 fn test_find_blob_refs_depth_limit() { 406 fn deeply_nested(depth: usize) -> JsonValue { 407 if depth == 0 { 408 serde_json::json!({ 409 "$type": "blob", 410 "ref": { "$link": "bafkreitest" }, 411 "mimeType": "image/png" 412 }) 413 } else { 414 serde_json::json!({ "nested": deeply_nested(depth - 1) }) 415 } 416 } 417 let deep = deeply_nested(40); 418 let blob_refs = find_blob_refs(&deep, 0); 419 assert!(blob_refs.is_empty()); 420 } 421}