this repo has no description
1use crate::state::AppState; 2use bytes::Bytes; 3use cid::Cid; 4use jacquard::types::{integer::LimitedU32, string::Tid}; 5use jacquard_repo::commit::Commit; 6use jacquard_repo::storage::BlockStore; 7use k256::ecdsa::SigningKey; 8use serde_json::{Value, json}; 9use std::str::FromStr; 10use uuid::Uuid; 11 12pub fn extract_blob_cids(record: &Value) -> Vec<String> { 13 let mut blobs = Vec::new(); 14 extract_blob_cids_recursive(record, &mut blobs); 15 blobs 16} 17 18fn extract_blob_cids_recursive(value: &Value, blobs: &mut Vec<String>) { 19 match value { 20 Value::Object(map) => { 21 if map.get("$type").and_then(|v| v.as_str()) == Some("blob") 22 && let Some(ref_obj) = map.get("ref") 23 && let Some(link) = ref_obj.get("$link").and_then(|v| v.as_str()) 24 { 25 blobs.push(link.to_string()); 26 } 27 for v in map.values() { 28 extract_blob_cids_recursive(v, blobs); 29 } 30 } 31 Value::Array(arr) => { 32 for v in arr { 33 extract_blob_cids_recursive(v, blobs); 34 } 35 } 36 _ => {} 37 } 38} 39 40pub fn create_signed_commit( 41 did: &str, 42 data: Cid, 43 rev: &str, 44 prev: Option<Cid>, 45 signing_key: &SigningKey, 46) -> Result<(Vec<u8>, Bytes), String> { 47 let did = 48 jacquard::types::string::Did::new(did).map_err(|e| format!("Invalid DID: {:?}", e))?; 49 let rev = 50 jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?; 51 let unsigned = Commit::new_unsigned(did, data, rev, prev); 52 let signed = unsigned 53 .sign(signing_key) 54 .map_err(|e| format!("Failed to sign commit: {:?}", e))?; 55 let sig_bytes = signed.sig().clone(); 56 let signed_bytes = signed 57 .to_cbor() 58 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?; 59 Ok((signed_bytes, sig_bytes)) 60} 61 62pub enum RecordOp { 63 Create { 64 collection: String, 65 rkey: String, 66 cid: Cid, 67 }, 68 Update { 69 collection: String, 70 rkey: String, 71 cid: Cid, 72 prev: Option<Cid>, 73 }, 74 Delete { 75 collection: String, 76 rkey: String, 77 prev: Option<Cid>, 78 }, 79} 80 81pub struct CommitResult { 82 pub commit_cid: Cid, 83 pub rev: String, 84} 85 86pub struct CommitParams<'a> { 87 pub did: &'a str, 88 pub user_id: Uuid, 89 pub current_root_cid: Option<Cid>, 90 pub prev_data_cid: Option<Cid>, 91 pub new_mst_root: Cid, 92 pub ops: Vec<RecordOp>, 93 pub blocks_cids: &'a [String], 94 pub blobs: &'a [String], 95 pub obsolete_cids: Vec<Cid>, 96} 97 98pub async fn commit_and_log( 99 state: &AppState, 100 params: CommitParams<'_>, 101) -> Result<CommitResult, String> { 102 let CommitParams { 103 did, 104 user_id, 105 current_root_cid, 106 prev_data_cid, 107 new_mst_root, 108 ops, 109 blocks_cids, 110 blobs, 111 obsolete_cids, 112 } = params; 113 let key_row = sqlx::query!( 114 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 115 user_id 116 ) 117 .fetch_one(&state.db) 118 .await 119 .map_err(|e| format!("Failed to fetch signing key: {}", e))?; 120 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 121 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?; 122 let signing_key = 123 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?; 124 let rev = Tid::now(LimitedU32::MIN); 125 let rev_str = rev.to_string(); 126 let (new_commit_bytes, _sig) = 127 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?; 128 let new_root_cid = state 129 .block_store 130 .put(&new_commit_bytes) 131 .await 132 .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 133 let mut tx = state 134 .db 135 .begin() 136 .await 137 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 138 let lock_result = sqlx::query!( 139 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 140 user_id 141 ) 142 .fetch_optional(&mut *tx) 143 .await; 144 match lock_result { 145 Err(e) => { 146 if let Some(db_err) = e.as_database_error() 147 && db_err.code().as_deref() == Some("55P03") 148 { 149 return Err( 150 "ConcurrentModification: Another request is modifying this repo".to_string(), 151 ); 152 } 153 return Err(format!("Failed to acquire repo lock: {}", e)); 154 } 155 Ok(Some(row)) => { 156 if let Some(expected_root) = &current_root_cid 157 && row.repo_root_cid != expected_root.to_string() 158 { 159 return Err( 160 "ConcurrentModification: Repo has been modified since last read".to_string(), 161 ); 162 } 163 } 164 Ok(None) => { 165 return Err("Repo not found".to_string()); 166 } 167 } 168 let is_account_active = sqlx::query_scalar!( 169 "SELECT deactivated_at IS NULL FROM users WHERE id = $1", 170 user_id 171 ) 172 .fetch_optional(&mut *tx) 173 .await 174 .map_err(|e| format!("Failed to check account status: {}", e))? 175 .flatten() 176 .unwrap_or(false); 177 sqlx::query!( 178 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3", 179 new_root_cid.to_string(), 180 &rev_str, 181 user_id 182 ) 183 .execute(&mut *tx) 184 .await 185 .map_err(|e| format!("DB Error (repos): {}", e))?; 186 let mut all_block_cids: Vec<Vec<u8>> = blocks_cids 187 .iter() 188 .filter_map(|s| Cid::from_str(s).ok()) 189 .map(|c| c.to_bytes()) 190 .collect(); 191 all_block_cids.push(new_root_cid.to_bytes()); 192 if !all_block_cids.is_empty() { 193 sqlx::query!( 194 r#" 195 INSERT INTO user_blocks (user_id, block_cid) 196 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 197 ON CONFLICT (user_id, block_cid) DO NOTHING 198 "#, 199 user_id, 200 &all_block_cids 201 ) 202 .execute(&mut *tx) 203 .await 204 .map_err(|e| format!("DB Error (user_blocks): {}", e))?; 205 } 206 if !obsolete_cids.is_empty() { 207 let obsolete_bytes: Vec<Vec<u8>> = obsolete_cids.iter().map(|c| c.to_bytes()).collect(); 208 sqlx::query!( 209 r#" 210 DELETE FROM user_blocks 211 WHERE user_id = $1 212 AND block_cid = ANY($2) 213 "#, 214 user_id, 215 &obsolete_bytes as &[Vec<u8>] 216 ) 217 .execute(&mut *tx) 218 .await 219 .map_err(|e| format!("DB Error (user_blocks delete obsolete): {}", e))?; 220 } 221 let mut upsert_collections: Vec<String> = Vec::new(); 222 let mut upsert_rkeys: Vec<String> = Vec::new(); 223 let mut upsert_cids: Vec<String> = Vec::new(); 224 let mut delete_collections: Vec<String> = Vec::new(); 225 let mut delete_rkeys: Vec<String> = Vec::new(); 226 for op in &ops { 227 match op { 228 RecordOp::Create { 229 collection, 230 rkey, 231 cid, 232 } 233 | RecordOp::Update { 234 collection, 235 rkey, 236 cid, 237 .. 238 } => { 239 upsert_collections.push(collection.clone()); 240 upsert_rkeys.push(rkey.clone()); 241 upsert_cids.push(cid.to_string()); 242 } 243 RecordOp::Delete { 244 collection, rkey, .. 245 } => { 246 delete_collections.push(collection.clone()); 247 delete_rkeys.push(rkey.clone()); 248 } 249 } 250 } 251 if !upsert_collections.is_empty() { 252 sqlx::query!( 253 r#" 254 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) 255 SELECT $1, collection, rkey, record_cid, $5 256 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid) 257 ON CONFLICT (repo_id, collection, rkey) DO UPDATE 258 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW() 259 "#, 260 user_id, 261 &upsert_collections, 262 &upsert_rkeys, 263 &upsert_cids, 264 rev_str 265 ) 266 .execute(&mut *tx) 267 .await 268 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?; 269 } 270 if !delete_collections.is_empty() { 271 sqlx::query!( 272 r#" 273 DELETE FROM records 274 WHERE repo_id = $1 275 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[])) 276 "#, 277 user_id, 278 &delete_collections, 279 &delete_rkeys 280 ) 281 .execute(&mut *tx) 282 .await 283 .map_err(|e| format!("DB Error (records batch delete): {}", e))?; 284 } 285 let ops_json = ops 286 .iter() 287 .map(|op| match op { 288 RecordOp::Create { 289 collection, 290 rkey, 291 cid, 292 } => json!({ 293 "action": "create", 294 "path": format!("{}/{}", collection, rkey), 295 "cid": cid.to_string() 296 }), 297 RecordOp::Update { 298 collection, 299 rkey, 300 cid, 301 prev, 302 } => { 303 let mut obj = json!({ 304 "action": "update", 305 "path": format!("{}/{}", collection, rkey), 306 "cid": cid.to_string() 307 }); 308 if let Some(prev_cid) = prev { 309 obj["prev"] = json!(prev_cid.to_string()); 310 } 311 obj 312 } 313 RecordOp::Delete { 314 collection, 315 rkey, 316 prev, 317 } => { 318 let mut obj = json!({ 319 "action": "delete", 320 "path": format!("{}/{}", collection, rkey), 321 "cid": null 322 }); 323 if let Some(prev_cid) = prev { 324 obj["prev"] = json!(prev_cid.to_string()); 325 } 326 obj 327 } 328 }) 329 .collect::<Vec<_>>(); 330 if is_account_active { 331 let event_type = "commit"; 332 let prev_cid_str = current_root_cid.map(|c| c.to_string()); 333 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string()); 334 let seq_row = sqlx::query!( 335 r#" 336 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid) 337 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 338 RETURNING seq 339 "#, 340 did, 341 event_type, 342 new_root_cid.to_string(), 343 prev_cid_str, 344 json!(ops_json), 345 blobs, 346 blocks_cids, 347 prev_data_cid_str, 348 ) 349 .fetch_one(&mut *tx) 350 .await 351 .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 352 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 353 .execute(&mut *tx) 354 .await 355 .map_err(|e| format!("DB Error (notify): {}", e))?; 356 } 357 tx.commit() 358 .await 359 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 360 if is_account_active { 361 let _ = sequence_sync_event(state, did, &new_root_cid.to_string(), Some(&rev_str)).await; 362 } 363 Ok(CommitResult { 364 commit_cid: new_root_cid, 365 rev: rev_str, 366 }) 367} 368pub async fn create_record_internal( 369 state: &AppState, 370 did: &str, 371 collection: &str, 372 rkey: &str, 373 record: &serde_json::Value, 374) -> Result<(String, Cid), String> { 375 use crate::repo::tracking::TrackingBlockStore; 376 use jacquard_repo::mst::Mst; 377 use std::sync::Arc; 378 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 379 .fetch_optional(&state.db) 380 .await 381 .map_err(|e| format!("DB error: {}", e))? 382 .ok_or_else(|| "User not found".to_string())?; 383 let root_cid_str: String = sqlx::query_scalar!( 384 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 385 user_id 386 ) 387 .fetch_optional(&state.db) 388 .await 389 .map_err(|e| format!("DB error: {}", e))? 390 .ok_or_else(|| "Repo not found".to_string())?; 391 let current_root_cid = 392 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?; 393 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 394 let commit_bytes = tracking_store 395 .get(&current_root_cid) 396 .await 397 .map_err(|e| format!("Failed to fetch commit: {:?}", e))? 398 .ok_or_else(|| "Commit block not found".to_string())?; 399 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes) 400 .map_err(|e| format!("Failed to parse commit: {:?}", e))?; 401 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 402 let record_ipld = crate::util::json_to_ipld(record); 403 let mut record_bytes = Vec::new(); 404 serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld) 405 .map_err(|e| format!("Failed to serialize record: {:?}", e))?; 406 let record_cid = tracking_store 407 .put(&record_bytes) 408 .await 409 .map_err(|e| format!("Failed to save record block: {:?}", e))?; 410 let key = format!("{}/{}", collection, rkey); 411 let new_mst = mst 412 .add(&key, record_cid) 413 .await 414 .map_err(|e| format!("Failed to add to MST: {:?}", e))?; 415 let new_mst_root = new_mst 416 .persist() 417 .await 418 .map_err(|e| format!("Failed to persist MST: {:?}", e))?; 419 let op = RecordOp::Create { 420 collection: collection.to_string(), 421 rkey: rkey.to_string(), 422 cid: record_cid, 423 }; 424 let mut new_mst_blocks = std::collections::BTreeMap::new(); 425 let mut old_mst_blocks = std::collections::BTreeMap::new(); 426 new_mst 427 .blocks_for_path(&key, &mut new_mst_blocks) 428 .await 429 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?; 430 mst.blocks_for_path(&key, &mut old_mst_blocks) 431 .await 432 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?; 433 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 434 .chain( 435 old_mst_blocks 436 .keys() 437 .filter(|cid| !new_mst_blocks.contains_key(*cid)) 438 .copied(), 439 ) 440 .collect(); 441 let mut relevant_blocks = new_mst_blocks; 442 relevant_blocks.extend(old_mst_blocks); 443 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 444 let written_cids: Vec<Cid> = tracking_store 445 .get_all_relevant_cids() 446 .into_iter() 447 .chain(relevant_blocks.keys().copied()) 448 .collect::<std::collections::HashSet<_>>() 449 .into_iter() 450 .collect(); 451 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 452 let blob_cids = extract_blob_cids(record); 453 let result = commit_and_log( 454 state, 455 CommitParams { 456 did, 457 user_id, 458 current_root_cid: Some(current_root_cid), 459 prev_data_cid: Some(commit.data), 460 new_mst_root, 461 ops: vec![op], 462 blocks_cids: &written_cids_str, 463 blobs: &blob_cids, 464 obsolete_cids, 465 }, 466 ) 467 .await?; 468 let uri = format!("at://{}/{}/{}", did, collection, rkey); 469 Ok((uri, result.commit_cid)) 470} 471 472pub async fn sequence_identity_event( 473 state: &AppState, 474 did: &str, 475 handle: Option<&str>, 476) -> Result<i64, String> { 477 let seq_row = sqlx::query!( 478 r#" 479 INSERT INTO repo_seq (did, event_type, handle) 480 VALUES ($1, 'identity', $2) 481 RETURNING seq 482 "#, 483 did, 484 handle, 485 ) 486 .fetch_one(&state.db) 487 .await 488 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?; 489 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 490 .execute(&state.db) 491 .await 492 .map_err(|e| format!("DB Error (notify): {}", e))?; 493 Ok(seq_row.seq) 494} 495pub async fn sequence_account_event( 496 state: &AppState, 497 did: &str, 498 active: bool, 499 status: Option<&str>, 500) -> Result<i64, String> { 501 let seq_row = sqlx::query!( 502 r#" 503 INSERT INTO repo_seq (did, event_type, active, status) 504 VALUES ($1, 'account', $2, $3) 505 RETURNING seq 506 "#, 507 did, 508 active, 509 status, 510 ) 511 .fetch_one(&state.db) 512 .await 513 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?; 514 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 515 .execute(&state.db) 516 .await 517 .map_err(|e| format!("DB Error (notify): {}", e))?; 518 Ok(seq_row.seq) 519} 520pub async fn sequence_sync_event( 521 state: &AppState, 522 did: &str, 523 commit_cid: &str, 524 rev: Option<&str>, 525) -> Result<i64, String> { 526 let seq_row = sqlx::query!( 527 r#" 528 INSERT INTO repo_seq (did, event_type, commit_cid, rev) 529 VALUES ($1, 'sync', $2, $3) 530 RETURNING seq 531 "#, 532 did, 533 commit_cid, 534 rev, 535 ) 536 .fetch_one(&state.db) 537 .await 538 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?; 539 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 540 .execute(&state.db) 541 .await 542 .map_err(|e| format!("DB Error (notify): {}", e))?; 543 Ok(seq_row.seq) 544} 545 546pub async fn sequence_genesis_commit( 547 state: &AppState, 548 did: &str, 549 commit_cid: &Cid, 550 mst_root_cid: &Cid, 551 rev: &str, 552) -> Result<i64, String> { 553 let ops = serde_json::json!([]); 554 let blobs: Vec<String> = vec![]; 555 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 556 let prev_cid: Option<&str> = None; 557 let commit_cid_str = commit_cid.to_string(); 558 let seq_row = sqlx::query!( 559 r#" 560 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev) 561 VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7) 562 RETURNING seq 563 "#, 564 did, 565 commit_cid_str, 566 prev_cid, 567 ops, 568 &blobs, 569 &blocks_cids, 570 rev 571 ) 572 .fetch_one(&state.db) 573 .await 574 .map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?; 575 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 576 .execute(&state.db) 577 .await 578 .map_err(|e| format!("DB Error (notify): {}", e))?; 579 Ok(seq_row.seq) 580}