this repo has no description
1use crate::state::AppState;
2use cid::Cid;
3use jacquard::types::{did::Did, integer::LimitedU32, string::Tid};
4use jacquard_repo::commit::Commit;
5use jacquard_repo::storage::BlockStore;
6use serde_json::json;
7use uuid::Uuid;
8
9pub enum RecordOp {
10 Create { collection: String, rkey: String, cid: Cid },
11 Update { collection: String, rkey: String, cid: Cid },
12 Delete { collection: String, rkey: String },
13}
14
15pub struct CommitResult {
16 pub commit_cid: Cid,
17 pub rev: String,
18}
19
20pub async fn commit_and_log(
21 state: &AppState,
22 did: &str,
23 user_id: Uuid,
24 current_root_cid: Option<Cid>,
25 new_mst_root: Cid,
26 ops: Vec<RecordOp>,
27 blocks_cids: &Vec<String>,
28) -> Result<CommitResult, String> {
29 let did_obj = Did::new(did).map_err(|e| format!("Invalid DID: {}", e))?;
30 let rev = Tid::now(LimitedU32::MIN);
31
32 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), current_root_cid);
33
34 let new_commit_bytes = new_commit.to_cbor().map_err(|e| format!("Failed to serialize commit: {:?}", e))?;
35
36 let new_root_cid = state.block_store.put(&new_commit_bytes).await
37 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
38
39 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id)
40 .execute(&state.db)
41 .await
42 .map_err(|e| format!("DB Error (repos): {}", e))?;
43
44 for op in &ops {
45 match op {
46 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid } => {
47 sqlx::query!(
48 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
49 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
50 user_id,
51 collection,
52 rkey,
53 cid.to_string()
54 )
55 .execute(&state.db)
56 .await
57 .map_err(|e| format!("DB Error (records): {}", e))?;
58 }
59 RecordOp::Delete { collection, rkey } => {
60 sqlx::query!(
61 "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
62 user_id,
63 collection,
64 rkey
65 )
66 .execute(&state.db)
67 .await
68 .map_err(|e| format!("DB Error (records): {}", e))?;
69 }
70 }
71 }
72
73 let ops_json = ops.iter().map(|op| {
74 match op {
75 RecordOp::Create { collection, rkey, cid } => json!({
76 "action": "create",
77 "path": format!("{}/{}", collection, rkey),
78 "cid": cid.to_string()
79 }),
80 RecordOp::Update { collection, rkey, cid } => json!({
81 "action": "update",
82 "path": format!("{}/{}", collection, rkey),
83 "cid": cid.to_string()
84 }),
85 RecordOp::Delete { collection, rkey } => json!({
86 "action": "delete",
87 "path": format!("{}/{}", collection, rkey),
88 "cid": null
89 }),
90 }
91 }).collect::<Vec<_>>();
92
93 let event_type = "commit";
94 let prev_cid_str = current_root_cid.map(|c| c.to_string());
95
96 let seq_row = sqlx::query!(
97 r#"
98 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)
99 VALUES ($1, $2, $3, $4, $5, $6, $7)
100 RETURNING seq
101 "#,
102 did,
103 event_type,
104 new_root_cid.to_string(),
105 prev_cid_str,
106 json!(ops_json),
107 &[] as &[String],
108 blocks_cids,
109 )
110 .fetch_one(&state.db)
111 .await
112 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
113
114 sqlx::query(
115 &format!("NOTIFY repo_updates, '{}'", seq_row.seq)
116 )
117 .execute(&state.db)
118 .await
119 .map_err(|e| format!("DB Error (notify): {}", e))?;
120
121 Ok(CommitResult {
122 commit_cid: new_root_cid,
123 rev: rev.to_string(),
124 })
125}