this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use ipld_core::ipld::Ipld;
4use iroh_car::CarReader;
5use serde_json::Value as JsonValue;
6use sqlx::PgPool;
7use std::collections::HashMap;
8use std::io::Cursor;
9use thiserror::Error;
10use tracing::debug;
11use uuid::Uuid;
12
13#[derive(Error, Debug)]
14pub enum ImportError {
15 #[error("CAR parsing error: {0}")]
16 CarParse(String),
17 #[error("Expected exactly one root in CAR file")]
18 InvalidRootCount,
19 #[error("Block not found: {0}")]
20 BlockNotFound(String),
21 #[error("Invalid CBOR: {0}")]
22 InvalidCbor(String),
23 #[error("Database error: {0}")]
24 Database(#[from] sqlx::Error),
25 #[error("Block store error: {0}")]
26 BlockStore(String),
27 #[error("Import size limit exceeded")]
28 SizeLimitExceeded,
29 #[error("Repo not found")]
30 RepoNotFound,
31 #[error("Concurrent modification detected")]
32 ConcurrentModification,
33 #[error("Invalid commit structure: {0}")]
34 InvalidCommit(String),
35 #[error("Verification failed: {0}")]
36 VerificationFailed(#[from] super::verify::VerifyError),
37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")]
38 DidMismatch { car_did: String, auth_did: String },
39}
40
41#[derive(Debug, Clone)]
42pub struct BlobRef {
43 pub cid: String,
44 pub mime_type: Option<String>,
45}
46
47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> {
48 let cursor = Cursor::new(data);
49 let mut reader = CarReader::new(cursor)
50 .await
51 .map_err(|e| ImportError::CarParse(e.to_string()))?;
52 let header = reader.header();
53 let roots = header.roots();
54 if roots.len() != 1 {
55 return Err(ImportError::InvalidRootCount);
56 }
57 let root = roots[0];
58 let mut blocks = HashMap::new();
59 while let Ok(Some((cid, block))) = reader.next_block().await {
60 blocks.insert(cid, Bytes::from(block));
61 }
62 if !blocks.contains_key(&root) {
63 return Err(ImportError::BlockNotFound(root.to_string()));
64 }
65 Ok((root, blocks))
66}
67
68pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> {
69 if depth > 32 {
70 return vec![];
71 }
72 match value {
73 Ipld::List(arr) => arr
74 .iter()
75 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
76 .collect(),
77 Ipld::Map(obj) => {
78 if let Some(Ipld::String(type_str)) = obj.get("$type")
79 && type_str == "blob"
80 {
81 let cid_str = if let Some(Ipld::Link(link_cid)) = obj.get("ref") {
82 Some(link_cid.to_string())
83 } else if let Some(Ipld::Map(ref_obj)) = obj.get("ref")
84 && let Some(Ipld::String(link)) = ref_obj.get("$link")
85 {
86 Some(link.clone())
87 } else {
88 None
89 };
90
91 if let Some(cid) = cid_str {
92 let mime = obj.get("mimeType").and_then(|v| {
93 if let Ipld::String(s) = v {
94 Some(s.clone())
95 } else {
96 None
97 }
98 });
99 return vec![BlobRef {
100 cid,
101 mime_type: mime,
102 }];
103 }
104 }
105 obj.values()
106 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
107 .collect()
108 }
109 _ => vec![],
110 }
111}
112
113pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> {
114 if depth > 32 {
115 return vec![];
116 }
117 match value {
118 JsonValue::Array(arr) => arr
119 .iter()
120 .flat_map(|v| find_blob_refs(v, depth + 1))
121 .collect(),
122 JsonValue::Object(obj) => {
123 if let Some(JsonValue::String(type_str)) = obj.get("$type")
124 && type_str == "blob"
125 && let Some(JsonValue::Object(ref_obj)) = obj.get("ref")
126 && let Some(JsonValue::String(link)) = ref_obj.get("$link")
127 {
128 let mime = obj
129 .get("mimeType")
130 .and_then(|v| v.as_str())
131 .map(String::from);
132 return vec![BlobRef {
133 cid: link.clone(),
134 mime_type: mime,
135 }];
136 }
137 obj.values()
138 .flat_map(|v| find_blob_refs(v, depth + 1))
139 .collect()
140 }
141 _ => vec![],
142 }
143}
144
145pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) {
146 match value {
147 Ipld::Link(cid) => {
148 links.push(*cid);
149 }
150 Ipld::Map(map) => {
151 map.values().for_each(|v| extract_links(v, links));
152 }
153 Ipld::List(arr) => {
154 arr.iter().for_each(|v| extract_links(v, links));
155 }
156 _ => {}
157 }
158}
159
160#[derive(Debug)]
161pub struct ImportedRecord {
162 pub collection: String,
163 pub rkey: String,
164 pub cid: Cid,
165 pub blob_refs: Vec<BlobRef>,
166}
167
168pub fn walk_mst(
169 blocks: &HashMap<Cid, Bytes>,
170 root_cid: &Cid,
171) -> Result<Vec<ImportedRecord>, ImportError> {
172 let mut records = Vec::new();
173 walk_mst_node(blocks, root_cid, &[], &mut records)?;
174 Ok(records)
175}
176
177fn walk_mst_node(
178 blocks: &HashMap<Cid, Bytes>,
179 cid: &Cid,
180 prev_key: &[u8],
181 records: &mut Vec<ImportedRecord>,
182) -> Result<(), ImportError> {
183 let block = blocks
184 .get(cid)
185 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?;
186 let value: Ipld = serde_ipld_dagcbor::from_slice(block)
187 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
188
189 if let Ipld::Map(ref obj) = value {
190 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
191 walk_mst_node(blocks, left_cid, prev_key, records)?;
192 }
193
194 let mut current_key = prev_key.to_vec();
195
196 if let Some(Ipld::List(entries)) = obj.get("e") {
197 for entry in entries {
198 if let Ipld::Map(entry_obj) = entry {
199 let prefix_len = entry_obj
200 .get("p")
201 .and_then(|p| {
202 if let Ipld::Integer(n) = p {
203 Some(*n as usize)
204 } else {
205 None
206 }
207 })
208 .unwrap_or(0);
209
210 let key_suffix = entry_obj.get("k").and_then(|k| {
211 if let Ipld::Bytes(b) = k {
212 Some(b.clone())
213 } else {
214 None
215 }
216 });
217
218 if let Some(suffix) = key_suffix {
219 current_key.truncate(prefix_len);
220 current_key.extend_from_slice(&suffix);
221 }
222
223 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
224 walk_mst_node(blocks, tree_cid, ¤t_key, records)?;
225 }
226
227 let record_cid = entry_obj.get("v").and_then(|v| {
228 if let Ipld::Link(cid) = v {
229 Some(*cid)
230 } else {
231 None
232 }
233 });
234
235 if let Some(record_cid) = record_cid
236 && let Ok(full_key) = String::from_utf8(current_key.clone())
237 && let Some(record_block) = blocks.get(&record_cid)
238 && let Ok(record_value) =
239 serde_ipld_dagcbor::from_slice::<Ipld>(record_block)
240 {
241 let blob_refs = find_blob_refs_ipld(&record_value, 0);
242 let parts: Vec<&str> = full_key.split('/').collect();
243 if parts.len() >= 2 {
244 let collection = parts[..parts.len() - 1].join("/");
245 let rkey = parts[parts.len() - 1].to_string();
246 records.push(ImportedRecord {
247 collection,
248 rkey,
249 cid: record_cid,
250 blob_refs,
251 });
252 }
253 }
254 }
255 }
256 }
257 }
258 Ok(())
259}
260
261pub struct CommitInfo {
262 pub rev: Option<String>,
263 pub prev: Option<String>,
264}
265
266pub struct ImportResult {
267 pub records: Vec<ImportedRecord>,
268 pub data_cid: Cid,
269}
270
271fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> {
272 let obj = match commit {
273 Ipld::Map(m) => m,
274 _ => {
275 return Err(ImportError::InvalidCommit(
276 "Commit must be a map".to_string(),
277 ));
278 }
279 };
280 let data_cid = obj
281 .get("data")
282 .and_then(|d| {
283 if let Ipld::Link(cid) = d {
284 Some(*cid)
285 } else {
286 None
287 }
288 })
289 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?;
290 let rev = obj.get("rev").and_then(|r| {
291 if let Ipld::String(s) = r {
292 Some(s.clone())
293 } else {
294 None
295 }
296 });
297 let prev = obj.get("prev").and_then(|p| {
298 if let Ipld::Link(cid) = p {
299 Some(cid.to_string())
300 } else if let Ipld::Null = p {
301 None
302 } else {
303 None
304 }
305 });
306 Ok((data_cid, CommitInfo { rev, prev }))
307}
308
309pub async fn apply_import(
310 db: &PgPool,
311 user_id: Uuid,
312 root: Cid,
313 blocks: HashMap<Cid, Bytes>,
314 max_blocks: usize,
315) -> Result<ImportResult, ImportError> {
316 if blocks.len() > max_blocks {
317 return Err(ImportError::SizeLimitExceeded);
318 }
319 let root_block = blocks
320 .get(&root)
321 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?;
322 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block)
323 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
324 let (data_cid, _commit_info) = extract_commit_info(&commit)?;
325 let records = walk_mst(&blocks, &data_cid)?;
326 debug!(
327 "Importing {} blocks and {} records for user {}",
328 blocks.len(),
329 records.len(),
330 user_id
331 );
332 let mut tx = db.begin().await?;
333 let repo = sqlx::query!(
334 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
335 user_id
336 )
337 .fetch_optional(&mut *tx)
338 .await
339 .map_err(|e| {
340 if let sqlx::Error::Database(ref db_err) = e
341 && db_err.code().as_deref() == Some("55P03")
342 {
343 return ImportError::ConcurrentModification;
344 }
345 ImportError::Database(e)
346 })?;
347 if repo.is_none() {
348 return Err(ImportError::RepoNotFound);
349 }
350 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks
351 .iter()
352 .collect::<Vec<_>>()
353 .chunks(100)
354 .map(|c| c.to_vec())
355 .collect();
356 for chunk in block_chunks {
357 for (cid, data) in chunk {
358 let cid_bytes = cid.to_bytes();
359 sqlx::query!(
360 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
361 &cid_bytes,
362 data.as_ref()
363 )
364 .execute(&mut *tx)
365 .await?;
366 }
367 }
368 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
369 .execute(&mut *tx)
370 .await?;
371 for record in &records {
372 let record_cid_str = record.cid.to_string();
373 sqlx::query!(
374 r#"
375 INSERT INTO records (repo_id, collection, rkey, record_cid)
376 VALUES ($1, $2, $3, $4)
377 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4
378 "#,
379 user_id,
380 record.collection,
381 record.rkey,
382 record_cid_str
383 )
384 .execute(&mut *tx)
385 .await?;
386 }
387 tx.commit().await?;
388 debug!(
389 "Successfully imported {} blocks and {} records",
390 blocks.len(),
391 records.len()
392 );
393 Ok(ImportResult { records, data_cid })
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399
400 #[test]
401 fn test_find_blob_refs() {
402 let record = serde_json::json!({
403 "$type": "app.bsky.feed.post",
404 "text": "Hello world",
405 "embed": {
406 "$type": "app.bsky.embed.images",
407 "images": [
408 {
409 "alt": "Test image",
410 "image": {
411 "$type": "blob",
412 "ref": {
413 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
414 },
415 "mimeType": "image/jpeg",
416 "size": 12345
417 }
418 }
419 ]
420 }
421 });
422 let blob_refs = find_blob_refs(&record, 0);
423 assert_eq!(blob_refs.len(), 1);
424 assert_eq!(
425 blob_refs[0].cid,
426 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
427 );
428 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string()));
429 }
430
431 #[test]
432 fn test_find_blob_refs_no_blobs() {
433 let record = serde_json::json!({
434 "$type": "app.bsky.feed.post",
435 "text": "Hello world"
436 });
437 let blob_refs = find_blob_refs(&record, 0);
438 assert!(blob_refs.is_empty());
439 }
440
441 #[test]
442 fn test_find_blob_refs_depth_limit() {
443 fn deeply_nested(depth: usize) -> JsonValue {
444 if depth == 0 {
445 serde_json::json!({
446 "$type": "blob",
447 "ref": { "$link": "bafkreitest" },
448 "mimeType": "image/png"
449 })
450 } else {
451 serde_json::json!({ "nested": deeply_nested(depth - 1) })
452 }
453 }
454 let deep = deeply_nested(40);
455 let blob_refs = find_blob_refs(&deep, 0);
456 assert!(blob_refs.is_empty());
457 }
458}