this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use ipld_core::ipld::Ipld;
4use iroh_car::CarReader;
5use serde_json::Value as JsonValue;
6use sqlx::PgPool;
7use std::collections::HashMap;
8use std::io::Cursor;
9use thiserror::Error;
10use tracing::debug;
11use uuid::Uuid;
12
13#[derive(Error, Debug)]
14pub enum ImportError {
15 #[error("CAR parsing error: {0}")]
16 CarParse(String),
17 #[error("Expected exactly one root in CAR file")]
18 InvalidRootCount,
19 #[error("Block not found: {0}")]
20 BlockNotFound(String),
21 #[error("Invalid CBOR: {0}")]
22 InvalidCbor(String),
23 #[error("Database error: {0}")]
24 Database(#[from] sqlx::Error),
25 #[error("Block store error: {0}")]
26 BlockStore(String),
27 #[error("Import size limit exceeded")]
28 SizeLimitExceeded,
29 #[error("Repo not found")]
30 RepoNotFound,
31 #[error("Concurrent modification detected")]
32 ConcurrentModification,
33 #[error("Invalid commit structure: {0}")]
34 InvalidCommit(String),
35 #[error("Verification failed: {0}")]
36 VerificationFailed(#[from] super::verify::VerifyError),
37 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")]
38 DidMismatch { car_did: String, auth_did: String },
39}
40
41#[derive(Debug, Clone)]
42pub struct BlobRef {
43 pub cid: String,
44 pub mime_type: Option<String>,
45}
46
47pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> {
48 let cursor = Cursor::new(data);
49 let mut reader = CarReader::new(cursor)
50 .await
51 .map_err(|e| ImportError::CarParse(e.to_string()))?;
52 let header = reader.header();
53 let roots = header.roots();
54 if roots.len() != 1 {
55 return Err(ImportError::InvalidRootCount);
56 }
57 let root = roots[0];
58 let mut blocks = HashMap::new();
59 while let Ok(Some((cid, block))) = reader.next_block().await {
60 blocks.insert(cid, Bytes::from(block));
61 }
62 if !blocks.contains_key(&root) {
63 return Err(ImportError::BlockNotFound(root.to_string()));
64 }
65 Ok((root, blocks))
66}
67
68pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> {
69 if depth > 32 {
70 return vec![];
71 }
72 match value {
73 Ipld::List(arr) => arr
74 .iter()
75 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
76 .collect(),
77 Ipld::Map(obj) => {
78 if let Some(Ipld::String(type_str)) = obj.get("$type") {
79 if type_str == "blob" {
80 if let Some(Ipld::Link(link_cid)) = obj.get("ref") {
81 let mime = obj
82 .get("mimeType")
83 .and_then(|v| if let Ipld::String(s) = v { Some(s.clone()) } else { None });
84 return vec![BlobRef {
85 cid: link_cid.to_string(),
86 mime_type: mime,
87 }];
88 }
89 }
90 }
91 obj.values()
92 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
93 .collect()
94 }
95 _ => vec![],
96 }
97}
98
99pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> {
100 if depth > 32 {
101 return vec![];
102 }
103 match value {
104 JsonValue::Array(arr) => arr
105 .iter()
106 .flat_map(|v| find_blob_refs(v, depth + 1))
107 .collect(),
108 JsonValue::Object(obj) => {
109 if let Some(JsonValue::String(type_str)) = obj.get("$type") {
110 if type_str == "blob" {
111 if let Some(JsonValue::Object(ref_obj)) = obj.get("ref") {
112 if let Some(JsonValue::String(link)) = ref_obj.get("$link") {
113 let mime = obj
114 .get("mimeType")
115 .and_then(|v| v.as_str())
116 .map(String::from);
117 return vec![BlobRef {
118 cid: link.clone(),
119 mime_type: mime,
120 }];
121 }
122 }
123 }
124 }
125 obj.values()
126 .flat_map(|v| find_blob_refs(v, depth + 1))
127 .collect()
128 }
129 _ => vec![],
130 }
131}
132
133pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) {
134 match value {
135 Ipld::Link(cid) => {
136 links.push(*cid);
137 }
138 Ipld::Map(map) => {
139 for v in map.values() {
140 extract_links(v, links);
141 }
142 }
143 Ipld::List(arr) => {
144 for v in arr {
145 extract_links(v, links);
146 }
147 }
148 _ => {}
149 }
150}
151
152#[derive(Debug)]
153pub struct ImportedRecord {
154 pub collection: String,
155 pub rkey: String,
156 pub cid: Cid,
157 pub blob_refs: Vec<BlobRef>,
158}
159
160pub fn walk_mst(
161 blocks: &HashMap<Cid, Bytes>,
162 root_cid: &Cid,
163) -> Result<Vec<ImportedRecord>, ImportError> {
164 let mut records = Vec::new();
165 let mut stack = vec![*root_cid];
166 let mut visited = std::collections::HashSet::new();
167 while let Some(cid) = stack.pop() {
168 if visited.contains(&cid) {
169 continue;
170 }
171 visited.insert(cid);
172 let block = blocks
173 .get(&cid)
174 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?;
175 let value: Ipld = serde_ipld_dagcbor::from_slice(block)
176 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
177 if let Ipld::Map(ref obj) = value {
178 if let Some(Ipld::List(entries)) = obj.get("e") {
179 for entry in entries {
180 if let Ipld::Map(entry_obj) = entry {
181 let key = entry_obj.get("k").and_then(|k| {
182 if let Ipld::Bytes(b) = k {
183 String::from_utf8(b.clone()).ok()
184 } else if let Ipld::String(s) = k {
185 Some(s.clone())
186 } else {
187 None
188 }
189 });
190 let record_cid = entry_obj.get("v").and_then(|v| {
191 if let Ipld::Link(cid) = v {
192 Some(*cid)
193 } else {
194 None
195 }
196 });
197 if let (Some(key), Some(record_cid)) = (key, record_cid) {
198 if let Some(record_block) = blocks.get(&record_cid) {
199 if let Ok(record_value) =
200 serde_ipld_dagcbor::from_slice::<Ipld>(record_block)
201 {
202 let blob_refs = find_blob_refs_ipld(&record_value, 0);
203 let parts: Vec<&str> = key.split('/').collect();
204 if parts.len() >= 2 {
205 let collection = parts[..parts.len() - 1].join("/");
206 let rkey = parts[parts.len() - 1].to_string();
207 records.push(ImportedRecord {
208 collection,
209 rkey,
210 cid: record_cid,
211 blob_refs,
212 });
213 }
214 }
215 }
216 }
217 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
218 stack.push(*tree_cid);
219 }
220 }
221 }
222 }
223 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
224 stack.push(*left_cid);
225 }
226 }
227 }
228 Ok(records)
229}
230
231pub struct CommitInfo {
232 pub rev: Option<String>,
233 pub prev: Option<String>,
234}
235
236fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> {
237 let obj = match commit {
238 Ipld::Map(m) => m,
239 _ => return Err(ImportError::InvalidCommit("Commit must be a map".to_string())),
240 };
241 let data_cid = obj
242 .get("data")
243 .and_then(|d| if let Ipld::Link(cid) = d { Some(*cid) } else { None })
244 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?;
245 let rev = obj.get("rev").and_then(|r| {
246 if let Ipld::String(s) = r {
247 Some(s.clone())
248 } else {
249 None
250 }
251 });
252 let prev = obj.get("prev").and_then(|p| {
253 if let Ipld::Link(cid) = p {
254 Some(cid.to_string())
255 } else if let Ipld::Null = p {
256 None
257 } else {
258 None
259 }
260 });
261 Ok((data_cid, CommitInfo { rev, prev }))
262}
263
264pub async fn apply_import(
265 db: &PgPool,
266 user_id: Uuid,
267 root: Cid,
268 blocks: HashMap<Cid, Bytes>,
269 max_blocks: usize,
270) -> Result<Vec<ImportedRecord>, ImportError> {
271 if blocks.len() > max_blocks {
272 return Err(ImportError::SizeLimitExceeded);
273 }
274 let root_block = blocks
275 .get(&root)
276 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?;
277 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block)
278 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
279 let (data_cid, _commit_info) = extract_commit_info(&commit)?;
280 let records = walk_mst(&blocks, &data_cid)?;
281 debug!(
282 "Importing {} blocks and {} records for user {}",
283 blocks.len(),
284 records.len(),
285 user_id
286 );
287 let mut tx = db.begin().await?;
288 let repo = sqlx::query!(
289 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
290 user_id
291 )
292 .fetch_optional(&mut *tx)
293 .await
294 .map_err(|e| {
295 if let sqlx::Error::Database(ref db_err) = e {
296 if db_err.code().as_deref() == Some("55P03") {
297 return ImportError::ConcurrentModification;
298 }
299 }
300 ImportError::Database(e)
301 })?;
302 if repo.is_none() {
303 return Err(ImportError::RepoNotFound);
304 }
305 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks
306 .iter()
307 .collect::<Vec<_>>()
308 .chunks(100)
309 .map(|c| c.to_vec())
310 .collect();
311 for chunk in block_chunks {
312 for (cid, data) in chunk {
313 let cid_bytes = cid.to_bytes();
314 sqlx::query!(
315 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
316 &cid_bytes,
317 data.as_ref()
318 )
319 .execute(&mut *tx)
320 .await?;
321 }
322 }
323 let root_str = root.to_string();
324 sqlx::query!(
325 "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2",
326 root_str,
327 user_id
328 )
329 .execute(&mut *tx)
330 .await?;
331 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
332 .execute(&mut *tx)
333 .await?;
334 for record in &records {
335 let record_cid_str = record.cid.to_string();
336 sqlx::query!(
337 r#"
338 INSERT INTO records (repo_id, collection, rkey, record_cid)
339 VALUES ($1, $2, $3, $4)
340 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4
341 "#,
342 user_id,
343 record.collection,
344 record.rkey,
345 record_cid_str
346 )
347 .execute(&mut *tx)
348 .await?;
349 }
350 tx.commit().await?;
351 debug!(
352 "Successfully imported {} blocks and {} records",
353 blocks.len(),
354 records.len()
355 );
356 Ok(records)
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362
363 #[test]
364 fn test_find_blob_refs() {
365 let record = serde_json::json!({
366 "$type": "app.bsky.feed.post",
367 "text": "Hello world",
368 "embed": {
369 "$type": "app.bsky.embed.images",
370 "images": [
371 {
372 "alt": "Test image",
373 "image": {
374 "$type": "blob",
375 "ref": {
376 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
377 },
378 "mimeType": "image/jpeg",
379 "size": 12345
380 }
381 }
382 ]
383 }
384 });
385 let blob_refs = find_blob_refs(&record, 0);
386 assert_eq!(blob_refs.len(), 1);
387 assert_eq!(
388 blob_refs[0].cid,
389 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
390 );
391 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string()));
392 }
393
394 #[test]
395 fn test_find_blob_refs_no_blobs() {
396 let record = serde_json::json!({
397 "$type": "app.bsky.feed.post",
398 "text": "Hello world"
399 });
400 let blob_refs = find_blob_refs(&record, 0);
401 assert!(blob_refs.is_empty());
402 }
403
404 #[test]
405 fn test_find_blob_refs_depth_limit() {
406 fn deeply_nested(depth: usize) -> JsonValue {
407 if depth == 0 {
408 serde_json::json!({
409 "$type": "blob",
410 "ref": { "$link": "bafkreitest" },
411 "mimeType": "image/png"
412 })
413 } else {
414 serde_json::json!({ "nested": deeply_nested(depth - 1) })
415 }
416 }
417 let deep = deeply_nested(40);
418 let blob_refs = find_blob_refs(&deep, 0);
419 assert!(blob_refs.is_empty());
420 }
421}