this repo has no description
1use crate::state::AppState; 2use bytes::Bytes; 3use cid::Cid; 4use jacquard::types::{integer::LimitedU32, string::Tid}; 5use jacquard_repo::commit::Commit; 6use jacquard_repo::storage::BlockStore; 7use k256::ecdsa::SigningKey; 8use serde_json::{json, Value}; 9use std::str::FromStr; 10use uuid::Uuid; 11 12pub fn extract_blob_cids(record: &Value) -> Vec<String> { 13 let mut blobs = Vec::new(); 14 extract_blob_cids_recursive(record, &mut blobs); 15 blobs 16} 17 18fn extract_blob_cids_recursive(value: &Value, blobs: &mut Vec<String>) { 19 match value { 20 Value::Object(map) => { 21 if map.get("$type").and_then(|v| v.as_str()) == Some("blob") { 22 if let Some(ref_obj) = map.get("ref") { 23 if let Some(link) = ref_obj.get("$link").and_then(|v| v.as_str()) { 24 blobs.push(link.to_string()); 25 } 26 } 27 } 28 for v in map.values() { 29 extract_blob_cids_recursive(v, blobs); 30 } 31 } 32 Value::Array(arr) => { 33 for v in arr { 34 extract_blob_cids_recursive(v, blobs); 35 } 36 } 37 _ => {} 38 } 39} 40 41pub fn create_signed_commit( 42 did: &str, 43 data: Cid, 44 rev: &str, 45 prev: Option<Cid>, 46 signing_key: &SigningKey, 47) -> Result<(Vec<u8>, Bytes), String> { 48 let did = 49 jacquard::types::string::Did::new(did).map_err(|e| format!("Invalid DID: {:?}", e))?; 50 let rev = 51 jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?; 52 let unsigned = Commit::new_unsigned(did, data, rev, prev); 53 let signed = unsigned 54 .sign(signing_key) 55 .map_err(|e| format!("Failed to sign commit: {:?}", e))?; 56 let sig_bytes = signed.sig().clone(); 57 let signed_bytes = signed 58 .to_cbor() 59 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?; 60 Ok((signed_bytes, sig_bytes)) 61} 62 63pub enum RecordOp { 64 Create { 65 collection: String, 66 rkey: String, 67 cid: Cid, 68 }, 69 Update { 70 collection: String, 71 rkey: String, 72 cid: Cid, 73 prev: Option<Cid>, 74 }, 75 Delete { 76 collection: String, 77 rkey: String, 78 prev: Option<Cid>, 79 }, 80} 81 82pub struct CommitResult { 83 pub commit_cid: Cid, 84 pub rev: String, 85} 86 87pub struct CommitParams<'a> { 88 pub did: &'a str, 89 pub user_id: Uuid, 90 pub current_root_cid: Option<Cid>, 91 pub prev_data_cid: Option<Cid>, 92 pub new_mst_root: Cid, 93 pub ops: Vec<RecordOp>, 94 pub blocks_cids: &'a [String], 95 pub blobs: &'a [String], 96} 97 98pub async fn commit_and_log( 99 state: &AppState, 100 params: CommitParams<'_>, 101) -> Result<CommitResult, String> { 102 let CommitParams { 103 did, 104 user_id, 105 current_root_cid, 106 prev_data_cid, 107 new_mst_root, 108 ops, 109 blocks_cids, 110 blobs, 111 } = params; 112 let key_row = sqlx::query!( 113 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 114 user_id 115 ) 116 .fetch_one(&state.db) 117 .await 118 .map_err(|e| format!("Failed to fetch signing key: {}", e))?; 119 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 120 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?; 121 let signing_key = 122 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?; 123 let rev = Tid::now(LimitedU32::MIN); 124 let rev_str = rev.to_string(); 125 let (new_commit_bytes, _sig) = 126 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?; 127 let new_root_cid = state 128 .block_store 129 .put(&new_commit_bytes) 130 .await 131 .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 132 let mut tx = state 133 .db 134 .begin() 135 .await 136 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 137 let lock_result = sqlx::query!( 138 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 139 user_id 140 ) 141 .fetch_optional(&mut *tx) 142 .await; 143 match lock_result { 144 Err(e) => { 145 if let Some(db_err) = e.as_database_error() 146 && db_err.code().as_deref() == Some("55P03") 147 { 148 return Err( 149 "ConcurrentModification: Another request is modifying this repo".to_string(), 150 ); 151 } 152 return Err(format!("Failed to acquire repo lock: {}", e)); 153 } 154 Ok(Some(row)) => { 155 if let Some(expected_root) = &current_root_cid 156 && row.repo_root_cid != expected_root.to_string() 157 { 158 return Err( 159 "ConcurrentModification: Repo has been modified since last read".to_string(), 160 ); 161 } 162 } 163 Ok(None) => { 164 return Err("Repo not found".to_string()); 165 } 166 } 167 let is_account_active = sqlx::query_scalar!( 168 "SELECT deactivated_at IS NULL FROM users WHERE id = $1", 169 user_id 170 ) 171 .fetch_optional(&mut *tx) 172 .await 173 .map_err(|e| format!("Failed to check account status: {}", e))? 174 .flatten() 175 .unwrap_or(false); 176 sqlx::query!( 177 "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", 178 new_root_cid.to_string(), 179 user_id 180 ) 181 .execute(&mut *tx) 182 .await 183 .map_err(|e| format!("DB Error (repos): {}", e))?; 184 let mut upsert_collections: Vec<String> = Vec::new(); 185 let mut upsert_rkeys: Vec<String> = Vec::new(); 186 let mut upsert_cids: Vec<String> = Vec::new(); 187 let mut delete_collections: Vec<String> = Vec::new(); 188 let mut delete_rkeys: Vec<String> = Vec::new(); 189 for op in &ops { 190 match op { 191 RecordOp::Create { 192 collection, 193 rkey, 194 cid, 195 } 196 | RecordOp::Update { 197 collection, 198 rkey, 199 cid, 200 .. 201 } => { 202 upsert_collections.push(collection.clone()); 203 upsert_rkeys.push(rkey.clone()); 204 upsert_cids.push(cid.to_string()); 205 } 206 RecordOp::Delete { 207 collection, rkey, .. 208 } => { 209 delete_collections.push(collection.clone()); 210 delete_rkeys.push(rkey.clone()); 211 } 212 } 213 } 214 if !upsert_collections.is_empty() { 215 sqlx::query!( 216 r#" 217 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) 218 SELECT $1, collection, rkey, record_cid, $5 219 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid) 220 ON CONFLICT (repo_id, collection, rkey) DO UPDATE 221 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW() 222 "#, 223 user_id, 224 &upsert_collections, 225 &upsert_rkeys, 226 &upsert_cids, 227 rev_str 228 ) 229 .execute(&mut *tx) 230 .await 231 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?; 232 } 233 if !delete_collections.is_empty() { 234 sqlx::query!( 235 r#" 236 DELETE FROM records 237 WHERE repo_id = $1 238 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[])) 239 "#, 240 user_id, 241 &delete_collections, 242 &delete_rkeys 243 ) 244 .execute(&mut *tx) 245 .await 246 .map_err(|e| format!("DB Error (records batch delete): {}", e))?; 247 } 248 let ops_json = ops 249 .iter() 250 .map(|op| match op { 251 RecordOp::Create { 252 collection, 253 rkey, 254 cid, 255 } => json!({ 256 "action": "create", 257 "path": format!("{}/{}", collection, rkey), 258 "cid": cid.to_string() 259 }), 260 RecordOp::Update { 261 collection, 262 rkey, 263 cid, 264 prev, 265 } => { 266 let mut obj = json!({ 267 "action": "update", 268 "path": format!("{}/{}", collection, rkey), 269 "cid": cid.to_string() 270 }); 271 if let Some(prev_cid) = prev { 272 obj["prev"] = json!(prev_cid.to_string()); 273 } 274 obj 275 } 276 RecordOp::Delete { 277 collection, 278 rkey, 279 prev, 280 } => { 281 let mut obj = json!({ 282 "action": "delete", 283 "path": format!("{}/{}", collection, rkey), 284 "cid": null 285 }); 286 if let Some(prev_cid) = prev { 287 obj["prev"] = json!(prev_cid.to_string()); 288 } 289 obj 290 } 291 }) 292 .collect::<Vec<_>>(); 293 if is_account_active { 294 let event_type = "commit"; 295 let prev_cid_str = current_root_cid.map(|c| c.to_string()); 296 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string()); 297 let seq_row = sqlx::query!( 298 r#" 299 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid) 300 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 301 RETURNING seq 302 "#, 303 did, 304 event_type, 305 new_root_cid.to_string(), 306 prev_cid_str, 307 json!(ops_json), 308 blobs, 309 blocks_cids, 310 prev_data_cid_str, 311 ) 312 .fetch_one(&mut *tx) 313 .await 314 .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 315 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 316 .execute(&mut *tx) 317 .await 318 .map_err(|e| format!("DB Error (notify): {}", e))?; 319 } 320 tx.commit() 321 .await 322 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 323 if is_account_active { 324 let _ = sequence_sync_event(state, did, &new_root_cid.to_string(), Some(&rev_str)).await; 325 } 326 Ok(CommitResult { 327 commit_cid: new_root_cid, 328 rev: rev_str, 329 }) 330} 331pub async fn create_record_internal( 332 state: &AppState, 333 did: &str, 334 collection: &str, 335 rkey: &str, 336 record: &serde_json::Value, 337) -> Result<(String, Cid), String> { 338 use crate::repo::tracking::TrackingBlockStore; 339 use jacquard_repo::mst::Mst; 340 use std::sync::Arc; 341 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 342 .fetch_optional(&state.db) 343 .await 344 .map_err(|e| format!("DB error: {}", e))? 345 .ok_or_else(|| "User not found".to_string())?; 346 let root_cid_str: String = sqlx::query_scalar!( 347 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 348 user_id 349 ) 350 .fetch_optional(&state.db) 351 .await 352 .map_err(|e| format!("DB error: {}", e))? 353 .ok_or_else(|| "Repo not found".to_string())?; 354 let current_root_cid = 355 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?; 356 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 357 let commit_bytes = tracking_store 358 .get(&current_root_cid) 359 .await 360 .map_err(|e| format!("Failed to fetch commit: {:?}", e))? 361 .ok_or_else(|| "Commit block not found".to_string())?; 362 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes) 363 .map_err(|e| format!("Failed to parse commit: {:?}", e))?; 364 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 365 let mut record_bytes = Vec::new(); 366 serde_ipld_dagcbor::to_writer(&mut record_bytes, record) 367 .map_err(|e| format!("Failed to serialize record: {:?}", e))?; 368 let record_cid = tracking_store 369 .put(&record_bytes) 370 .await 371 .map_err(|e| format!("Failed to save record block: {:?}", e))?; 372 let key = format!("{}/{}", collection, rkey); 373 let new_mst = mst 374 .add(&key, record_cid) 375 .await 376 .map_err(|e| format!("Failed to add to MST: {:?}", e))?; 377 let new_mst_root = new_mst 378 .persist() 379 .await 380 .map_err(|e| format!("Failed to persist MST: {:?}", e))?; 381 let op = RecordOp::Create { 382 collection: collection.to_string(), 383 rkey: rkey.to_string(), 384 cid: record_cid, 385 }; 386 let mut relevant_blocks = std::collections::BTreeMap::new(); 387 new_mst 388 .blocks_for_path(&key, &mut relevant_blocks) 389 .await 390 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?; 391 mst.blocks_for_path(&key, &mut relevant_blocks) 392 .await 393 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?; 394 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 395 let mut written_cids = tracking_store.get_all_relevant_cids(); 396 for cid in relevant_blocks.keys() { 397 if !written_cids.contains(cid) { 398 written_cids.push(*cid); 399 } 400 } 401 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 402 let blob_cids = extract_blob_cids(record); 403 let result = commit_and_log( 404 state, 405 CommitParams { 406 did, 407 user_id, 408 current_root_cid: Some(current_root_cid), 409 prev_data_cid: Some(commit.data), 410 new_mst_root, 411 ops: vec![op], 412 blocks_cids: &written_cids_str, 413 blobs: &blob_cids, 414 }, 415 ) 416 .await?; 417 let uri = format!("at://{}/{}/{}", did, collection, rkey); 418 Ok((uri, result.commit_cid)) 419} 420 421pub async fn sequence_identity_event( 422 state: &AppState, 423 did: &str, 424 handle: Option<&str>, 425) -> Result<i64, String> { 426 let seq_row = sqlx::query!( 427 r#" 428 INSERT INTO repo_seq (did, event_type, handle) 429 VALUES ($1, 'identity', $2) 430 RETURNING seq 431 "#, 432 did, 433 handle, 434 ) 435 .fetch_one(&state.db) 436 .await 437 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?; 438 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 439 .execute(&state.db) 440 .await 441 .map_err(|e| format!("DB Error (notify): {}", e))?; 442 Ok(seq_row.seq) 443} 444pub async fn sequence_account_event( 445 state: &AppState, 446 did: &str, 447 active: bool, 448 status: Option<&str>, 449) -> Result<i64, String> { 450 let seq_row = sqlx::query!( 451 r#" 452 INSERT INTO repo_seq (did, event_type, active, status) 453 VALUES ($1, 'account', $2, $3) 454 RETURNING seq 455 "#, 456 did, 457 active, 458 status, 459 ) 460 .fetch_one(&state.db) 461 .await 462 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?; 463 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 464 .execute(&state.db) 465 .await 466 .map_err(|e| format!("DB Error (notify): {}", e))?; 467 Ok(seq_row.seq) 468} 469pub async fn sequence_sync_event( 470 state: &AppState, 471 did: &str, 472 commit_cid: &str, 473 rev: Option<&str>, 474) -> Result<i64, String> { 475 let seq_row = sqlx::query!( 476 r#" 477 INSERT INTO repo_seq (did, event_type, commit_cid, rev) 478 VALUES ($1, 'sync', $2, $3) 479 RETURNING seq 480 "#, 481 did, 482 commit_cid, 483 rev, 484 ) 485 .fetch_one(&state.db) 486 .await 487 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?; 488 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 489 .execute(&state.db) 490 .await 491 .map_err(|e| format!("DB Error (notify): {}", e))?; 492 Ok(seq_row.seq) 493} 494 495pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> { 496 let repo_root = sqlx::query_scalar!( 497 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 498 did 499 ) 500 .fetch_optional(&state.db) 501 .await 502 .map_err(|e| format!("DB Error fetching repo root: {}", e))? 503 .ok_or_else(|| "Repo not found".to_string())?; 504 let ops = serde_json::json!([]); 505 let blobs: Vec<String> = vec![]; 506 let blocks_cids: Vec<String> = vec![]; 507 let seq_row = sqlx::query!( 508 r#" 509 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids) 510 VALUES ($1, 'commit', $2, $2, $3, $4, $5) 511 RETURNING seq 512 "#, 513 did, 514 repo_root, 515 ops, 516 &blobs, 517 &blocks_cids 518 ) 519 .fetch_one(&state.db) 520 .await 521 .map_err(|e| format!("DB Error (repo_seq empty commit): {}", e))?; 522 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 523 .execute(&state.db) 524 .await 525 .map_err(|e| format!("DB Error (notify): {}", e))?; 526 Ok(seq_row.seq) 527}