this repo has no description
1use crate::state::AppState; 2use bytes::Bytes; 3use cid::Cid; 4use jacquard::types::{integer::LimitedU32, string::Tid}; 5use jacquard_repo::commit::Commit; 6use jacquard_repo::storage::BlockStore; 7use k256::ecdsa::SigningKey; 8use serde_json::{Value, json}; 9use std::str::FromStr; 10use uuid::Uuid; 11 12pub fn extract_blob_cids(record: &Value) -> Vec<String> { 13 let mut blobs = Vec::new(); 14 extract_blob_cids_recursive(record, &mut blobs); 15 blobs 16} 17 18fn extract_blob_cids_recursive(value: &Value, blobs: &mut Vec<String>) { 19 match value { 20 Value::Object(map) => { 21 if map.get("$type").and_then(|v| v.as_str()) == Some("blob") 22 && let Some(ref_obj) = map.get("ref") 23 && let Some(link) = ref_obj.get("$link").and_then(|v| v.as_str()) 24 { 25 blobs.push(link.to_string()); 26 } 27 for v in map.values() { 28 extract_blob_cids_recursive(v, blobs); 29 } 30 } 31 Value::Array(arr) => { 32 for v in arr { 33 extract_blob_cids_recursive(v, blobs); 34 } 35 } 36 _ => {} 37 } 38} 39 40pub fn create_signed_commit( 41 did: &str, 42 data: Cid, 43 rev: &str, 44 prev: Option<Cid>, 45 signing_key: &SigningKey, 46) -> Result<(Vec<u8>, Bytes), String> { 47 let did = 48 jacquard::types::string::Did::new(did).map_err(|e| format!("Invalid DID: {:?}", e))?; 49 let rev = 50 jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?; 51 let unsigned = Commit::new_unsigned(did, data, rev, prev); 52 let signed = unsigned 53 .sign(signing_key) 54 .map_err(|e| format!("Failed to sign commit: {:?}", e))?; 55 let sig_bytes = signed.sig().clone(); 56 let signed_bytes = signed 57 .to_cbor() 58 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?; 59 Ok((signed_bytes, sig_bytes)) 60} 61 62pub enum RecordOp { 63 Create { 64 collection: String, 65 rkey: String, 66 cid: Cid, 67 }, 68 Update { 69 collection: String, 70 rkey: String, 71 cid: Cid, 72 prev: Option<Cid>, 73 }, 74 Delete { 75 collection: String, 76 rkey: String, 77 prev: Option<Cid>, 78 }, 79} 80 81pub struct CommitResult { 82 pub commit_cid: Cid, 83 pub rev: String, 84} 85 86pub struct CommitParams<'a> { 87 pub did: &'a str, 88 pub user_id: Uuid, 89 pub current_root_cid: Option<Cid>, 90 pub prev_data_cid: Option<Cid>, 91 pub new_mst_root: Cid, 92 pub ops: Vec<RecordOp>, 93 pub blocks_cids: &'a [String], 94 pub blobs: &'a [String], 95} 96 97pub async fn commit_and_log( 98 state: &AppState, 99 params: CommitParams<'_>, 100) -> Result<CommitResult, String> { 101 let CommitParams { 102 did, 103 user_id, 104 current_root_cid, 105 prev_data_cid, 106 new_mst_root, 107 ops, 108 blocks_cids, 109 blobs, 110 } = params; 111 let key_row = sqlx::query!( 112 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 113 user_id 114 ) 115 .fetch_one(&state.db) 116 .await 117 .map_err(|e| format!("Failed to fetch signing key: {}", e))?; 118 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 119 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?; 120 let signing_key = 121 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?; 122 let rev = Tid::now(LimitedU32::MIN); 123 let rev_str = rev.to_string(); 124 let (new_commit_bytes, _sig) = 125 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?; 126 let new_root_cid = state 127 .block_store 128 .put(&new_commit_bytes) 129 .await 130 .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 131 let mut tx = state 132 .db 133 .begin() 134 .await 135 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 136 let lock_result = sqlx::query!( 137 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 138 user_id 139 ) 140 .fetch_optional(&mut *tx) 141 .await; 142 match lock_result { 143 Err(e) => { 144 if let Some(db_err) = e.as_database_error() 145 && db_err.code().as_deref() == Some("55P03") 146 { 147 return Err( 148 "ConcurrentModification: Another request is modifying this repo".to_string(), 149 ); 150 } 151 return Err(format!("Failed to acquire repo lock: {}", e)); 152 } 153 Ok(Some(row)) => { 154 if let Some(expected_root) = &current_root_cid 155 && row.repo_root_cid != expected_root.to_string() 156 { 157 return Err( 158 "ConcurrentModification: Repo has been modified since last read".to_string(), 159 ); 160 } 161 } 162 Ok(None) => { 163 return Err("Repo not found".to_string()); 164 } 165 } 166 let is_account_active = sqlx::query_scalar!( 167 "SELECT deactivated_at IS NULL FROM users WHERE id = $1", 168 user_id 169 ) 170 .fetch_optional(&mut *tx) 171 .await 172 .map_err(|e| format!("Failed to check account status: {}", e))? 173 .flatten() 174 .unwrap_or(false); 175 sqlx::query!( 176 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3", 177 new_root_cid.to_string(), 178 &rev_str, 179 user_id 180 ) 181 .execute(&mut *tx) 182 .await 183 .map_err(|e| format!("DB Error (repos): {}", e))?; 184 let mut all_block_cids: Vec<Vec<u8>> = blocks_cids 185 .iter() 186 .filter_map(|s| Cid::from_str(s).ok()) 187 .map(|c| c.to_bytes()) 188 .collect(); 189 all_block_cids.push(new_root_cid.to_bytes()); 190 if !all_block_cids.is_empty() { 191 sqlx::query!( 192 r#" 193 INSERT INTO user_blocks (user_id, block_cid) 194 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 195 ON CONFLICT (user_id, block_cid) DO NOTHING 196 "#, 197 user_id, 198 &all_block_cids 199 ) 200 .execute(&mut *tx) 201 .await 202 .map_err(|e| format!("DB Error (user_blocks): {}", e))?; 203 } 204 let mut upsert_collections: Vec<String> = Vec::new(); 205 let mut upsert_rkeys: Vec<String> = Vec::new(); 206 let mut upsert_cids: Vec<String> = Vec::new(); 207 let mut delete_collections: Vec<String> = Vec::new(); 208 let mut delete_rkeys: Vec<String> = Vec::new(); 209 for op in &ops { 210 match op { 211 RecordOp::Create { 212 collection, 213 rkey, 214 cid, 215 } 216 | RecordOp::Update { 217 collection, 218 rkey, 219 cid, 220 .. 221 } => { 222 upsert_collections.push(collection.clone()); 223 upsert_rkeys.push(rkey.clone()); 224 upsert_cids.push(cid.to_string()); 225 } 226 RecordOp::Delete { 227 collection, rkey, .. 228 } => { 229 delete_collections.push(collection.clone()); 230 delete_rkeys.push(rkey.clone()); 231 } 232 } 233 } 234 if !upsert_collections.is_empty() { 235 sqlx::query!( 236 r#" 237 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) 238 SELECT $1, collection, rkey, record_cid, $5 239 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid) 240 ON CONFLICT (repo_id, collection, rkey) DO UPDATE 241 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW() 242 "#, 243 user_id, 244 &upsert_collections, 245 &upsert_rkeys, 246 &upsert_cids, 247 rev_str 248 ) 249 .execute(&mut *tx) 250 .await 251 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?; 252 } 253 if !delete_collections.is_empty() { 254 sqlx::query!( 255 r#" 256 DELETE FROM records 257 WHERE repo_id = $1 258 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[])) 259 "#, 260 user_id, 261 &delete_collections, 262 &delete_rkeys 263 ) 264 .execute(&mut *tx) 265 .await 266 .map_err(|e| format!("DB Error (records batch delete): {}", e))?; 267 } 268 let ops_json = ops 269 .iter() 270 .map(|op| match op { 271 RecordOp::Create { 272 collection, 273 rkey, 274 cid, 275 } => json!({ 276 "action": "create", 277 "path": format!("{}/{}", collection, rkey), 278 "cid": cid.to_string() 279 }), 280 RecordOp::Update { 281 collection, 282 rkey, 283 cid, 284 prev, 285 } => { 286 let mut obj = json!({ 287 "action": "update", 288 "path": format!("{}/{}", collection, rkey), 289 "cid": cid.to_string() 290 }); 291 if let Some(prev_cid) = prev { 292 obj["prev"] = json!(prev_cid.to_string()); 293 } 294 obj 295 } 296 RecordOp::Delete { 297 collection, 298 rkey, 299 prev, 300 } => { 301 let mut obj = json!({ 302 "action": "delete", 303 "path": format!("{}/{}", collection, rkey), 304 "cid": null 305 }); 306 if let Some(prev_cid) = prev { 307 obj["prev"] = json!(prev_cid.to_string()); 308 } 309 obj 310 } 311 }) 312 .collect::<Vec<_>>(); 313 if is_account_active { 314 let event_type = "commit"; 315 let prev_cid_str = current_root_cid.map(|c| c.to_string()); 316 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string()); 317 let seq_row = sqlx::query!( 318 r#" 319 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid) 320 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 321 RETURNING seq 322 "#, 323 did, 324 event_type, 325 new_root_cid.to_string(), 326 prev_cid_str, 327 json!(ops_json), 328 blobs, 329 blocks_cids, 330 prev_data_cid_str, 331 ) 332 .fetch_one(&mut *tx) 333 .await 334 .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 335 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 336 .execute(&mut *tx) 337 .await 338 .map_err(|e| format!("DB Error (notify): {}", e))?; 339 } 340 tx.commit() 341 .await 342 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 343 if is_account_active { 344 let _ = sequence_sync_event(state, did, &new_root_cid.to_string(), Some(&rev_str)).await; 345 } 346 Ok(CommitResult { 347 commit_cid: new_root_cid, 348 rev: rev_str, 349 }) 350} 351pub async fn create_record_internal( 352 state: &AppState, 353 did: &str, 354 collection: &str, 355 rkey: &str, 356 record: &serde_json::Value, 357) -> Result<(String, Cid), String> { 358 use crate::repo::tracking::TrackingBlockStore; 359 use jacquard_repo::mst::Mst; 360 use std::sync::Arc; 361 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 362 .fetch_optional(&state.db) 363 .await 364 .map_err(|e| format!("DB error: {}", e))? 365 .ok_or_else(|| "User not found".to_string())?; 366 let root_cid_str: String = sqlx::query_scalar!( 367 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 368 user_id 369 ) 370 .fetch_optional(&state.db) 371 .await 372 .map_err(|e| format!("DB error: {}", e))? 373 .ok_or_else(|| "Repo not found".to_string())?; 374 let current_root_cid = 375 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?; 376 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 377 let commit_bytes = tracking_store 378 .get(&current_root_cid) 379 .await 380 .map_err(|e| format!("Failed to fetch commit: {:?}", e))? 381 .ok_or_else(|| "Commit block not found".to_string())?; 382 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes) 383 .map_err(|e| format!("Failed to parse commit: {:?}", e))?; 384 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 385 let record_ipld = crate::util::json_to_ipld(record); 386 let mut record_bytes = Vec::new(); 387 serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld) 388 .map_err(|e| format!("Failed to serialize record: {:?}", e))?; 389 let record_cid = tracking_store 390 .put(&record_bytes) 391 .await 392 .map_err(|e| format!("Failed to save record block: {:?}", e))?; 393 let key = format!("{}/{}", collection, rkey); 394 let new_mst = mst 395 .add(&key, record_cid) 396 .await 397 .map_err(|e| format!("Failed to add to MST: {:?}", e))?; 398 let new_mst_root = new_mst 399 .persist() 400 .await 401 .map_err(|e| format!("Failed to persist MST: {:?}", e))?; 402 let op = RecordOp::Create { 403 collection: collection.to_string(), 404 rkey: rkey.to_string(), 405 cid: record_cid, 406 }; 407 let mut relevant_blocks = std::collections::BTreeMap::new(); 408 new_mst 409 .blocks_for_path(&key, &mut relevant_blocks) 410 .await 411 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?; 412 mst.blocks_for_path(&key, &mut relevant_blocks) 413 .await 414 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?; 415 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 416 let mut written_cids = tracking_store.get_all_relevant_cids(); 417 for cid in relevant_blocks.keys() { 418 if !written_cids.contains(cid) { 419 written_cids.push(*cid); 420 } 421 } 422 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 423 let blob_cids = extract_blob_cids(record); 424 let result = commit_and_log( 425 state, 426 CommitParams { 427 did, 428 user_id, 429 current_root_cid: Some(current_root_cid), 430 prev_data_cid: Some(commit.data), 431 new_mst_root, 432 ops: vec![op], 433 blocks_cids: &written_cids_str, 434 blobs: &blob_cids, 435 }, 436 ) 437 .await?; 438 let uri = format!("at://{}/{}/{}", did, collection, rkey); 439 Ok((uri, result.commit_cid)) 440} 441 442pub async fn sequence_identity_event( 443 state: &AppState, 444 did: &str, 445 handle: Option<&str>, 446) -> Result<i64, String> { 447 let seq_row = sqlx::query!( 448 r#" 449 INSERT INTO repo_seq (did, event_type, handle) 450 VALUES ($1, 'identity', $2) 451 RETURNING seq 452 "#, 453 did, 454 handle, 455 ) 456 .fetch_one(&state.db) 457 .await 458 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?; 459 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 460 .execute(&state.db) 461 .await 462 .map_err(|e| format!("DB Error (notify): {}", e))?; 463 Ok(seq_row.seq) 464} 465pub async fn sequence_account_event( 466 state: &AppState, 467 did: &str, 468 active: bool, 469 status: Option<&str>, 470) -> Result<i64, String> { 471 let seq_row = sqlx::query!( 472 r#" 473 INSERT INTO repo_seq (did, event_type, active, status) 474 VALUES ($1, 'account', $2, $3) 475 RETURNING seq 476 "#, 477 did, 478 active, 479 status, 480 ) 481 .fetch_one(&state.db) 482 .await 483 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?; 484 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 485 .execute(&state.db) 486 .await 487 .map_err(|e| format!("DB Error (notify): {}", e))?; 488 Ok(seq_row.seq) 489} 490pub async fn sequence_sync_event( 491 state: &AppState, 492 did: &str, 493 commit_cid: &str, 494 rev: Option<&str>, 495) -> Result<i64, String> { 496 let seq_row = sqlx::query!( 497 r#" 498 INSERT INTO repo_seq (did, event_type, commit_cid, rev) 499 VALUES ($1, 'sync', $2, $3) 500 RETURNING seq 501 "#, 502 did, 503 commit_cid, 504 rev, 505 ) 506 .fetch_one(&state.db) 507 .await 508 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?; 509 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 510 .execute(&state.db) 511 .await 512 .map_err(|e| format!("DB Error (notify): {}", e))?; 513 Ok(seq_row.seq) 514} 515 516pub async fn sequence_genesis_commit( 517 state: &AppState, 518 did: &str, 519 commit_cid: &Cid, 520 mst_root_cid: &Cid, 521 rev: &str, 522) -> Result<i64, String> { 523 let ops = serde_json::json!([]); 524 let blobs: Vec<String> = vec![]; 525 let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 526 let prev_cid: Option<&str> = None; 527 let commit_cid_str = commit_cid.to_string(); 528 let seq_row = sqlx::query!( 529 r#" 530 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev) 531 VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7) 532 RETURNING seq 533 "#, 534 did, 535 commit_cid_str, 536 prev_cid, 537 ops, 538 &blobs, 539 &blocks_cids, 540 rev 541 ) 542 .fetch_one(&state.db) 543 .await 544 .map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?; 545 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 546 .execute(&state.db) 547 .await 548 .map_err(|e| format!("DB Error (notify): {}", e))?; 549 Ok(seq_row.seq) 550}