this repo has no description
1use crate::state::AppState; 2use cid::Cid; 3use jacquard::types::{did::Did, integer::LimitedU32, string::Tid}; 4use jacquard_repo::commit::Commit; 5use jacquard_repo::storage::BlockStore; 6use k256::ecdsa::SigningKey; 7use serde_json::json; 8use uuid::Uuid; 9 10pub enum RecordOp { 11 Create { collection: String, rkey: String, cid: Cid }, 12 Update { collection: String, rkey: String, cid: Cid }, 13 Delete { collection: String, rkey: String }, 14} 15 16pub struct CommitResult { 17 pub commit_cid: Cid, 18 pub rev: String, 19} 20 21pub async fn commit_and_log( 22 state: &AppState, 23 did: &str, 24 user_id: Uuid, 25 current_root_cid: Option<Cid>, 26 new_mst_root: Cid, 27 ops: Vec<RecordOp>, 28 blocks_cids: &Vec<String>, 29) -> Result<CommitResult, String> { 30 let key_row = sqlx::query!( 31 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 32 user_id 33 ) 34 .fetch_one(&state.db) 35 .await 36 .map_err(|e| format!("Failed to fetch signing key: {}", e))?; 37 38 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 39 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?; 40 41 let signing_key = SigningKey::from_slice(&key_bytes) 42 .map_err(|e| format!("Invalid signing key: {}", e))?; 43 44 let did_obj = Did::new(did).map_err(|e| format!("Invalid DID: {}", e))?; 45 let rev = Tid::now(LimitedU32::MIN); 46 47 let unsigned_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), current_root_cid); 48 49 let signed_commit = unsigned_commit 50 .sign(&signing_key) 51 .map_err(|e| format!("Failed to sign commit: {:?}", e))?; 52 53 let new_commit_bytes = signed_commit.to_cbor().map_err(|e| format!("Failed to serialize commit: {:?}", e))?; 54 55 let new_root_cid = state.block_store.put(&new_commit_bytes).await 56 .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 57 58 let mut tx = state.db.begin().await 59 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 60 61 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 62 .execute(&mut *tx) 63 .await 64 .map_err(|e| format!("DB Error (repos): {}", e))?; 65 66 for op in &ops { 67 match op { 68 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid } => { 69 sqlx::query!( 70 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 71 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 72 user_id, 73 collection, 74 rkey, 75 cid.to_string() 76 ) 77 .execute(&mut *tx) 78 .await 79 .map_err(|e| format!("DB Error (records): {}", e))?; 80 } 81 RecordOp::Delete { collection, rkey } => { 82 sqlx::query!( 83 "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 84 user_id, 85 collection, 86 rkey 87 ) 88 .execute(&mut *tx) 89 .await 90 .map_err(|e| format!("DB Error (records): {}", e))?; 91 } 92 } 93 } 94 95 let ops_json = ops.iter().map(|op| { 96 match op { 97 RecordOp::Create { collection, rkey, cid } => json!({ 98 "action": "create", 99 "path": format!("{}/{}", collection, rkey), 100 "cid": cid.to_string() 101 }), 102 RecordOp::Update { collection, rkey, cid } => json!({ 103 "action": "update", 104 "path": format!("{}/{}", collection, rkey), 105 "cid": cid.to_string() 106 }), 107 RecordOp::Delete { collection, rkey } => json!({ 108 "action": "delete", 109 "path": format!("{}/{}", collection, rkey), 110 "cid": null 111 }), 112 } 113 }).collect::<Vec<_>>(); 114 115 let event_type = "commit"; 116 let prev_cid_str = current_root_cid.map(|c| c.to_string()); 117 118 let seq_row = sqlx::query!( 119 r#" 120 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids) 121 VALUES ($1, $2, $3, $4, $5, $6, $7) 122 RETURNING seq 123 "#, 124 did, 125 event_type, 126 new_root_cid.to_string(), 127 prev_cid_str, 128 json!(ops_json), 129 &[] as &[String], 130 blocks_cids, 131 ) 132 .fetch_one(&mut *tx) 133 .await 134 .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 135 136 sqlx::query( 137 &format!("NOTIFY repo_updates, '{}'", seq_row.seq) 138 ) 139 .execute(&mut *tx) 140 .await 141 .map_err(|e| format!("DB Error (notify): {}", e))?; 142 143 tx.commit().await 144 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 145 146 Ok(CommitResult { 147 commit_cid: new_root_cid, 148 rev: rev.to_string(), 149 }) 150}