this repo has no description
1use crate::state::AppState;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard::types::{integer::LimitedU32, string::Tid};
5use jacquard_repo::commit::Commit;
6use jacquard_repo::storage::BlockStore;
7use k256::ecdsa::SigningKey;
8use serde_json::{json, Value};
9use std::str::FromStr;
10use uuid::Uuid;
11
12pub fn extract_blob_cids(record: &Value) -> Vec<String> {
13 let mut blobs = Vec::new();
14 extract_blob_cids_recursive(record, &mut blobs);
15 blobs
16}
17
18fn extract_blob_cids_recursive(value: &Value, blobs: &mut Vec<String>) {
19 match value {
20 Value::Object(map) => {
21 if map.get("$type").and_then(|v| v.as_str()) == Some("blob") {
22 if let Some(ref_obj) = map.get("ref") {
23 if let Some(link) = ref_obj.get("$link").and_then(|v| v.as_str()) {
24 blobs.push(link.to_string());
25 }
26 }
27 }
28 for v in map.values() {
29 extract_blob_cids_recursive(v, blobs);
30 }
31 }
32 Value::Array(arr) => {
33 for v in arr {
34 extract_blob_cids_recursive(v, blobs);
35 }
36 }
37 _ => {}
38 }
39}
40
41pub fn create_signed_commit(
42 did: &str,
43 data: Cid,
44 rev: &str,
45 prev: Option<Cid>,
46 signing_key: &SigningKey,
47) -> Result<(Vec<u8>, Bytes), String> {
48 let did =
49 jacquard::types::string::Did::new(did).map_err(|e| format!("Invalid DID: {:?}", e))?;
50 let rev =
51 jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?;
52 let unsigned = Commit::new_unsigned(did, data, rev, prev);
53 let signed = unsigned
54 .sign(signing_key)
55 .map_err(|e| format!("Failed to sign commit: {:?}", e))?;
56 let sig_bytes = signed.sig().clone();
57 let signed_bytes = signed
58 .to_cbor()
59 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?;
60 Ok((signed_bytes, sig_bytes))
61}
62
63pub enum RecordOp {
64 Create {
65 collection: String,
66 rkey: String,
67 cid: Cid,
68 },
69 Update {
70 collection: String,
71 rkey: String,
72 cid: Cid,
73 prev: Option<Cid>,
74 },
75 Delete {
76 collection: String,
77 rkey: String,
78 prev: Option<Cid>,
79 },
80}
81
82pub struct CommitResult {
83 pub commit_cid: Cid,
84 pub rev: String,
85}
86
87pub struct CommitParams<'a> {
88 pub did: &'a str,
89 pub user_id: Uuid,
90 pub current_root_cid: Option<Cid>,
91 pub prev_data_cid: Option<Cid>,
92 pub new_mst_root: Cid,
93 pub ops: Vec<RecordOp>,
94 pub blocks_cids: &'a [String],
95 pub blobs: &'a [String],
96}
97
98pub async fn commit_and_log(
99 state: &AppState,
100 params: CommitParams<'_>,
101) -> Result<CommitResult, String> {
102 let CommitParams {
103 did,
104 user_id,
105 current_root_cid,
106 prev_data_cid,
107 new_mst_root,
108 ops,
109 blocks_cids,
110 blobs,
111 } = params;
112 let key_row = sqlx::query!(
113 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1",
114 user_id
115 )
116 .fetch_one(&state.db)
117 .await
118 .map_err(|e| format!("Failed to fetch signing key: {}", e))?;
119 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version)
120 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?;
121 let signing_key =
122 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?;
123 let rev = Tid::now(LimitedU32::MIN);
124 let rev_str = rev.to_string();
125 let (new_commit_bytes, _sig) =
126 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?;
127 let new_root_cid = state
128 .block_store
129 .put(&new_commit_bytes)
130 .await
131 .map_err(|e| format!("Failed to save commit block: {:?}", e))?;
132 let mut tx = state
133 .db
134 .begin()
135 .await
136 .map_err(|e| format!("Failed to begin transaction: {}", e))?;
137 let lock_result = sqlx::query!(
138 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT",
139 user_id
140 )
141 .fetch_optional(&mut *tx)
142 .await;
143 match lock_result {
144 Err(e) => {
145 if let Some(db_err) = e.as_database_error()
146 && db_err.code().as_deref() == Some("55P03")
147 {
148 return Err(
149 "ConcurrentModification: Another request is modifying this repo".to_string(),
150 );
151 }
152 return Err(format!("Failed to acquire repo lock: {}", e));
153 }
154 Ok(Some(row)) => {
155 if let Some(expected_root) = ¤t_root_cid
156 && row.repo_root_cid != expected_root.to_string()
157 {
158 return Err(
159 "ConcurrentModification: Repo has been modified since last read".to_string(),
160 );
161 }
162 }
163 Ok(None) => {
164 return Err("Repo not found".to_string());
165 }
166 }
167 let is_account_active = sqlx::query_scalar!(
168 "SELECT deactivated_at IS NULL FROM users WHERE id = $1",
169 user_id
170 )
171 .fetch_optional(&mut *tx)
172 .await
173 .map_err(|e| format!("Failed to check account status: {}", e))?
174 .flatten()
175 .unwrap_or(false);
176 sqlx::query!(
177 "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2",
178 new_root_cid.to_string(),
179 user_id
180 )
181 .execute(&mut *tx)
182 .await
183 .map_err(|e| format!("DB Error (repos): {}", e))?;
184 let mut upsert_collections: Vec<String> = Vec::new();
185 let mut upsert_rkeys: Vec<String> = Vec::new();
186 let mut upsert_cids: Vec<String> = Vec::new();
187 let mut delete_collections: Vec<String> = Vec::new();
188 let mut delete_rkeys: Vec<String> = Vec::new();
189 for op in &ops {
190 match op {
191 RecordOp::Create {
192 collection,
193 rkey,
194 cid,
195 }
196 | RecordOp::Update {
197 collection,
198 rkey,
199 cid,
200 ..
201 } => {
202 upsert_collections.push(collection.clone());
203 upsert_rkeys.push(rkey.clone());
204 upsert_cids.push(cid.to_string());
205 }
206 RecordOp::Delete {
207 collection, rkey, ..
208 } => {
209 delete_collections.push(collection.clone());
210 delete_rkeys.push(rkey.clone());
211 }
212 }
213 }
214 if !upsert_collections.is_empty() {
215 sqlx::query!(
216 r#"
217 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev)
218 SELECT $1, collection, rkey, record_cid, $5
219 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid)
220 ON CONFLICT (repo_id, collection, rkey) DO UPDATE
221 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW()
222 "#,
223 user_id,
224 &upsert_collections,
225 &upsert_rkeys,
226 &upsert_cids,
227 rev_str
228 )
229 .execute(&mut *tx)
230 .await
231 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?;
232 }
233 if !delete_collections.is_empty() {
234 sqlx::query!(
235 r#"
236 DELETE FROM records
237 WHERE repo_id = $1
238 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[]))
239 "#,
240 user_id,
241 &delete_collections,
242 &delete_rkeys
243 )
244 .execute(&mut *tx)
245 .await
246 .map_err(|e| format!("DB Error (records batch delete): {}", e))?;
247 }
248 let ops_json = ops
249 .iter()
250 .map(|op| match op {
251 RecordOp::Create {
252 collection,
253 rkey,
254 cid,
255 } => json!({
256 "action": "create",
257 "path": format!("{}/{}", collection, rkey),
258 "cid": cid.to_string()
259 }),
260 RecordOp::Update {
261 collection,
262 rkey,
263 cid,
264 prev,
265 } => {
266 let mut obj = json!({
267 "action": "update",
268 "path": format!("{}/{}", collection, rkey),
269 "cid": cid.to_string()
270 });
271 if let Some(prev_cid) = prev {
272 obj["prev"] = json!(prev_cid.to_string());
273 }
274 obj
275 }
276 RecordOp::Delete {
277 collection,
278 rkey,
279 prev,
280 } => {
281 let mut obj = json!({
282 "action": "delete",
283 "path": format!("{}/{}", collection, rkey),
284 "cid": null
285 });
286 if let Some(prev_cid) = prev {
287 obj["prev"] = json!(prev_cid.to_string());
288 }
289 obj
290 }
291 })
292 .collect::<Vec<_>>();
293 if is_account_active {
294 let event_type = "commit";
295 let prev_cid_str = current_root_cid.map(|c| c.to_string());
296 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string());
297 let seq_row = sqlx::query!(
298 r#"
299 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid)
300 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
301 RETURNING seq
302 "#,
303 did,
304 event_type,
305 new_root_cid.to_string(),
306 prev_cid_str,
307 json!(ops_json),
308 blobs,
309 blocks_cids,
310 prev_data_cid_str,
311 )
312 .fetch_one(&mut *tx)
313 .await
314 .map_err(|e| format!("DB Error (repo_seq): {}", e))?;
315 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
316 .execute(&mut *tx)
317 .await
318 .map_err(|e| format!("DB Error (notify): {}", e))?;
319 }
320 tx.commit()
321 .await
322 .map_err(|e| format!("Failed to commit transaction: {}", e))?;
323 if is_account_active {
324 let _ = sequence_sync_event(state, did, &new_root_cid.to_string(), Some(&rev_str)).await;
325 }
326 Ok(CommitResult {
327 commit_cid: new_root_cid,
328 rev: rev_str,
329 })
330}
331pub async fn create_record_internal(
332 state: &AppState,
333 did: &str,
334 collection: &str,
335 rkey: &str,
336 record: &serde_json::Value,
337) -> Result<(String, Cid), String> {
338 use crate::repo::tracking::TrackingBlockStore;
339 use jacquard_repo::mst::Mst;
340 use std::sync::Arc;
341 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
342 .fetch_optional(&state.db)
343 .await
344 .map_err(|e| format!("DB error: {}", e))?
345 .ok_or_else(|| "User not found".to_string())?;
346 let root_cid_str: String = sqlx::query_scalar!(
347 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
348 user_id
349 )
350 .fetch_optional(&state.db)
351 .await
352 .map_err(|e| format!("DB error: {}", e))?
353 .ok_or_else(|| "Repo not found".to_string())?;
354 let current_root_cid =
355 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?;
356 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
357 let commit_bytes = tracking_store
358 .get(¤t_root_cid)
359 .await
360 .map_err(|e| format!("Failed to fetch commit: {:?}", e))?
361 .ok_or_else(|| "Commit block not found".to_string())?;
362 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes)
363 .map_err(|e| format!("Failed to parse commit: {:?}", e))?;
364 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
365 let mut record_bytes = Vec::new();
366 serde_ipld_dagcbor::to_writer(&mut record_bytes, record)
367 .map_err(|e| format!("Failed to serialize record: {:?}", e))?;
368 let record_cid = tracking_store
369 .put(&record_bytes)
370 .await
371 .map_err(|e| format!("Failed to save record block: {:?}", e))?;
372 let key = format!("{}/{}", collection, rkey);
373 let new_mst = mst
374 .add(&key, record_cid)
375 .await
376 .map_err(|e| format!("Failed to add to MST: {:?}", e))?;
377 let new_mst_root = new_mst
378 .persist()
379 .await
380 .map_err(|e| format!("Failed to persist MST: {:?}", e))?;
381 let op = RecordOp::Create {
382 collection: collection.to_string(),
383 rkey: rkey.to_string(),
384 cid: record_cid,
385 };
386 let mut relevant_blocks = std::collections::BTreeMap::new();
387 new_mst
388 .blocks_for_path(&key, &mut relevant_blocks)
389 .await
390 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?;
391 mst.blocks_for_path(&key, &mut relevant_blocks)
392 .await
393 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?;
394 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
395 let mut written_cids = tracking_store.get_all_relevant_cids();
396 for cid in relevant_blocks.keys() {
397 if !written_cids.contains(cid) {
398 written_cids.push(*cid);
399 }
400 }
401 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
402 let blob_cids = extract_blob_cids(record);
403 let result = commit_and_log(
404 state,
405 CommitParams {
406 did,
407 user_id,
408 current_root_cid: Some(current_root_cid),
409 prev_data_cid: Some(commit.data),
410 new_mst_root,
411 ops: vec![op],
412 blocks_cids: &written_cids_str,
413 blobs: &blob_cids,
414 },
415 )
416 .await?;
417 let uri = format!("at://{}/{}/{}", did, collection, rkey);
418 Ok((uri, result.commit_cid))
419}
420
421pub async fn sequence_identity_event(
422 state: &AppState,
423 did: &str,
424 handle: Option<&str>,
425) -> Result<i64, String> {
426 let seq_row = sqlx::query!(
427 r#"
428 INSERT INTO repo_seq (did, event_type, handle)
429 VALUES ($1, 'identity', $2)
430 RETURNING seq
431 "#,
432 did,
433 handle,
434 )
435 .fetch_one(&state.db)
436 .await
437 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?;
438 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
439 .execute(&state.db)
440 .await
441 .map_err(|e| format!("DB Error (notify): {}", e))?;
442 Ok(seq_row.seq)
443}
444pub async fn sequence_account_event(
445 state: &AppState,
446 did: &str,
447 active: bool,
448 status: Option<&str>,
449) -> Result<i64, String> {
450 let seq_row = sqlx::query!(
451 r#"
452 INSERT INTO repo_seq (did, event_type, active, status)
453 VALUES ($1, 'account', $2, $3)
454 RETURNING seq
455 "#,
456 did,
457 active,
458 status,
459 )
460 .fetch_one(&state.db)
461 .await
462 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?;
463 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
464 .execute(&state.db)
465 .await
466 .map_err(|e| format!("DB Error (notify): {}", e))?;
467 Ok(seq_row.seq)
468}
469pub async fn sequence_sync_event(
470 state: &AppState,
471 did: &str,
472 commit_cid: &str,
473 rev: Option<&str>,
474) -> Result<i64, String> {
475 let seq_row = sqlx::query!(
476 r#"
477 INSERT INTO repo_seq (did, event_type, commit_cid, rev)
478 VALUES ($1, 'sync', $2, $3)
479 RETURNING seq
480 "#,
481 did,
482 commit_cid,
483 rev,
484 )
485 .fetch_one(&state.db)
486 .await
487 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?;
488 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
489 .execute(&state.db)
490 .await
491 .map_err(|e| format!("DB Error (notify): {}", e))?;
492 Ok(seq_row.seq)
493}
494
495pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> {
496 let repo_root = sqlx::query_scalar!(
497 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
498 did
499 )
500 .fetch_optional(&state.db)
501 .await
502 .map_err(|e| format!("DB Error fetching repo root: {}", e))?
503 .ok_or_else(|| "Repo not found".to_string())?;
504 let ops = serde_json::json!([]);
505 let blobs: Vec<String> = vec![];
506 let blocks_cids: Vec<String> = vec![];
507 let seq_row = sqlx::query!(
508 r#"
509 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)
510 VALUES ($1, 'commit', $2, $2, $3, $4, $5)
511 RETURNING seq
512 "#,
513 did,
514 repo_root,
515 ops,
516 &blobs,
517 &blocks_cids
518 )
519 .fetch_one(&state.db)
520 .await
521 .map_err(|e| format!("DB Error (repo_seq empty commit): {}", e))?;
522 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
523 .execute(&state.db)
524 .await
525 .map_err(|e| format!("DB Error (notify): {}", e))?;
526 Ok(seq_row.seq)
527}