this repo has no description
at main 20 kB view raw
1use crate::state::AppState; 2use crate::types::{Did, Handle, Nsid, Rkey}; 3use bytes::Bytes; 4use cid::Cid; 5use jacquard::types::{integer::LimitedU32, string::Tid}; 6use jacquard_repo::commit::Commit; 7use jacquard_repo::storage::BlockStore; 8use k256::ecdsa::SigningKey; 9use serde_json::{Value, json}; 10use std::str::FromStr; 11use uuid::Uuid; 12 13pub fn extract_blob_cids(record: &Value) -> Vec<String> { 14 let mut blobs = Vec::new(); 15 extract_blob_cids_recursive(record, &mut blobs); 16 blobs 17} 18 19fn extract_blob_cids_recursive(value: &Value, blobs: &mut Vec<String>) { 20 match value { 21 Value::Object(map) => { 22 if map.get("$type").and_then(|v| v.as_str()) == Some("blob") 23 && let Some(ref_obj) = map.get("ref") 24 && let Some(link) = ref_obj.get("$link").and_then(|v| v.as_str()) 25 { 26 blobs.push(link.to_string()); 27 } 28 map.values() 29 .for_each(|v| extract_blob_cids_recursive(v, blobs)); 30 } 31 Value::Array(arr) => { 32 arr.iter() 33 .for_each(|v| extract_blob_cids_recursive(v, blobs)); 34 } 35 _ => {} 36 } 37} 38 39pub fn create_signed_commit( 40 did: &Did, 41 data: Cid, 42 rev: &str, 43 prev: Option<Cid>, 44 signing_key: &SigningKey, 45) -> Result<(Vec<u8>, Bytes), String> { 46 let did = jacquard::types::string::Did::new(did.as_str()) 47 .map_err(|e| format!("Invalid DID: {:?}", e))?; 48 let rev = 49 jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?; 50 let unsigned = Commit::new_unsigned(did, data, rev, prev); 51 let signed = unsigned 52 .sign(signing_key) 53 .map_err(|e| format!("Failed to sign commit: {:?}", e))?; 54 let sig_bytes = signed.sig().clone(); 55 let signed_bytes = signed 56 .to_cbor() 57 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?; 58 Ok((signed_bytes, sig_bytes)) 59} 60 61pub enum RecordOp { 62 Create { 63 collection: Nsid, 64 rkey: Rkey, 65 cid: Cid, 66 }, 67 Update { 68 collection: Nsid, 69 rkey: Rkey, 70 cid: Cid, 71 prev: Option<Cid>, 72 }, 73 Delete { 74 collection: Nsid, 75 rkey: Rkey, 76 prev: Option<Cid>, 77 }, 78} 79 80pub struct CommitResult { 81 pub commit_cid: Cid, 82 pub rev: String, 83} 84 85pub struct CommitParams<'a> { 86 pub did: &'a Did, 87 pub user_id: Uuid, 88 pub current_root_cid: Option<Cid>, 89 pub prev_data_cid: Option<Cid>, 90 pub new_mst_root: Cid, 91 pub ops: Vec<RecordOp>, 92 pub blocks_cids: &'a [String], 93 pub blobs: &'a [String], 94 pub obsolete_cids: Vec<Cid>, 95} 96 97pub async fn commit_and_log( 98 state: &AppState, 99 params: CommitParams<'_>, 100) -> Result<CommitResult, String> { 101 let CommitParams { 102 did, 103 user_id, 104 current_root_cid, 105 prev_data_cid, 106 new_mst_root, 107 ops, 108 blocks_cids, 109 blobs, 110 obsolete_cids, 111 } = params; 112 let key_row = sqlx::query!( 113 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 114 user_id 115 ) 116 .fetch_one(&state.db) 117 .await 118 .map_err(|e| format!("Failed to fetch signing key: {}", e))?; 119 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 120 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?; 121 let signing_key = 122 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?; 123 let rev = Tid::now(LimitedU32::MIN); 124 let rev_str = rev.to_string(); 125 let (new_commit_bytes, _sig) = 126 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?; 127 let new_root_cid = state 128 .block_store 129 .put(&new_commit_bytes) 130 .await 131 .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 132 let mut tx = state 133 .db 134 .begin() 135 .await 136 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 137 let lock_result = sqlx::query!( 138 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 139 user_id 140 ) 141 .fetch_optional(&mut *tx) 142 .await; 143 match lock_result { 144 Err(e) => { 145 if let Some(db_err) = e.as_database_error() 146 && db_err.code().as_deref() == Some("55P03") 147 { 148 return Err( 149 "ConcurrentModification: Another request is modifying this repo".to_string(), 150 ); 151 } 152 return Err(format!("Failed to acquire repo lock: {}", e)); 153 } 154 Ok(Some(row)) => { 155 if let Some(expected_root) = &current_root_cid 156 && row.repo_root_cid != expected_root.to_string() 157 { 158 return Err( 159 "ConcurrentModification: Repo has been modified since last read".to_string(), 160 ); 161 } 162 } 163 Ok(None) => { 164 return Err("Repo not found".to_string()); 165 } 166 } 167 let is_account_active = sqlx::query_scalar!( 168 "SELECT deactivated_at IS NULL FROM users WHERE id = $1", 169 user_id 170 ) 171 .fetch_optional(&mut *tx) 172 .await 173 .map_err(|e| format!("Failed to check account status: {}", e))? 174 .flatten() 175 .unwrap_or(false); 176 sqlx::query!( 177 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3", 178 new_root_cid.to_string(), 179 &rev_str, 180 user_id 181 ) 182 .execute(&mut *tx) 183 .await 184 .map_err(|e| format!("DB Error (repos): {}", e))?; 185 let mut all_block_cids: Vec<Vec<u8>> = blocks_cids 186 .iter() 187 .filter_map(|s| Cid::from_str(s).ok()) 188 .map(|c| c.to_bytes()) 189 .collect(); 190 all_block_cids.push(new_root_cid.to_bytes()); 191 if !all_block_cids.is_empty() { 192 sqlx::query!( 193 r#" 194 INSERT INTO user_blocks (user_id, block_cid) 195 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 196 ON CONFLICT (user_id, block_cid) DO NOTHING 197 "#, 198 user_id, 199 &all_block_cids 200 ) 201 .execute(&mut *tx) 202 .await 203 .map_err(|e| format!("DB Error (user_blocks): {}", e))?; 204 } 205 if !obsolete_cids.is_empty() { 206 let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect(); 207 sqlx::query!( 208 r#" 209 DELETE FROM user_blocks 210 WHERE user_id = $1 211 AND block_cid = ANY($2) 212 "#, 213 user_id, 214 &obsolete_bytes as &[Vec<u8>] 215 ) 216 .execute(&mut *tx) 217 .await 218 .map_err(|e| format!("DB Error (user_blocks delete obsolete): {}", e))?; 219 } 220 let (upserts, deletes): (Vec<_>, Vec<_>) = ops 221 .iter() 222 .partition(|op| matches!(op, RecordOp::Create { .. } | RecordOp::Update { .. })); 223 let (upsert_collections, upsert_rkeys, upsert_cids): (Vec<String>, Vec<String>, Vec<String>) = 224 upserts 225 .into_iter() 226 .filter_map(|op| match op { 227 RecordOp::Create { 228 collection, 229 rkey, 230 cid, 231 } 232 | RecordOp::Update { 233 collection, 234 rkey, 235 cid, 236 .. 237 } => Some((collection.to_string(), rkey.to_string(), cid.to_string())), 238 _ => None, 239 }) 240 .fold( 241 (Vec::new(), Vec::new(), Vec::new()), 242 |(mut cols, mut rkeys, mut cids), (c, r, ci)| { 243 cols.push(c); 244 rkeys.push(r); 245 cids.push(ci); 246 (cols, rkeys, cids) 247 }, 248 ); 249 let (delete_collections, delete_rkeys): (Vec<String>, Vec<String>) = deletes 250 .into_iter() 251 .filter_map(|op| match op { 252 RecordOp::Delete { 253 collection, rkey, .. 254 } => Some((collection.to_string(), rkey.to_string())), 255 _ => None, 256 }) 257 .unzip(); 258 if !upsert_collections.is_empty() { 259 sqlx::query!( 260 r#" 261 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) 262 SELECT $1, collection, rkey, record_cid, $5 263 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid) 264 ON CONFLICT (repo_id, collection, rkey) DO UPDATE 265 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW() 266 "#, 267 user_id, 268 &upsert_collections, 269 &upsert_rkeys, 270 &upsert_cids, 271 rev_str 272 ) 273 .execute(&mut *tx) 274 .await 275 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?; 276 } 277 if !delete_collections.is_empty() { 278 sqlx::query!( 279 r#" 280 DELETE FROM records 281 WHERE repo_id = $1 282 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[])) 283 "#, 284 user_id, 285 &delete_collections, 286 &delete_rkeys 287 ) 288 .execute(&mut *tx) 289 .await 290 .map_err(|e| format!("DB Error (records batch delete): {}", e))?; 291 } 292 let ops_json = ops 293 .iter() 294 .map(|op| match op { 295 RecordOp::Create { 296 collection, 297 rkey, 298 cid, 299 } => json!({ 300 "action": "create", 301 "path": format!("{}/{}", collection, rkey), 302 "cid": cid.to_string() 303 }), 304 RecordOp::Update { 305 collection, 306 rkey, 307 cid, 308 prev, 309 } => { 310 let mut obj = json!({ 311 "action": "update", 312 "path": format!("{}/{}", collection, rkey), 313 "cid": cid.to_string() 314 }); 315 if let Some(prev_cid) = prev { 316 obj["prev"] = json!(prev_cid.to_string()); 317 } 318 obj 319 } 320 RecordOp::Delete { 321 collection, 322 rkey, 323 prev, 324 } => { 325 let mut obj = json!({ 326 "action": "delete", 327 "path": format!("{}/{}", collection, rkey), 328 "cid": null 329 }); 330 if let Some(prev_cid) = prev { 331 obj["prev"] = json!(prev_cid.to_string()); 332 } 333 obj 334 } 335 }) 336 .collect::<Vec<_>>(); 337 if is_account_active { 338 let event_type = "commit"; 339 let prev_cid_str = current_root_cid.map(|c| c.to_string()); 340 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string()); 341 let seq_row = sqlx::query!( 342 r#" 343 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid) 344 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 345 RETURNING seq 346 "#, 347 did.as_str(), 348 event_type, 349 new_root_cid.to_string(), 350 prev_cid_str, 351 json!(ops_json), 352 blobs, 353 blocks_cids, 354 prev_data_cid_str, 355 ) 356 .fetch_one(&mut *tx) 357 .await 358 .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 359 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 360 .execute(&mut *tx) 361 .await 362 .map_err(|e| format!("DB Error (notify): {}", e))?; 363 } 364 tx.commit() 365 .await 366 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 367 if is_account_active { 368 let _ = sequence_sync_event(state, did, &new_root_cid.to_string(), Some(&rev_str)).await; 369 } 370 Ok(CommitResult { 371 commit_cid: new_root_cid, 372 rev: rev_str, 373 }) 374} 375pub async fn create_record_internal( 376 state: &AppState, 377 did: &Did, 378 collection: &Nsid, 379 rkey: &Rkey, 380 record: &serde_json::Value, 381) -> Result<(String, Cid), String> { 382 use crate::repo::tracking::TrackingBlockStore; 383 use jacquard_repo::mst::Mst; 384 use std::sync::Arc; 385 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 386 .fetch_optional(&state.db) 387 .await 388 .map_err(|e| format!("DB error: {}", e))? 389 .ok_or_else(|| "User not found".to_string())?; 390 let root_cid_str: String = sqlx::query_scalar!( 391 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 392 user_id 393 ) 394 .fetch_optional(&state.db) 395 .await 396 .map_err(|e| format!("DB error: {}", e))? 397 .ok_or_else(|| "Repo not found".to_string())?; 398 let current_root_cid = 399 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?; 400 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 401 let commit_bytes = tracking_store 402 .get(&current_root_cid) 403 .await 404 .map_err(|e| format!("Failed to fetch commit: {:?}", e))? 405 .ok_or_else(|| "Commit block not found".to_string())?; 406 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes) 407 .map_err(|e| format!("Failed to parse commit: {:?}", e))?; 408 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 409 let record_ipld = crate::util::json_to_ipld(record); 410 let mut record_bytes = Vec::new(); 411 serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld) 412 .map_err(|e| format!("Failed to serialize record: {:?}", e))?; 413 let record_cid = tracking_store 414 .put(&record_bytes) 415 .await 416 .map_err(|e| format!("Failed to save record block: {:?}", e))?; 417 let key = format!("{}/{}", collection, rkey); 418 let new_mst = mst 419 .add(&key, record_cid) 420 .await 421 .map_err(|e| format!("Failed to add to MST: {:?}", e))?; 422 let new_mst_root = new_mst 423 .persist() 424 .await 425 .map_err(|e| format!("Failed to persist MST: {:?}", e))?; 426 let op = RecordOp::Create { 427 collection: collection.clone(), 428 rkey: rkey.clone(), 429 cid: record_cid, 430 }; 431 let mut new_mst_blocks = std::collections::BTreeMap::new(); 432 let mut old_mst_blocks = std::collections::BTreeMap::new(); 433 new_mst 434 .blocks_for_path(&key, &mut new_mst_blocks) 435 .await 436 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?; 437 mst.blocks_for_path(&key, &mut old_mst_blocks) 438 .await 439 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?; 440 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 441 .chain( 442 old_mst_blocks 443 .keys() 444 .filter(|cid| !new_mst_blocks.contains_key(*cid)) 445 .copied(), 446 ) 447 .collect(); 448 let mut relevant_blocks = new_mst_blocks; 449 relevant_blocks.extend(old_mst_blocks); 450 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 451 let written_cids: Vec<Cid> = tracking_store 452 .get_all_relevant_cids() 453 .into_iter() 454 .chain(relevant_blocks.keys().copied()) 455 .collect::<std::collections::HashSet<_>>() 456 .into_iter() 457 .collect(); 458 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 459 let blob_cids = extract_blob_cids(record); 460 let result = commit_and_log( 461 state, 462 CommitParams { 463 did, 464 user_id, 465 current_root_cid: Some(current_root_cid), 466 prev_data_cid: Some(commit.data), 467 new_mst_root, 468 ops: vec![op], 469 blocks_cids: &written_cids_str, 470 blobs: &blob_cids, 471 obsolete_cids, 472 }, 473 ) 474 .await?; 475 let uri = format!("at://{}/{}/{}", did, collection, rkey); 476 Ok((uri, result.commit_cid)) 477} 478 479pub async fn sequence_identity_event( 480 state: &AppState, 481 did: &Did, 482 handle: Option<&Handle>, 483) -> Result<i64, String> { 484 let mut tx = state 485 .db 486 .begin() 487 .await 488 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 489 let seq_row = sqlx::query!( 490 r#" 491 INSERT INTO repo_seq (did, event_type, handle) 492 VALUES ($1, 'identity', $2) 493 RETURNING seq 494 "#, 495 did.as_str(), 496 handle.map(|h| h.as_str()), 497 ) 498 .fetch_one(&mut *tx) 499 .await 500 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?; 501 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 502 .execute(&mut *tx) 503 .await 504 .map_err(|e| format!("DB Error (notify): {}", e))?; 505 tx.commit() 506 .await 507 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 508 Ok(seq_row.seq) 509} 510pub async fn sequence_account_event( 511 state: &AppState, 512 did: &Did, 513 active: bool, 514 status: Option<&str>, 515) -> Result<i64, String> { 516 let mut tx = state 517 .db 518 .begin() 519 .await 520 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 521 let seq_row = sqlx::query!( 522 r#" 523 INSERT INTO repo_seq (did, event_type, active, status) 524 VALUES ($1, 'account', $2, $3) 525 RETURNING seq 526 "#, 527 did.as_str(), 528 active, 529 status, 530 ) 531 .fetch_one(&mut *tx) 532 .await 533 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?; 534 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 535 .execute(&mut *tx) 536 .await 537 .map_err(|e| format!("DB Error (notify): {}", e))?; 538 tx.commit() 539 .await 540 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 541 Ok(seq_row.seq) 542} 543pub async fn sequence_sync_event( 544 state: &AppState, 545 did: &Did, 546 commit_cid: &str, 547 rev: Option<&str>, 548) -> Result<i64, String> { 549 let mut tx = state 550 .db 551 .begin() 552 .await 553 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 554 let seq_row = sqlx::query!( 555 r#" 556 INSERT INTO repo_seq (did, event_type, commit_cid, rev) 557 VALUES ($1, 'sync', $2, $3) 558 RETURNING seq 559 "#, 560 did.as_str(), 561 commit_cid, 562 rev, 563 ) 564 .fetch_one(&mut *tx) 565 .await 566 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?; 567 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 568 .execute(&mut *tx) 569 .await 570 .map_err(|e| format!("DB Error (notify): {}", e))?; 571 tx.commit() 572 .await 573 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 574 Ok(seq_row.seq) 575} 576 577pub async fn sequence_genesis_commit( 578 state: &AppState, 579 did: &Did, 580 commit_cid: &Cid, 581 mst_root_cid: &Cid, 582 rev: &str, 583) -> Result<i64, String> { 584 let ops = serde_json::json!([]); 585 let blobs: Vec<String> = vec![]; 586 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 587 let prev_cid: Option<&str> = None; 588 let commit_cid_str = commit_cid.to_string(); 589 let mut tx = state 590 .db 591 .begin() 592 .await 593 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 594 let seq_row = sqlx::query!( 595 r#" 596 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev) 597 VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7) 598 RETURNING seq 599 "#, 600 did.as_str(), 601 commit_cid_str, 602 prev_cid, 603 ops, 604 &blobs, 605 &blocks_cids, 606 rev 607 ) 608 .fetch_one(&mut *tx) 609 .await 610 .map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?; 611 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 612 .execute(&mut *tx) 613 .await 614 .map_err(|e| format!("DB Error (notify): {}", e))?; 615 tx.commit() 616 .await 617 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 618 Ok(seq_row.seq) 619}