this repo has no description
1use crate::state::AppState;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard::types::{integer::LimitedU32, string::Tid};
5use jacquard_repo::commit::Commit;
6use jacquard_repo::storage::BlockStore;
7use k256::ecdsa::SigningKey;
8use serde_json::{Value, json};
9use std::str::FromStr;
10use uuid::Uuid;
11
12pub fn extract_blob_cids(record: &Value) -> Vec<String> {
13 let mut blobs = Vec::new();
14 extract_blob_cids_recursive(record, &mut blobs);
15 blobs
16}
17
18fn extract_blob_cids_recursive(value: &Value, blobs: &mut Vec<String>) {
19 match value {
20 Value::Object(map) => {
21 if map.get("$type").and_then(|v| v.as_str()) == Some("blob")
22 && let Some(ref_obj) = map.get("ref")
23 && let Some(link) = ref_obj.get("$link").and_then(|v| v.as_str())
24 {
25 blobs.push(link.to_string());
26 }
27 for v in map.values() {
28 extract_blob_cids_recursive(v, blobs);
29 }
30 }
31 Value::Array(arr) => {
32 for v in arr {
33 extract_blob_cids_recursive(v, blobs);
34 }
35 }
36 _ => {}
37 }
38}
39
40pub fn create_signed_commit(
41 did: &str,
42 data: Cid,
43 rev: &str,
44 prev: Option<Cid>,
45 signing_key: &SigningKey,
46) -> Result<(Vec<u8>, Bytes), String> {
47 let did =
48 jacquard::types::string::Did::new(did).map_err(|e| format!("Invalid DID: {:?}", e))?;
49 let rev =
50 jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?;
51 let unsigned = Commit::new_unsigned(did, data, rev, prev);
52 let signed = unsigned
53 .sign(signing_key)
54 .map_err(|e| format!("Failed to sign commit: {:?}", e))?;
55 let sig_bytes = signed.sig().clone();
56 let signed_bytes = signed
57 .to_cbor()
58 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?;
59 Ok((signed_bytes, sig_bytes))
60}
61
62pub enum RecordOp {
63 Create {
64 collection: String,
65 rkey: String,
66 cid: Cid,
67 },
68 Update {
69 collection: String,
70 rkey: String,
71 cid: Cid,
72 prev: Option<Cid>,
73 },
74 Delete {
75 collection: String,
76 rkey: String,
77 prev: Option<Cid>,
78 },
79}
80
81pub struct CommitResult {
82 pub commit_cid: Cid,
83 pub rev: String,
84}
85
86pub struct CommitParams<'a> {
87 pub did: &'a str,
88 pub user_id: Uuid,
89 pub current_root_cid: Option<Cid>,
90 pub prev_data_cid: Option<Cid>,
91 pub new_mst_root: Cid,
92 pub ops: Vec<RecordOp>,
93 pub blocks_cids: &'a [String],
94 pub blobs: &'a [String],
95}
96
97pub async fn commit_and_log(
98 state: &AppState,
99 params: CommitParams<'_>,
100) -> Result<CommitResult, String> {
101 let CommitParams {
102 did,
103 user_id,
104 current_root_cid,
105 prev_data_cid,
106 new_mst_root,
107 ops,
108 blocks_cids,
109 blobs,
110 } = params;
111 let key_row = sqlx::query!(
112 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
113 user_id
114 )
115 .fetch_one(&state.db)
116 .await
117 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
118 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
119 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
120 let signing_key =
121 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?;
122 let rev = Tid::now(LimitedU32::MIN);
123 let rev_str = rev.to_string();
124 let (new_commit_bytes, _sig) =
125 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?;
126 let new_root_cid = state
127 .block_store
128 .put(&new_commit_bytes)
129 .await
130 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
131 let mut tx = state
132 .db
133 .begin()
134 .await
135 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
136 let lock_result = sqlx::query!(
137 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
138 user_id
139 )
140 .fetch_optional(&mut *tx)
141 .await;
142 match lock_result {
143 Err(e) => {
144 if let Some(db_err) = e.as_database_error()
145 && db_err.code().as_deref() == Some("55P03")
146 {
147 return Err(
148 "ConcurrentModification: Another request is modifying this repo".to_string(),
149 );
150 }
151 return Err(format!("Failed to acquire repo lock: {}", e));
152 }
153 Ok(Some(row)) => {
154 if let Some(expected_root) = ¤t_root_cid
155 && row.repo_root_cid != expected_root.to_string()
156 {
157 return Err(
158 "ConcurrentModification: Repo has been modified since last read".to_string(),
159 );
160 }
161 }
162 Ok(None) => {
163 return Err("Repo not found".to_string());
164 }
165 }
166 let is_account_active = sqlx::query_scalar!(
167 "SELECT deactivated_at IS NULL FROM users WHERE id = $1",
168 user_id
169 )
170 .fetch_optional(&mut *tx)
171 .await
172 .map_err(|e| format!("Failed to check account status: {}", e))?
173 .flatten()
174 .unwrap_or(false);
175 sqlx::query!(
176 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3",
177 new_root_cid.to_string(),
178 &rev_str,
179 user_id
180 )
181 .execute(&mut *tx)
182 .await
183 .map_err(|e| format!("DB Error (repos): {}", e))?;
184 let mut all_block_cids: Vec<Vec<u8>> = blocks_cids
185 .iter()
186 .filter_map(|s| Cid::from_str(s).ok())
187 .map(|c| c.to_bytes())
188 .collect();
189 all_block_cids.push(new_root_cid.to_bytes());
190 if !all_block_cids.is_empty() {
191 sqlx::query!(
192 r#"
193 INSERT INTO user_blocks (user_id, block_cid)
194 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
195 ON CONFLICT (user_id, block_cid) DO NOTHING
196 "#,
197 user_id,
198 &all_block_cids
199 )
200 .execute(&mut *tx)
201 .await
202 .map_err(|e| format!("DB Error (user_blocks): {}", e))?;
203 }
204 let mut upsert_collections: Vec<String> = Vec::new();
205 let mut upsert_rkeys: Vec<String> = Vec::new();
206 let mut upsert_cids: Vec<String> = Vec::new();
207 let mut delete_collections: Vec<String> = Vec::new();
208 let mut delete_rkeys: Vec<String> = Vec::new();
209 for op in &ops {
210 match op {
211 RecordOp::Create {
212 collection,
213 rkey,
214 cid,
215 }
216 | RecordOp::Update {
217 collection,
218 rkey,
219 cid,
220 ..
221 } => {
222 upsert_collections.push(collection.clone());
223 upsert_rkeys.push(rkey.clone());
224 upsert_cids.push(cid.to_string());
225 }
226 RecordOp::Delete {
227 collection, rkey, ..
228 } => {
229 delete_collections.push(collection.clone());
230 delete_rkeys.push(rkey.clone());
231 }
232 }
233 }
234 if !upsert_collections.is_empty() {
235 sqlx::query!(
236 r#"
237 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev)
238 SELECT $1, collection, rkey, record_cid, $5
239 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid)
240 ON CONFLICT (repo_id, collection, rkey) DO UPDATE
241 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW()
242 "#,
243 user_id,
244 &upsert_collections,
245 &upsert_rkeys,
246 &upsert_cids,
247 rev_str
248 )
249 .execute(&mut *tx)
250 .await
251 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?;
252 }
253 if !delete_collections.is_empty() {
254 sqlx::query!(
255 r#"
256 DELETE FROM records
257 WHERE repo_id = $1
258 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[]))
259 "#,
260 user_id,
261 &delete_collections,
262 &delete_rkeys
263 )
264 .execute(&mut *tx)
265 .await
266 .map_err(|e| format!("DB Error (records batch delete): {}", e))?;
267 }
268 let ops_json = ops
269 .iter()
270 .map(|op| match op {
271 RecordOp::Create {
272 collection,
273 rkey,
274 cid,
275 } => json!({
276 "action": "create",
277 "path": format!("{}/{}", collection, rkey),
278 "cid": cid.to_string()
279 }),
280 RecordOp::Update {
281 collection,
282 rkey,
283 cid,
284 prev,
285 } => {
286 let mut obj = json!({
287 "action": "update",
288 "path": format!("{}/{}", collection, rkey),
289 "cid": cid.to_string()
290 });
291 if let Some(prev_cid) = prev {
292 obj["prev"] = json!(prev_cid.to_string());
293 }
294 obj
295 }
296 RecordOp::Delete {
297 collection,
298 rkey,
299 prev,
300 } => {
301 let mut obj = json!({
302 "action": "delete",
303 "path": format!("{}/{}", collection, rkey),
304 "cid": null
305 });
306 if let Some(prev_cid) = prev {
307 obj["prev"] = json!(prev_cid.to_string());
308 }
309 obj
310 }
311 })
312 .collect::<Vec<_>>();
313 if is_account_active {
314 let event_type = "commit";
315 let prev_cid_str = current_root_cid.map(|c| c.to_string());
316 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string());
317 let seq_row = sqlx::query!(
318 r#"
319 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid)
320 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
321 RETURNING seq
322 "#,
323 did,
324 event_type,
325 new_root_cid.to_string(),
326 prev_cid_str,
327 json!(ops_json),
328 blobs,
329 blocks_cids,
330 prev_data_cid_str,
331 )
332 .fetch_one(&mut *tx)
333 .await
334 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
335 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
336 .execute(&mut *tx)
337 .await
338 .map_err(|e| format!("DB Error (notify): {}", e))?;
339 }
340 tx.commit()
341 .await
342 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
343 if is_account_active {
344 let _ = sequence_sync_event(state, did, &new_root_cid.to_string(), Some(&rev_str)).await;
345 }
346 Ok(CommitResult {
347 commit_cid: new_root_cid,
348 rev: rev_str,
349 })
350}
351pub async fn create_record_internal(
352 state: &AppState,
353 did: &str,
354 collection: &str,
355 rkey: &str,
356 record: &serde_json::Value,
357) -> Result<(String, Cid), String> {
358 use crate::repo::tracking::TrackingBlockStore;
359 use jacquard_repo::mst::Mst;
360 use std::sync::Arc;
361 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
362 .fetch_optional(&state.db)
363 .await
364 .map_err(|e| format!("DB error: {}", e))?
365 .ok_or_else(|| "User not found".to_string())?;
366 let root_cid_str: String = sqlx::query_scalar!(
367 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
368 user_id
369 )
370 .fetch_optional(&state.db)
371 .await
372 .map_err(|e| format!("DB error: {}", e))?
373 .ok_or_else(|| "Repo not found".to_string())?;
374 let current_root_cid =
375 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?;
376 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
377 let commit_bytes = tracking_store
378 .get(¤t_root_cid)
379 .await
380 .map_err(|e| format!("Failed to fetch commit: {:?}", e))?
381 .ok_or_else(|| "Commit block not found".to_string())?;
382 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes)
383 .map_err(|e| format!("Failed to parse commit: {:?}", e))?;
384 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
385 let mut record_bytes = Vec::new();
386 serde_ipld_dagcbor::to_writer(&mut record_bytes, record)
387 .map_err(|e| format!("Failed to serialize record: {:?}", e))?;
388 let record_cid = tracking_store
389 .put(&record_bytes)
390 .await
391 .map_err(|e| format!("Failed to save record block: {:?}", e))?;
392 let key = format!("{}/{}", collection, rkey);
393 let new_mst = mst
394 .add(&key, record_cid)
395 .await
396 .map_err(|e| format!("Failed to add to MST: {:?}", e))?;
397 let new_mst_root = new_mst
398 .persist()
399 .await
400 .map_err(|e| format!("Failed to persist MST: {:?}", e))?;
401 let op = RecordOp::Create {
402 collection: collection.to_string(),
403 rkey: rkey.to_string(),
404 cid: record_cid,
405 };
406 let mut relevant_blocks = std::collections::BTreeMap::new();
407 new_mst
408 .blocks_for_path(&key, &mut relevant_blocks)
409 .await
410 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?;
411 mst.blocks_for_path(&key, &mut relevant_blocks)
412 .await
413 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?;
414 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
415 let mut written_cids = tracking_store.get_all_relevant_cids();
416 for cid in relevant_blocks.keys() {
417 if !written_cids.contains(cid) {
418 written_cids.push(*cid);
419 }
420 }
421 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
422 let blob_cids = extract_blob_cids(record);
423 let result = commit_and_log(
424 state,
425 CommitParams {
426 did,
427 user_id,
428 current_root_cid: Some(current_root_cid),
429 prev_data_cid: Some(commit.data),
430 new_mst_root,
431 ops: vec![op],
432 blocks_cids: &written_cids_str,
433 blobs: &blob_cids,
434 },
435 )
436 .await?;
437 let uri = format!("at://{}/{}/{}", did, collection, rkey);
438 Ok((uri, result.commit_cid))
439}
440
441pub async fn sequence_identity_event(
442 state: &AppState,
443 did: &str,
444 handle: Option<&str>,
445) -> Result<i64, String> {
446 let seq_row = sqlx::query!(
447 r#"
448 INSERT INTO repo_seq (did, event_type, handle)
449 VALUES ($1, 'identity', $2)
450 RETURNING seq
451 "#,
452 did,
453 handle,
454 )
455 .fetch_one(&state.db)
456 .await
457 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
458 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
459 .execute(&state.db)
460 .await
461 .map_err(|e| format!("DB Error (notify): {}", e))?;
462 Ok(seq_row.seq)
463}
464pub async fn sequence_account_event(
465 state: &AppState,
466 did: &str,
467 active: bool,
468 status: Option<&str>,
469) -> Result<i64, String> {
470 let seq_row = sqlx::query!(
471 r#"
472 INSERT INTO repo_seq (did, event_type, active, status)
473 VALUES ($1, 'account', $2, $3)
474 RETURNING seq
475 "#,
476 did,
477 active,
478 status,
479 )
480 .fetch_one(&state.db)
481 .await
482 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
483 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
484 .execute(&state.db)
485 .await
486 .map_err(|e| format!("DB Error (notify): {}", e))?;
487 Ok(seq_row.seq)
488}
489pub async fn sequence_sync_event(
490 state: &AppState,
491 did: &str,
492 commit_cid: &str,
493 rev: Option<&str>,
494) -> Result<i64, String> {
495 let seq_row = sqlx::query!(
496 r#"
497 INSERT INTO repo_seq (did, event_type, commit_cid, rev)
498 VALUES ($1, 'sync', $2, $3)
499 RETURNING seq
500 "#,
501 did,
502 commit_cid,
503 rev,
504 )
505 .fetch_one(&state.db)
506 .await
507 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
508 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
509 .execute(&state.db)
510 .await
511 .map_err(|e| format!("DB Error (notify): {}", e))?;
512 Ok(seq_row.seq)
513}
514
515pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> {
516 let repo_info = sqlx::query!(
517 "SELECT r.repo_root_cid, r.repo_rev FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
518 did
519 )
520 .fetch_optional(&state.db)
521 .await
522 .map_err(|e| format!("DB Error fetching repo root: {}", e))?
523 .ok_or_else(|| "Repo not found".to_string())?;
524 let ops = serde_json::json!([]);
525 let blobs: Vec<String> = vec![];
526 let blocks_cids: Vec<String> = vec![];
527 let prev_cid: Option<&str> = None;
528 let seq_row = sqlx::query!(
529 r#"
530 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev)
531 VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7)
532 RETURNING seq
533 "#,
534 did,
535 repo_info.repo_root_cid,
536 prev_cid,
537 ops,
538 &blobs,
539 &blocks_cids,
540 repo_info.repo_rev
541 )
542 .fetch_one(&state.db)
543 .await
544 .map_err(|e| format!("DB Error (repo_seq empty commit): {}", e))?;
545 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
546 .execute(&state.db)
547 .await
548 .map_err(|e| format!("DB Error (notify): {}", e))?;
549 Ok(seq_row.seq)
550}