this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use ipld_core::ipld::Ipld;
4use iroh_car::CarReader;
5use serde_json::Value as JsonValue;
6use sqlx::PgPool;
7use std::collections::HashMap;
8use std::io::Cursor;
9use thiserror::Error;
10use tracing::debug;
11use uuid::Uuid;
12
13#[derive(Error, Debug)]
14pub enum ImportError {
15 #[error("CAR parsing error: {0}")]
16 CarParse(String),
17 #[error("Expected exactly one root in CAR file")]
18 InvalidRootCount,
19 #[error("Block not found: {0}")]
20 BlockNotFound(String),
21 #[error("Invalid CBOR: {0}")]
22 InvalidCbor(String),
23 #[error("Database error: {0}")]
24 Database(#[from] sqlx::Error),
25 #[error("Block store error: {0}")]
26 BlockStore(String),
27 #[error("Import size limit exceeded")]
28 SizeLimitExceeded,
29 #[error("Repo not found")]
30 RepoNotFound,
31 #[error("Concurrent modification detected")]
32 ConcurrentModification,
33 #[error("Invalid commit structure: {0}")]
34 InvalidCommit(String),
35 #[error("Verification failed: {0}")]
36 VerificationFailed(#[from] super::verify::VerifyError),
37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")]
38 DidMismatch { car_did: String, auth_did: String },
39}
40
41#[derive(Debug, Clone)]
42pub struct BlobRef {
43 pub cid: String,
44 pub mime_type: Option<String>,
45}
46
47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> {
48 let cursor = Cursor::new(data);
49 let mut reader = CarReader::new(cursor)
50 .await
51 .map_err(|e| ImportError::CarParse(e.to_string()))?;
52 let header = reader.header();
53 let roots = header.roots();
54 if roots.len() != 1 {
55 return Err(ImportError::InvalidRootCount);
56 }
57 let root = roots[0];
58 let mut blocks = HashMap::new();
59 while let Ok(Some((cid, block))) = reader.next_block().await {
60 blocks.insert(cid, Bytes::from(block));
61 }
62 if !blocks.contains_key(&root) {
63 return Err(ImportError::BlockNotFound(root.to_string()));
64 }
65 Ok((root, blocks))
66}
67
68pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> {
69 if depth > 32 {
70 return vec![];
71 }
72 match value {
73 Ipld::List(arr) => arr
74 .iter()
75 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
76 .collect(),
77 Ipld::Map(obj) => {
78 if let Some(Ipld::String(type_str)) = obj.get("$type")
79 && type_str == "blob"
80 && let Some(Ipld::Link(link_cid)) = obj.get("ref") {
81 let mime = obj.get("mimeType").and_then(|v| {
82 if let Ipld::String(s) = v {
83 Some(s.clone())
84 } else {
85 None
86 }
87 });
88 return vec![BlobRef {
89 cid: link_cid.to_string(),
90 mime_type: mime,
91 }];
92 }
93 obj.values()
94 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
95 .collect()
96 }
97 _ => vec![],
98 }
99}
100
101pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> {
102 if depth > 32 {
103 return vec![];
104 }
105 match value {
106 JsonValue::Array(arr) => arr
107 .iter()
108 .flat_map(|v| find_blob_refs(v, depth + 1))
109 .collect(),
110 JsonValue::Object(obj) => {
111 if let Some(JsonValue::String(type_str)) = obj.get("$type")
112 && type_str == "blob"
113 && let Some(JsonValue::Object(ref_obj)) = obj.get("ref")
114 && let Some(JsonValue::String(link)) = ref_obj.get("$link") {
115 let mime = obj
116 .get("mimeType")
117 .and_then(|v| v.as_str())
118 .map(String::from);
119 return vec![BlobRef {
120 cid: link.clone(),
121 mime_type: mime,
122 }];
123 }
124 obj.values()
125 .flat_map(|v| find_blob_refs(v, depth + 1))
126 .collect()
127 }
128 _ => vec![],
129 }
130}
131
132pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) {
133 match value {
134 Ipld::Link(cid) => {
135 links.push(*cid);
136 }
137 Ipld::Map(map) => {
138 for v in map.values() {
139 extract_links(v, links);
140 }
141 }
142 Ipld::List(arr) => {
143 for v in arr {
144 extract_links(v, links);
145 }
146 }
147 _ => {}
148 }
149}
150
151#[derive(Debug)]
152pub struct ImportedRecord {
153 pub collection: String,
154 pub rkey: String,
155 pub cid: Cid,
156 pub blob_refs: Vec<BlobRef>,
157}
158
159pub fn walk_mst(
160 blocks: &HashMap<Cid, Bytes>,
161 root_cid: &Cid,
162) -> Result<Vec<ImportedRecord>, ImportError> {
163 let mut records = Vec::new();
164 let mut stack = vec![*root_cid];
165 let mut visited = std::collections::HashSet::new();
166 while let Some(cid) = stack.pop() {
167 if visited.contains(&cid) {
168 continue;
169 }
170 visited.insert(cid);
171 let block = blocks
172 .get(&cid)
173 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?;
174 let value: Ipld = serde_ipld_dagcbor::from_slice(block)
175 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
176 if let Ipld::Map(ref obj) = value {
177 if let Some(Ipld::List(entries)) = obj.get("e") {
178 for entry in entries {
179 if let Ipld::Map(entry_obj) = entry {
180 let key = entry_obj.get("k").and_then(|k| {
181 if let Ipld::Bytes(b) = k {
182 String::from_utf8(b.clone()).ok()
183 } else if let Ipld::String(s) = k {
184 Some(s.clone())
185 } else {
186 None
187 }
188 });
189 let record_cid = entry_obj.get("v").and_then(|v| {
190 if let Ipld::Link(cid) = v {
191 Some(*cid)
192 } else {
193 None
194 }
195 });
196 if let (Some(key), Some(record_cid)) = (key, record_cid)
197 && let Some(record_block) = blocks.get(&record_cid)
198 && let Ok(record_value) =
199 serde_ipld_dagcbor::from_slice::<Ipld>(record_block)
200 {
201 let blob_refs = find_blob_refs_ipld(&record_value, 0);
202 let parts: Vec<&str> = key.split('/').collect();
203 if parts.len() >= 2 {
204 let collection = parts[..parts.len() - 1].join("/");
205 let rkey = parts[parts.len() - 1].to_string();
206 records.push(ImportedRecord {
207 collection,
208 rkey,
209 cid: record_cid,
210 blob_refs,
211 });
212 }
213 }
214 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
215 stack.push(*tree_cid);
216 }
217 }
218 }
219 }
220 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
221 stack.push(*left_cid);
222 }
223 }
224 }
225 Ok(records)
226}
227
228pub struct CommitInfo {
229 pub rev: Option<String>,
230 pub prev: Option<String>,
231}
232
233fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> {
234 let obj = match commit {
235 Ipld::Map(m) => m,
236 _ => {
237 return Err(ImportError::InvalidCommit(
238 "Commit must be a map".to_string(),
239 ));
240 }
241 };
242 let data_cid = obj
243 .get("data")
244 .and_then(|d| {
245 if let Ipld::Link(cid) = d {
246 Some(*cid)
247 } else {
248 None
249 }
250 })
251 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?;
252 let rev = obj.get("rev").and_then(|r| {
253 if let Ipld::String(s) = r {
254 Some(s.clone())
255 } else {
256 None
257 }
258 });
259 let prev = obj.get("prev").and_then(|p| {
260 if let Ipld::Link(cid) = p {
261 Some(cid.to_string())
262 } else if let Ipld::Null = p {
263 None
264 } else {
265 None
266 }
267 });
268 Ok((data_cid, CommitInfo { rev, prev }))
269}
270
271pub async fn apply_import(
272 db: &PgPool,
273 user_id: Uuid,
274 root: Cid,
275 blocks: HashMap<Cid, Bytes>,
276 max_blocks: usize,
277) -> Result<Vec<ImportedRecord>, ImportError> {
278 if blocks.len() > max_blocks {
279 return Err(ImportError::SizeLimitExceeded);
280 }
281 let root_block = blocks
282 .get(&root)
283 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?;
284 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block)
285 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
286 let (data_cid, _commit_info) = extract_commit_info(&commit)?;
287 let records = walk_mst(&blocks, &data_cid)?;
288 debug!(
289 "Importing {} blocks and {} records for user {}",
290 blocks.len(),
291 records.len(),
292 user_id
293 );
294 let mut tx = db.begin().await?;
295 let repo = sqlx::query!(
296 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
297 user_id
298 )
299 .fetch_optional(&mut *tx)
300 .await
301 .map_err(|e| {
302 if let sqlx::Error::Database(ref db_err) = e
303 && db_err.code().as_deref() == Some("55P03") {
304 return ImportError::ConcurrentModification;
305 }
306 ImportError::Database(e)
307 })?;
308 if repo.is_none() {
309 return Err(ImportError::RepoNotFound);
310 }
311 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks
312 .iter()
313 .collect::<Vec<_>>()
314 .chunks(100)
315 .map(|c| c.to_vec())
316 .collect();
317 for chunk in block_chunks {
318 for (cid, data) in chunk {
319 let cid_bytes = cid.to_bytes();
320 sqlx::query!(
321 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
322 &cid_bytes,
323 data.as_ref()
324 )
325 .execute(&mut *tx)
326 .await?;
327 }
328 }
329 let root_str = root.to_string();
330 sqlx::query!(
331 "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2",
332 root_str,
333 user_id
334 )
335 .execute(&mut *tx)
336 .await?;
337 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
338 .execute(&mut *tx)
339 .await?;
340 for record in &records {
341 let record_cid_str = record.cid.to_string();
342 sqlx::query!(
343 r#"
344 INSERT INTO records (repo_id, collection, rkey, record_cid)
345 VALUES ($1, $2, $3, $4)
346 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4
347 "#,
348 user_id,
349 record.collection,
350 record.rkey,
351 record_cid_str
352 )
353 .execute(&mut *tx)
354 .await?;
355 }
356 tx.commit().await?;
357 debug!(
358 "Successfully imported {} blocks and {} records",
359 blocks.len(),
360 records.len()
361 );
362 Ok(records)
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368
369 #[test]
370 fn test_find_blob_refs() {
371 let record = serde_json::json!({
372 "$type": "app.bsky.feed.post",
373 "text": "Hello world",
374 "embed": {
375 "$type": "app.bsky.embed.images",
376 "images": [
377 {
378 "alt": "Test image",
379 "image": {
380 "$type": "blob",
381 "ref": {
382 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
383 },
384 "mimeType": "image/jpeg",
385 "size": 12345
386 }
387 }
388 ]
389 }
390 });
391 let blob_refs = find_blob_refs(&record, 0);
392 assert_eq!(blob_refs.len(), 1);
393 assert_eq!(
394 blob_refs[0].cid,
395 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
396 );
397 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string()));
398 }
399
400 #[test]
401 fn test_find_blob_refs_no_blobs() {
402 let record = serde_json::json!({
403 "$type": "app.bsky.feed.post",
404 "text": "Hello world"
405 });
406 let blob_refs = find_blob_refs(&record, 0);
407 assert!(blob_refs.is_empty());
408 }
409
410 #[test]
411 fn test_find_blob_refs_depth_limit() {
412 fn deeply_nested(depth: usize) -> JsonValue {
413 if depth == 0 {
414 serde_json::json!({
415 "$type": "blob",
416 "ref": { "$link": "bafkreitest" },
417 "mimeType": "image/png"
418 })
419 } else {
420 serde_json::json!({ "nested": deeply_nested(depth - 1) })
421 }
422 }
423 let deep = deeply_nested(40);
424 let blob_refs = find_blob_refs(&deep, 0);
425 assert!(blob_refs.is_empty());
426 }
427}