this repo has no description
1use crate::state::AppState;
2use crate::types::{Did, Handle, Nsid, Rkey};
3use bytes::Bytes;
4use cid::Cid;
5use jacquard::types::{integer::LimitedU32, string::Tid};
6use jacquard_repo::commit::Commit;
7use jacquard_repo::storage::BlockStore;
8use k256::ecdsa::SigningKey;
9use serde_json::{Value, json};
10use std::str::FromStr;
11use uuid::Uuid;
12
13pub fn extract_blob_cids(record: &Value) -> Vec<String> {
14 let mut blobs = Vec::new();
15 extract_blob_cids_recursive(record, &mut blobs);
16 blobs
17}
18
19fn extract_blob_cids_recursive(value: &Value, blobs: &mut Vec<String>) {
20 match value {
21 Value::Object(map) => {
22 if map.get("$type").and_then(|v| v.as_str()) == Some("blob")
23 && let Some(ref_obj) = map.get("ref")
24 && let Some(link) = ref_obj.get("$link").and_then(|v| v.as_str())
25 {
26 blobs.push(link.to_string());
27 }
28 for v in map.values() {
29 extract_blob_cids_recursive(v, blobs);
30 }
31 }
32 Value::Array(arr) => {
33 for v in arr {
34 extract_blob_cids_recursive(v, blobs);
35 }
36 }
37 _ => {}
38 }
39}
40
41pub fn create_signed_commit(
42 did: &Did,
43 data: Cid,
44 rev: &str,
45 prev: Option<Cid>,
46 signing_key: &SigningKey,
47) -> Result<(Vec<u8>, Bytes), String> {
48 let did = jacquard::types::string::Did::new(did.as_str())
49 .map_err(|e| format!("Invalid DID: {:?}", e))?;
50 let rev =
51 jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?;
52 let unsigned = Commit::new_unsigned(did, data, rev, prev);
53 let signed = unsigned
54 .sign(signing_key)
55 .map_err(|e| format!("Failed to sign commit: {:?}", e))?;
56 let sig_bytes = signed.sig().clone();
57 let signed_bytes = signed
58 .to_cbor()
59 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?;
60 Ok((signed_bytes, sig_bytes))
61}
62
63pub enum RecordOp {
64 Create {
65 collection: Nsid,
66 rkey: Rkey,
67 cid: Cid,
68 },
69 Update {
70 collection: Nsid,
71 rkey: Rkey,
72 cid: Cid,
73 prev: Option<Cid>,
74 },
75 Delete {
76 collection: Nsid,
77 rkey: Rkey,
78 prev: Option<Cid>,
79 },
80}
81
82pub struct CommitResult {
83 pub commit_cid: Cid,
84 pub rev: String,
85}
86
87pub struct CommitParams<'a> {
88 pub did: &'a Did,
89 pub user_id: Uuid,
90 pub current_root_cid: Option<Cid>,
91 pub prev_data_cid: Option<Cid>,
92 pub new_mst_root: Cid,
93 pub ops: Vec<RecordOp>,
94 pub blocks_cids: &'a [String],
95 pub blobs: &'a [String],
96 pub obsolete_cids: Vec<Cid>,
97}
98
99pub async fn commit_and_log(
100 state: &AppState,
101 params: CommitParams<'_>,
102) -> Result<CommitResult, String> {
103 let CommitParams {
104 did,
105 user_id,
106 current_root_cid,
107 prev_data_cid,
108 new_mst_root,
109 ops,
110 blocks_cids,
111 blobs,
112 obsolete_cids,
113 } = params;
114 let key_row = sqlx::query!(
115 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
116 user_id
117 )
118 .fetch_one(&state.db)
119 .await
120 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
121 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
122 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
123 let signing_key =
124 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?;
125 let rev = Tid::now(LimitedU32::MIN);
126 let rev_str = rev.to_string();
127 let (new_commit_bytes, _sig) =
128 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?;
129 let new_root_cid = state
130 .block_store
131 .put(&new_commit_bytes)
132 .await
133 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
134 let mut tx = state
135 .db
136 .begin()
137 .await
138 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
139 let lock_result = sqlx::query!(
140 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
141 user_id
142 )
143 .fetch_optional(&mut *tx)
144 .await;
145 match lock_result {
146 Err(e) => {
147 if let Some(db_err) = e.as_database_error()
148 && db_err.code().as_deref() == Some("55P03")
149 {
150 return Err(
151 "ConcurrentModification: Another request is modifying this repo".to_string(),
152 );
153 }
154 return Err(format!("Failed to acquire repo lock: {}", e));
155 }
156 Ok(Some(row)) => {
157 if let Some(expected_root) = ¤t_root_cid
158 && row.repo_root_cid != expected_root.to_string()
159 {
160 return Err(
161 "ConcurrentModification: Repo has been modified since last read".to_string(),
162 );
163 }
164 }
165 Ok(None) => {
166 return Err("Repo not found".to_string());
167 }
168 }
169 let is_account_active = sqlx::query_scalar!(
170 "SELECT deactivated_at IS NULL FROM users WHERE id = $1",
171 user_id
172 )
173 .fetch_optional(&mut *tx)
174 .await
175 .map_err(|e| format!("Failed to check account status: {}", e))?
176 .flatten()
177 .unwrap_or(false);
178 sqlx::query!(
179 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3",
180 new_root_cid.to_string(),
181 &rev_str,
182 user_id
183 )
184 .execute(&mut *tx)
185 .await
186 .map_err(|e| format!("DB Error (repos): {}", e))?;
187 let mut all_block_cids: Vec<Vec<u8>> = blocks_cids
188 .iter()
189 .filter_map(|s| Cid::from_str(s).ok())
190 .map(|c| c.to_bytes())
191 .collect();
192 all_block_cids.push(new_root_cid.to_bytes());
193 if !all_block_cids.is_empty() {
194 sqlx::query!(
195 r#"
196 INSERT INTO user_blocks (user_id, block_cid)
197 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
198 ON CONFLICT (user_id, block_cid) DO NOTHING
199 "#,
200 user_id,
201 &all_block_cids
202 )
203 .execute(&mut *tx)
204 .await
205 .map_err(|e| format!("DB Error (user_blocks): {}", e))?;
206 }
207 if !obsolete_cids.is_empty() {
208 let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect();
209 sqlx::query!(
210 r#"
211 DELETE FROM user_blocks
212 WHERE user_id = $1
213 AND block_cid = ANY($2)
214 "#,
215 user_id,
216 &obsolete_bytes as &[Vec<u8>]
217 )
218 .execute(&mut *tx)
219 .await
220 .map_err(|e| format!("DB Error (user_blocks delete obsolete): {}", e))?;
221 }
222 let (upserts, deletes): (Vec<_>, Vec<_>) = ops
223 .iter()
224 .partition(|op| matches!(op, RecordOp::Create { .. } | RecordOp::Update { .. }));
225 let (upsert_collections, upsert_rkeys, upsert_cids): (Vec<String>, Vec<String>, Vec<String>) =
226 upserts
227 .into_iter()
228 .filter_map(|op| match op {
229 RecordOp::Create {
230 collection,
231 rkey,
232 cid,
233 }
234 | RecordOp::Update {
235 collection,
236 rkey,
237 cid,
238 ..
239 } => Some((collection.to_string(), rkey.to_string(), cid.to_string())),
240 _ => None,
241 })
242 .fold(
243 (Vec::new(), Vec::new(), Vec::new()),
244 |(mut cols, mut rkeys, mut cids), (c, r, ci)| {
245 cols.push(c);
246 rkeys.push(r);
247 cids.push(ci);
248 (cols, rkeys, cids)
249 },
250 );
251 let (delete_collections, delete_rkeys): (Vec<String>, Vec<String>) = deletes
252 .into_iter()
253 .filter_map(|op| match op {
254 RecordOp::Delete {
255 collection, rkey, ..
256 } => Some((collection.to_string(), rkey.to_string())),
257 _ => None,
258 })
259 .unzip();
260 if !upsert_collections.is_empty() {
261 sqlx::query!(
262 r#"
263 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev)
264 SELECT $1, collection, rkey, record_cid, $5
265 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid)
266 ON CONFLICT (repo_id, collection, rkey) DO UPDATE
267 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW()
268 "#,
269 user_id,
270 &upsert_collections,
271 &upsert_rkeys,
272 &upsert_cids,
273 rev_str
274 )
275 .execute(&mut *tx)
276 .await
277 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?;
278 }
279 if !delete_collections.is_empty() {
280 sqlx::query!(
281 r#"
282 DELETE FROM records
283 WHERE repo_id = $1
284 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[]))
285 "#,
286 user_id,
287 &delete_collections,
288 &delete_rkeys
289 )
290 .execute(&mut *tx)
291 .await
292 .map_err(|e| format!("DB Error (records batch delete): {}", e))?;
293 }
294 let ops_json = ops
295 .iter()
296 .map(|op| match op {
297 RecordOp::Create {
298 collection,
299 rkey,
300 cid,
301 } => json!({
302 "action": "create",
303 "path": format!("{}/{}", collection, rkey),
304 "cid": cid.to_string()
305 }),
306 RecordOp::Update {
307 collection,
308 rkey,
309 cid,
310 prev,
311 } => {
312 let mut obj = json!({
313 "action": "update",
314 "path": format!("{}/{}", collection, rkey),
315 "cid": cid.to_string()
316 });
317 if let Some(prev_cid) = prev {
318 obj["prev"] = json!(prev_cid.to_string());
319 }
320 obj
321 }
322 RecordOp::Delete {
323 collection,
324 rkey,
325 prev,
326 } => {
327 let mut obj = json!({
328 "action": "delete",
329 "path": format!("{}/{}", collection, rkey),
330 "cid": null
331 });
332 if let Some(prev_cid) = prev {
333 obj["prev"] = json!(prev_cid.to_string());
334 }
335 obj
336 }
337 })
338 .collect::<Vec<_>>();
339 if is_account_active {
340 let event_type = "commit";
341 let prev_cid_str = current_root_cid.map(|c| c.to_string());
342 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string());
343 let seq_row = sqlx::query!(
344 r#"
345 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid)
346 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
347 RETURNING seq
348 "#,
349 did.as_str(),
350 event_type,
351 new_root_cid.to_string(),
352 prev_cid_str,
353 json!(ops_json),
354 blobs,
355 blocks_cids,
356 prev_data_cid_str,
357 )
358 .fetch_one(&mut *tx)
359 .await
360 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
361 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
362 .execute(&mut *tx)
363 .await
364 .map_err(|e| format!("DB Error (notify): {}", e))?;
365 }
366 tx.commit()
367 .await
368 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
369 if is_account_active {
370 let _ = sequence_sync_event(state, did, &new_root_cid.to_string(), Some(&rev_str)).await;
371 }
372 Ok(CommitResult {
373 commit_cid: new_root_cid,
374 rev: rev_str,
375 })
376}
377pub async fn create_record_internal(
378 state: &AppState,
379 did: &Did,
380 collection: &Nsid,
381 rkey: &Rkey,
382 record: &serde_json::Value,
383) -> Result<(String, Cid), String> {
384 use crate::repo::tracking::TrackingBlockStore;
385 use jacquard_repo::mst::Mst;
386 use std::sync::Arc;
387 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
388 .fetch_optional(&state.db)
389 .await
390 .map_err(|e| format!("DB error: {}", e))?
391 .ok_or_else(|| "User not found".to_string())?;
392 let root_cid_str: String = sqlx::query_scalar!(
393 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
394 user_id
395 )
396 .fetch_optional(&state.db)
397 .await
398 .map_err(|e| format!("DB error: {}", e))?
399 .ok_or_else(|| "Repo not found".to_string())?;
400 let current_root_cid =
401 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?;
402 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
403 let commit_bytes = tracking_store
404 .get(¤t_root_cid)
405 .await
406 .map_err(|e| format!("Failed to fetch commit: {:?}", e))?
407 .ok_or_else(|| "Commit block not found".to_string())?;
408 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes)
409 .map_err(|e| format!("Failed to parse commit: {:?}", e))?;
410 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
411 let record_ipld = crate::util::json_to_ipld(record);
412 let mut record_bytes = Vec::new();
413 serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld)
414 .map_err(|e| format!("Failed to serialize record: {:?}", e))?;
415 let record_cid = tracking_store
416 .put(&record_bytes)
417 .await
418 .map_err(|e| format!("Failed to save record block: {:?}", e))?;
419 let key = format!("{}/{}", collection, rkey);
420 let new_mst = mst
421 .add(&key, record_cid)
422 .await
423 .map_err(|e| format!("Failed to add to MST: {:?}", e))?;
424 let new_mst_root = new_mst
425 .persist()
426 .await
427 .map_err(|e| format!("Failed to persist MST: {:?}", e))?;
428 let op = RecordOp::Create {
429 collection: collection.clone(),
430 rkey: rkey.clone(),
431 cid: record_cid,
432 };
433 let mut new_mst_blocks = std::collections::BTreeMap::new();
434 let mut old_mst_blocks = std::collections::BTreeMap::new();
435 new_mst
436 .blocks_for_path(&key, &mut new_mst_blocks)
437 .await
438 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?;
439 mst.blocks_for_path(&key, &mut old_mst_blocks)
440 .await
441 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?;
442 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
443 .chain(
444 old_mst_blocks
445 .keys()
446 .filter(|cid| !new_mst_blocks.contains_key(*cid))
447 .copied(),
448 )
449 .collect();
450 let mut relevant_blocks = new_mst_blocks;
451 relevant_blocks.extend(old_mst_blocks);
452 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
453 let written_cids: Vec<Cid> = tracking_store
454 .get_all_relevant_cids()
455 .into_iter()
456 .chain(relevant_blocks.keys().copied())
457 .collect::<std::collections::HashSet<_>>()
458 .into_iter()
459 .collect();
460 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
461 let blob_cids = extract_blob_cids(record);
462 let result = commit_and_log(
463 state,
464 CommitParams {
465 did,
466 user_id,
467 current_root_cid: Some(current_root_cid),
468 prev_data_cid: Some(commit.data),
469 new_mst_root,
470 ops: vec![op],
471 blocks_cids: &written_cids_str,
472 blobs: &blob_cids,
473 obsolete_cids,
474 },
475 )
476 .await?;
477 let uri = format!("at://{}/{}/{}", did, collection, rkey);
478 Ok((uri, result.commit_cid))
479}
480
481pub async fn sequence_identity_event(
482 state: &AppState,
483 did: &Did,
484 handle: Option<&Handle>,
485) -> Result<i64, String> {
486 let mut tx = state
487 .db
488 .begin()
489 .await
490 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
491 let seq_row = sqlx::query!(
492 r#"
493 INSERT INTO repo_seq (did, event_type, handle)
494 VALUES ($1, 'identity', $2)
495 RETURNING seq
496 "#,
497 did.as_str(),
498 handle.map(|h| h.as_str()),
499 )
500 .fetch_one(&mut *tx)
501 .await
502 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
503 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
504 .execute(&mut *tx)
505 .await
506 .map_err(|e| format!("DB Error (notify): {}", e))?;
507 tx.commit()
508 .await
509 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
510 Ok(seq_row.seq)
511}
512pub async fn sequence_account_event(
513 state: &AppState,
514 did: &Did,
515 active: bool,
516 status: Option<&str>,
517) -> Result<i64, String> {
518 let mut tx = state
519 .db
520 .begin()
521 .await
522 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
523 let seq_row = sqlx::query!(
524 r#"
525 INSERT INTO repo_seq (did, event_type, active, status)
526 VALUES ($1, 'account', $2, $3)
527 RETURNING seq
528 "#,
529 did.as_str(),
530 active,
531 status,
532 )
533 .fetch_one(&mut *tx)
534 .await
535 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
536 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
537 .execute(&mut *tx)
538 .await
539 .map_err(|e| format!("DB Error (notify): {}", e))?;
540 tx.commit()
541 .await
542 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
543 Ok(seq_row.seq)
544}
545pub async fn sequence_sync_event(
546 state: &AppState,
547 did: &Did,
548 commit_cid: &str,
549 rev: Option<&str>,
550) -> Result<i64, String> {
551 let mut tx = state
552 .db
553 .begin()
554 .await
555 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
556 let seq_row = sqlx::query!(
557 r#"
558 INSERT INTO repo_seq (did, event_type, commit_cid, rev)
559 VALUES ($1, 'sync', $2, $3)
560 RETURNING seq
561 "#,
562 did.as_str(),
563 commit_cid,
564 rev,
565 )
566 .fetch_one(&mut *tx)
567 .await
568 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
569 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
570 .execute(&mut *tx)
571 .await
572 .map_err(|e| format!("DB Error (notify): {}", e))?;
573 tx.commit()
574 .await
575 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
576 Ok(seq_row.seq)
577}
578
579pub async fn sequence_genesis_commit(
580 state: &AppState,
581 did: &Did,
582 commit_cid: &Cid,
583 mst_root_cid: &Cid,
584 rev: &str,
585) -> Result<i64, String> {
586 let ops = serde_json::json!([]);
587 let blobs: Vec<String> = vec![];
588 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
589 let prev_cid: Option<&str> = None;
590 let commit_cid_str = commit_cid.to_string();
591 let mut tx = state
592 .db
593 .begin()
594 .await
595 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
596 let seq_row = sqlx::query!(
597 r#"
598 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev)
599 VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7)
600 RETURNING seq
601 "#,
602 did.as_str(),
603 commit_cid_str,
604 prev_cid,
605 ops,
606 &blobs,
607 &blocks_cids,
608 rev
609 )
610 .fetch_one(&mut *tx)
611 .await
612 .map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?;
613 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
614 .execute(&mut *tx)
615 .await
616 .map_err(|e| format!("DB Error (notify): {}", e))?;
617 tx.commit()
618 .await
619 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
620 Ok(seq_row.seq)
621}