this repo has no description
1use crate::state::AppState;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard::types::{integer::LimitedU32, string::Tid};
5use jacquard_repo::commit::Commit;
6use jacquard_repo::storage::BlockStore;
7use k256::ecdsa::SigningKey;
8use serde_json::{Value, json};
9use std::str::FromStr;
10use uuid::Uuid;
11
12pub fn extract_blob_cids(record: &Value) -> Vec<String> {
13 let mut blobs = Vec::new();
14 extract_blob_cids_recursive(record, &mut blobs);
15 blobs
16}
17
18fn extract_blob_cids_recursive(value: &Value, blobs: &mut Vec<String>) {
19 match value {
20 Value::Object(map) => {
21 if map.get("$type").and_then(|v| v.as_str()) == Some("blob")
22 && let Some(ref_obj) = map.get("ref")
23 && let Some(link) = ref_obj.get("$link").and_then(|v| v.as_str())
24 {
25 blobs.push(link.to_string());
26 }
27 for v in map.values() {
28 extract_blob_cids_recursive(v, blobs);
29 }
30 }
31 Value::Array(arr) => {
32 for v in arr {
33 extract_blob_cids_recursive(v, blobs);
34 }
35 }
36 _ => {}
37 }
38}
39
40pub fn create_signed_commit(
41 did: &str,
42 data: Cid,
43 rev: &str,
44 prev: Option<Cid>,
45 signing_key: &SigningKey,
46) -> Result<(Vec<u8>, Bytes), String> {
47 let did =
48 jacquard::types::string::Did::new(did).map_err(|e| format!("Invalid DID: {:?}", e))?;
49 let rev =
50 jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?;
51 let unsigned = Commit::new_unsigned(did, data, rev, prev);
52 let signed = unsigned
53 .sign(signing_key)
54 .map_err(|e| format!("Failed to sign commit: {:?}", e))?;
55 let sig_bytes = signed.sig().clone();
56 let signed_bytes = signed
57 .to_cbor()
58 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?;
59 Ok((signed_bytes, sig_bytes))
60}
61
62pub enum RecordOp {
63 Create {
64 collection: String,
65 rkey: String,
66 cid: Cid,
67 },
68 Update {
69 collection: String,
70 rkey: String,
71 cid: Cid,
72 prev: Option<Cid>,
73 },
74 Delete {
75 collection: String,
76 rkey: String,
77 prev: Option<Cid>,
78 },
79}
80
81pub struct CommitResult {
82 pub commit_cid: Cid,
83 pub rev: String,
84}
85
86pub struct CommitParams<'a> {
87 pub did: &'a str,
88 pub user_id: Uuid,
89 pub current_root_cid: Option<Cid>,
90 pub prev_data_cid: Option<Cid>,
91 pub new_mst_root: Cid,
92 pub ops: Vec<RecordOp>,
93 pub blocks_cids: &'a [String],
94 pub blobs: &'a [String],
95}
96
97pub async fn commit_and_log(
98 state: &AppState,
99 params: CommitParams<'_>,
100) -> Result<CommitResult, String> {
101 let CommitParams {
102 did,
103 user_id,
104 current_root_cid,
105 prev_data_cid,
106 new_mst_root,
107 ops,
108 blocks_cids,
109 blobs,
110 } = params;
111 let key_row = sqlx::query!(
112 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
113 user_id
114 )
115 .fetch_one(&state.db)
116 .await
117 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
118 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
119 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
120 let signing_key =
121 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?;
122 let rev = Tid::now(LimitedU32::MIN);
123 let rev_str = rev.to_string();
124 let (new_commit_bytes, _sig) =
125 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?;
126 let new_root_cid = state
127 .block_store
128 .put(&new_commit_bytes)
129 .await
130 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
131 let mut tx = state
132 .db
133 .begin()
134 .await
135 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
136 let lock_result = sqlx::query!(
137 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
138 user_id
139 )
140 .fetch_optional(&mut *tx)
141 .await;
142 match lock_result {
143 Err(e) => {
144 if let Some(db_err) = e.as_database_error()
145 && db_err.code().as_deref() == Some("55P03")
146 {
147 return Err(
148 "ConcurrentModification: Another request is modifying this repo".to_string(),
149 );
150 }
151 return Err(format!("Failed to acquire repo lock: {}", e));
152 }
153 Ok(Some(row)) => {
154 if let Some(expected_root) = ¤t_root_cid
155 && row.repo_root_cid != expected_root.to_string()
156 {
157 return Err(
158 "ConcurrentModification: Repo has been modified since last read".to_string(),
159 );
160 }
161 }
162 Ok(None) => {
163 return Err("Repo not found".to_string());
164 }
165 }
166 let is_account_active = sqlx::query_scalar!(
167 "SELECT deactivated_at IS NULL FROM users WHERE id = $1",
168 user_id
169 )
170 .fetch_optional(&mut *tx)
171 .await
172 .map_err(|e| format!("Failed to check account status: {}", e))?
173 .flatten()
174 .unwrap_or(false);
175 sqlx::query!(
176 "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2",
177 new_root_cid.to_string(),
178 user_id
179 )
180 .execute(&mut *tx)
181 .await
182 .map_err(|e| format!("DB Error (repos): {}", e))?;
183 let mut upsert_collections: Vec<String> = Vec::new();
184 let mut upsert_rkeys: Vec<String> = Vec::new();
185 let mut upsert_cids: Vec<String> = Vec::new();
186 let mut delete_collections: Vec<String> = Vec::new();
187 let mut delete_rkeys: Vec<String> = Vec::new();
188 for op in &ops {
189 match op {
190 RecordOp::Create {
191 collection,
192 rkey,
193 cid,
194 }
195 | RecordOp::Update {
196 collection,
197 rkey,
198 cid,
199 ..
200 } => {
201 upsert_collections.push(collection.clone());
202 upsert_rkeys.push(rkey.clone());
203 upsert_cids.push(cid.to_string());
204 }
205 RecordOp::Delete {
206 collection, rkey, ..
207 } => {
208 delete_collections.push(collection.clone());
209 delete_rkeys.push(rkey.clone());
210 }
211 }
212 }
213 if !upsert_collections.is_empty() {
214 sqlx::query!(
215 r#"
216 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev)
217 SELECT $1, collection, rkey, record_cid, $5
218 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid)
219 ON CONFLICT (repo_id, collection, rkey) DO UPDATE
220 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW()
221 "#,
222 user_id,
223 &upsert_collections,
224 &upsert_rkeys,
225 &upsert_cids,
226 rev_str
227 )
228 .execute(&mut *tx)
229 .await
230 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?;
231 }
232 if !delete_collections.is_empty() {
233 sqlx::query!(
234 r#"
235 DELETE FROM records
236 WHERE repo_id = $1
237 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[]))
238 "#,
239 user_id,
240 &delete_collections,
241 &delete_rkeys
242 )
243 .execute(&mut *tx)
244 .await
245 .map_err(|e| format!("DB Error (records batch delete): {}", e))?;
246 }
247 let ops_json = ops
248 .iter()
249 .map(|op| match op {
250 RecordOp::Create {
251 collection,
252 rkey,
253 cid,
254 } => json!({
255 "action": "create",
256 "path": format!("{}/{}", collection, rkey),
257 "cid": cid.to_string()
258 }),
259 RecordOp::Update {
260 collection,
261 rkey,
262 cid,
263 prev,
264 } => {
265 let mut obj = json!({
266 "action": "update",
267 "path": format!("{}/{}", collection, rkey),
268 "cid": cid.to_string()
269 });
270 if let Some(prev_cid) = prev {
271 obj["prev"] = json!(prev_cid.to_string());
272 }
273 obj
274 }
275 RecordOp::Delete {
276 collection,
277 rkey,
278 prev,
279 } => {
280 let mut obj = json!({
281 "action": "delete",
282 "path": format!("{}/{}", collection, rkey),
283 "cid": null
284 });
285 if let Some(prev_cid) = prev {
286 obj["prev"] = json!(prev_cid.to_string());
287 }
288 obj
289 }
290 })
291 .collect::<Vec<_>>();
292 if is_account_active {
293 let event_type = "commit";
294 let prev_cid_str = current_root_cid.map(|c| c.to_string());
295 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string());
296 let seq_row = sqlx::query!(
297 r#"
298 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid)
299 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
300 RETURNING seq
301 "#,
302 did,
303 event_type,
304 new_root_cid.to_string(),
305 prev_cid_str,
306 json!(ops_json),
307 blobs,
308 blocks_cids,
309 prev_data_cid_str,
310 )
311 .fetch_one(&mut *tx)
312 .await
313 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
314 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
315 .execute(&mut *tx)
316 .await
317 .map_err(|e| format!("DB Error (notify): {}", e))?;
318 }
319 tx.commit()
320 .await
321 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
322 if is_account_active {
323 let _ = sequence_sync_event(state, did, &new_root_cid.to_string(), Some(&rev_str)).await;
324 }
325 Ok(CommitResult {
326 commit_cid: new_root_cid,
327 rev: rev_str,
328 })
329}
330pub async fn create_record_internal(
331 state: &AppState,
332 did: &str,
333 collection: &str,
334 rkey: &str,
335 record: &serde_json::Value,
336) -> Result<(String, Cid), String> {
337 use crate::repo::tracking::TrackingBlockStore;
338 use jacquard_repo::mst::Mst;
339 use std::sync::Arc;
340 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
341 .fetch_optional(&state.db)
342 .await
343 .map_err(|e| format!("DB error: {}", e))?
344 .ok_or_else(|| "User not found".to_string())?;
345 let root_cid_str: String = sqlx::query_scalar!(
346 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
347 user_id
348 )
349 .fetch_optional(&state.db)
350 .await
351 .map_err(|e| format!("DB error: {}", e))?
352 .ok_or_else(|| "Repo not found".to_string())?;
353 let current_root_cid =
354 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?;
355 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
356 let commit_bytes = tracking_store
357 .get(¤t_root_cid)
358 .await
359 .map_err(|e| format!("Failed to fetch commit: {:?}", e))?
360 .ok_or_else(|| "Commit block not found".to_string())?;
361 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes)
362 .map_err(|e| format!("Failed to parse commit: {:?}", e))?;
363 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
364 let mut record_bytes = Vec::new();
365 serde_ipld_dagcbor::to_writer(&mut record_bytes, record)
366 .map_err(|e| format!("Failed to serialize record: {:?}", e))?;
367 let record_cid = tracking_store
368 .put(&record_bytes)
369 .await
370 .map_err(|e| format!("Failed to save record block: {:?}", e))?;
371 let key = format!("{}/{}", collection, rkey);
372 let new_mst = mst
373 .add(&key, record_cid)
374 .await
375 .map_err(|e| format!("Failed to add to MST: {:?}", e))?;
376 let new_mst_root = new_mst
377 .persist()
378 .await
379 .map_err(|e| format!("Failed to persist MST: {:?}", e))?;
380 let op = RecordOp::Create {
381 collection: collection.to_string(),
382 rkey: rkey.to_string(),
383 cid: record_cid,
384 };
385 let mut relevant_blocks = std::collections::BTreeMap::new();
386 new_mst
387 .blocks_for_path(&key, &mut relevant_blocks)
388 .await
389 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?;
390 mst.blocks_for_path(&key, &mut relevant_blocks)
391 .await
392 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?;
393 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
394 let mut written_cids = tracking_store.get_all_relevant_cids();
395 for cid in relevant_blocks.keys() {
396 if !written_cids.contains(cid) {
397 written_cids.push(*cid);
398 }
399 }
400 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
401 let blob_cids = extract_blob_cids(record);
402 let result = commit_and_log(
403 state,
404 CommitParams {
405 did,
406 user_id,
407 current_root_cid: Some(current_root_cid),
408 prev_data_cid: Some(commit.data),
409 new_mst_root,
410 ops: vec![op],
411 blocks_cids: &written_cids_str,
412 blobs: &blob_cids,
413 },
414 )
415 .await?;
416 let uri = format!("at://{}/{}/{}", did, collection, rkey);
417 Ok((uri, result.commit_cid))
418}
419
420pub async fn sequence_identity_event(
421 state: &AppState,
422 did: &str,
423 handle: Option<&str>,
424) -> Result<i64, String> {
425 let seq_row = sqlx::query!(
426 r#"
427 INSERT INTO repo_seq (did, event_type, handle)
428 VALUES ($1, 'identity', $2)
429 RETURNING seq
430 "#,
431 did,
432 handle,
433 )
434 .fetch_one(&state.db)
435 .await
436 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
437 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
438 .execute(&state.db)
439 .await
440 .map_err(|e| format!("DB Error (notify): {}", e))?;
441 Ok(seq_row.seq)
442}
443pub async fn sequence_account_event(
444 state: &AppState,
445 did: &str,
446 active: bool,
447 status: Option<&str>,
448) -> Result<i64, String> {
449 let seq_row = sqlx::query!(
450 r#"
451 INSERT INTO repo_seq (did, event_type, active, status)
452 VALUES ($1, 'account', $2, $3)
453 RETURNING seq
454 "#,
455 did,
456 active,
457 status,
458 )
459 .fetch_one(&state.db)
460 .await
461 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
462 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
463 .execute(&state.db)
464 .await
465 .map_err(|e| format!("DB Error (notify): {}", e))?;
466 Ok(seq_row.seq)
467}
468pub async fn sequence_sync_event(
469 state: &AppState,
470 did: &str,
471 commit_cid: &str,
472 rev: Option<&str>,
473) -> Result<i64, String> {
474 let seq_row = sqlx::query!(
475 r#"
476 INSERT INTO repo_seq (did, event_type, commit_cid, rev)
477 VALUES ($1, 'sync', $2, $3)
478 RETURNING seq
479 "#,
480 did,
481 commit_cid,
482 rev,
483 )
484 .fetch_one(&state.db)
485 .await
486 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
487 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
488 .execute(&state.db)
489 .await
490 .map_err(|e| format!("DB Error (notify): {}", e))?;
491 Ok(seq_row.seq)
492}
493
494pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> {
495 let repo_root = sqlx::query_scalar!(
496 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
497 did
498 )
499 .fetch_optional(&state.db)
500 .await
501 .map_err(|e| format!("DB Error fetching repo root: {}", e))?
502 .ok_or_else(|| "Repo not found".to_string())?;
503 let ops = serde_json::json!([]);
504 let blobs: Vec<String> = vec![];
505 let blocks_cids: Vec<String> = vec![];
506 let seq_row = sqlx::query!(
507 r#"
508 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)
509 VALUES ($1, 'commit', $2, $2, $3, $4, $5)
510 RETURNING seq
511 "#,
512 did,
513 repo_root,
514 ops,
515 &blobs,
516 &blocks_cids
517 )
518 .fetch_one(&state.db)
519 .await
520 .map_err(|e| format!("DB Error (repo_seq empty commit): {}", e))?;
521 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
522 .execute(&state.db)
523 .await
524 .map_err(|e| format!("DB Error (notify): {}", e))?;
525 Ok(seq_row.seq)
526}