this repo has no description
1use crate::state::AppState;
2use cid::Cid;
3use jacquard::types::{did::Did, integer::LimitedU32, string::Tid};
4use jacquard_repo::commit::Commit;
5use jacquard_repo::storage::BlockStore;
6use k256::ecdsa::SigningKey;
7use serde_json::json;
8use uuid::Uuid;
9
10pub enum RecordOp {
11 Create { collection: String, rkey: String, cid: Cid },
12 Update { collection: String, rkey: String, cid: Cid },
13 Delete { collection: String, rkey: String },
14}
15
16pub struct CommitResult {
17 pub commit_cid: Cid,
18 pub rev: String,
19}
20
21pub async fn commit_and_log(
22 state: &AppState,
23 did: &str,
24 user_id: Uuid,
25 current_root_cid: Option<Cid>,
26 new_mst_root: Cid,
27 ops: Vec<RecordOp>,
28 blocks_cids: &Vec<String>,
29) -> Result<CommitResult, String> {
30 let key_row = sqlx::query!(
31 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
32 user_id
33 )
34 .fetch_one(&state.db)
35 .await
36 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
37
38 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
39 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
40
41 let signing_key = SigningKey::from_slice(&key_bytes)
42 .map_err(|e| format!("Invalid signing key: {}", e))?;
43
44 let did_obj = Did::new(did).map_err(|e| format!("Invalid DID: {}", e))?;
45 let rev = Tid::now(LimitedU32::MIN);
46
47 let unsigned_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), current_root_cid);
48
49 let signed_commit = unsigned_commit
50 .sign(&signing_key)
51 .map_err(|e| format!("Failed to sign commit: {:?}", e))?;
52
53 let new_commit_bytes = signed_commit.to_cbor().map_err(|e| format!("Failed to serialize commit: {:?}", e))?;
54
55 let new_root_cid = state.block_store.put(&new_commit_bytes).await
56 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
57
58 let mut tx = state.db.begin().await
59 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
60
61 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id)
62 .execute(&mut *tx)
63 .await
64 .map_err(|e| format!("DB Error (repos): {}", e))?;
65
66 for op in &ops {
67 match op {
68 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid } => {
69 sqlx::query!(
70 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
71 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
72 user_id,
73 collection,
74 rkey,
75 cid.to_string()
76 )
77 .execute(&mut *tx)
78 .await
79 .map_err(|e| format!("DB Error (records): {}", e))?;
80 }
81 RecordOp::Delete { collection, rkey } => {
82 sqlx::query!(
83 "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
84 user_id,
85 collection,
86 rkey
87 )
88 .execute(&mut *tx)
89 .await
90 .map_err(|e| format!("DB Error (records): {}", e))?;
91 }
92 }
93 }
94
95 let ops_json = ops.iter().map(|op| {
96 match op {
97 RecordOp::Create { collection, rkey, cid } => json!({
98 "action": "create",
99 "path": format!("{}/{}", collection, rkey),
100 "cid": cid.to_string()
101 }),
102 RecordOp::Update { collection, rkey, cid } => json!({
103 "action": "update",
104 "path": format!("{}/{}", collection, rkey),
105 "cid": cid.to_string()
106 }),
107 RecordOp::Delete { collection, rkey } => json!({
108 "action": "delete",
109 "path": format!("{}/{}", collection, rkey),
110 "cid": null
111 }),
112 }
113 }).collect::<Vec<_>>();
114
115 let event_type = "commit";
116 let prev_cid_str = current_root_cid.map(|c| c.to_string());
117
118 let seq_row = sqlx::query!(
119 r#"
120 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)
121 VALUES ($1, $2, $3, $4, $5, $6, $7)
122 RETURNING seq
123 "#,
124 did,
125 event_type,
126 new_root_cid.to_string(),
127 prev_cid_str,
128 json!(ops_json),
129 &[] as &[String],
130 blocks_cids,
131 )
132 .fetch_one(&mut *tx)
133 .await
134 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
135
136 sqlx::query(
137 &format!("NOTIFY repo_updates, '{}'", seq_row.seq)
138 )
139 .execute(&mut *tx)
140 .await
141 .map_err(|e| format!("DB Error (notify): {}", e))?;
142
143 tx.commit().await
144 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
145
146 Ok(CommitResult {
147 commit_cid: new_root_cid,
148 rev: rev.to_string(),
149 })
150}