this repo has no description
1use crate::state::AppState;
2use crate::types::{Did, Handle, Nsid, Rkey};
3use bytes::Bytes;
4use cid::Cid;
5use jacquard::types::{integer::LimitedU32, string::Tid};
6use jacquard_repo::commit::Commit;
7use jacquard_repo::storage::BlockStore;
8use k256::ecdsa::SigningKey;
9use serde_json::{Value, json};
10use std::str::FromStr;
11use uuid::Uuid;
12
13pub fn extract_blob_cids(record: &Value) -> Vec<String> {
14 let mut blobs = Vec::new();
15 extract_blob_cids_recursive(record, &mut blobs);
16 blobs
17}
18
19fn extract_blob_cids_recursive(value: &Value, blobs: &mut Vec<String>) {
20 match value {
21 Value::Object(map) => {
22 if map.get("$type").and_then(|v| v.as_str()) == Some("blob")
23 && let Some(ref_obj) = map.get("ref")
24 && let Some(link) = ref_obj.get("$link").and_then(|v| v.as_str())
25 {
26 blobs.push(link.to_string());
27 }
28 map.values()
29 .for_each(|v| extract_blob_cids_recursive(v, blobs));
30 }
31 Value::Array(arr) => {
32 arr.iter()
33 .for_each(|v| extract_blob_cids_recursive(v, blobs));
34 }
35 _ => {}
36 }
37}
38
39pub fn create_signed_commit(
40 did: &Did,
41 data: Cid,
42 rev: &str,
43 prev: Option<Cid>,
44 signing_key: &SigningKey,
45) -> Result<(Vec<u8>, Bytes), String> {
46 let did = jacquard::types::string::Did::new(did.as_str())
47 .map_err(|e| format!("Invalid DID: {:?}", e))?;
48 let rev =
49 jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?;
50 let unsigned = Commit::new_unsigned(did, data, rev, prev);
51 let signed = unsigned
52 .sign(signing_key)
53 .map_err(|e| format!("Failed to sign commit: {:?}", e))?;
54 let sig_bytes = signed.sig().clone();
55 let signed_bytes = signed
56 .to_cbor()
57 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?;
58 Ok((signed_bytes, sig_bytes))
59}
60
61pub enum RecordOp {
62 Create {
63 collection: Nsid,
64 rkey: Rkey,
65 cid: Cid,
66 },
67 Update {
68 collection: Nsid,
69 rkey: Rkey,
70 cid: Cid,
71 prev: Option<Cid>,
72 },
73 Delete {
74 collection: Nsid,
75 rkey: Rkey,
76 prev: Option<Cid>,
77 },
78}
79
80pub struct CommitResult {
81 pub commit_cid: Cid,
82 pub rev: String,
83}
84
85pub struct CommitParams<'a> {
86 pub did: &'a Did,
87 pub user_id: Uuid,
88 pub current_root_cid: Option<Cid>,
89 pub prev_data_cid: Option<Cid>,
90 pub new_mst_root: Cid,
91 pub ops: Vec<RecordOp>,
92 pub blocks_cids: &'a [String],
93 pub blobs: &'a [String],
94 pub obsolete_cids: Vec<Cid>,
95}
96
97pub async fn commit_and_log(
98 state: &AppState,
99 params: CommitParams<'_>,
100) -> Result<CommitResult, String> {
101 let CommitParams {
102 did,
103 user_id,
104 current_root_cid,
105 prev_data_cid,
106 new_mst_root,
107 ops,
108 blocks_cids,
109 blobs,
110 obsolete_cids,
111 } = params;
112 let key_row = sqlx::query!(
113 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
114 user_id
115 )
116 .fetch_one(&state.db)
117 .await
118 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
119 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
120 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
121 let signing_key =
122 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?;
123 let rev = Tid::now(LimitedU32::MIN);
124 let rev_str = rev.to_string();
125 let (new_commit_bytes, _sig) =
126 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?;
127 let new_root_cid = state
128 .block_store
129 .put(&new_commit_bytes)
130 .await
131 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
132 let mut tx = state
133 .db
134 .begin()
135 .await
136 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
137 let lock_result = sqlx::query!(
138 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
139 user_id
140 )
141 .fetch_optional(&mut *tx)
142 .await;
143 match lock_result {
144 Err(e) => {
145 if let Some(db_err) = e.as_database_error()
146 && db_err.code().as_deref() == Some("55P03")
147 {
148 return Err(
149 "ConcurrentModification: Another request is modifying this repo".to_string(),
150 );
151 }
152 return Err(format!("Failed to acquire repo lock: {}", e));
153 }
154 Ok(Some(row)) => {
155 if let Some(expected_root) = ¤t_root_cid
156 && row.repo_root_cid != expected_root.to_string()
157 {
158 return Err(
159 "ConcurrentModification: Repo has been modified since last read".to_string(),
160 );
161 }
162 }
163 Ok(None) => {
164 return Err("Repo not found".to_string());
165 }
166 }
167 let is_account_active = sqlx::query_scalar!(
168 "SELECT deactivated_at IS NULL FROM users WHERE id = $1",
169 user_id
170 )
171 .fetch_optional(&mut *tx)
172 .await
173 .map_err(|e| format!("Failed to check account status: {}", e))?
174 .flatten()
175 .unwrap_or(false);
176 sqlx::query!(
177 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3",
178 new_root_cid.to_string(),
179 &rev_str,
180 user_id
181 )
182 .execute(&mut *tx)
183 .await
184 .map_err(|e| format!("DB Error (repos): {}", e))?;
185 let mut all_block_cids: Vec<Vec<u8>> = blocks_cids
186 .iter()
187 .filter_map(|s| Cid::from_str(s).ok())
188 .map(|c| c.to_bytes())
189 .collect();
190 all_block_cids.push(new_root_cid.to_bytes());
191 if !all_block_cids.is_empty() {
192 sqlx::query!(
193 r#"
194 INSERT INTO user_blocks (user_id, block_cid)
195 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
196 ON CONFLICT (user_id, block_cid) DO NOTHING
197 "#,
198 user_id,
199 &all_block_cids
200 )
201 .execute(&mut *tx)
202 .await
203 .map_err(|e| format!("DB Error (user_blocks): {}", e))?;
204 }
205 if !obsolete_cids.is_empty() {
206 let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect();
207 sqlx::query!(
208 r#"
209 DELETE FROM user_blocks
210 WHERE user_id = $1
211 AND block_cid = ANY($2)
212 "#,
213 user_id,
214 &obsolete_bytes as &[Vec<u8>]
215 )
216 .execute(&mut *tx)
217 .await
218 .map_err(|e| format!("DB Error (user_blocks delete obsolete): {}", e))?;
219 }
220 let (upserts, deletes): (Vec<_>, Vec<_>) = ops
221 .iter()
222 .partition(|op| matches!(op, RecordOp::Create { .. } | RecordOp::Update { .. }));
223 let (upsert_collections, upsert_rkeys, upsert_cids): (Vec<String>, Vec<String>, Vec<String>) =
224 upserts
225 .into_iter()
226 .filter_map(|op| match op {
227 RecordOp::Create {
228 collection,
229 rkey,
230 cid,
231 }
232 | RecordOp::Update {
233 collection,
234 rkey,
235 cid,
236 ..
237 } => Some((collection.to_string(), rkey.to_string(), cid.to_string())),
238 _ => None,
239 })
240 .fold(
241 (Vec::new(), Vec::new(), Vec::new()),
242 |(mut cols, mut rkeys, mut cids), (c, r, ci)| {
243 cols.push(c);
244 rkeys.push(r);
245 cids.push(ci);
246 (cols, rkeys, cids)
247 },
248 );
249 let (delete_collections, delete_rkeys): (Vec<String>, Vec<String>) = deletes
250 .into_iter()
251 .filter_map(|op| match op {
252 RecordOp::Delete {
253 collection, rkey, ..
254 } => Some((collection.to_string(), rkey.to_string())),
255 _ => None,
256 })
257 .unzip();
258 if !upsert_collections.is_empty() {
259 sqlx::query!(
260 r#"
261 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev)
262 SELECT $1, collection, rkey, record_cid, $5
263 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid)
264 ON CONFLICT (repo_id, collection, rkey) DO UPDATE
265 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW()
266 "#,
267 user_id,
268 &upsert_collections,
269 &upsert_rkeys,
270 &upsert_cids,
271 rev_str
272 )
273 .execute(&mut *tx)
274 .await
275 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?;
276 }
277 if !delete_collections.is_empty() {
278 sqlx::query!(
279 r#"
280 DELETE FROM records
281 WHERE repo_id = $1
282 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[]))
283 "#,
284 user_id,
285 &delete_collections,
286 &delete_rkeys
287 )
288 .execute(&mut *tx)
289 .await
290 .map_err(|e| format!("DB Error (records batch delete): {}", e))?;
291 }
292 let ops_json = ops
293 .iter()
294 .map(|op| match op {
295 RecordOp::Create {
296 collection,
297 rkey,
298 cid,
299 } => json!({
300 "action": "create",
301 "path": format!("{}/{}", collection, rkey),
302 "cid": cid.to_string()
303 }),
304 RecordOp::Update {
305 collection,
306 rkey,
307 cid,
308 prev,
309 } => {
310 let mut obj = json!({
311 "action": "update",
312 "path": format!("{}/{}", collection, rkey),
313 "cid": cid.to_string()
314 });
315 if let Some(prev_cid) = prev {
316 obj["prev"] = json!(prev_cid.to_string());
317 }
318 obj
319 }
320 RecordOp::Delete {
321 collection,
322 rkey,
323 prev,
324 } => {
325 let mut obj = json!({
326 "action": "delete",
327 "path": format!("{}/{}", collection, rkey),
328 "cid": null
329 });
330 if let Some(prev_cid) = prev {
331 obj["prev"] = json!(prev_cid.to_string());
332 }
333 obj
334 }
335 })
336 .collect::<Vec<_>>();
337 if is_account_active {
338 let event_type = "commit";
339 let prev_cid_str = current_root_cid.map(|c| c.to_string());
340 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string());
341 let seq_row = sqlx::query!(
342 r#"
343 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid)
344 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
345 RETURNING seq
346 "#,
347 did.as_str(),
348 event_type,
349 new_root_cid.to_string(),
350 prev_cid_str,
351 json!(ops_json),
352 blobs,
353 blocks_cids,
354 prev_data_cid_str,
355 )
356 .fetch_one(&mut *tx)
357 .await
358 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
359 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
360 .execute(&mut *tx)
361 .await
362 .map_err(|e| format!("DB Error (notify): {}", e))?;
363 }
364 tx.commit()
365 .await
366 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
367 if is_account_active {
368 let _ = sequence_sync_event(state, did, &new_root_cid.to_string(), Some(&rev_str)).await;
369 }
370 Ok(CommitResult {
371 commit_cid: new_root_cid,
372 rev: rev_str,
373 })
374}
375pub async fn create_record_internal(
376 state: &AppState,
377 did: &Did,
378 collection: &Nsid,
379 rkey: &Rkey,
380 record: &serde_json::Value,
381) -> Result<(String, Cid), String> {
382 use crate::repo::tracking::TrackingBlockStore;
383 use jacquard_repo::mst::Mst;
384 use std::sync::Arc;
385 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
386 .fetch_optional(&state.db)
387 .await
388 .map_err(|e| format!("DB error: {}", e))?
389 .ok_or_else(|| "User not found".to_string())?;
390 let root_cid_str: String = sqlx::query_scalar!(
391 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
392 user_id
393 )
394 .fetch_optional(&state.db)
395 .await
396 .map_err(|e| format!("DB error: {}", e))?
397 .ok_or_else(|| "Repo not found".to_string())?;
398 let current_root_cid =
399 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?;
400 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
401 let commit_bytes = tracking_store
402 .get(¤t_root_cid)
403 .await
404 .map_err(|e| format!("Failed to fetch commit: {:?}", e))?
405 .ok_or_else(|| "Commit block not found".to_string())?;
406 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes)
407 .map_err(|e| format!("Failed to parse commit: {:?}", e))?;
408 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
409 let record_ipld = crate::util::json_to_ipld(record);
410 let mut record_bytes = Vec::new();
411 serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld)
412 .map_err(|e| format!("Failed to serialize record: {:?}", e))?;
413 let record_cid = tracking_store
414 .put(&record_bytes)
415 .await
416 .map_err(|e| format!("Failed to save record block: {:?}", e))?;
417 let key = format!("{}/{}", collection, rkey);
418 let new_mst = mst
419 .add(&key, record_cid)
420 .await
421 .map_err(|e| format!("Failed to add to MST: {:?}", e))?;
422 let new_mst_root = new_mst
423 .persist()
424 .await
425 .map_err(|e| format!("Failed to persist MST: {:?}", e))?;
426 let op = RecordOp::Create {
427 collection: collection.clone(),
428 rkey: rkey.clone(),
429 cid: record_cid,
430 };
431 let mut new_mst_blocks = std::collections::BTreeMap::new();
432 let mut old_mst_blocks = std::collections::BTreeMap::new();
433 new_mst
434 .blocks_for_path(&key, &mut new_mst_blocks)
435 .await
436 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?;
437 mst.blocks_for_path(&key, &mut old_mst_blocks)
438 .await
439 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?;
440 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
441 .chain(
442 old_mst_blocks
443 .keys()
444 .filter(|cid| !new_mst_blocks.contains_key(*cid))
445 .copied(),
446 )
447 .collect();
448 let mut relevant_blocks = new_mst_blocks;
449 relevant_blocks.extend(old_mst_blocks);
450 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
451 let written_cids: Vec<Cid> = tracking_store
452 .get_all_relevant_cids()
453 .into_iter()
454 .chain(relevant_blocks.keys().copied())
455 .collect::<std::collections::HashSet<_>>()
456 .into_iter()
457 .collect();
458 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
459 let blob_cids = extract_blob_cids(record);
460 let result = commit_and_log(
461 state,
462 CommitParams {
463 did,
464 user_id,
465 current_root_cid: Some(current_root_cid),
466 prev_data_cid: Some(commit.data),
467 new_mst_root,
468 ops: vec![op],
469 blocks_cids: &written_cids_str,
470 blobs: &blob_cids,
471 obsolete_cids,
472 },
473 )
474 .await?;
475 let uri = format!("at://{}/{}/{}", did, collection, rkey);
476 Ok((uri, result.commit_cid))
477}
478
479pub async fn sequence_identity_event(
480 state: &AppState,
481 did: &Did,
482 handle: Option<&Handle>,
483) -> Result<i64, String> {
484 let mut tx = state
485 .db
486 .begin()
487 .await
488 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
489 let seq_row = sqlx::query!(
490 r#"
491 INSERT INTO repo_seq (did, event_type, handle)
492 VALUES ($1, 'identity', $2)
493 RETURNING seq
494 "#,
495 did.as_str(),
496 handle.map(|h| h.as_str()),
497 )
498 .fetch_one(&mut *tx)
499 .await
500 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
501 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
502 .execute(&mut *tx)
503 .await
504 .map_err(|e| format!("DB Error (notify): {}", e))?;
505 tx.commit()
506 .await
507 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
508 Ok(seq_row.seq)
509}
510pub async fn sequence_account_event(
511 state: &AppState,
512 did: &Did,
513 active: bool,
514 status: Option<&str>,
515) -> Result<i64, String> {
516 let mut tx = state
517 .db
518 .begin()
519 .await
520 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
521 let seq_row = sqlx::query!(
522 r#"
523 INSERT INTO repo_seq (did, event_type, active, status)
524 VALUES ($1, 'account', $2, $3)
525 RETURNING seq
526 "#,
527 did.as_str(),
528 active,
529 status,
530 )
531 .fetch_one(&mut *tx)
532 .await
533 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
534 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
535 .execute(&mut *tx)
536 .await
537 .map_err(|e| format!("DB Error (notify): {}", e))?;
538 tx.commit()
539 .await
540 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
541 Ok(seq_row.seq)
542}
543pub async fn sequence_sync_event(
544 state: &AppState,
545 did: &Did,
546 commit_cid: &str,
547 rev: Option<&str>,
548) -> Result<i64, String> {
549 let mut tx = state
550 .db
551 .begin()
552 .await
553 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
554 let seq_row = sqlx::query!(
555 r#"
556 INSERT INTO repo_seq (did, event_type, commit_cid, rev)
557 VALUES ($1, 'sync', $2, $3)
558 RETURNING seq
559 "#,
560 did.as_str(),
561 commit_cid,
562 rev,
563 )
564 .fetch_one(&mut *tx)
565 .await
566 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
567 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
568 .execute(&mut *tx)
569 .await
570 .map_err(|e| format!("DB Error (notify): {}", e))?;
571 tx.commit()
572 .await
573 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
574 Ok(seq_row.seq)
575}
576
577pub async fn sequence_genesis_commit(
578 state: &AppState,
579 did: &Did,
580 commit_cid: &Cid,
581 mst_root_cid: &Cid,
582 rev: &str,
583) -> Result<i64, String> {
584 let ops = serde_json::json!([]);
585 let blobs: Vec<String> = vec![];
586 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()];
587 let prev_cid: Option<&str> = None;
588 let commit_cid_str = commit_cid.to_string();
589 let mut tx = state
590 .db
591 .begin()
592 .await
593 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
594 let seq_row = sqlx::query!(
595 r#"
596 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev)
597 VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7)
598 RETURNING seq
599 "#,
600 did.as_str(),
601 commit_cid_str,
602 prev_cid,
603 ops,
604 &blobs,
605 &blocks_cids,
606 rev
607 )
608 .fetch_one(&mut *tx)
609 .await
610 .map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?;
611 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
612 .execute(&mut *tx)
613 .await
614 .map_err(|e| format!("DB Error (notify): {}", e))?;
615 tx.commit()
616 .await
617 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
618 Ok(seq_row.seq)
619}