this repo has no description
1use crate::state::AppState;
2use cid::Cid;
3use jacquard::types::{did::Did, integer::LimitedU32, string::Tid};
4use jacquard_repo::commit::Commit;
5use jacquard_repo::storage::BlockStore;
6use k256::ecdsa::SigningKey;
7use serde_json::json;
8use uuid::Uuid;
9
10pub enum RecordOp {
11 Create { collection: String, rkey: String, cid: Cid },
12 Update { collection: String, rkey: String, cid: Cid },
13 Delete { collection: String, rkey: String },
14}
15
16pub struct CommitResult {
17 pub commit_cid: Cid,
18 pub rev: String,
19}
20
21pub async fn commit_and_log(
22 state: &AppState,
23 did: &str,
24 user_id: Uuid,
25 current_root_cid: Option<Cid>,
26 new_mst_root: Cid,
27 ops: Vec<RecordOp>,
28 blocks_cids: &[String],
29) -> Result<CommitResult, String> {
30 let key_row = sqlx::query!(
31 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
32 user_id
33 )
34 .fetch_one(&state.db)
35 .await
36 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
37
38 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
39 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
40
41 let signing_key = SigningKey::from_slice(&key_bytes)
42 .map_err(|e| format!("Invalid signing key: {}", e))?;
43
44 let did_obj = Did::new(did).map_err(|e| format!("Invalid DID: {}", e))?;
45 let rev = Tid::now(LimitedU32::MIN);
46
47 let unsigned_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), current_root_cid);
48
49 let signed_commit = unsigned_commit
50 .sign(&signing_key)
51 .map_err(|e| format!("Failed to sign commit: {:?}", e))?;
52
53 let new_commit_bytes = signed_commit.to_cbor().map_err(|e| format!("Failed to serialize commit: {:?}", e))?;
54
55 let new_root_cid = state.block_store.put(&new_commit_bytes).await
56 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
57
58 let mut tx = state.db.begin().await
59 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
60
61 let lock_result = sqlx::query!(
62 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
63 user_id
64 )
65 .fetch_optional(&mut *tx)
66 .await;
67
68 match lock_result {
69 Err(e) => {
70 if let Some(db_err) = e.as_database_error() {
71 if db_err.code().as_deref() == Some("55P03") {
72 return Err("ConcurrentModification: Another request is modifying this repo".to_string());
73 }
74 }
75 return Err(format!("Failed to acquire repo lock: {}", e));
76 }
77 Ok(Some(row)) => {
78 if let Some(expected_root) = ¤t_root_cid {
79 if row.repo_root_cid != expected_root.to_string() {
80 return Err("ConcurrentModification: Repo has been modified since last read".to_string());
81 }
82 }
83 }
84 Ok(None) => {
85 return Err("Repo not found".to_string());
86 }
87 }
88
89 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id)
90 .execute(&mut *tx)
91 .await
92 .map_err(|e| format!("DB Error (repos): {}", e))?;
93
94 let rev_str = rev.to_string();
95 for op in &ops {
96 match op {
97 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid } => {
98 sqlx::query!(
99 "INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) VALUES ($1, $2, $3, $4, $5)
100 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, repo_rev = $5, created_at = NOW()",
101 user_id,
102 collection,
103 rkey,
104 cid.to_string(),
105 rev_str
106 )
107 .execute(&mut *tx)
108 .await
109 .map_err(|e| format!("DB Error (records): {}", e))?;
110 }
111 RecordOp::Delete { collection, rkey } => {
112 sqlx::query!(
113 "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
114 user_id,
115 collection,
116 rkey
117 )
118 .execute(&mut *tx)
119 .await
120 .map_err(|e| format!("DB Error (records): {}", e))?;
121 }
122 }
123 }
124
125 let ops_json = ops.iter().map(|op| {
126 match op {
127 RecordOp::Create { collection, rkey, cid } => json!({
128 "action": "create",
129 "path": format!("{}/{}", collection, rkey),
130 "cid": cid.to_string()
131 }),
132 RecordOp::Update { collection, rkey, cid } => json!({
133 "action": "update",
134 "path": format!("{}/{}", collection, rkey),
135 "cid": cid.to_string()
136 }),
137 RecordOp::Delete { collection, rkey } => json!({
138 "action": "delete",
139 "path": format!("{}/{}", collection, rkey),
140 "cid": null
141 }),
142 }
143 }).collect::<Vec<_>>();
144
145 let event_type = "commit";
146 let prev_cid_str = current_root_cid.map(|c| c.to_string());
147
148 let seq_row = sqlx::query!(
149 r#"
150 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)
151 VALUES ($1, $2, $3, $4, $5, $6, $7)
152 RETURNING seq
153 "#,
154 did,
155 event_type,
156 new_root_cid.to_string(),
157 prev_cid_str,
158 json!(ops_json),
159 &[] as &[String],
160 blocks_cids,
161 )
162 .fetch_one(&mut *tx)
163 .await
164 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
165
166 sqlx::query(
167 &format!("NOTIFY repo_updates, '{}'", seq_row.seq)
168 )
169 .execute(&mut *tx)
170 .await
171 .map_err(|e| format!("DB Error (notify): {}", e))?;
172
173 tx.commit().await
174 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
175
176 Ok(CommitResult {
177 commit_cid: new_root_cid,
178 rev: rev.to_string(),
179 })
180}