this repo has no description
1use crate::state::AppState;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard::types::{integer::LimitedU32, string::Tid};
5use jacquard_repo::commit::Commit;
6use jacquard_repo::storage::BlockStore;
7use k256::ecdsa::SigningKey;
8use serde_json::json;
9use std::str::FromStr;
10use uuid::Uuid;
11
12pub fn create_signed_commit(
13 did: &str,
14 data: Cid,
15 rev: &str,
16 prev: Option<Cid>,
17 signing_key: &SigningKey,
18) -> Result<(Vec<u8>, Bytes), String> {
19 let did = jacquard::types::string::Did::new(did)
20 .map_err(|e| format!("Invalid DID: {:?}", e))?;
21 let rev = jacquard::types::string::Tid::from_str(rev)
22 .map_err(|e| format!("Invalid TID: {:?}", e))?;
23 let unsigned = Commit::new_unsigned(did, data, rev, prev);
24 let signed = unsigned
25 .sign(signing_key)
26 .map_err(|e| format!("Failed to sign commit: {:?}", e))?;
27 let sig_bytes = signed.sig().clone();
28 let signed_bytes = signed
29 .to_cbor()
30 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?;
31 Ok((signed_bytes, sig_bytes))
32}
33
34pub enum RecordOp {
35 Create {
36 collection: String,
37 rkey: String,
38 cid: Cid,
39 },
40 Update {
41 collection: String,
42 rkey: String,
43 cid: Cid,
44 prev: Option<Cid>,
45 },
46 Delete {
47 collection: String,
48 rkey: String,
49 prev: Option<Cid>,
50 },
51}
52
53pub struct CommitResult {
54 pub commit_cid: Cid,
55 pub rev: String,
56}
57
58pub struct CommitParams<'a> {
59 pub did: &'a str,
60 pub user_id: Uuid,
61 pub current_root_cid: Option<Cid>,
62 pub prev_data_cid: Option<Cid>,
63 pub new_mst_root: Cid,
64 pub ops: Vec<RecordOp>,
65 pub blocks_cids: &'a [String],
66}
67
68pub async fn commit_and_log(
69 state: &AppState,
70 params: CommitParams<'_>,
71) -> Result<CommitResult, String> {
72 let CommitParams {
73 did,
74 user_id,
75 current_root_cid,
76 prev_data_cid,
77 new_mst_root,
78 ops,
79 blocks_cids,
80 } = params;
81 let key_row = sqlx::query!(
82 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
83 user_id
84 )
85 .fetch_one(&state.db)
86 .await
87 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
88 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
89 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
90 let signing_key =
91 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?;
92 let rev = Tid::now(LimitedU32::MIN);
93 let rev_str = rev.to_string();
94 let (new_commit_bytes, _sig) =
95 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?;
96 let new_root_cid = state
97 .block_store
98 .put(&new_commit_bytes)
99 .await
100 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
101 let mut tx = state
102 .db
103 .begin()
104 .await
105 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
106 let lock_result = sqlx::query!(
107 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
108 user_id
109 )
110 .fetch_optional(&mut *tx)
111 .await;
112 match lock_result {
113 Err(e) => {
114 if let Some(db_err) = e.as_database_error()
115 && db_err.code().as_deref() == Some("55P03")
116 {
117 return Err(
118 "ConcurrentModification: Another request is modifying this repo".to_string(),
119 );
120 }
121 return Err(format!("Failed to acquire repo lock: {}", e));
122 }
123 Ok(Some(row)) => {
124 if let Some(expected_root) = ¤t_root_cid
125 && row.repo_root_cid != expected_root.to_string()
126 {
127 return Err(
128 "ConcurrentModification: Repo has been modified since last read".to_string(),
129 );
130 }
131 }
132 Ok(None) => {
133 return Err("Repo not found".to_string());
134 }
135 }
136 let is_account_active = sqlx::query_scalar!(
137 "SELECT deactivated_at IS NULL FROM users WHERE id = $1",
138 user_id
139 )
140 .fetch_optional(&mut *tx)
141 .await
142 .map_err(|e| format!("Failed to check account status: {}", e))?
143 .flatten()
144 .unwrap_or(false);
145 sqlx::query!(
146 "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2",
147 new_root_cid.to_string(),
148 user_id
149 )
150 .execute(&mut *tx)
151 .await
152 .map_err(|e| format!("DB Error (repos): {}", e))?;
153 let mut upsert_collections: Vec<String> = Vec::new();
154 let mut upsert_rkeys: Vec<String> = Vec::new();
155 let mut upsert_cids: Vec<String> = Vec::new();
156 let mut delete_collections: Vec<String> = Vec::new();
157 let mut delete_rkeys: Vec<String> = Vec::new();
158 for op in &ops {
159 match op {
160 RecordOp::Create {
161 collection,
162 rkey,
163 cid,
164 }
165 | RecordOp::Update {
166 collection,
167 rkey,
168 cid,
169 ..
170 } => {
171 upsert_collections.push(collection.clone());
172 upsert_rkeys.push(rkey.clone());
173 upsert_cids.push(cid.to_string());
174 }
175 RecordOp::Delete {
176 collection, rkey, ..
177 } => {
178 delete_collections.push(collection.clone());
179 delete_rkeys.push(rkey.clone());
180 }
181 }
182 }
183 if !upsert_collections.is_empty() {
184 sqlx::query!(
185 r#"
186 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev)
187 SELECT $1, collection, rkey, record_cid, $5
188 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid)
189 ON CONFLICT (repo_id, collection, rkey) DO UPDATE
190 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW()
191 "#,
192 user_id,
193 &upsert_collections,
194 &upsert_rkeys,
195 &upsert_cids,
196 rev_str
197 )
198 .execute(&mut *tx)
199 .await
200 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?;
201 }
202 if !delete_collections.is_empty() {
203 sqlx::query!(
204 r#"
205 DELETE FROM records
206 WHERE repo_id = $1
207 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[]))
208 "#,
209 user_id,
210 &delete_collections,
211 &delete_rkeys
212 )
213 .execute(&mut *tx)
214 .await
215 .map_err(|e| format!("DB Error (records batch delete): {}", e))?;
216 }
217 let ops_json = ops
218 .iter()
219 .map(|op| match op {
220 RecordOp::Create {
221 collection,
222 rkey,
223 cid,
224 } => json!({
225 "action": "create",
226 "path": format!("{}/{}", collection, rkey),
227 "cid": cid.to_string()
228 }),
229 RecordOp::Update {
230 collection,
231 rkey,
232 cid,
233 prev,
234 } => {
235 let mut obj = json!({
236 "action": "update",
237 "path": format!("{}/{}", collection, rkey),
238 "cid": cid.to_string()
239 });
240 if let Some(prev_cid) = prev {
241 obj["prev"] = json!(prev_cid.to_string());
242 }
243 obj
244 }
245 RecordOp::Delete {
246 collection,
247 rkey,
248 prev,
249 } => {
250 let mut obj = json!({
251 "action": "delete",
252 "path": format!("{}/{}", collection, rkey),
253 "cid": null
254 });
255 if let Some(prev_cid) = prev {
256 obj["prev"] = json!(prev_cid.to_string());
257 }
258 obj
259 }
260 })
261 .collect::<Vec<_>>();
262 if is_account_active {
263 let event_type = "commit";
264 let prev_cid_str = current_root_cid.map(|c| c.to_string());
265 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string());
266 let seq_row = sqlx::query!(
267 r#"
268 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid)
269 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
270 RETURNING seq
271 "#,
272 did,
273 event_type,
274 new_root_cid.to_string(),
275 prev_cid_str,
276 json!(ops_json),
277 &[] as &[String],
278 blocks_cids,
279 prev_data_cid_str,
280 )
281 .fetch_one(&mut *tx)
282 .await
283 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
284 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
285 .execute(&mut *tx)
286 .await
287 .map_err(|e| format!("DB Error (notify): {}", e))?;
288 }
289 tx.commit()
290 .await
291 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
292 if is_account_active {
293 let _ = sequence_sync_event(state, did, &new_root_cid.to_string()).await;
294 }
295 Ok(CommitResult {
296 commit_cid: new_root_cid,
297 rev: rev_str,
298 })
299}
300pub async fn create_record_internal(
301 state: &AppState,
302 did: &str,
303 collection: &str,
304 rkey: &str,
305 record: &serde_json::Value,
306) -> Result<(String, Cid), String> {
307 use crate::repo::tracking::TrackingBlockStore;
308 use jacquard_repo::mst::Mst;
309 use std::sync::Arc;
310 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
311 .fetch_optional(&state.db)
312 .await
313 .map_err(|e| format!("DB error: {}", e))?
314 .ok_or_else(|| "User not found".to_string())?;
315 let root_cid_str: String = sqlx::query_scalar!(
316 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
317 user_id
318 )
319 .fetch_optional(&state.db)
320 .await
321 .map_err(|e| format!("DB error: {}", e))?
322 .ok_or_else(|| "Repo not found".to_string())?;
323 let current_root_cid =
324 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?;
325 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
326 let commit_bytes = tracking_store
327 .get(¤t_root_cid)
328 .await
329 .map_err(|e| format!("Failed to fetch commit: {:?}", e))?
330 .ok_or_else(|| "Commit block not found".to_string())?;
331 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes)
332 .map_err(|e| format!("Failed to parse commit: {:?}", e))?;
333 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
334 let mut record_bytes = Vec::new();
335 serde_ipld_dagcbor::to_writer(&mut record_bytes, record)
336 .map_err(|e| format!("Failed to serialize record: {:?}", e))?;
337 let record_cid = tracking_store
338 .put(&record_bytes)
339 .await
340 .map_err(|e| format!("Failed to save record block: {:?}", e))?;
341 let key = format!("{}/{}", collection, rkey);
342 let new_mst = mst
343 .add(&key, record_cid)
344 .await
345 .map_err(|e| format!("Failed to add to MST: {:?}", e))?;
346 let new_mst_root = new_mst
347 .persist()
348 .await
349 .map_err(|e| format!("Failed to persist MST: {:?}", e))?;
350 let op = RecordOp::Create {
351 collection: collection.to_string(),
352 rkey: rkey.to_string(),
353 cid: record_cid,
354 };
355 let mut relevant_blocks = std::collections::BTreeMap::new();
356 new_mst
357 .blocks_for_path(&key, &mut relevant_blocks)
358 .await
359 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?;
360 mst.blocks_for_path(&key, &mut relevant_blocks)
361 .await
362 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?;
363 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
364 let mut written_cids = tracking_store.get_all_relevant_cids();
365 for cid in relevant_blocks.keys() {
366 if !written_cids.contains(cid) {
367 written_cids.push(*cid);
368 }
369 }
370 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
371 let result = commit_and_log(
372 state,
373 CommitParams {
374 did,
375 user_id,
376 current_root_cid: Some(current_root_cid),
377 prev_data_cid: Some(commit.data),
378 new_mst_root,
379 ops: vec![op],
380 blocks_cids: &written_cids_str,
381 },
382 )
383 .await?;
384 let uri = format!("at://{}/{}/{}", did, collection, rkey);
385 Ok((uri, result.commit_cid))
386}
387
388pub async fn sequence_identity_event(
389 state: &AppState,
390 did: &str,
391 handle: Option<&str>,
392) -> Result<i64, String> {
393 let seq_row = sqlx::query!(
394 r#"
395 INSERT INTO repo_seq (did, event_type, handle)
396 VALUES ($1, 'identity', $2)
397 RETURNING seq
398 "#,
399 did,
400 handle,
401 )
402 .fetch_one(&state.db)
403 .await
404 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
405 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
406 .execute(&state.db)
407 .await
408 .map_err(|e| format!("DB Error (notify): {}", e))?;
409 Ok(seq_row.seq)
410}
411pub async fn sequence_account_event(
412 state: &AppState,
413 did: &str,
414 active: bool,
415 status: Option<&str>,
416) -> Result<i64, String> {
417 let seq_row = sqlx::query!(
418 r#"
419 INSERT INTO repo_seq (did, event_type, active, status)
420 VALUES ($1, 'account', $2, $3)
421 RETURNING seq
422 "#,
423 did,
424 active,
425 status,
426 )
427 .fetch_one(&state.db)
428 .await
429 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
430 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
431 .execute(&state.db)
432 .await
433 .map_err(|e| format!("DB Error (notify): {}", e))?;
434 Ok(seq_row.seq)
435}
436pub async fn sequence_sync_event(
437 state: &AppState,
438 did: &str,
439 commit_cid: &str,
440) -> Result<i64, String> {
441 let seq_row = sqlx::query!(
442 r#"
443 INSERT INTO repo_seq (did, event_type, commit_cid)
444 VALUES ($1, 'sync', $2)
445 RETURNING seq
446 "#,
447 did,
448 commit_cid,
449 )
450 .fetch_one(&state.db)
451 .await
452 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
453 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
454 .execute(&state.db)
455 .await
456 .map_err(|e| format!("DB Error (notify): {}", e))?;
457 Ok(seq_row.seq)
458}
459
460pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> {
461 let repo_root = sqlx::query_scalar!(
462 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
463 did
464 )
465 .fetch_optional(&state.db)
466 .await
467 .map_err(|e| format!("DB Error fetching repo root: {}", e))?
468 .ok_or_else(|| "Repo not found".to_string())?;
469 let ops = serde_json::json!([]);
470 let blobs: Vec<String> = vec![];
471 let blocks_cids: Vec<String> = vec![];
472 let seq_row = sqlx::query!(
473 r#"
474 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)
475 VALUES ($1, 'commit', $2, $2, $3, $4, $5)
476 RETURNING seq
477 "#,
478 did,
479 repo_root,
480 ops,
481 &blobs,
482 &blocks_cids
483 )
484 .fetch_one(&state.db)
485 .await
486 .map_err(|e| format!("DB Error (repo_seq empty commit): {}", e))?;
487 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
488 .execute(&state.db)
489 .await
490 .map_err(|e| format!("DB Error (notify): {}", e))?;
491 Ok(seq_row.seq)
492}