this repo has no description
1use crate::state::AppState; 2use crate::types::{Did, Handle, Nsid, Rkey}; 3use bytes::Bytes; 4use cid::Cid; 5use jacquard::types::{integer::LimitedU32, string::Tid}; 6use jacquard_repo::commit::Commit; 7use jacquard_repo::storage::BlockStore; 8use k256::ecdsa::SigningKey; 9use serde_json::{Value, json}; 10use std::str::FromStr; 11use uuid::Uuid; 12 13pub fn extract_blob_cids(record: &Value) -> Vec<String> { 14 let mut blobs = Vec::new(); 15 extract_blob_cids_recursive(record, &mut blobs); 16 blobs 17} 18 19fn extract_blob_cids_recursive(value: &Value, blobs: &mut Vec<String>) { 20 match value { 21 Value::Object(map) => { 22 if map.get("$type").and_then(|v| v.as_str()) == Some("blob") 23 && let Some(ref_obj) = map.get("ref") 24 && let Some(link) = ref_obj.get("$link").and_then(|v| v.as_str()) 25 { 26 blobs.push(link.to_string()); 27 } 28 for v in map.values() { 29 extract_blob_cids_recursive(v, blobs); 30 } 31 } 32 Value::Array(arr) => { 33 for v in arr { 34 extract_blob_cids_recursive(v, blobs); 35 } 36 } 37 _ => {} 38 } 39} 40 41pub fn create_signed_commit( 42 did: &Did, 43 data: Cid, 44 rev: &str, 45 prev: Option<Cid>, 46 signing_key: &SigningKey, 47) -> Result<(Vec<u8>, Bytes), String> { 48 let did = jacquard::types::string::Did::new(did.as_str()) 49 .map_err(|e| format!("Invalid DID: {:?}", e))?; 50 let rev = 51 jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?; 52 let unsigned = Commit::new_unsigned(did, data, rev, prev); 53 let signed = unsigned 54 .sign(signing_key) 55 .map_err(|e| format!("Failed to sign commit: {:?}", e))?; 56 let sig_bytes = signed.sig().clone(); 57 let signed_bytes = signed 58 .to_cbor() 59 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?; 60 Ok((signed_bytes, sig_bytes)) 61} 62 63pub enum RecordOp { 64 Create { 65 collection: Nsid, 66 rkey: Rkey, 67 cid: Cid, 68 }, 69 Update { 70 collection: Nsid, 71 rkey: Rkey, 72 cid: Cid, 73 prev: Option<Cid>, 74 }, 75 Delete { 76 collection: Nsid, 77 rkey: Rkey, 78 prev: Option<Cid>, 79 }, 80} 81 82pub struct CommitResult { 83 pub commit_cid: Cid, 84 pub rev: String, 85} 86 87pub struct CommitParams<'a> { 88 pub did: &'a Did, 89 pub user_id: Uuid, 90 pub current_root_cid: Option<Cid>, 91 pub prev_data_cid: Option<Cid>, 92 pub new_mst_root: Cid, 93 pub ops: Vec<RecordOp>, 94 pub blocks_cids: &'a [String], 95 pub blobs: &'a [String], 96 pub obsolete_cids: Vec<Cid>, 97} 98 99pub async fn commit_and_log( 100 state: &AppState, 101 params: CommitParams<'_>, 102) -> Result<CommitResult, String> { 103 let CommitParams { 104 did, 105 user_id, 106 current_root_cid, 107 prev_data_cid, 108 new_mst_root, 109 ops, 110 blocks_cids, 111 blobs, 112 obsolete_cids, 113 } = params; 114 let key_row = sqlx::query!( 115 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 116 user_id 117 ) 118 .fetch_one(&state.db) 119 .await 120 .map_err(|e| format!("Failed to fetch signing key: {}", e))?; 121 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 122 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?; 123 let signing_key = 124 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?; 125 let rev = Tid::now(LimitedU32::MIN); 126 let rev_str = rev.to_string(); 127 let (new_commit_bytes, _sig) = 128 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?; 129 let new_root_cid = state 130 .block_store 131 .put(&new_commit_bytes) 132 .await 133 .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 134 let mut tx = state 135 .db 136 .begin() 137 .await 138 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 139 let lock_result = sqlx::query!( 140 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 141 user_id 142 ) 143 .fetch_optional(&mut *tx) 144 .await; 145 match lock_result { 146 Err(e) => { 147 if let Some(db_err) = e.as_database_error() 148 && db_err.code().as_deref() == Some("55P03") 149 { 150 return Err( 151 "ConcurrentModification: Another request is modifying this repo".to_string(), 152 ); 153 } 154 return Err(format!("Failed to acquire repo lock: {}", e)); 155 } 156 Ok(Some(row)) => { 157 if let Some(expected_root) = &current_root_cid 158 && row.repo_root_cid != expected_root.to_string() 159 { 160 return Err( 161 "ConcurrentModification: Repo has been modified since last read".to_string(), 162 ); 163 } 164 } 165 Ok(None) => { 166 return Err("Repo not found".to_string()); 167 } 168 } 169 let is_account_active = sqlx::query_scalar!( 170 "SELECT deactivated_at IS NULL FROM users WHERE id = $1", 171 user_id 172 ) 173 .fetch_optional(&mut *tx) 174 .await 175 .map_err(|e| format!("Failed to check account status: {}", e))? 176 .flatten() 177 .unwrap_or(false); 178 sqlx::query!( 179 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3", 180 new_root_cid.to_string(), 181 &rev_str, 182 user_id 183 ) 184 .execute(&mut *tx) 185 .await 186 .map_err(|e| format!("DB Error (repos): {}", e))?; 187 let mut all_block_cids: Vec<Vec<u8>> = blocks_cids 188 .iter() 189 .filter_map(|s| Cid::from_str(s).ok()) 190 .map(|c| c.to_bytes()) 191 .collect(); 192 all_block_cids.push(new_root_cid.to_bytes()); 193 if !all_block_cids.is_empty() { 194 sqlx::query!( 195 r#" 196 INSERT INTO user_blocks (user_id, block_cid) 197 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 198 ON CONFLICT (user_id, block_cid) DO NOTHING 199 "#, 200 user_id, 201 &all_block_cids 202 ) 203 .execute(&mut *tx) 204 .await 205 .map_err(|e| format!("DB Error (user_blocks): {}", e))?; 206 } 207 if !obsolete_cids.is_empty() { 208 let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect(); 209 sqlx::query!( 210 r#" 211 DELETE FROM user_blocks 212 WHERE user_id = $1 213 AND block_cid = ANY($2) 214 "#, 215 user_id, 216 &obsolete_bytes as &[Vec<u8>] 217 ) 218 .execute(&mut *tx) 219 .await 220 .map_err(|e| format!("DB Error (user_blocks delete obsolete): {}", e))?; 221 } 222 let (upserts, deletes): (Vec<_>, Vec<_>) = ops 223 .iter() 224 .partition(|op| matches!(op, RecordOp::Create { .. } | RecordOp::Update { .. })); 225 let (upsert_collections, upsert_rkeys, upsert_cids): (Vec<String>, Vec<String>, Vec<String>) = 226 upserts 227 .into_iter() 228 .filter_map(|op| match op { 229 RecordOp::Create { 230 collection, 231 rkey, 232 cid, 233 } 234 | RecordOp::Update { 235 collection, 236 rkey, 237 cid, 238 .. 239 } => Some((collection.to_string(), rkey.to_string(), cid.to_string())), 240 _ => None, 241 }) 242 .fold( 243 (Vec::new(), Vec::new(), Vec::new()), 244 |(mut cols, mut rkeys, mut cids), (c, r, ci)| { 245 cols.push(c); 246 rkeys.push(r); 247 cids.push(ci); 248 (cols, rkeys, cids) 249 }, 250 ); 251 let (delete_collections, delete_rkeys): (Vec<String>, Vec<String>) = deletes 252 .into_iter() 253 .filter_map(|op| match op { 254 RecordOp::Delete { 255 collection, rkey, .. 256 } => Some((collection.to_string(), rkey.to_string())), 257 _ => None, 258 }) 259 .unzip(); 260 if !upsert_collections.is_empty() { 261 sqlx::query!( 262 r#" 263 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) 264 SELECT $1, collection, rkey, record_cid, $5 265 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid) 266 ON CONFLICT (repo_id, collection, rkey) DO UPDATE 267 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW() 268 "#, 269 user_id, 270 &upsert_collections, 271 &upsert_rkeys, 272 &upsert_cids, 273 rev_str 274 ) 275 .execute(&mut *tx) 276 .await 277 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?; 278 } 279 if !delete_collections.is_empty() { 280 sqlx::query!( 281 r#" 282 DELETE FROM records 283 WHERE repo_id = $1 284 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[])) 285 "#, 286 user_id, 287 &delete_collections, 288 &delete_rkeys 289 ) 290 .execute(&mut *tx) 291 .await 292 .map_err(|e| format!("DB Error (records batch delete): {}", e))?; 293 } 294 let ops_json = ops 295 .iter() 296 .map(|op| match op { 297 RecordOp::Create { 298 collection, 299 rkey, 300 cid, 301 } => json!({ 302 "action": "create", 303 "path": format!("{}/{}", collection, rkey), 304 "cid": cid.to_string() 305 }), 306 RecordOp::Update { 307 collection, 308 rkey, 309 cid, 310 prev, 311 } => { 312 let mut obj = json!({ 313 "action": "update", 314 "path": format!("{}/{}", collection, rkey), 315 "cid": cid.to_string() 316 }); 317 if let Some(prev_cid) = prev { 318 obj["prev"] = json!(prev_cid.to_string()); 319 } 320 obj 321 } 322 RecordOp::Delete { 323 collection, 324 rkey, 325 prev, 326 } => { 327 let mut obj = json!({ 328 "action": "delete", 329 "path": format!("{}/{}", collection, rkey), 330 "cid": null 331 }); 332 if let Some(prev_cid) = prev { 333 obj["prev"] = json!(prev_cid.to_string()); 334 } 335 obj 336 } 337 }) 338 .collect::<Vec<_>>(); 339 if is_account_active { 340 let event_type = "commit"; 341 let prev_cid_str = current_root_cid.map(|c| c.to_string()); 342 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string()); 343 let seq_row = sqlx::query!( 344 r#" 345 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid) 346 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 347 RETURNING seq 348 "#, 349 did.as_str(), 350 event_type, 351 new_root_cid.to_string(), 352 prev_cid_str, 353 json!(ops_json), 354 blobs, 355 blocks_cids, 356 prev_data_cid_str, 357 ) 358 .fetch_one(&mut *tx) 359 .await 360 .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 361 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 362 .execute(&mut *tx) 363 .await 364 .map_err(|e| format!("DB Error (notify): {}", e))?; 365 } 366 tx.commit() 367 .await 368 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 369 if is_account_active { 370 let _ = sequence_sync_event(state, did, &new_root_cid.to_string(), Some(&rev_str)).await; 371 } 372 Ok(CommitResult { 373 commit_cid: new_root_cid, 374 rev: rev_str, 375 }) 376} 377pub async fn create_record_internal( 378 state: &AppState, 379 did: &Did, 380 collection: &Nsid, 381 rkey: &Rkey, 382 record: &serde_json::Value, 383) -> Result<(String, Cid), String> { 384 use crate::repo::tracking::TrackingBlockStore; 385 use jacquard_repo::mst::Mst; 386 use std::sync::Arc; 387 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 388 .fetch_optional(&state.db) 389 .await 390 .map_err(|e| format!("DB error: {}", e))? 391 .ok_or_else(|| "User not found".to_string())?; 392 let root_cid_str: String = sqlx::query_scalar!( 393 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 394 user_id 395 ) 396 .fetch_optional(&state.db) 397 .await 398 .map_err(|e| format!("DB error: {}", e))? 399 .ok_or_else(|| "Repo not found".to_string())?; 400 let current_root_cid = 401 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?; 402 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 403 let commit_bytes = tracking_store 404 .get(&current_root_cid) 405 .await 406 .map_err(|e| format!("Failed to fetch commit: {:?}", e))? 407 .ok_or_else(|| "Commit block not found".to_string())?; 408 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes) 409 .map_err(|e| format!("Failed to parse commit: {:?}", e))?; 410 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 411 let record_ipld = crate::util::json_to_ipld(record); 412 let mut record_bytes = Vec::new(); 413 serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld) 414 .map_err(|e| format!("Failed to serialize record: {:?}", e))?; 415 let record_cid = tracking_store 416 .put(&record_bytes) 417 .await 418 .map_err(|e| format!("Failed to save record block: {:?}", e))?; 419 let key = format!("{}/{}", collection, rkey); 420 let new_mst = mst 421 .add(&key, record_cid) 422 .await 423 .map_err(|e| format!("Failed to add to MST: {:?}", e))?; 424 let new_mst_root = new_mst 425 .persist() 426 .await 427 .map_err(|e| format!("Failed to persist MST: {:?}", e))?; 428 let op = RecordOp::Create { 429 collection: collection.clone(), 430 rkey: rkey.clone(), 431 cid: record_cid, 432 }; 433 let mut new_mst_blocks = std::collections::BTreeMap::new(); 434 let mut old_mst_blocks = std::collections::BTreeMap::new(); 435 new_mst 436 .blocks_for_path(&key, &mut new_mst_blocks) 437 .await 438 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?; 439 mst.blocks_for_path(&key, &mut old_mst_blocks) 440 .await 441 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?; 442 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 443 .chain( 444 old_mst_blocks 445 .keys() 446 .filter(|cid| !new_mst_blocks.contains_key(*cid)) 447 .copied(), 448 ) 449 .collect(); 450 let mut relevant_blocks = new_mst_blocks; 451 relevant_blocks.extend(old_mst_blocks); 452 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 453 let written_cids: Vec<Cid> = tracking_store 454 .get_all_relevant_cids() 455 .into_iter() 456 .chain(relevant_blocks.keys().copied()) 457 .collect::<std::collections::HashSet<_>>() 458 .into_iter() 459 .collect(); 460 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 461 let blob_cids = extract_blob_cids(record); 462 let result = commit_and_log( 463 state, 464 CommitParams { 465 did, 466 user_id, 467 current_root_cid: Some(current_root_cid), 468 prev_data_cid: Some(commit.data), 469 new_mst_root, 470 ops: vec![op], 471 blocks_cids: &written_cids_str, 472 blobs: &blob_cids, 473 obsolete_cids, 474 }, 475 ) 476 .await?; 477 let uri = format!("at://{}/{}/{}", did, collection, rkey); 478 Ok((uri, result.commit_cid)) 479} 480 481pub async fn sequence_identity_event( 482 state: &AppState, 483 did: &Did, 484 handle: Option<&Handle>, 485) -> Result<i64, String> { 486 let mut tx = state 487 .db 488 .begin() 489 .await 490 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 491 let seq_row = sqlx::query!( 492 r#" 493 INSERT INTO repo_seq (did, event_type, handle) 494 VALUES ($1, 'identity', $2) 495 RETURNING seq 496 "#, 497 did.as_str(), 498 handle.map(|h| h.as_str()), 499 ) 500 .fetch_one(&mut *tx) 501 .await 502 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?; 503 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 504 .execute(&mut *tx) 505 .await 506 .map_err(|e| format!("DB Error (notify): {}", e))?; 507 tx.commit() 508 .await 509 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 510 Ok(seq_row.seq) 511} 512pub async fn sequence_account_event( 513 state: &AppState, 514 did: &Did, 515 active: bool, 516 status: Option<&str>, 517) -> Result<i64, String> { 518 let mut tx = state 519 .db 520 .begin() 521 .await 522 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 523 let seq_row = sqlx::query!( 524 r#" 525 INSERT INTO repo_seq (did, event_type, active, status) 526 VALUES ($1, 'account', $2, $3) 527 RETURNING seq 528 "#, 529 did.as_str(), 530 active, 531 status, 532 ) 533 .fetch_one(&mut *tx) 534 .await 535 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?; 536 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 537 .execute(&mut *tx) 538 .await 539 .map_err(|e| format!("DB Error (notify): {}", e))?; 540 tx.commit() 541 .await 542 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 543 Ok(seq_row.seq) 544} 545pub async fn sequence_sync_event( 546 state: &AppState, 547 did: &Did, 548 commit_cid: &str, 549 rev: Option<&str>, 550) -> Result<i64, String> { 551 let mut tx = state 552 .db 553 .begin() 554 .await 555 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 556 let seq_row = sqlx::query!( 557 r#" 558 INSERT INTO repo_seq (did, event_type, commit_cid, rev) 559 VALUES ($1, 'sync', $2, $3) 560 RETURNING seq 561 "#, 562 did.as_str(), 563 commit_cid, 564 rev, 565 ) 566 .fetch_one(&mut *tx) 567 .await 568 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?; 569 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 570 .execute(&mut *tx) 571 .await 572 .map_err(|e| format!("DB Error (notify): {}", e))?; 573 tx.commit() 574 .await 575 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 576 Ok(seq_row.seq) 577} 578 579pub async fn sequence_genesis_commit( 580 state: &AppState, 581 did: &Did, 582 commit_cid: &Cid, 583 mst_root_cid: &Cid, 584 rev: &str, 585) -> Result<i64, String> { 586 let ops = serde_json::json!([]); 587 let blobs: Vec<String> = vec![]; 588 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 589 let prev_cid: Option<&str> = None; 590 let commit_cid_str = commit_cid.to_string(); 591 let mut tx = state 592 .db 593 .begin() 594 .await 595 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 596 let seq_row = sqlx::query!( 597 r#" 598 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev) 599 VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7) 600 RETURNING seq 601 "#, 602 did.as_str(), 603 commit_cid_str, 604 prev_cid, 605 ops, 606 &blobs, 607 &blocks_cids, 608 rev 609 ) 610 .fetch_one(&mut *tx) 611 .await 612 .map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?; 613 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 614 .execute(&mut *tx) 615 .await 616 .map_err(|e| format!("DB Error (notify): {}", e))?; 617 tx.commit() 618 .await 619 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 620 Ok(seq_row.seq) 621}