this repo has no description
1use crate::state::AppState; 2use cid::Cid; 3use jacquard::types::{did::Did, integer::LimitedU32, string::Tid}; 4use jacquard_repo::commit::Commit; 5use jacquard_repo::storage::BlockStore; 6use serde_json::json; 7use uuid::Uuid; 8 9pub enum RecordOp { 10 Create { collection: String, rkey: String, cid: Cid }, 11 Update { collection: String, rkey: String, cid: Cid }, 12 Delete { collection: String, rkey: String }, 13} 14 15pub struct CommitResult { 16 pub commit_cid: Cid, 17 pub rev: String, 18} 19 20pub async fn commit_and_log( 21 state: &AppState, 22 did: &str, 23 user_id: Uuid, 24 current_root_cid: Option<Cid>, 25 new_mst_root: Cid, 26 ops: Vec<RecordOp>, 27 blocks_cids: &Vec<String>, 28) -> Result<CommitResult, String> { 29 let did_obj = Did::new(did).map_err(|e| format!("Invalid DID: {}", e))?; 30 let rev = Tid::now(LimitedU32::MIN); 31 32 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), current_root_cid); 33 34 let new_commit_bytes = new_commit.to_cbor().map_err(|e| format!("Failed to serialize commit: {:?}", e))?; 35 36 let new_root_cid = state.block_store.put(&new_commit_bytes).await 37 .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 38 39 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 40 .execute(&state.db) 41 .await 42 .map_err(|e| format!("DB Error (repos): {}", e))?; 43 44 for op in &ops { 45 match op { 46 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid } => { 47 sqlx::query!( 48 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 49 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 50 user_id, 51 collection, 52 rkey, 53 cid.to_string() 54 ) 55 .execute(&state.db) 56 .await 57 .map_err(|e| format!("DB Error (records): {}", e))?; 58 } 59 RecordOp::Delete { collection, rkey } => { 60 sqlx::query!( 61 "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 62 user_id, 63 collection, 64 rkey 65 ) 66 .execute(&state.db) 67 .await 68 .map_err(|e| format!("DB Error (records): {}", e))?; 69 } 70 } 71 } 72 73 let ops_json = ops.iter().map(|op| { 74 match op { 75 RecordOp::Create { collection, rkey, cid } => json!({ 76 "action": "create", 77 "path": format!("{}/{}", collection, rkey), 78 "cid": cid.to_string() 79 }), 80 RecordOp::Update { collection, rkey, cid } => json!({ 81 "action": "update", 82 "path": format!("{}/{}", collection, rkey), 83 "cid": cid.to_string() 84 }), 85 RecordOp::Delete { collection, rkey } => json!({ 86 "action": "delete", 87 "path": format!("{}/{}", collection, rkey), 88 "cid": null 89 }), 90 } 91 }).collect::<Vec<_>>(); 92 93 let event_type = "commit"; 94 let prev_cid_str = current_root_cid.map(|c| c.to_string()); 95 96 let seq_row = sqlx::query!( 97 r#" 98 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids) 99 VALUES ($1, $2, $3, $4, $5, $6, $7) 100 RETURNING seq 101 "#, 102 did, 103 event_type, 104 new_root_cid.to_string(), 105 prev_cid_str, 106 json!(ops_json), 107 &[] as &[String], 108 blocks_cids, 109 ) 110 .fetch_one(&state.db) 111 .await 112 .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 113 114 sqlx::query( 115 &format!("NOTIFY repo_updates, '{}'", seq_row.seq) 116 ) 117 .execute(&state.db) 118 .await 119 .map_err(|e| format!("DB Error (notify): {}", e))?; 120 121 Ok(CommitResult { 122 commit_cid: new_root_cid, 123 rev: rev.to_string(), 124 }) 125}