this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use ipld_core::ipld::Ipld;
4use iroh_car::CarReader;
5use serde_json::Value as JsonValue;
6use sqlx::PgPool;
7use std::collections::HashMap;
8use std::io::Cursor;
9use thiserror::Error;
10use tracing::debug;
11use uuid::Uuid;
12#[derive(Error, Debug)]
13pub enum ImportError {
14 #[error("CAR parsing error: {0}")]
15 CarParse(String),
16 #[error("Expected exactly one root in CAR file")]
17 InvalidRootCount,
18 #[error("Block not found: {0}")]
19 BlockNotFound(String),
20 #[error("Invalid CBOR: {0}")]
21 InvalidCbor(String),
22 #[error("Database error: {0}")]
23 Database(#[from] sqlx::Error),
24 #[error("Block store error: {0}")]
25 BlockStore(String),
26 #[error("Import size limit exceeded")]
27 SizeLimitExceeded,
28 #[error("Repo not found")]
29 RepoNotFound,
30 #[error("Concurrent modification detected")]
31 ConcurrentModification,
32 #[error("Invalid commit structure: {0}")]
33 InvalidCommit(String),
34 #[error("Verification failed: {0}")]
35 VerificationFailed(#[from] super::verify::VerifyError),
36 #[error("DID mismatch: CAR is for {car_did}, but authenticated as {auth_did}")]
37 DidMismatch { car_did: String, auth_did: String },
38}
39#[derive(Debug, Clone)]
40pub struct BlobRef {
41 pub cid: String,
42 pub mime_type: Option<String>,
43}
44pub async fn parse_car(data: &[u8]) -> Result<(Cid, HashMap<Cid, Bytes>), ImportError> {
45 let cursor = Cursor::new(data);
46 let mut reader = CarReader::new(cursor)
47 .await
48 .map_err(|e| ImportError::CarParse(e.to_string()))?;
49 let header = reader.header();
50 let roots = header.roots();
51 if roots.len() != 1 {
52 return Err(ImportError::InvalidRootCount);
53 }
54 let root = roots[0];
55 let mut blocks = HashMap::new();
56 while let Ok(Some((cid, block))) = reader.next_block().await {
57 blocks.insert(cid, Bytes::from(block));
58 }
59 if !blocks.contains_key(&root) {
60 return Err(ImportError::BlockNotFound(root.to_string()));
61 }
62 Ok((root, blocks))
63}
64pub fn find_blob_refs_ipld(value: &Ipld, depth: usize) -> Vec<BlobRef> {
65 if depth > 32 {
66 return vec![];
67 }
68 match value {
69 Ipld::List(arr) => arr
70 .iter()
71 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
72 .collect(),
73 Ipld::Map(obj) => {
74 if let Some(Ipld::String(type_str)) = obj.get("$type") {
75 if type_str == "blob" {
76 if let Some(Ipld::Link(link_cid)) = obj.get("ref") {
77 let mime = obj
78 .get("mimeType")
79 .and_then(|v| if let Ipld::String(s) = v { Some(s.clone()) } else { None });
80 return vec![BlobRef {
81 cid: link_cid.to_string(),
82 mime_type: mime,
83 }];
84 }
85 }
86 }
87 obj.values()
88 .flat_map(|v| find_blob_refs_ipld(v, depth + 1))
89 .collect()
90 }
91 _ => vec![],
92 }
93}
94pub fn find_blob_refs(value: &JsonValue, depth: usize) -> Vec<BlobRef> {
95 if depth > 32 {
96 return vec![];
97 }
98 match value {
99 JsonValue::Array(arr) => arr
100 .iter()
101 .flat_map(|v| find_blob_refs(v, depth + 1))
102 .collect(),
103 JsonValue::Object(obj) => {
104 if let Some(JsonValue::String(type_str)) = obj.get("$type") {
105 if type_str == "blob" {
106 if let Some(JsonValue::Object(ref_obj)) = obj.get("ref") {
107 if let Some(JsonValue::String(link)) = ref_obj.get("$link") {
108 let mime = obj
109 .get("mimeType")
110 .and_then(|v| v.as_str())
111 .map(String::from);
112 return vec![BlobRef {
113 cid: link.clone(),
114 mime_type: mime,
115 }];
116 }
117 }
118 }
119 }
120 obj.values()
121 .flat_map(|v| find_blob_refs(v, depth + 1))
122 .collect()
123 }
124 _ => vec![],
125 }
126}
127pub fn extract_links(value: &Ipld, links: &mut Vec<Cid>) {
128 match value {
129 Ipld::Link(cid) => {
130 links.push(*cid);
131 }
132 Ipld::Map(map) => {
133 for v in map.values() {
134 extract_links(v, links);
135 }
136 }
137 Ipld::List(arr) => {
138 for v in arr {
139 extract_links(v, links);
140 }
141 }
142 _ => {}
143 }
144}
145#[derive(Debug)]
146pub struct ImportedRecord {
147 pub collection: String,
148 pub rkey: String,
149 pub cid: Cid,
150 pub blob_refs: Vec<BlobRef>,
151}
152pub fn walk_mst(
153 blocks: &HashMap<Cid, Bytes>,
154 root_cid: &Cid,
155) -> Result<Vec<ImportedRecord>, ImportError> {
156 let mut records = Vec::new();
157 let mut stack = vec![*root_cid];
158 let mut visited = std::collections::HashSet::new();
159 while let Some(cid) = stack.pop() {
160 if visited.contains(&cid) {
161 continue;
162 }
163 visited.insert(cid);
164 let block = blocks
165 .get(&cid)
166 .ok_or_else(|| ImportError::BlockNotFound(cid.to_string()))?;
167 let value: Ipld = serde_ipld_dagcbor::from_slice(block)
168 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
169 if let Ipld::Map(ref obj) = value {
170 if let Some(Ipld::List(entries)) = obj.get("e") {
171 for entry in entries {
172 if let Ipld::Map(entry_obj) = entry {
173 let key = entry_obj.get("k").and_then(|k| {
174 if let Ipld::Bytes(b) = k {
175 String::from_utf8(b.clone()).ok()
176 } else if let Ipld::String(s) = k {
177 Some(s.clone())
178 } else {
179 None
180 }
181 });
182 let record_cid = entry_obj.get("v").and_then(|v| {
183 if let Ipld::Link(cid) = v {
184 Some(*cid)
185 } else {
186 None
187 }
188 });
189 if let (Some(key), Some(record_cid)) = (key, record_cid) {
190 if let Some(record_block) = blocks.get(&record_cid) {
191 if let Ok(record_value) =
192 serde_ipld_dagcbor::from_slice::<Ipld>(record_block)
193 {
194 let blob_refs = find_blob_refs_ipld(&record_value, 0);
195 let parts: Vec<&str> = key.split('/').collect();
196 if parts.len() >= 2 {
197 let collection = parts[..parts.len() - 1].join("/");
198 let rkey = parts[parts.len() - 1].to_string();
199 records.push(ImportedRecord {
200 collection,
201 rkey,
202 cid: record_cid,
203 blob_refs,
204 });
205 }
206 }
207 }
208 }
209 if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
210 stack.push(*tree_cid);
211 }
212 }
213 }
214 }
215 if let Some(Ipld::Link(left_cid)) = obj.get("l") {
216 stack.push(*left_cid);
217 }
218 }
219 }
220 Ok(records)
221}
222pub struct CommitInfo {
223 pub rev: Option<String>,
224 pub prev: Option<String>,
225}
226fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> {
227 let obj = match commit {
228 Ipld::Map(m) => m,
229 _ => return Err(ImportError::InvalidCommit("Commit must be a map".to_string())),
230 };
231 let data_cid = obj
232 .get("data")
233 .and_then(|d| if let Ipld::Link(cid) = d { Some(*cid) } else { None })
234 .ok_or_else(|| ImportError::InvalidCommit("Missing data field".to_string()))?;
235 let rev = obj.get("rev").and_then(|r| {
236 if let Ipld::String(s) = r {
237 Some(s.clone())
238 } else {
239 None
240 }
241 });
242 let prev = obj.get("prev").and_then(|p| {
243 if let Ipld::Link(cid) = p {
244 Some(cid.to_string())
245 } else if let Ipld::Null = p {
246 None
247 } else {
248 None
249 }
250 });
251 Ok((data_cid, CommitInfo { rev, prev }))
252}
253pub async fn apply_import(
254 db: &PgPool,
255 user_id: Uuid,
256 root: Cid,
257 blocks: HashMap<Cid, Bytes>,
258 max_blocks: usize,
259) -> Result<Vec<ImportedRecord>, ImportError> {
260 if blocks.len() > max_blocks {
261 return Err(ImportError::SizeLimitExceeded);
262 }
263 let root_block = blocks
264 .get(&root)
265 .ok_or_else(|| ImportError::BlockNotFound(root.to_string()))?;
266 let commit: Ipld = serde_ipld_dagcbor::from_slice(root_block)
267 .map_err(|e| ImportError::InvalidCbor(e.to_string()))?;
268 let (data_cid, _commit_info) = extract_commit_info(&commit)?;
269 let records = walk_mst(&blocks, &data_cid)?;
270 debug!(
271 "Importing {} blocks and {} records for user {}",
272 blocks.len(),
273 records.len(),
274 user_id
275 );
276 let mut tx = db.begin().await?;
277 let repo = sqlx::query!(
278 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
279 user_id
280 )
281 .fetch_optional(&mut *tx)
282 .await
283 .map_err(|e| {
284 if let sqlx::Error::Database(ref db_err) = e {
285 if db_err.code().as_deref() == Some("55P03") {
286 return ImportError::ConcurrentModification;
287 }
288 }
289 ImportError::Database(e)
290 })?;
291 if repo.is_none() {
292 return Err(ImportError::RepoNotFound);
293 }
294 let block_chunks: Vec<Vec<(&Cid, &Bytes)>> = blocks
295 .iter()
296 .collect::<Vec<_>>()
297 .chunks(100)
298 .map(|c| c.to_vec())
299 .collect();
300 for chunk in block_chunks {
301 for (cid, data) in chunk {
302 let cid_bytes = cid.to_bytes();
303 sqlx::query!(
304 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
305 &cid_bytes,
306 data.as_ref()
307 )
308 .execute(&mut *tx)
309 .await?;
310 }
311 }
312 let root_str = root.to_string();
313 sqlx::query!(
314 "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2",
315 root_str,
316 user_id
317 )
318 .execute(&mut *tx)
319 .await?;
320 sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
321 .execute(&mut *tx)
322 .await?;
323 for record in &records {
324 let record_cid_str = record.cid.to_string();
325 sqlx::query!(
326 r#"
327 INSERT INTO records (repo_id, collection, rkey, record_cid)
328 VALUES ($1, $2, $3, $4)
329 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4
330 "#,
331 user_id,
332 record.collection,
333 record.rkey,
334 record_cid_str
335 )
336 .execute(&mut *tx)
337 .await?;
338 }
339 tx.commit().await?;
340 debug!(
341 "Successfully imported {} blocks and {} records",
342 blocks.len(),
343 records.len()
344 );
345 Ok(records)
346}
347#[cfg(test)]
348mod tests {
349 use super::*;
350 #[test]
351 fn test_find_blob_refs() {
352 let record = serde_json::json!({
353 "$type": "app.bsky.feed.post",
354 "text": "Hello world",
355 "embed": {
356 "$type": "app.bsky.embed.images",
357 "images": [
358 {
359 "alt": "Test image",
360 "image": {
361 "$type": "blob",
362 "ref": {
363 "$link": "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
364 },
365 "mimeType": "image/jpeg",
366 "size": 12345
367 }
368 }
369 ]
370 }
371 });
372 let blob_refs = find_blob_refs(&record, 0);
373 assert_eq!(blob_refs.len(), 1);
374 assert_eq!(
375 blob_refs[0].cid,
376 "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"
377 );
378 assert_eq!(blob_refs[0].mime_type, Some("image/jpeg".to_string()));
379 }
380 #[test]
381 fn test_find_blob_refs_no_blobs() {
382 let record = serde_json::json!({
383 "$type": "app.bsky.feed.post",
384 "text": "Hello world"
385 });
386 let blob_refs = find_blob_refs(&record, 0);
387 assert!(blob_refs.is_empty());
388 }
389 #[test]
390 fn test_find_blob_refs_depth_limit() {
391 fn deeply_nested(depth: usize) -> JsonValue {
392 if depth == 0 {
393 serde_json::json!({
394 "$type": "blob",
395 "ref": { "$link": "bafkreitest" },
396 "mimeType": "image/png"
397 })
398 } else {
399 serde_json::json!({ "nested": deeply_nested(depth - 1) })
400 }
401 }
402 let deep = deeply_nested(40);
403 let blob_refs = find_blob_refs(&deep, 0);
404 assert!(blob_refs.is_empty());
405 }
406}