this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use ipld_core::ipld::Ipld;
4use iroh_car::CarReader;
5use serde_json::Value as JsonValue;
6use sqlx::PgPool;
7use std::collections::HashMap;
8use std::io::Cursor;
9use thiserror::Error;
10use tracing::debug;
11use uuid::Uuid;
12
13#[derive(Error, Debug)]
14pub enum ImportError {
15 #[error("CAR parsing error: {0}")]
16 CarParse(String),
17 #[error("Expected exactly one root in CAR file")]
18 InvalidRootCount,
19 #[error("Block not found: {0}")]
20 BlockNotFound(String),
21 #[error("Invalid CBOR: {0}")]
22 InvalidCbor(String),
23 #[error("Database error: {0}")]
24 Database(#[from] sqlx::Error),
25 #[error("Block store error: {0}")]
26 BlockStore(String),
27 #[error("Import size limit exceeded")]
28 SizeLimitExceeded,
29 #[error("Repo not found")]
30 RepoNotFound,
31 #[error("Concurrent modification detected")]
32 ConcurrentModification,
33 #[error("Invalid commit structure: {0}")]
34 InvalidCommit(String),
35 #[error("Verification failed: {0}")]
36 VerificationFailed(#[from] super::verify::VerifyError),
37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")]
38 DidMismatch { car_did: String, auth_did: String },
39}
40
41#[derive(Debug, Clone)]
42pub struct BlobRef {
43 pub cid: String,
44 pub mime_type: Option<String>,
45}
46
47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> {
48 let cursor = Cursor::new(data);
49 let mut reader = CarReader::new(cursor)
50 .await
51 .map_err(|e| ImportError::CarParse(e.to_string()))?;
52 let header = reader.header();
53 let roots = header.roots();
54 if roots.len() != 1 {
55 return Err(ImportError::InvalidRootCount);
56 }
57 let root = roots[0];
58 let mut blocks = HashMap::new();
59 while let Ok(Some((cid, block))) = reader.next_block().await {
60 blocks.insert(cid, Bytes::from(block));
61 }
62 if !blocks.contains_key(&root) {
63 return Err(ImportError::BlockNotFound(root.to_string()));
64 }
65 Ok((root, blocks))
66}
67
68pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> {
69 if depth > 32 {
70 return vec![];
71 }
72 match value {
73 Ipld::List(arr) => arr
74 .iter()
75 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
76 .collect(),
77 Ipld::Map(obj) => {
78 if let Some(Ipld::String(type_str)) = obj.get("$type")
79 && type_str == "blob"
80 && let Some(Ipld::Link(link_cid)) = obj.get("ref")
81 {
82 let mime = obj.get("mimeType").and_then(|v| {
83 if let Ipld::String(s) = v {
84 Some(s.clone())
85 } else {
86 None
87 }
88 });
89 return vec![BlobRef {
90 cid: link_cid.to_string(),
91 mime_type: mime,
92 }];
93 }
94 obj.values()
95 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
96 .collect()
97 }
98 _ => vec![],
99 }
100}
101
102pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> {
103 if depth > 32 {
104 return vec![];
105 }
106 match value {
107 JsonValue::Array(arr) => arr
108 .iter()
109 .flat_map(|v| find_blob_refs(v, depth + 1))
110 .collect(),
111 JsonValue::Object(obj) => {
112 if let Some(JsonValue::String(type_str)) = obj.get("$type")
113 && type_str == "blob"
114 && let Some(JsonValue::Object(ref_obj)) = obj.get("ref")
115 && let Some(JsonValue::String(link)) = ref_obj.get("$link")
116 {
117 let mime = obj
118 .get("mimeType")
119 .and_then(|v| v.as_str())
120 .map(String::from);
121 return vec![BlobRef {
122 cid: link.clone(),
123 mime_type: mime,
124 }];
125 }
126 obj.values()
127 .flat_map(|v| find_blob_refs(v, depth + 1))
128 .collect()
129 }
130 _ => vec![],
131 }
132}
133
134pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) {
135 match value {
136 Ipld::Link(cid) => {
137 links.push(*cid);
138 }
139 Ipld::Map(map) => {
140 for v in map.values() {
141 extract_links(v, links);
142 }
143 }
144 Ipld::List(arr) => {
145 for v in arr {
146 extract_links(v, links);
147 }
148 }
149 _ => {}
150 }
151}
152
153#[derive(Debug)]
154pub struct ImportedRecord {
155 pub collection: String,
156 pub rkey: String,
157 pub cid: Cid,
158 pub blob_refs: Vec<BlobRef>,
159}
160
161pub fn walk_mst(
162 blocks: &HashMap<Cid, Bytes>,
163 root_cid: &Cid,
164) -> Result<Vec<ImportedRecord>, ImportError> {
165 let mut records = Vec::new();
166 walk_mst_node(blocks, root_cid, &[], &mut records)?;
167 Ok(records)
168}
169
170fn walk_mst_node(
171 blocks: &HashMap<Cid, Bytes>,
172 cid: &Cid,
173 prev_key: &[u8],
174 records: &mut Vec<ImportedRecord>,
175) -> Result<(), ImportError> {
176 let block = blocks
177 .get(cid)
178 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?;
179 let value: Ipld = serde_ipld_dagcbor::from_slice(block)
180 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
181
182 if let Ipld::Map(ref obj) = value {
183 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
184 walk_mst_node(blocks, left_cid, prev_key, records)?;
185 }
186
187 let mut current_key = prev_key.to_vec();
188
189 if let Some(Ipld::List(entries)) = obj.get("e") {
190 for entry in entries {
191 if let Ipld::Map(entry_obj) = entry {
192 let prefix_len = entry_obj.get("p").and_then(|p| {
193 if let Ipld::Integer(n) = p {
194 Some(*n as usize)
195 } else {
196 None
197 }
198 }).unwrap_or(0);
199
200 let key_suffix = entry_obj.get("k").and_then(|k| {
201 if let Ipld::Bytes(b) = k {
202 Some(b.clone())
203 } else {
204 None
205 }
206 });
207
208 if let Some(suffix) = key_suffix {
209 current_key.truncate(prefix_len);
210 current_key.extend_from_slice(&suffix);
211 }
212
213 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
214 walk_mst_node(blocks, tree_cid, ¤t_key, records)?;
215 }
216
217 let record_cid = entry_obj.get("v").and_then(|v| {
218 if let Ipld::Link(cid) = v {
219 Some(*cid)
220 } else {
221 None
222 }
223 });
224
225 if let Some(record_cid) = record_cid {
226 if let Ok(full_key) = String::from_utf8(current_key.clone()) {
227 if let Some(record_block) = blocks.get(&record_cid)
228 && let Ok(record_value) =
229 serde_ipld_dagcbor::from_slice::<Ipld>(record_block)
230 {
231 let blob_refs = find_blob_refs_ipld(&record_value, 0);
232 let parts: Vec<&str> = full_key.split('/').collect();
233 if parts.len() >= 2 {
234 let collection = parts[..parts.len() - 1].join("/");
235 let rkey = parts[parts.len() - 1].to_string();
236 records.push(ImportedRecord {
237 collection,
238 rkey,
239 cid: record_cid,
240 blob_refs,
241 });
242 }
243 }
244 }
245 }
246 }
247 }
248 }
249 }
250 Ok(())
251}
252
253pub struct CommitInfo {
254 pub rev: Option<String>,
255 pub prev: Option<String>,
256}
257
258fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> {
259 let obj = match commit {
260 Ipld::Map(m) => m,
261 _ => {
262 return Err(ImportError::InvalidCommit(
263 "Commit must be a map".to_string(),
264 ));
265 }
266 };
267 let data_cid = obj
268 .get("data")
269 .and_then(|d| {
270 if let Ipld::Link(cid) = d {
271 Some(*cid)
272 } else {
273 None
274 }
275 })
276 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?;
277 let rev = obj.get("rev").and_then(|r| {
278 if let Ipld::String(s) = r {
279 Some(s.clone())
280 } else {
281 None
282 }
283 });
284 let prev = obj.get("prev").and_then(|p| {
285 if let Ipld::Link(cid) = p {
286 Some(cid.to_string())
287 } else if let Ipld::Null = p {
288 None
289 } else {
290 None
291 }
292 });
293 Ok((data_cid, CommitInfo { rev, prev }))
294}
295
296pub async fn apply_import(
297 db: &PgPool,
298 user_id: Uuid,
299 root: Cid,
300 blocks: HashMap<Cid, Bytes>,
301 max_blocks: usize,
302) -> Result<Vec<ImportedRecord>, ImportError> {
303 if blocks.len() > max_blocks {
304 return Err(ImportError::SizeLimitExceeded);
305 }
306 let root_block = blocks
307 .get(&root)
308 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?;
309 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block)
310 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
311 let (data_cid, _commit_info) = extract_commit_info(&commit)?;
312 let records = walk_mst(&blocks, &data_cid)?;
313 debug!(
314 "Importing {} blocks and {} records for user {}",
315 blocks.len(),
316 records.len(),
317 user_id
318 );
319 let mut tx = db.begin().await?;
320 let repo = sqlx::query!(
321 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
322 user_id
323 )
324 .fetch_optional(&mut *tx)
325 .await
326 .map_err(|e| {
327 if let sqlx::Error::Database(ref db_err) = e
328 && db_err.code().as_deref() == Some("55P03")
329 {
330 return ImportError::ConcurrentModification;
331 }
332 ImportError::Database(e)
333 })?;
334 if repo.is_none() {
335 return Err(ImportError::RepoNotFound);
336 }
337 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks
338 .iter()
339 .collect::<Vec<_>>()
340 .chunks(100)
341 .map(|c| c.to_vec())
342 .collect();
343 for chunk in block_chunks {
344 for (cid, data) in chunk {
345 let cid_bytes = cid.to_bytes();
346 sqlx::query!(
347 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
348 &cid_bytes,
349 data.as_ref()
350 )
351 .execute(&mut *tx)
352 .await?;
353 }
354 }
355 let root_str = root.to_string();
356 sqlx::query!(
357 "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2",
358 root_str,
359 user_id
360 )
361 .execute(&mut *tx)
362 .await?;
363 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
364 .execute(&mut *tx)
365 .await?;
366 for record in &records {
367 let record_cid_str = record.cid.to_string();
368 sqlx::query!(
369 r#"
370 INSERT INTO records (repo_id, collection, rkey, record_cid)
371 VALUES ($1, $2, $3, $4)
372 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4
373 "#,
374 user_id,
375 record.collection,
376 record.rkey,
377 record_cid_str
378 )
379 .execute(&mut *tx)
380 .await?;
381 }
382 tx.commit().await?;
383 debug!(
384 "Successfully imported {} blocks and {} records",
385 blocks.len(),
386 records.len()
387 );
388 Ok(records)
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394
395 #[test]
396 fn test_find_blob_refs() {
397 let record = serde_json::json!({
398 "$type": "app.bsky.feed.post",
399 "text": "Hello world",
400 "embed": {
401 "$type": "app.bsky.embed.images",
402 "images": [
403 {
404 "alt": "Test image",
405 "image": {
406 "$type": "blob",
407 "ref": {
408 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
409 },
410 "mimeType": "image/jpeg",
411 "size": 12345
412 }
413 }
414 ]
415 }
416 });
417 let blob_refs = find_blob_refs(&record, 0);
418 assert_eq!(blob_refs.len(), 1);
419 assert_eq!(
420 blob_refs[0].cid,
421 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
422 );
423 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string()));
424 }
425
426 #[test]
427 fn test_find_blob_refs_no_blobs() {
428 let record = serde_json::json!({
429 "$type": "app.bsky.feed.post",
430 "text": "Hello world"
431 });
432 let blob_refs = find_blob_refs(&record, 0);
433 assert!(blob_refs.is_empty());
434 }
435
436 #[test]
437 fn test_find_blob_refs_depth_limit() {
438 fn deeply_nested(depth: usize) -> JsonValue {
439 if depth == 0 {
440 serde_json::json!({
441 "$type": "blob",
442 "ref": { "$link": "bafkreitest" },
443 "mimeType": "image/png"
444 })
445 } else {
446 serde_json::json!({ "nested": deeply_nested(depth - 1) })
447 }
448 }
449 let deep = deeply_nested(40);
450 let blob_refs = find_blob_refs(&deep, 0);
451 assert!(blob_refs.is_empty());
452 }
453}