this repo has no description
1use crate::state::AppState; 2use bytes::Bytes; 3use cid::Cid; 4use jacquard::types::{integer::LimitedU32, string::Tid}; 5use jacquard_repo::commit::Commit; 6use jacquard_repo::storage::BlockStore; 7use k256::ecdsa::SigningKey; 8use serde_json::json; 9use std::str::FromStr; 10use uuid::Uuid; 11 12pub fn create_signed_commit( 13 did: &str, 14 data: Cid, 15 rev: &str, 16 prev: Option<Cid>, 17 signing_key: &SigningKey, 18) -> Result<(Vec<u8>, Bytes), String> { 19 let did = 20 jacquard::types::string::Did::new(did).map_err(|e| format!("Invalid DID: {:?}", e))?; 21 let rev = 22 jacquard::types::string::Tid::from_str(rev).map_err(|e| format!("Invalid TID: {:?}", e))?; 23 let unsigned = Commit::new_unsigned(did, data, rev, prev); 24 let signed = unsigned 25 .sign(signing_key) 26 .map_err(|e| format!("Failed to sign commit: {:?}", e))?; 27 let sig_bytes = signed.sig().clone(); 28 let signed_bytes = signed 29 .to_cbor() 30 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?; 31 Ok((signed_bytes, sig_bytes)) 32} 33 34pub enum RecordOp { 35 Create { 36 collection: String, 37 rkey: String, 38 cid: Cid, 39 }, 40 Update { 41 collection: String, 42 rkey: String, 43 cid: Cid, 44 prev: Option<Cid>, 45 }, 46 Delete { 47 collection: String, 48 rkey: String, 49 prev: Option<Cid>, 50 }, 51} 52 53pub struct CommitResult { 54 pub commit_cid: Cid, 55 pub rev: String, 56} 57 58pub struct CommitParams<'a> { 59 pub did: &'a str, 60 pub user_id: Uuid, 61 pub current_root_cid: Option<Cid>, 62 pub prev_data_cid: Option<Cid>, 63 pub new_mst_root: Cid, 64 pub ops: Vec<RecordOp>, 65 pub blocks_cids: &'a [String], 66} 67 68pub async fn commit_and_log( 69 state: &AppState, 70 params: CommitParams<'_>, 71) -> Result<CommitResult, String> { 72 let CommitParams { 73 did, 74 user_id, 75 current_root_cid, 76 prev_data_cid, 77 new_mst_root, 78 ops, 79 blocks_cids, 80 } = params; 81 let key_row = sqlx::query!( 82 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 83 user_id 84 ) 85 .fetch_one(&state.db) 86 .await 87 .map_err(|e| format!("Failed to fetch signing key: {}", e))?; 88 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 89 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?; 90 let signing_key = 91 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?; 92 let rev = Tid::now(LimitedU32::MIN); 93 let rev_str = rev.to_string(); 94 let (new_commit_bytes, _sig) = 95 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?; 96 let new_root_cid = state 97 .block_store 98 .put(&new_commit_bytes) 99 .await 100 .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 101 let mut tx = state 102 .db 103 .begin() 104 .await 105 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 106 let lock_result = sqlx::query!( 107 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 108 user_id 109 ) 110 .fetch_optional(&mut *tx) 111 .await; 112 match lock_result { 113 Err(e) => { 114 if let Some(db_err) = e.as_database_error() 115 && db_err.code().as_deref() == Some("55P03") 116 { 117 return Err( 118 "ConcurrentModification: Another request is modifying this repo".to_string(), 119 ); 120 } 121 return Err(format!("Failed to acquire repo lock: {}", e)); 122 } 123 Ok(Some(row)) => { 124 if let Some(expected_root) = &current_root_cid 125 && row.repo_root_cid != expected_root.to_string() 126 { 127 return Err( 128 "ConcurrentModification: Repo has been modified since last read".to_string(), 129 ); 130 } 131 } 132 Ok(None) => { 133 return Err("Repo not found".to_string()); 134 } 135 } 136 let is_account_active = sqlx::query_scalar!( 137 "SELECT deactivated_at IS NULL FROM users WHERE id = $1", 138 user_id 139 ) 140 .fetch_optional(&mut *tx) 141 .await 142 .map_err(|e| format!("Failed to check account status: {}", e))? 143 .flatten() 144 .unwrap_or(false); 145 sqlx::query!( 146 "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", 147 new_root_cid.to_string(), 148 user_id 149 ) 150 .execute(&mut *tx) 151 .await 152 .map_err(|e| format!("DB Error (repos): {}", e))?; 153 let mut upsert_collections: Vec<String> = Vec::new(); 154 let mut upsert_rkeys: Vec<String> = Vec::new(); 155 let mut upsert_cids: Vec<String> = Vec::new(); 156 let mut delete_collections: Vec<String> = Vec::new(); 157 let mut delete_rkeys: Vec<String> = Vec::new(); 158 for op in &ops { 159 match op { 160 RecordOp::Create { 161 collection, 162 rkey, 163 cid, 164 } 165 | RecordOp::Update { 166 collection, 167 rkey, 168 cid, 169 .. 170 } => { 171 upsert_collections.push(collection.clone()); 172 upsert_rkeys.push(rkey.clone()); 173 upsert_cids.push(cid.to_string()); 174 } 175 RecordOp::Delete { 176 collection, rkey, .. 177 } => { 178 delete_collections.push(collection.clone()); 179 delete_rkeys.push(rkey.clone()); 180 } 181 } 182 } 183 if !upsert_collections.is_empty() { 184 sqlx::query!( 185 r#" 186 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) 187 SELECT $1, collection, rkey, record_cid, $5 188 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid) 189 ON CONFLICT (repo_id, collection, rkey) DO UPDATE 190 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW() 191 "#, 192 user_id, 193 &upsert_collections, 194 &upsert_rkeys, 195 &upsert_cids, 196 rev_str 197 ) 198 .execute(&mut *tx) 199 .await 200 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?; 201 } 202 if !delete_collections.is_empty() { 203 sqlx::query!( 204 r#" 205 DELETE FROM records 206 WHERE repo_id = $1 207 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[])) 208 "#, 209 user_id, 210 &delete_collections, 211 &delete_rkeys 212 ) 213 .execute(&mut *tx) 214 .await 215 .map_err(|e| format!("DB Error (records batch delete): {}", e))?; 216 } 217 let ops_json = ops 218 .iter() 219 .map(|op| match op { 220 RecordOp::Create { 221 collection, 222 rkey, 223 cid, 224 } => json!({ 225 "action": "create", 226 "path": format!("{}/{}", collection, rkey), 227 "cid": cid.to_string() 228 }), 229 RecordOp::Update { 230 collection, 231 rkey, 232 cid, 233 prev, 234 } => { 235 let mut obj = json!({ 236 "action": "update", 237 "path": format!("{}/{}", collection, rkey), 238 "cid": cid.to_string() 239 }); 240 if let Some(prev_cid) = prev { 241 obj["prev"] = json!(prev_cid.to_string()); 242 } 243 obj 244 } 245 RecordOp::Delete { 246 collection, 247 rkey, 248 prev, 249 } => { 250 let mut obj = json!({ 251 "action": "delete", 252 "path": format!("{}/{}", collection, rkey), 253 "cid": null 254 }); 255 if let Some(prev_cid) = prev { 256 obj["prev"] = json!(prev_cid.to_string()); 257 } 258 obj 259 } 260 }) 261 .collect::<Vec<_>>(); 262 if is_account_active { 263 let event_type = "commit"; 264 let prev_cid_str = current_root_cid.map(|c| c.to_string()); 265 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string()); 266 let seq_row = sqlx::query!( 267 r#" 268 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid) 269 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 270 RETURNING seq 271 "#, 272 did, 273 event_type, 274 new_root_cid.to_string(), 275 prev_cid_str, 276 json!(ops_json), 277 &[] as &[String], 278 blocks_cids, 279 prev_data_cid_str, 280 ) 281 .fetch_one(&mut *tx) 282 .await 283 .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 284 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 285 .execute(&mut *tx) 286 .await 287 .map_err(|e| format!("DB Error (notify): {}", e))?; 288 } 289 tx.commit() 290 .await 291 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 292 if is_account_active { 293 let _ = sequence_sync_event(state, did, &new_root_cid.to_string()).await; 294 } 295 Ok(CommitResult { 296 commit_cid: new_root_cid, 297 rev: rev_str, 298 }) 299} 300pub async fn create_record_internal( 301 state: &AppState, 302 did: &str, 303 collection: &str, 304 rkey: &str, 305 record: &serde_json::Value, 306) -> Result<(String, Cid), String> { 307 use crate::repo::tracking::TrackingBlockStore; 308 use jacquard_repo::mst::Mst; 309 use std::sync::Arc; 310 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 311 .fetch_optional(&state.db) 312 .await 313 .map_err(|e| format!("DB error: {}", e))? 314 .ok_or_else(|| "User not found".to_string())?; 315 let root_cid_str: String = sqlx::query_scalar!( 316 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 317 user_id 318 ) 319 .fetch_optional(&state.db) 320 .await 321 .map_err(|e| format!("DB error: {}", e))? 322 .ok_or_else(|| "Repo not found".to_string())?; 323 let current_root_cid = 324 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?; 325 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 326 let commit_bytes = tracking_store 327 .get(&current_root_cid) 328 .await 329 .map_err(|e| format!("Failed to fetch commit: {:?}", e))? 330 .ok_or_else(|| "Commit block not found".to_string())?; 331 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes) 332 .map_err(|e| format!("Failed to parse commit: {:?}", e))?; 333 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 334 let mut record_bytes = Vec::new(); 335 serde_ipld_dagcbor::to_writer(&mut record_bytes, record) 336 .map_err(|e| format!("Failed to serialize record: {:?}", e))?; 337 let record_cid = tracking_store 338 .put(&record_bytes) 339 .await 340 .map_err(|e| format!("Failed to save record block: {:?}", e))?; 341 let key = format!("{}/{}", collection, rkey); 342 let new_mst = mst 343 .add(&key, record_cid) 344 .await 345 .map_err(|e| format!("Failed to add to MST: {:?}", e))?; 346 let new_mst_root = new_mst 347 .persist() 348 .await 349 .map_err(|e| format!("Failed to persist MST: {:?}", e))?; 350 let op = RecordOp::Create { 351 collection: collection.to_string(), 352 rkey: rkey.to_string(), 353 cid: record_cid, 354 }; 355 let mut relevant_blocks = std::collections::BTreeMap::new(); 356 new_mst 357 .blocks_for_path(&key, &mut relevant_blocks) 358 .await 359 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?; 360 mst.blocks_for_path(&key, &mut relevant_blocks) 361 .await 362 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?; 363 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 364 let mut written_cids = tracking_store.get_all_relevant_cids(); 365 for cid in relevant_blocks.keys() { 366 if !written_cids.contains(cid) { 367 written_cids.push(*cid); 368 } 369 } 370 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 371 let result = commit_and_log( 372 state, 373 CommitParams { 374 did, 375 user_id, 376 current_root_cid: Some(current_root_cid), 377 prev_data_cid: Some(commit.data), 378 new_mst_root, 379 ops: vec![op], 380 blocks_cids: &written_cids_str, 381 }, 382 ) 383 .await?; 384 let uri = format!("at://{}/{}/{}", did, collection, rkey); 385 Ok((uri, result.commit_cid)) 386} 387 388pub async fn sequence_identity_event( 389 state: &AppState, 390 did: &str, 391 handle: Option<&str>, 392) -> Result<i64, String> { 393 let seq_row = sqlx::query!( 394 r#" 395 INSERT INTO repo_seq (did, event_type, handle) 396 VALUES ($1, 'identity', $2) 397 RETURNING seq 398 "#, 399 did, 400 handle, 401 ) 402 .fetch_one(&state.db) 403 .await 404 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?; 405 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 406 .execute(&state.db) 407 .await 408 .map_err(|e| format!("DB Error (notify): {}", e))?; 409 Ok(seq_row.seq) 410} 411pub async fn sequence_account_event( 412 state: &AppState, 413 did: &str, 414 active: bool, 415 status: Option<&str>, 416) -> Result<i64, String> { 417 let seq_row = sqlx::query!( 418 r#" 419 INSERT INTO repo_seq (did, event_type, active, status) 420 VALUES ($1, 'account', $2, $3) 421 RETURNING seq 422 "#, 423 did, 424 active, 425 status, 426 ) 427 .fetch_one(&state.db) 428 .await 429 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?; 430 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 431 .execute(&state.db) 432 .await 433 .map_err(|e| format!("DB Error (notify): {}", e))?; 434 Ok(seq_row.seq) 435} 436pub async fn sequence_sync_event( 437 state: &AppState, 438 did: &str, 439 commit_cid: &str, 440) -> Result<i64, String> { 441 let seq_row = sqlx::query!( 442 r#" 443 INSERT INTO repo_seq (did, event_type, commit_cid) 444 VALUES ($1, 'sync', $2) 445 RETURNING seq 446 "#, 447 did, 448 commit_cid, 449 ) 450 .fetch_one(&state.db) 451 .await 452 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?; 453 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 454 .execute(&state.db) 455 .await 456 .map_err(|e| format!("DB Error (notify): {}", e))?; 457 Ok(seq_row.seq) 458} 459 460pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> { 461 let repo_root = sqlx::query_scalar!( 462 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 463 did 464 ) 465 .fetch_optional(&state.db) 466 .await 467 .map_err(|e| format!("DB Error fetching repo root: {}", e))? 468 .ok_or_else(|| "Repo not found".to_string())?; 469 let ops = serde_json::json!([]); 470 let blobs: Vec<String> = vec![]; 471 let blocks_cids: Vec<String> = vec![]; 472 let seq_row = sqlx::query!( 473 r#" 474 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids) 475 VALUES ($1, 'commit', $2, $2, $3, $4, $5) 476 RETURNING seq 477 "#, 478 did, 479 repo_root, 480 ops, 481 &blobs, 482 &blocks_cids 483 ) 484 .fetch_one(&state.db) 485 .await 486 .map_err(|e| format!("DB Error (repo_seq empty commit): {}", e))?; 487 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 488 .execute(&state.db) 489 .await 490 .map_err(|e| format!("DB Error (notify): {}", e))?; 491 Ok(seq_row.seq) 492}