this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use ipld_core::ipld::Ipld;
4use iroh_car::CarReader;
5use serde_json::Value as JsonValue;
6use sqlx::PgPool;
7use std::collections::HashMap;
8use std::io::Cursor;
9use thiserror::Error;
10use tracing::debug;
11use uuid::Uuid;
12
13#[derive(Error, Debug)]
14pub enum ImportError {
15 #[error("CAR parsing error: {0}")]
16 CarParse(String),
17 #[error("Expected exactly one root in CAR file")]
18 InvalidRootCount,
19 #[error("Block not found: {0}")]
20 BlockNotFound(String),
21 #[error("Invalid CBOR: {0}")]
22 InvalidCbor(String),
23 #[error("Database error: {0}")]
24 Database(#[from] sqlx::Error),
25 #[error("Block store error: {0}")]
26 BlockStore(String),
27 #[error("Import size limit exceeded")]
28 SizeLimitExceeded,
29 #[error("Repo not found")]
30 RepoNotFound,
31 #[error("Concurrent modification detected")]
32 ConcurrentModification,
33 #[error("Invalid commit structure: {0}")]
34 InvalidCommit(String),
35 #[error("Verification failed: {0}")]
36 VerificationFailed(#[from] super::verify::VerifyError),
37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")]
38 DidMismatch { car_did: String, auth_did: String },
39}
40
41#[derive(Debug, Clone)]
42pub struct BlobRef {
43 pub cid: String,
44 pub mime_type: Option<String>,
45}
46
47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> {
48 let cursor = Cursor::new(data);
49 let mut reader = CarReader::new(cursor)
50 .await
51 .map_err(|e| ImportError::CarParse(e.to_string()))?;
52
53 let header = reader.header();
54 let roots = header.roots();
55
56 if roots.len() != 1 {
57 return Err(ImportError::InvalidRootCount);
58 }
59
60 let root = roots[0];
61 let mut blocks = HashMap::new();
62
63 while let Ok(Some((cid, block))) = reader.next_block().await {
64 blocks.insert(cid, Bytes::from(block));
65 }
66
67 if !blocks.contains_key(&root) {
68 return Err(ImportError::BlockNotFound(root.to_string()));
69 }
70
71 Ok((root, blocks))
72}
73
74pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> {
75 if depth > 32 {
76 return vec![];
77 }
78
79 match value {
80 Ipld::List(arr) => arr
81 .iter()
82 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
83 .collect(),
84 Ipld::Map(obj) => {
85 if let Some(Ipld::String(type_str)) = obj.get("$type") {
86 if type_str == "blob" {
87 if let Some(Ipld::Link(link_cid)) = obj.get("ref") {
88 let mime = obj
89 .get("mimeType")
90 .and_then(|v| if let Ipld::String(s) = v { Some(s.clone()) } else { None });
91 return vec![BlobRef {
92 cid: link_cid.to_string(),
93 mime_type: mime,
94 }];
95 }
96 }
97 }
98
99 obj.values()
100 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
101 .collect()
102 }
103 _ => vec![],
104 }
105}
106
107pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> {
108 if depth > 32 {
109 return vec![];
110 }
111
112 match value {
113 JsonValue::Array(arr) => arr
114 .iter()
115 .flat_map(|v| find_blob_refs(v, depth + 1))
116 .collect(),
117 JsonValue::Object(obj) => {
118 if let Some(JsonValue::String(type_str)) = obj.get("$type") {
119 if type_str == "blob" {
120 if let Some(JsonValue::Object(ref_obj)) = obj.get("ref") {
121 if let Some(JsonValue::String(link)) = ref_obj.get("$link") {
122 let mime = obj
123 .get("mimeType")
124 .and_then(|v| v.as_str())
125 .map(String::from);
126 return vec![BlobRef {
127 cid: link.clone(),
128 mime_type: mime,
129 }];
130 }
131 }
132 }
133 }
134
135 obj.values()
136 .flat_map(|v| find_blob_refs(v, depth + 1))
137 .collect()
138 }
139 _ => vec![],
140 }
141}
142
143pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) {
144 match value {
145 Ipld::Link(cid) => {
146 links.push(*cid);
147 }
148 Ipld::Map(map) => {
149 for v in map.values() {
150 extract_links(v, links);
151 }
152 }
153 Ipld::List(arr) => {
154 for v in arr {
155 extract_links(v, links);
156 }
157 }
158 _ => {}
159 }
160}
161
162#[derive(Debug)]
163pub struct ImportedRecord {
164 pub collection: String,
165 pub rkey: String,
166 pub cid: Cid,
167 pub blob_refs: Vec<BlobRef>,
168}
169
170pub fn walk_mst(
171 blocks: &HashMap<Cid, Bytes>,
172 root_cid: &Cid,
173) -> Result<Vec<ImportedRecord>, ImportError> {
174 let mut records = Vec::new();
175 let mut stack = vec![*root_cid];
176 let mut visited = std::collections::HashSet::new();
177
178 while let Some(cid) = stack.pop() {
179 if visited.contains(&cid) {
180 continue;
181 }
182 visited.insert(cid);
183
184 let block = blocks
185 .get(&cid)
186 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?;
187
188 let value: Ipld = serde_ipld_dagcbor::from_slice(block)
189 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
190
191 if let Ipld::Map(ref obj) = value {
192 if let Some(Ipld::List(entries)) = obj.get("e") {
193 for entry in entries {
194 if let Ipld::Map(entry_obj) = entry {
195 let key = entry_obj.get("k").and_then(|k| {
196 if let Ipld::Bytes(b) = k {
197 String::from_utf8(b.clone()).ok()
198 } else if let Ipld::String(s) = k {
199 Some(s.clone())
200 } else {
201 None
202 }
203 });
204
205 let record_cid = entry_obj.get("v").and_then(|v| {
206 if let Ipld::Link(cid) = v {
207 Some(*cid)
208 } else {
209 None
210 }
211 });
212
213 if let (Some(key), Some(record_cid)) = (key, record_cid) {
214 if let Some(record_block) = blocks.get(&record_cid) {
215 if let Ok(record_value) =
216 serde_ipld_dagcbor::from_slice::<Ipld>(record_block)
217 {
218 let blob_refs = find_blob_refs_ipld(&record_value, 0);
219
220 let parts: Vec<&str> = key.split('/').collect();
221 if parts.len() >= 2 {
222 let collection = parts[..parts.len() - 1].join("/");
223 let rkey = parts[parts.len() - 1].to_string();
224
225 records.push(ImportedRecord {
226 collection,
227 rkey,
228 cid: record_cid,
229 blob_refs,
230 });
231 }
232 }
233 }
234 }
235
236 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
237 stack.push(*tree_cid);
238 }
239 }
240 }
241 }
242
243 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
244 stack.push(*left_cid);
245 }
246 }
247 }
248
249 Ok(records)
250}
251
252pub struct CommitInfo {
253 pub rev: Option<String>,
254 pub prev: Option<String>,
255}
256
257fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> {
258 let obj = match commit {
259 Ipld::Map(m) => m,
260 _ => return Err(ImportError::InvalidCommit("Commit must be a map".to_string())),
261 };
262
263 let data_cid = obj
264 .get("data")
265 .and_then(|d| if let Ipld::Link(cid) = d { Some(*cid) } else { None })
266 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?;
267
268 let rev = obj.get("rev").and_then(|r| {
269 if let Ipld::String(s) = r {
270 Some(s.clone())
271 } else {
272 None
273 }
274 });
275
276 let prev = obj.get("prev").and_then(|p| {
277 if let Ipld::Link(cid) = p {
278 Some(cid.to_string())
279 } else if let Ipld::Null = p {
280 None
281 } else {
282 None
283 }
284 });
285
286 Ok((data_cid, CommitInfo { rev, prev }))
287}
288
289pub async fn apply_import(
290 db: &PgPool,
291 user_id: Uuid,
292 root: Cid,
293 blocks: HashMap<Cid, Bytes>,
294 max_blocks: usize,
295) -> Result<Vec<ImportedRecord>, ImportError> {
296 if blocks.len() > max_blocks {
297 return Err(ImportError::SizeLimitExceeded);
298 }
299
300 let root_block = blocks
301 .get(&root)
302 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?;
303 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block)
304 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
305
306 let (data_cid, _commit_info) = extract_commit_info(&commit)?;
307
308 let records = walk_mst(&blocks, &data_cid)?;
309
310 debug!(
311 "Importing {} blocks and {} records for user {}",
312 blocks.len(),
313 records.len(),
314 user_id
315 );
316
317 let mut tx = db.begin().await?;
318
319 let repo = sqlx::query!(
320 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
321 user_id
322 )
323 .fetch_optional(&mut *tx)
324 .await
325 .map_err(|e| {
326 if let sqlx::Error::Database(ref db_err) = e {
327 if db_err.code().as_deref() == Some("55P03") {
328 return ImportError::ConcurrentModification;
329 }
330 }
331 ImportError::Database(e)
332 })?;
333
334 if repo.is_none() {
335 return Err(ImportError::RepoNotFound);
336 }
337
338 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks
339 .iter()
340 .collect::<Vec<_>>()
341 .chunks(100)
342 .map(|c| c.to_vec())
343 .collect();
344
345 for chunk in block_chunks {
346 for (cid, data) in chunk {
347 let cid_bytes = cid.to_bytes();
348 sqlx::query!(
349 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
350 &cid_bytes,
351 data.as_ref()
352 )
353 .execute(&mut *tx)
354 .await?;
355 }
356 }
357
358 let root_str = root.to_string();
359 sqlx::query!(
360 "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2",
361 root_str,
362 user_id
363 )
364 .execute(&mut *tx)
365 .await?;
366
367 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
368 .execute(&mut *tx)
369 .await?;
370
371 for record in &records {
372 let record_cid_str = record.cid.to_string();
373 sqlx::query!(
374 r#"
375 INSERT INTO records (repo_id, collection, rkey, record_cid)
376 VALUES ($1, $2, $3, $4)
377 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4
378 "#,
379 user_id,
380 record.collection,
381 record.rkey,
382 record_cid_str
383 )
384 .execute(&mut *tx)
385 .await?;
386 }
387
388 tx.commit().await?;
389
390 debug!(
391 "Successfully imported {} blocks and {} records",
392 blocks.len(),
393 records.len()
394 );
395
396 Ok(records)
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402
403 #[test]
404 fn test_find_blob_refs() {
405 let record = serde_json::json!({
406 "$type": "app.bsky.feed.post",
407 "text": "Hello world",
408 "embed": {
409 "$type": "app.bsky.embed.images",
410 "images": [
411 {
412 "alt": "Test image",
413 "image": {
414 "$type": "blob",
415 "ref": {
416 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
417 },
418 "mimeType": "image/jpeg",
419 "size": 12345
420 }
421 }
422 ]
423 }
424 });
425
426 let blob_refs = find_blob_refs(&record, 0);
427 assert_eq!(blob_refs.len(), 1);
428 assert_eq!(
429 blob_refs[0].cid,
430 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
431 );
432 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string()));
433 }
434
435 #[test]
436 fn test_find_blob_refs_no_blobs() {
437 let record = serde_json::json!({
438 "$type": "app.bsky.feed.post",
439 "text": "Hello world"
440 });
441
442 let blob_refs = find_blob_refs(&record, 0);
443 assert!(blob_refs.is_empty());
444 }
445
446 #[test]
447 fn test_find_blob_refs_depth_limit() {
448 fn deeply_nested(depth: usize) -> JsonValue {
449 if depth == 0 {
450 serde_json::json!({
451 "$type": "blob",
452 "ref": { "$link": "bafkreitest" },
453 "mimeType": "image/png"
454 })
455 } else {
456 serde_json::json!({ "nested": deeply_nested(depth - 1) })
457 }
458 }
459
460 let deep = deeply_nested(40);
461 let blob_refs = find_blob_refs(&deep, 0);
462 assert!(blob_refs.is_empty());
463 }
464}