this repo has no description
1use crate::state::AppState;
2use cid::Cid;
3use jacquard::types::{did::Did, integer::LimitedU32, string::Tid};
4use jacquard_repo::commit::Commit;
5use jacquard_repo::storage::BlockStore;
6use k256::ecdsa::SigningKey;
7use serde_json::json;
8use uuid::Uuid;
9
10pub enum RecordOp {
11 Create { collection: String, rkey: String, cid: Cid },
12 Update { collection: String, rkey: String, cid: Cid },
13 Delete { collection: String, rkey: String },
14}
15
16pub struct CommitResult {
17 pub commit_cid: Cid,
18 pub rev: String,
19}
20
21pub async fn commit_and_log(
22 state: &AppState,
23 did: &str,
24 user_id: Uuid,
25 current_root_cid: Option<Cid>,
26 new_mst_root: Cid,
27 ops: Vec<RecordOp>,
28 blocks_cids: &[String],
29) -> Result<CommitResult, String> {
30 let key_row = sqlx::query!(
31 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
32 user_id
33 )
34 .fetch_one(&state.db)
35 .await
36 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
37
38 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
39 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
40
41 let signing_key = SigningKey::from_slice(&key_bytes)
42 .map_err(|e| format!("Invalid signing key: {}", e))?;
43
44 let did_obj = Did::new(did).map_err(|e| format!("Invalid DID: {}", e))?;
45 let rev = Tid::now(LimitedU32::MIN);
46
47 let unsigned_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), current_root_cid);
48
49 let signed_commit = unsigned_commit
50 .sign(&signing_key)
51 .map_err(|e| format!("Failed to sign commit: {:?}", e))?;
52
53 let new_commit_bytes = signed_commit.to_cbor().map_err(|e| format!("Failed to serialize commit: {:?}", e))?;
54
55 let new_root_cid = state.block_store.put(&new_commit_bytes).await
56 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
57
58 let mut tx = state.db.begin().await
59 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
60
61 let lock_result = sqlx::query!(
62 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
63 user_id
64 )
65 .fetch_optional(&mut *tx)
66 .await;
67
68 match lock_result {
69 Err(e) => {
70 if let Some(db_err) = e.as_database_error() {
71 if db_err.code().as_deref() == Some("55P03") {
72 return Err("ConcurrentModification: Another request is modifying this repo".to_string());
73 }
74 }
75 return Err(format!("Failed to acquire repo lock: {}", e));
76 }
77 Ok(Some(row)) => {
78 if let Some(expected_root) = ¤t_root_cid {
79 if row.repo_root_cid != expected_root.to_string() {
80 return Err("ConcurrentModification: Repo has been modified since last read".to_string());
81 }
82 }
83 }
84 Ok(None) => {
85 return Err("Repo not found".to_string());
86 }
87 }
88
89 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id)
90 .execute(&mut *tx)
91 .await
92 .map_err(|e| format!("DB Error (repos): {}", e))?;
93
94 let rev_str = rev.to_string();
95
96 let mut upsert_collections: Vec<String> = Vec::new();
97 let mut upsert_rkeys: Vec<String> = Vec::new();
98 let mut upsert_cids: Vec<String> = Vec::new();
99
100 let mut delete_collections: Vec<String> = Vec::new();
101 let mut delete_rkeys: Vec<String> = Vec::new();
102
103 for op in &ops {
104 match op {
105 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid } => {
106 upsert_collections.push(collection.clone());
107 upsert_rkeys.push(rkey.clone());
108 upsert_cids.push(cid.to_string());
109 }
110 RecordOp::Delete { collection, rkey } => {
111 delete_collections.push(collection.clone());
112 delete_rkeys.push(rkey.clone());
113 }
114 }
115 }
116
117 if !upsert_collections.is_empty() {
118 sqlx::query!(
119 r#"
120 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev)
121 SELECT $1, collection, rkey, record_cid, $5
122 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid)
123 ON CONFLICT (repo_id, collection, rkey) DO UPDATE
124 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW()
125 "#,
126 user_id,
127 &upsert_collections,
128 &upsert_rkeys,
129 &upsert_cids,
130 rev_str
131 )
132 .execute(&mut *tx)
133 .await
134 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?;
135 }
136
137 if !delete_collections.is_empty() {
138 sqlx::query!(
139 r#"
140 DELETE FROM records
141 WHERE repo_id = $1
142 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[]))
143 "#,
144 user_id,
145 &delete_collections,
146 &delete_rkeys
147 )
148 .execute(&mut *tx)
149 .await
150 .map_err(|e| format!("DB Error (records batch delete): {}", e))?;
151 }
152
153 let ops_json = ops.iter().map(|op| {
154 match op {
155 RecordOp::Create { collection, rkey, cid } => json!({
156 "action": "create",
157 "path": format!("{}/{}", collection, rkey),
158 "cid": cid.to_string()
159 }),
160 RecordOp::Update { collection, rkey, cid } => json!({
161 "action": "update",
162 "path": format!("{}/{}", collection, rkey),
163 "cid": cid.to_string()
164 }),
165 RecordOp::Delete { collection, rkey } => json!({
166 "action": "delete",
167 "path": format!("{}/{}", collection, rkey),
168 "cid": null
169 }),
170 }
171 }).collect::<Vec<_>>();
172
173 let event_type = "commit";
174 let prev_cid_str = current_root_cid.map(|c| c.to_string());
175
176 let seq_row = sqlx::query!(
177 r#"
178 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)
179 VALUES ($1, $2, $3, $4, $5, $6, $7)
180 RETURNING seq
181 "#,
182 did,
183 event_type,
184 new_root_cid.to_string(),
185 prev_cid_str,
186 json!(ops_json),
187 &[] as &[String],
188 blocks_cids,
189 )
190 .fetch_one(&mut *tx)
191 .await
192 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
193
194 sqlx::query(
195 &format!("NOTIFY repo_updates, '{}'", seq_row.seq)
196 )
197 .execute(&mut *tx)
198 .await
199 .map_err(|e| format!("DB Error (notify): {}", e))?;
200
201 tx.commit().await
202 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
203
204 Ok(CommitResult {
205 commit_cid: new_root_cid,
206 rev: rev.to_string(),
207 })
208}