this repo has no description
1use crate::state::AppState; 2use cid::Cid; 3use jacquard::types::{did::Did, integer::LimitedU32, string::Tid}; 4use jacquard_repo::commit::Commit; 5use jacquard_repo::storage::BlockStore; 6use k256::ecdsa::SigningKey; 7use serde_json::json; 8use uuid::Uuid; 9 10pub enum RecordOp { 11 Create { collection: String, rkey: String, cid: Cid }, 12 Update { collection: String, rkey: String, cid: Cid }, 13 Delete { collection: String, rkey: String }, 14} 15 16pub struct CommitResult { 17 pub commit_cid: Cid, 18 pub rev: String, 19} 20 21pub async fn commit_and_log( 22 state: &AppState, 23 did: &str, 24 user_id: Uuid, 25 current_root_cid: Option<Cid>, 26 new_mst_root: Cid, 27 ops: Vec<RecordOp>, 28 blocks_cids: &[String], 29) -> Result<CommitResult, String> { 30 let key_row = sqlx::query!( 31 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 32 user_id 33 ) 34 .fetch_one(&state.db) 35 .await 36 .map_err(|e| format!("Failed to fetch signing key: {}", e))?; 37 38 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 39 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?; 40 41 let signing_key = SigningKey::from_slice(&key_bytes) 42 .map_err(|e| format!("Invalid signing key: {}", e))?; 43 44 let did_obj = Did::new(did).map_err(|e| format!("Invalid DID: {}", e))?; 45 let rev = Tid::now(LimitedU32::MIN); 46 47 let unsigned_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), current_root_cid); 48 49 let signed_commit = unsigned_commit 50 .sign(&signing_key) 51 .map_err(|e| format!("Failed to sign commit: {:?}", e))?; 52 53 let new_commit_bytes = signed_commit.to_cbor().map_err(|e| format!("Failed to serialize commit: {:?}", e))?; 54 55 let new_root_cid = state.block_store.put(&new_commit_bytes).await 56 .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 57 58 let mut tx = state.db.begin().await 59 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 60 61 let lock_result = sqlx::query!( 62 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 63 user_id 64 ) 65 .fetch_optional(&mut *tx) 66 .await; 67 68 match lock_result { 69 Err(e) => { 70 if let Some(db_err) = e.as_database_error() { 71 if db_err.code().as_deref() == Some("55P03") { 72 return Err("ConcurrentModification: Another request is modifying this repo".to_string()); 73 } 74 } 75 return Err(format!("Failed to acquire repo lock: {}", e)); 76 } 77 Ok(Some(row)) => { 78 if let Some(expected_root) = &current_root_cid { 79 if row.repo_root_cid != expected_root.to_string() { 80 return Err("ConcurrentModification: Repo has been modified since last read".to_string()); 81 } 82 } 83 } 84 Ok(None) => { 85 return Err("Repo not found".to_string()); 86 } 87 } 88 89 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 90 .execute(&mut *tx) 91 .await 92 .map_err(|e| format!("DB Error (repos): {}", e))?; 93 94 let rev_str = rev.to_string(); 95 96 let mut upsert_collections: Vec<String> = Vec::new(); 97 let mut upsert_rkeys: Vec<String> = Vec::new(); 98 let mut upsert_cids: Vec<String> = Vec::new(); 99 100 let mut delete_collections: Vec<String> = Vec::new(); 101 let mut delete_rkeys: Vec<String> = Vec::new(); 102 103 for op in &ops { 104 match op { 105 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid } => { 106 upsert_collections.push(collection.clone()); 107 upsert_rkeys.push(rkey.clone()); 108 upsert_cids.push(cid.to_string()); 109 } 110 RecordOp::Delete { collection, rkey } => { 111 delete_collections.push(collection.clone()); 112 delete_rkeys.push(rkey.clone()); 113 } 114 } 115 } 116 117 if !upsert_collections.is_empty() { 118 sqlx::query!( 119 r#" 120 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) 121 SELECT $1, collection, rkey, record_cid, $5 122 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid) 123 ON CONFLICT (repo_id, collection, rkey) DO UPDATE 124 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW() 125 "#, 126 user_id, 127 &upsert_collections, 128 &upsert_rkeys, 129 &upsert_cids, 130 rev_str 131 ) 132 .execute(&mut *tx) 133 .await 134 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?; 135 } 136 137 if !delete_collections.is_empty() { 138 sqlx::query!( 139 r#" 140 DELETE FROM records 141 WHERE repo_id = $1 142 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[])) 143 "#, 144 user_id, 145 &delete_collections, 146 &delete_rkeys 147 ) 148 .execute(&mut *tx) 149 .await 150 .map_err(|e| format!("DB Error (records batch delete): {}", e))?; 151 } 152 153 let ops_json = ops.iter().map(|op| { 154 match op { 155 RecordOp::Create { collection, rkey, cid } => json!({ 156 "action": "create", 157 "path": format!("{}/{}", collection, rkey), 158 "cid": cid.to_string() 159 }), 160 RecordOp::Update { collection, rkey, cid } => json!({ 161 "action": "update", 162 "path": format!("{}/{}", collection, rkey), 163 "cid": cid.to_string() 164 }), 165 RecordOp::Delete { collection, rkey } => json!({ 166 "action": "delete", 167 "path": format!("{}/{}", collection, rkey), 168 "cid": null 169 }), 170 } 171 }).collect::<Vec<_>>(); 172 173 let event_type = "commit"; 174 let prev_cid_str = current_root_cid.map(|c| c.to_string()); 175 176 let seq_row = sqlx::query!( 177 r#" 178 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids) 179 VALUES ($1, $2, $3, $4, $5, $6, $7) 180 RETURNING seq 181 "#, 182 did, 183 event_type, 184 new_root_cid.to_string(), 185 prev_cid_str, 186 json!(ops_json), 187 &[] as &[String], 188 blocks_cids, 189 ) 190 .fetch_one(&mut *tx) 191 .await 192 .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 193 194 sqlx::query( 195 &format!("NOTIFY repo_updates, '{}'", seq_row.seq) 196 ) 197 .execute(&mut *tx) 198 .await 199 .map_err(|e| format!("DB Error (notify): {}", e))?; 200 201 tx.commit().await 202 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 203 204 Ok(CommitResult { 205 commit_cid: new_root_cid, 206 rev: rev.to_string(), 207 }) 208}