this repo has no description
1use bytes::Bytes; 2use cid::Cid; 3use ipld_core::ipld::Ipld; 4use iroh_car::CarReader; 5use serde_json::Value as JsonValue; 6use sqlx::PgPool; 7use std::collections::HashMap; 8use std::io::Cursor; 9use thiserror::Error; 10use tracing::debug; 11use uuid::Uuid; 12#[derive(Error, Debug)] 13pub enum ImportError { 14 #[error("CAR parsing error: {0}")] 15 CarParse(String), 16 #[error("Expected exactly one root in CAR file")] 17 InvalidRootCount, 18 #[error("Block not found: {0}")] 19 BlockNotFound(String), 20 #[error("Invalid CBOR: {0}")] 21 InvalidCbor(String), 22 #[error("Database error: {0}")] 23 Database(#[from] sqlx::Error), 24 #[error("Block store error: {0}")] 25 BlockStore(String), 26 #[error("Import size limit exceeded")] 27 SizeLimitExceeded, 28 #[error("Repo not found")] 29 RepoNotFound, 30 #[error("Concurrent modification detected")] 31 ConcurrentModification, 32 #[error("Invalid commit structure: {0}")] 33 InvalidCommit(String), 34 #[error("Verification failed: {0}")] 35 VerificationFailed(#[from] super::verify::VerifyError), 36 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")] 37 DidMismatch { car_did: String, auth_did: String }, 38} 39#[derive(Debug, Clone)] 40pub struct BlobRef { 41 pub cid: String, 42 pub mime_type: Option<String>, 43} 44pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> { 45 let cursor = Cursor::new(data); 46 let mut reader = CarReader::new(cursor) 47 .await 48 .map_err(|e| ImportError::CarParse(e.to_string()))?; 49 let header = reader.header(); 50 let roots = header.roots(); 51 if roots.len() != 1 { 52 return Err(ImportError::InvalidRootCount); 53 } 54 let root = roots[0]; 55 let mut blocks = HashMap::new(); 56 while let Ok(Some((cid, block))) = reader.next_block().await { 57 blocks.insert(cid, Bytes::from(block)); 58 } 59 if !blocks.contains_key(&root) { 60 return Err(ImportError::BlockNotFound(root.to_string())); 61 } 62 Ok((root, blocks)) 63} 64pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> { 65 if depth > 32 { 66 return vec![]; 67 } 68 match value { 69 Ipld::List(arr) => arr 70 .iter() 71 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 72 .collect(), 73 Ipld::Map(obj) => { 74 if let Some(Ipld::String(type_str)) = obj.get("$type") { 75 if type_str == "blob" { 76 if let Some(Ipld::Link(link_cid)) = obj.get("ref") { 77 let mime = obj 78 .get("mimeType") 79 .and_then(|v| if let Ipld::String(s) = v { Some(s.clone()) } else { None }); 80 return vec![BlobRef { 81 cid: link_cid.to_string(), 82 mime_type: mime, 83 }]; 84 } 85 } 86 } 87 obj.values() 88 .flat_map(|v| find_blob_refs_ipld(v, depth + 1)) 89 .collect() 90 } 91 _ => vec![], 92 } 93} 94pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> { 95 if depth > 32 { 96 return vec![]; 97 } 98 match value { 99 JsonValue::Array(arr) => arr 100 .iter() 101 .flat_map(|v| find_blob_refs(v, depth + 1)) 102 .collect(), 103 JsonValue::Object(obj) => { 104 if let Some(JsonValue::String(type_str)) = obj.get("$type") { 105 if type_str == "blob" { 106 if let Some(JsonValue::Object(ref_obj)) = obj.get("ref") { 107 if let Some(JsonValue::String(link)) = ref_obj.get("$link") { 108 let mime = obj 109 .get("mimeType") 110 .and_then(|v| v.as_str()) 111 .map(String::from); 112 return vec![BlobRef { 113 cid: link.clone(), 114 mime_type: mime, 115 }]; 116 } 117 } 118 } 119 } 120 obj.values() 121 .flat_map(|v| find_blob_refs(v, depth + 1)) 122 .collect() 123 } 124 _ => vec![], 125 } 126} 127pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) { 128 match value { 129 Ipld::Link(cid) => { 130 links.push(*cid); 131 } 132 Ipld::Map(map) => { 133 for v in map.values() { 134 extract_links(v, links); 135 } 136 } 137 Ipld::List(arr) => { 138 for v in arr { 139 extract_links(v, links); 140 } 141 } 142 _ => {} 143 } 144} 145#[derive(Debug)] 146pub struct ImportedRecord { 147 pub collection: String, 148 pub rkey: String, 149 pub cid: Cid, 150 pub blob_refs: Vec<BlobRef>, 151} 152pub fn walk_mst( 153 blocks: &HashMap<Cid, Bytes>, 154 root_cid: &Cid, 155) -> Result<Vec<ImportedRecord>, ImportError> { 156 let mut records = Vec::new(); 157 let mut stack = vec![*root_cid]; 158 let mut visited = std::collections::HashSet::new(); 159 while let Some(cid) = stack.pop() { 160 if visited.contains(&cid) { 161 continue; 162 } 163 visited.insert(cid); 164 let block = blocks 165 .get(&cid) 166 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?; 167 let value: Ipld = serde_ipld_dagcbor::from_slice(block) 168 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 169 if let Ipld::Map(ref obj) = value { 170 if let Some(Ipld::List(entries)) = obj.get("e") { 171 for entry in entries { 172 if let Ipld::Map(entry_obj) = entry { 173 let key = entry_obj.get("k").and_then(|k| { 174 if let Ipld::Bytes(b) = k { 175 String::from_utf8(b.clone()).ok() 176 } else if let Ipld::String(s) = k { 177 Some(s.clone()) 178 } else { 179 None 180 } 181 }); 182 let record_cid = entry_obj.get("v").and_then(|v| { 183 if let Ipld::Link(cid) = v { 184 Some(*cid) 185 } else { 186 None 187 } 188 }); 189 if let (Some(key), Some(record_cid)) = (key, record_cid) { 190 if let Some(record_block) = blocks.get(&record_cid) { 191 if let Ok(record_value) = 192 serde_ipld_dagcbor::from_slice::<Ipld>(record_block) 193 { 194 let blob_refs = find_blob_refs_ipld(&record_value, 0); 195 let parts: Vec<&str> = key.split('/').collect(); 196 if parts.len() >= 2 { 197 let collection = parts[..parts.len() - 1].join("/"); 198 let rkey = parts[parts.len() - 1].to_string(); 199 records.push(ImportedRecord { 200 collection, 201 rkey, 202 cid: record_cid, 203 blob_refs, 204 }); 205 } 206 } 207 } 208 } 209 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { 210 stack.push(*tree_cid); 211 } 212 } 213 } 214 } 215 if let Some(Ipld::Link(left_cid)) = obj.get("l") { 216 stack.push(*left_cid); 217 } 218 } 219 } 220 Ok(records) 221} 222pub struct CommitInfo { 223 pub rev: Option<String>, 224 pub prev: Option<String>, 225} 226fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> { 227 let obj = match commit { 228 Ipld::Map(m) => m, 229 _ => return Err(ImportError::InvalidCommit("Commit must be a map".to_string())), 230 }; 231 let data_cid = obj 232 .get("data") 233 .and_then(|d| if let Ipld::Link(cid) = d { Some(*cid) } else { None }) 234 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?; 235 let rev = obj.get("rev").and_then(|r| { 236 if let Ipld::String(s) = r { 237 Some(s.clone()) 238 } else { 239 None 240 } 241 }); 242 let prev = obj.get("prev").and_then(|p| { 243 if let Ipld::Link(cid) = p { 244 Some(cid.to_string()) 245 } else if let Ipld::Null = p { 246 None 247 } else { 248 None 249 } 250 }); 251 Ok((data_cid, CommitInfo { rev, prev })) 252} 253pub async fn apply_import( 254 db: &PgPool, 255 user_id: Uuid, 256 root: Cid, 257 blocks: HashMap<Cid, Bytes>, 258 max_blocks: usize, 259) -> Result<Vec<ImportedRecord>, ImportError> { 260 if blocks.len() > max_blocks { 261 return Err(ImportError::SizeLimitExceeded); 262 } 263 let root_block = blocks 264 .get(&root) 265 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?; 266 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block) 267 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?; 268 let (data_cid, _commit_info) = extract_commit_info(&commit)?; 269 let records = walk_mst(&blocks, &data_cid)?; 270 debug!( 271 "Importing {} blocks and {} records for user {}", 272 blocks.len(), 273 records.len(), 274 user_id 275 ); 276 let mut tx = db.begin().await?; 277 let repo = sqlx::query!( 278 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 279 user_id 280 ) 281 .fetch_optional(&mut *tx) 282 .await 283 .map_err(|e| { 284 if let sqlx::Error::Database(ref db_err) = e { 285 if db_err.code().as_deref() == Some("55P03") { 286 return ImportError::ConcurrentModification; 287 } 288 } 289 ImportError::Database(e) 290 })?; 291 if repo.is_none() { 292 return Err(ImportError::RepoNotFound); 293 } 294 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks 295 .iter() 296 .collect::<Vec<_>>() 297 .chunks(100) 298 .map(|c| c.to_vec()) 299 .collect(); 300 for chunk in block_chunks { 301 for (cid, data) in chunk { 302 let cid_bytes = cid.to_bytes(); 303 sqlx::query!( 304 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", 305 &cid_bytes, 306 data.as_ref() 307 ) 308 .execute(&mut *tx) 309 .await?; 310 } 311 } 312 let root_str = root.to_string(); 313 sqlx::query!( 314 "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2", 315 root_str, 316 user_id 317 ) 318 .execute(&mut *tx) 319 .await?; 320 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) 321 .execute(&mut *tx) 322 .await?; 323 for record in &records { 324 let record_cid_str = record.cid.to_string(); 325 sqlx::query!( 326 r#" 327 INSERT INTO records (repo_id, collection, rkey, record_cid) 328 VALUES ($1, $2, $3, $4) 329 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4 330 "#, 331 user_id, 332 record.collection, 333 record.rkey, 334 record_cid_str 335 ) 336 .execute(&mut *tx) 337 .await?; 338 } 339 tx.commit().await?; 340 debug!( 341 "Successfully imported {} blocks and {} records", 342 blocks.len(), 343 records.len() 344 ); 345 Ok(records) 346} 347#[cfg(test)] 348mod tests { 349 use super::*; 350 #[test] 351 fn test_find_blob_refs() { 352 let record = serde_json::json!({ 353 "$type": "app.bsky.feed.post", 354 "text": "Hello world", 355 "embed": { 356 "$type": "app.bsky.embed.images", 357 "images": [ 358 { 359 "alt": "Test image", 360 "image": { 361 "$type": "blob", 362 "ref": { 363 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 364 }, 365 "mimeType": "image/jpeg", 366 "size": 12345 367 } 368 } 369 ] 370 } 371 }); 372 let blob_refs = find_blob_refs(&record, 0); 373 assert_eq!(blob_refs.len(), 1); 374 assert_eq!( 375 blob_refs[0].cid, 376 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku" 377 ); 378 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string())); 379 } 380 #[test] 381 fn test_find_blob_refs_no_blobs() { 382 let record = serde_json::json!({ 383 "$type": "app.bsky.feed.post", 384 "text": "Hello world" 385 }); 386 let blob_refs = find_blob_refs(&record, 0); 387 assert!(blob_refs.is_empty()); 388 } 389 #[test] 390 fn test_find_blob_refs_depth_limit() { 391 fn deeply_nested(depth: usize) -> JsonValue { 392 if depth == 0 { 393 serde_json::json!({ 394 "$type": "blob", 395 "ref": { "$link": "bafkreitest" }, 396 "mimeType": "image/png" 397 }) 398 } else { 399 serde_json::json!({ "nested": deeply_nested(depth - 1) }) 400 } 401 } 402 let deep = deeply_nested(40); 403 let blob_refs = find_blob_refs(&deep, 0); 404 assert!(blob_refs.is_empty()); 405 } 406}