this repo has no description
1use crate::state::AppState;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard::types::{integer::LimitedU32, string::Tid};
5use jacquard_repo::commit::Commit;
6use jacquard_repo::storage::BlockStore;
7use k256::ecdsa::SigningKey;
8use serde_json::{Value, json};
9use std::str::FromStr;
10use uuid::Uuid;
11
12pub fn extract_blob_cids(record: &Value) -> Vec<String> {
13 let mut blobs = Vec::new();
14 extract_blob_cids_recursive(record, &mut blobs);
15 blobs
16}
17
18fn extract_blob_cids_recursive(value: &Value, blobs: &mut Vec<String>) {
19 match value {
20 Value::Object(map) => {
21 if map.get("$type").and_then(|v| v.as_str()) == Some("blob")
22 && let Some(ref_obj) = map.get("ref")
23 && let Some(link) = ref_obj.get("$link").and_then(|v| v.as_str())
24 {
25 blobs.push(link.to_string());
26 }
27 for v in map.values() {
28 extract_blob_cids_recursive(v, blobs);
29 }
30 }
31 Value::Array(arr) => {
32 for v in arr {
33 extract_blob_cids_recursive(v, blobs);
34 }
35 }
36 _ => {}
37 }
38}
39
40pub fn create_signed_commit(
41 did: &str,
42 data: Cid,
43 rev: &str,
44 prev: Option<Cid>,
45 signing_key: &SigningKey,
46) -> Result<(Vec<u8>, Bytes), String> {
47 let did =
48 jacquard::types::string::Did::new(did).map_err(|e| format!("Invalid DID: {:?}", e))?;
49 let rev =
50 jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?;
51 let unsigned = Commit::new_unsigned(did, data, rev, prev);
52 let signed = unsigned
53 .sign(signing_key)
54 .map_err(|e| format!("Failed to sign commit: {:?}", e))?;
55 let sig_bytes = signed.sig().clone();
56 let signed_bytes = signed
57 .to_cbor()
58 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?;
59 Ok((signed_bytes, sig_bytes))
60}
61
62pub enum RecordOp {
63 Create {
64 collection: String,
65 rkey: String,
66 cid: Cid,
67 },
68 Update {
69 collection: String,
70 rkey: String,
71 cid: Cid,
72 prev: Option<Cid>,
73 },
74 Delete {
75 collection: String,
76 rkey: String,
77 prev: Option<Cid>,
78 },
79}
80
81pub struct CommitResult {
82 pub commit_cid: Cid,
83 pub rev: String,
84}
85
86pub struct CommitParams<'a> {
87 pub did: &'a str,
88 pub user_id: Uuid,
89 pub current_root_cid: Option<Cid>,
90 pub prev_data_cid: Option<Cid>,
91 pub new_mst_root: Cid,
92 pub ops: Vec<RecordOp>,
93 pub blocks_cids: &'a [String],
94 pub blobs: &'a [String],
95 pub obsolete_cids: Vec<Cid>,
96}
97
98pub async fn commit_and_log(
99 state: &AppState,
100 params: CommitParams<'_>,
101) -> Result<CommitResult, String> {
102 let CommitParams {
103 did,
104 user_id,
105 current_root_cid,
106 prev_data_cid,
107 new_mst_root,
108 ops,
109 blocks_cids,
110 blobs,
111 obsolete_cids,
112 } = params;
113 let key_row = sqlx::query!(
114 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
115 user_id
116 )
117 .fetch_one(&state.db)
118 .await
119 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
120 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
121 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
122 let signing_key =
123 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?;
124 let rev = Tid::now(LimitedU32::MIN);
125 let rev_str = rev.to_string();
126 let (new_commit_bytes, _sig) =
127 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?;
128 let new_root_cid = state
129 .block_store
130 .put(&new_commit_bytes)
131 .await
132 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
133 let mut tx = state
134 .db
135 .begin()
136 .await
137 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
138 let lock_result = sqlx::query!(
139 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
140 user_id
141 )
142 .fetch_optional(&mut *tx)
143 .await;
144 match lock_result {
145 Err(e) => {
146 if let Some(db_err) = e.as_database_error()
147 && db_err.code().as_deref() == Some("55P03")
148 {
149 return Err(
150 "ConcurrentModification: Another request is modifying this repo".to_string(),
151 );
152 }
153 return Err(format!("Failed to acquire repo lock: {}", e));
154 }
155 Ok(Some(row)) => {
156 if let Some(expected_root) = ¤t_root_cid
157 && row.repo_root_cid != expected_root.to_string()
158 {
159 return Err(
160 "ConcurrentModification: Repo has been modified since last read".to_string(),
161 );
162 }
163 }
164 Ok(None) => {
165 return Err("Repo not found".to_string());
166 }
167 }
168 let is_account_active = sqlx::query_scalar!(
169 "SELECT deactivated_at IS NULL FROM users WHERE id = $1",
170 user_id
171 )
172 .fetch_optional(&mut *tx)
173 .await
174 .map_err(|e| format!("Failed to check account status: {}", e))?
175 .flatten()
176 .unwrap_or(false);
177 sqlx::query!(
178 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3",
179 new_root_cid.to_string(),
180 &rev_str,
181 user_id
182 )
183 .execute(&mut *tx)
184 .await
185 .map_err(|e| format!("DB Error (repos): {}", e))?;
186 let mut all_block_cids: Vec<Vec<u8>> = blocks_cids
187 .iter()
188 .filter_map(|s| Cid::from_str(s).ok())
189 .map(|c| c.to_bytes())
190 .collect();
191 all_block_cids.push(new_root_cid.to_bytes());
192 if !all_block_cids.is_empty() {
193 sqlx::query!(
194 r#"
195 INSERT INTO user_blocks (user_id, block_cid)
196 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
197 ON CONFLICT (user_id, block_cid) DO NOTHING
198 "#,
199 user_id,
200 &all_block_cids
201 )
202 .execute(&mut *tx)
203 .await
204 .map_err(|e| format!("DB Error (user_blocks): {}", e))?;
205 }
206 if !obsolete_cids.is_empty() {
207 let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect();
208 sqlx::query!(
209 r#"
210 DELETE FROM user_blocks
211 WHERE user_id = $1
212 AND block_cid = ANY($2)
213 "#,
214 user_id,
215 &obsolete_bytes as &[Vec<u8>]
216 )
217 .execute(&mut *tx)
218 .await
219 .map_err(|e| format!("DB Error (user_blocks delete obsolete): {}", e))?;
220 }
221 let mut upsert_collections: Vec<String> = Vec::new();
222 let mut upsert_rkeys: Vec<String> = Vec::new();
223 let mut upsert_cids: Vec<String> = Vec::new();
224 let mut delete_collections: Vec<String> = Vec::new();
225 let mut delete_rkeys: Vec<String> = Vec::new();
226 for op in &ops {
227 match op {
228 RecordOp::Create {
229 collection,
230 rkey,
231 cid,
232 }
233 | RecordOp::Update {
234 collection,
235 rkey,
236 cid,
237 ..
238 } => {
239 upsert_collections.push(collection.clone());
240 upsert_rkeys.push(rkey.clone());
241 upsert_cids.push(cid.to_string());
242 }
243 RecordOp::Delete {
244 collection, rkey, ..
245 } => {
246 delete_collections.push(collection.clone());
247 delete_rkeys.push(rkey.clone());
248 }
249 }
250 }
251 if !upsert_collections.is_empty() {
252 sqlx::query!(
253 r#"
254 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev)
255 SELECT $1, collection, rkey, record_cid, $5
256 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid)
257 ON CONFLICT (repo_id, collection, rkey) DO UPDATE
258 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW()
259 "#,
260 user_id,
261 &upsert_collections,
262 &upsert_rkeys,
263 &upsert_cids,
264 rev_str
265 )
266 .execute(&mut *tx)
267 .await
268 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?;
269 }
270 if !delete_collections.is_empty() {
271 sqlx::query!(
272 r#"
273 DELETE FROM records
274 WHERE repo_id = $1
275 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[]))
276 "#,
277 user_id,
278 &delete_collections,
279 &delete_rkeys
280 )
281 .execute(&mut *tx)
282 .await
283 .map_err(|e| format!("DB Error (records batch delete): {}", e))?;
284 }
285 let ops_json = ops
286 .iter()
287 .map(|op| match op {
288 RecordOp::Create {
289 collection,
290 rkey,
291 cid,
292 } => json!({
293 "action": "create",
294 "path": format!("{}/{}", collection, rkey),
295 "cid": cid.to_string()
296 }),
297 RecordOp::Update {
298 collection,
299 rkey,
300 cid,
301 prev,
302 } => {
303 let mut obj = json!({
304 "action": "update",
305 "path": format!("{}/{}", collection, rkey),
306 "cid": cid.to_string()
307 });
308 if let Some(prev_cid) = prev {
309 obj["prev"] = json!(prev_cid.to_string());
310 }
311 obj
312 }
313 RecordOp::Delete {
314 collection,
315 rkey,
316 prev,
317 } => {
318 let mut obj = json!({
319 "action": "delete",
320 "path": format!("{}/{}", collection, rkey),
321 "cid": null
322 });
323 if let Some(prev_cid) = prev {
324 obj["prev"] = json!(prev_cid.to_string());
325 }
326 obj
327 }
328 })
329 .collect::<Vec<_>>();
330 if is_account_active {
331 let event_type = "commit";
332 let prev_cid_str = current_root_cid.map(|c| c.to_string());
333 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string());
334 let seq_row = sqlx::query!(
335 r#"
336 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid)
337 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
338 RETURNING seq
339 "#,
340 did,
341 event_type,
342 new_root_cid.to_string(),
343 prev_cid_str,
344 json!(ops_json),
345 blobs,
346 blocks_cids,
347 prev_data_cid_str,
348 )
349 .fetch_one(&mut *tx)
350 .await
351 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
352 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
353 .execute(&mut *tx)
354 .await
355 .map_err(|e| format!("DB Error (notify): {}", e))?;
356 }
357 tx.commit()
358 .await
359 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
360 if is_account_active {
361 let _ = sequence_sync_event(state, did, &new_root_cid.to_string(), Some(&rev_str)).await;
362 }
363 Ok(CommitResult {
364 commit_cid: new_root_cid,
365 rev: rev_str,
366 })
367}
368pub async fn create_record_internal(
369 state: &AppState,
370 did: &str,
371 collection: &str,
372 rkey: &str,
373 record: &serde_json::Value,
374) -> Result<(String, Cid), String> {
375 use crate::repo::tracking::TrackingBlockStore;
376 use jacquard_repo::mst::Mst;
377 use std::sync::Arc;
378 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
379 .fetch_optional(&state.db)
380 .await
381 .map_err(|e| format!("DB error: {}", e))?
382 .ok_or_else(|| "User not found".to_string())?;
383 let root_cid_str: String = sqlx::query_scalar!(
384 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
385 user_id
386 )
387 .fetch_optional(&state.db)
388 .await
389 .map_err(|e| format!("DB error: {}", e))?
390 .ok_or_else(|| "Repo not found".to_string())?;
391 let current_root_cid =
392 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?;
393 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
394 let commit_bytes = tracking_store
395 .get(¤t_root_cid)
396 .await
397 .map_err(|e| format!("Failed to fetch commit: {:?}", e))?
398 .ok_or_else(|| "Commit block not found".to_string())?;
399 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes)
400 .map_err(|e| format!("Failed to parse commit: {:?}", e))?;
401 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
402 let record_ipld = crate::util::json_to_ipld(record);
403 let mut record_bytes = Vec::new();
404 serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld)
405 .map_err(|e| format!("Failed to serialize record: {:?}", e))?;
406 let record_cid = tracking_store
407 .put(&record_bytes)
408 .await
409 .map_err(|e| format!("Failed to save record block: {:?}", e))?;
410 let key = format!("{}/{}", collection, rkey);
411 let new_mst = mst
412 .add(&key, record_cid)
413 .await
414 .map_err(|e| format!("Failed to add to MST: {:?}", e))?;
415 let new_mst_root = new_mst
416 .persist()
417 .await
418 .map_err(|e| format!("Failed to persist MST: {:?}", e))?;
419 let op = RecordOp::Create {
420 collection: collection.to_string(),
421 rkey: rkey.to_string(),
422 cid: record_cid,
423 };
424 let mut new_mst_blocks = std::collections::BTreeMap::new();
425 let mut old_mst_blocks = std::collections::BTreeMap::new();
426 new_mst
427 .blocks_for_path(&key, &mut new_mst_blocks)
428 .await
429 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?;
430 mst.blocks_for_path(&key, &mut old_mst_blocks)
431 .await
432 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?;
433 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
434 .chain(
435 old_mst_blocks
436 .keys()
437 .filter(|cid| !new_mst_blocks.contains_key(*cid))
438 .copied(),
439 )
440 .collect();
441 let mut relevant_blocks = new_mst_blocks;
442 relevant_blocks.extend(old_mst_blocks);
443 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
444 let written_cids: Vec<Cid> = tracking_store
445 .get_all_relevant_cids()
446 .into_iter()
447 .chain(relevant_blocks.keys().copied())
448 .collect::<std::collections::HashSet<_>>()
449 .into_iter()
450 .collect();
451 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
452 let blob_cids = extract_blob_cids(record);
453 let result = commit_and_log(
454 state,
455 CommitParams {
456 did,
457 user_id,
458 current_root_cid: Some(current_root_cid),
459 prev_data_cid: Some(commit.data),
460 new_mst_root,
461 ops: vec![op],
462 blocks_cids: &written_cids_str,
463 blobs: &blob_cids,
464 obsolete_cids,
465 },
466 )
467 .await?;
468 let uri = format!("at://{}/{}/{}", did, collection, rkey);
469 Ok((uri, result.commit_cid))
470}
471
472pub async fn sequence_identity_event(
473 state: &AppState,
474 did: &str,
475 handle: Option<&str>,
476) -> Result<i64, String> {
477 let seq_row = sqlx::query!(
478 r#"
479 INSERT INTO repo_seq (did, event_type, handle)
480 VALUES ($1, 'identity', $2)
481 RETURNING seq
482 "#,
483 did,
484 handle,
485 )
486 .fetch_one(&state.db)
487 .await
488 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
489 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
490 .execute(&state.db)
491 .await
492 .map_err(|e| format!("DB Error (notify): {}", e))?;
493 Ok(seq_row.seq)
494}
495pub async fn sequence_account_event(
496 state: &AppState,
497 did: &str,
498 active: bool,
499 status: Option<&str>,
500) -> Result<i64, String> {
501 let seq_row = sqlx::query!(
502 r#"
503 INSERT INTO repo_seq (did, event_type, active, status)
504 VALUES ($1, 'account', $2, $3)
505 RETURNING seq
506 "#,
507 did,
508 active,
509 status,
510 )
511 .fetch_one(&state.db)
512 .await
513 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
514 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
515 .execute(&state.db)
516 .await
517 .map_err(|e| format!("DB Error (notify): {}", e))?;
518 Ok(seq_row.seq)
519}
520pub async fn sequence_sync_event(
521 state: &AppState,
522 did: &str,
523 commit_cid: &str,
524 rev: Option<&str>,
525) -> Result<i64, String> {
526 let seq_row = sqlx::query!(
527 r#"
528 INSERT INTO repo_seq (did, event_type, commit_cid, rev)
529 VALUES ($1, 'sync', $2, $3)
530 RETURNING seq
531 "#,
532 did,
533 commit_cid,
534 rev,
535 )
536 .fetch_one(&state.db)
537 .await
538 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
539 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
540 .execute(&state.db)
541 .await
542 .map_err(|e| format!("DB Error (notify): {}", e))?;
543 Ok(seq_row.seq)
544}
545
546pub async fn sequence_genesis_commit(
547 state: &AppState,
548 did: &str,
549 commit_cid: &Cid,
550 mst_root_cid: &Cid,
551 rev: &str,
552) -> Result<i64, String> {
553 let ops = serde_json::json!([]);
554 let blobs: Vec<String> = vec![];
555 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
556 let prev_cid: Option<&str> = None;
557 let commit_cid_str = commit_cid.to_string();
558 let seq_row = sqlx::query!(
559 r#"
560 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev)
561 VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7)
562 RETURNING seq
563 "#,
564 did,
565 commit_cid_str,
566 prev_cid,
567 ops,
568 &blobs,
569 &blocks_cids,
570 rev
571 )
572 .fetch_one(&state.db)
573 .await
574 .map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?;
575 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
576 .execute(&state.db)
577 .await
578 .map_err(|e| format!("DB Error (notify): {}", e))?;
579 Ok(seq_row.seq)
580}