this repo has no description
1use crate::state::AppState;
2use cid::Cid;
3use jacquard::types::{did::Did, integer::LimitedU32, string::Tid};
4use jacquard_repo::commit::Commit;
5use jacquard_repo::storage::BlockStore;
6use k256::ecdsa::SigningKey;
7use serde_json::json;
8use uuid::Uuid;
9
10pub enum RecordOp {
11 Create { collection: String, rkey: String, cid: Cid },
12 Update { collection: String, rkey: String, cid: Cid },
13 Delete { collection: String, rkey: String },
14}
15
16pub struct CommitResult {
17 pub commit_cid: Cid,
18 pub rev: String,
19}
20
21pub async fn commit_and_log(
22 state: &AppState,
23 did: &str,
24 user_id: Uuid,
25 current_root_cid: Option<Cid>,
26 new_mst_root: Cid,
27 ops: Vec<RecordOp>,
28 blocks_cids: &[String],
29) -> Result<CommitResult, String> {
30 let key_row = sqlx::query!(
31 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
32 user_id
33 )
34 .fetch_one(&state.db)
35 .await
36 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
37
38 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
39 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
40
41 let signing_key = SigningKey::from_slice(&key_bytes)
42 .map_err(|e| format!("Invalid signing key: {}", e))?;
43
44 let did_obj = Did::new(did).map_err(|e| format!("Invalid DID: {}", e))?;
45 let rev = Tid::now(LimitedU32::MIN);
46
47 let unsigned_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), current_root_cid);
48
49 let signed_commit = unsigned_commit
50 .sign(&signing_key)
51 .map_err(|e| format!("Failed to sign commit: {:?}", e))?;
52
53 let new_commit_bytes = signed_commit.to_cbor().map_err(|e| format!("Failed to serialize commit: {:?}", e))?;
54
55 let new_root_cid = state.block_store.put(&new_commit_bytes).await
56 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
57
58 let mut tx = state.db.begin().await
59 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
60
61 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id)
62 .execute(&mut *tx)
63 .await
64 .map_err(|e| format!("DB Error (repos): {}", e))?;
65
66 let rev_str = rev.to_string();
67 for op in &ops {
68 match op {
69 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid } => {
70 sqlx::query!(
71 "INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) VALUES ($1, $2, $3, $4, $5)
72 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, repo_rev = $5, created_at = NOW()",
73 user_id,
74 collection,
75 rkey,
76 cid.to_string(),
77 rev_str
78 )
79 .execute(&mut *tx)
80 .await
81 .map_err(|e| format!("DB Error (records): {}", e))?;
82 }
83 RecordOp::Delete { collection, rkey } => {
84 sqlx::query!(
85 "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
86 user_id,
87 collection,
88 rkey
89 )
90 .execute(&mut *tx)
91 .await
92 .map_err(|e| format!("DB Error (records): {}", e))?;
93 }
94 }
95 }
96
97 let ops_json = ops.iter().map(|op| {
98 match op {
99 RecordOp::Create { collection, rkey, cid } => json!({
100 "action": "create",
101 "path": format!("{}/{}", collection, rkey),
102 "cid": cid.to_string()
103 }),
104 RecordOp::Update { collection, rkey, cid } => json!({
105 "action": "update",
106 "path": format!("{}/{}", collection, rkey),
107 "cid": cid.to_string()
108 }),
109 RecordOp::Delete { collection, rkey } => json!({
110 "action": "delete",
111 "path": format!("{}/{}", collection, rkey),
112 "cid": null
113 }),
114 }
115 }).collect::<Vec<_>>();
116
117 let event_type = "commit";
118 let prev_cid_str = current_root_cid.map(|c| c.to_string());
119
120 let seq_row = sqlx::query!(
121 r#"
122 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)
123 VALUES ($1, $2, $3, $4, $5, $6, $7)
124 RETURNING seq
125 "#,
126 did,
127 event_type,
128 new_root_cid.to_string(),
129 prev_cid_str,
130 json!(ops_json),
131 &[] as &[String],
132 blocks_cids,
133 )
134 .fetch_one(&mut *tx)
135 .await
136 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
137
138 sqlx::query(
139 &format!("NOTIFY repo_updates, '{}'", seq_row.seq)
140 )
141 .execute(&mut *tx)
142 .await
143 .map_err(|e| format!("DB Error (notify): {}", e))?;
144
145 tx.commit().await
146 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
147
148 Ok(CommitResult {
149 commit_cid: new_root_cid,
150 rev: rev.to_string(),
151 })
152}