this repo has no description
1use crate::state::AppState; 2use bytes::Bytes; 3use cid::Cid; 4use jacquard::types::{integer::LimitedU32, string::Tid}; 5use jacquard_repo::storage::BlockStore; 6use k256::ecdsa::{Signature, SigningKey, signature::Signer}; 7use serde::Serialize; 8use serde_json::json; 9use uuid::Uuid; 10/* 11 * Why am I making custom commit objects instead of jacquard's Commit::sign(), you ask? 12 * 13 * At time of writing, jacquard has a bug in how it creates unsigned bytes for signing. 14 * Jacquard sets sig to empty bytes and serializes (6-field CBOR map) 15 * Indigo/ATProto creates a struct *without* the sig field (5-field CBOR map) 16 * 17 * These produce different CBOR bytes, so signatures created with jacquard 18 * don't verify with the relay's algorithm. The relay silently rejects commits 19 * with invalid signatures. 20 * 21 * If you have it downloaded, see: reference-relay-indigo/atproto/repo/commit.go UnsignedBytes() 22 */ 23#[derive(Serialize)] 24struct UnsignedCommit<'a> { 25 data: Cid, 26 did: &'a str, 27 prev: Option<Cid>, 28 rev: &'a str, 29 version: i64, 30} 31 32fn create_signed_commit( 33 did: &str, 34 data: Cid, 35 rev: &str, 36 prev: Option<Cid>, 37 signing_key: &SigningKey, 38) -> Result<(Vec<u8>, Bytes), String> { 39 let unsigned = UnsignedCommit { 40 data, 41 did, 42 prev, 43 rev, 44 version: 3, 45 }; 46 let unsigned_bytes = serde_ipld_dagcbor::to_vec(&unsigned) 47 .map_err(|e| format!("Failed to serialize unsigned commit: {:?}", e))?; 48 let sig: Signature = signing_key.sign(&unsigned_bytes); 49 let sig_bytes = Bytes::copy_from_slice(&sig.to_bytes()); 50 #[derive(Serialize)] 51 struct SignedCommit<'a> { 52 data: Cid, 53 did: &'a str, 54 prev: Option<Cid>, 55 rev: &'a str, 56 #[serde(with = "serde_bytes")] 57 sig: &'a [u8], 58 version: i64, 59 } 60 let signed = SignedCommit { 61 data, 62 did, 63 prev, 64 rev, 65 sig: &sig_bytes, 66 version: 3, 67 }; 68 let signed_bytes = serde_ipld_dagcbor::to_vec(&signed) 69 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?; 70 Ok((signed_bytes, sig_bytes)) 71} 72 73pub enum RecordOp { 74 Create { 75 collection: String, 76 rkey: String, 77 cid: Cid, 78 }, 79 Update { 80 collection: String, 81 rkey: String, 82 cid: Cid, 83 prev: Option<Cid>, 84 }, 85 Delete { 86 collection: String, 87 rkey: String, 88 prev: Option<Cid>, 89 }, 90} 91 92pub struct CommitResult { 93 pub commit_cid: Cid, 94 pub rev: String, 95} 96 97pub struct CommitParams<'a> { 98 pub did: &'a str, 99 pub user_id: Uuid, 100 pub current_root_cid: Option<Cid>, 101 pub prev_data_cid: Option<Cid>, 102 pub new_mst_root: Cid, 103 pub ops: Vec<RecordOp>, 104 pub blocks_cids: &'a [String], 105} 106 107pub async fn commit_and_log( 108 state: &AppState, 109 params: CommitParams<'_>, 110) -> Result<CommitResult, String> { 111 let CommitParams { 112 did, 113 user_id, 114 current_root_cid, 115 prev_data_cid, 116 new_mst_root, 117 ops, 118 blocks_cids, 119 } = params; 120 let key_row = sqlx::query!( 121 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 122 user_id 123 ) 124 .fetch_one(&state.db) 125 .await 126 .map_err(|e| format!("Failed to fetch signing key: {}", e))?; 127 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 128 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?; 129 let signing_key = 130 SigningKey::from_slice(&key_bytes).map_err(|e| format!("Invalid signing key: {}", e))?; 131 let rev = Tid::now(LimitedU32::MIN); 132 let rev_str = rev.to_string(); 133 let (new_commit_bytes, _sig) = 134 create_signed_commit(did, new_mst_root, &rev_str, current_root_cid, &signing_key)?; 135 let new_root_cid = state 136 .block_store 137 .put(&new_commit_bytes) 138 .await 139 .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 140 let mut tx = state 141 .db 142 .begin() 143 .await 144 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 145 let lock_result = sqlx::query!( 146 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 147 user_id 148 ) 149 .fetch_optional(&mut *tx) 150 .await; 151 match lock_result { 152 Err(e) => { 153 if let Some(db_err) = e.as_database_error() 154 && db_err.code().as_deref() == Some("55P03") 155 { 156 return Err( 157 "ConcurrentModification: Another request is modifying this repo".to_string(), 158 ); 159 } 160 return Err(format!("Failed to acquire repo lock: {}", e)); 161 } 162 Ok(Some(row)) => { 163 if let Some(expected_root) = &current_root_cid 164 && row.repo_root_cid != expected_root.to_string() 165 { 166 return Err( 167 "ConcurrentModification: Repo has been modified since last read".to_string(), 168 ); 169 } 170 } 171 Ok(None) => { 172 return Err("Repo not found".to_string()); 173 } 174 } 175 let is_account_active = sqlx::query_scalar!( 176 "SELECT deactivated_at IS NULL FROM users WHERE id = $1", 177 user_id 178 ) 179 .fetch_optional(&mut *tx) 180 .await 181 .map_err(|e| format!("Failed to check account status: {}", e))? 182 .flatten() 183 .unwrap_or(false); 184 sqlx::query!( 185 "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", 186 new_root_cid.to_string(), 187 user_id 188 ) 189 .execute(&mut *tx) 190 .await 191 .map_err(|e| format!("DB Error (repos): {}", e))?; 192 let mut upsert_collections: Vec<String> = Vec::new(); 193 let mut upsert_rkeys: Vec<String> = Vec::new(); 194 let mut upsert_cids: Vec<String> = Vec::new(); 195 let mut delete_collections: Vec<String> = Vec::new(); 196 let mut delete_rkeys: Vec<String> = Vec::new(); 197 for op in &ops { 198 match op { 199 RecordOp::Create { 200 collection, 201 rkey, 202 cid, 203 } 204 | RecordOp::Update { 205 collection, 206 rkey, 207 cid, 208 .. 209 } => { 210 upsert_collections.push(collection.clone()); 211 upsert_rkeys.push(rkey.clone()); 212 upsert_cids.push(cid.to_string()); 213 } 214 RecordOp::Delete { 215 collection, rkey, .. 216 } => { 217 delete_collections.push(collection.clone()); 218 delete_rkeys.push(rkey.clone()); 219 } 220 } 221 } 222 if !upsert_collections.is_empty() { 223 sqlx::query!( 224 r#" 225 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) 226 SELECT $1, collection, rkey, record_cid, $5 227 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid) 228 ON CONFLICT (repo_id, collection, rkey) DO UPDATE 229 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW() 230 "#, 231 user_id, 232 &upsert_collections, 233 &upsert_rkeys, 234 &upsert_cids, 235 rev_str 236 ) 237 .execute(&mut *tx) 238 .await 239 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?; 240 } 241 if !delete_collections.is_empty() { 242 sqlx::query!( 243 r#" 244 DELETE FROM records 245 WHERE repo_id = $1 246 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[])) 247 "#, 248 user_id, 249 &delete_collections, 250 &delete_rkeys 251 ) 252 .execute(&mut *tx) 253 .await 254 .map_err(|e| format!("DB Error (records batch delete): {}", e))?; 255 } 256 let ops_json = ops 257 .iter() 258 .map(|op| match op { 259 RecordOp::Create { 260 collection, 261 rkey, 262 cid, 263 } => json!({ 264 "action": "create", 265 "path": format!("{}/{}", collection, rkey), 266 "cid": cid.to_string() 267 }), 268 RecordOp::Update { 269 collection, 270 rkey, 271 cid, 272 prev, 273 } => { 274 let mut obj = json!({ 275 "action": "update", 276 "path": format!("{}/{}", collection, rkey), 277 "cid": cid.to_string() 278 }); 279 if let Some(prev_cid) = prev { 280 obj["prev"] = json!(prev_cid.to_string()); 281 } 282 obj 283 } 284 RecordOp::Delete { 285 collection, 286 rkey, 287 prev, 288 } => { 289 let mut obj = json!({ 290 "action": "delete", 291 "path": format!("{}/{}", collection, rkey), 292 "cid": null 293 }); 294 if let Some(prev_cid) = prev { 295 obj["prev"] = json!(prev_cid.to_string()); 296 } 297 obj 298 } 299 }) 300 .collect::<Vec<_>>(); 301 if is_account_active { 302 let event_type = "commit"; 303 let prev_cid_str = current_root_cid.map(|c| c.to_string()); 304 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string()); 305 let seq_row = sqlx::query!( 306 r#" 307 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid) 308 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 309 RETURNING seq 310 "#, 311 did, 312 event_type, 313 new_root_cid.to_string(), 314 prev_cid_str, 315 json!(ops_json), 316 &[] as &[String], 317 blocks_cids, 318 prev_data_cid_str, 319 ) 320 .fetch_one(&mut *tx) 321 .await 322 .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 323 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 324 .execute(&mut *tx) 325 .await 326 .map_err(|e| format!("DB Error (notify): {}", e))?; 327 } 328 tx.commit() 329 .await 330 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 331 if is_account_active { 332 let _ = sequence_sync_event(state, did, &new_root_cid.to_string()).await; 333 } 334 Ok(CommitResult { 335 commit_cid: new_root_cid, 336 rev: rev_str, 337 }) 338} 339pub async fn create_record_internal( 340 state: &AppState, 341 did: &str, 342 collection: &str, 343 rkey: &str, 344 record: &serde_json::Value, 345) -> Result<(String, Cid), String> { 346 use crate::repo::tracking::TrackingBlockStore; 347 use jacquard_repo::mst::Mst; 348 use std::sync::Arc; 349 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 350 .fetch_optional(&state.db) 351 .await 352 .map_err(|e| format!("DB error: {}", e))? 353 .ok_or_else(|| "User not found".to_string())?; 354 let root_cid_str: String = sqlx::query_scalar!( 355 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 356 user_id 357 ) 358 .fetch_optional(&state.db) 359 .await 360 .map_err(|e| format!("DB error: {}", e))? 361 .ok_or_else(|| "Repo not found".to_string())?; 362 let current_root_cid = 363 Cid::from_str(&root_cid_str).map_err(|_| "Invalid repo root CID".to_string())?; 364 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 365 let commit_bytes = tracking_store 366 .get(&current_root_cid) 367 .await 368 .map_err(|e| format!("Failed to fetch commit: {:?}", e))? 369 .ok_or_else(|| "Commit block not found".to_string())?; 370 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes) 371 .map_err(|e| format!("Failed to parse commit: {:?}", e))?; 372 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 373 let mut record_bytes = Vec::new(); 374 serde_ipld_dagcbor::to_writer(&mut record_bytes, record) 375 .map_err(|e| format!("Failed to serialize record: {:?}", e))?; 376 let record_cid = tracking_store 377 .put(&record_bytes) 378 .await 379 .map_err(|e| format!("Failed to save record block: {:?}", e))?; 380 let key = format!("{}/{}", collection, rkey); 381 let new_mst = mst 382 .add(&key, record_cid) 383 .await 384 .map_err(|e| format!("Failed to add to MST: {:?}", e))?; 385 let new_mst_root = new_mst 386 .persist() 387 .await 388 .map_err(|e| format!("Failed to persist MST: {:?}", e))?; 389 let op = RecordOp::Create { 390 collection: collection.to_string(), 391 rkey: rkey.to_string(), 392 cid: record_cid, 393 }; 394 let mut relevant_blocks = std::collections::BTreeMap::new(); 395 new_mst 396 .blocks_for_path(&key, &mut relevant_blocks) 397 .await 398 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?; 399 mst.blocks_for_path(&key, &mut relevant_blocks) 400 .await 401 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?; 402 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 403 let mut written_cids = tracking_store.get_all_relevant_cids(); 404 for cid in relevant_blocks.keys() { 405 if !written_cids.contains(cid) { 406 written_cids.push(*cid); 407 } 408 } 409 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 410 let result = commit_and_log( 411 state, 412 CommitParams { 413 did, 414 user_id, 415 current_root_cid: Some(current_root_cid), 416 prev_data_cid: Some(commit.data), 417 new_mst_root, 418 ops: vec![op], 419 blocks_cids: &written_cids_str, 420 }, 421 ) 422 .await?; 423 let uri = format!("at://{}/{}/{}", did, collection, rkey); 424 Ok((uri, result.commit_cid)) 425} 426use std::str::FromStr; 427pub async fn sequence_identity_event( 428 state: &AppState, 429 did: &str, 430 handle: Option<&str>, 431) -> Result<i64, String> { 432 let seq_row = sqlx::query!( 433 r#" 434 INSERT INTO repo_seq (did, event_type, handle) 435 VALUES ($1, 'identity', $2) 436 RETURNING seq 437 "#, 438 did, 439 handle, 440 ) 441 .fetch_one(&state.db) 442 .await 443 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?; 444 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 445 .execute(&state.db) 446 .await 447 .map_err(|e| format!("DB Error (notify): {}", e))?; 448 Ok(seq_row.seq) 449} 450pub async fn sequence_account_event( 451 state: &AppState, 452 did: &str, 453 active: bool, 454 status: Option<&str>, 455) -> Result<i64, String> { 456 let seq_row = sqlx::query!( 457 r#" 458 INSERT INTO repo_seq (did, event_type, active, status) 459 VALUES ($1, 'account', $2, $3) 460 RETURNING seq 461 "#, 462 did, 463 active, 464 status, 465 ) 466 .fetch_one(&state.db) 467 .await 468 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?; 469 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 470 .execute(&state.db) 471 .await 472 .map_err(|e| format!("DB Error (notify): {}", e))?; 473 Ok(seq_row.seq) 474} 475pub async fn sequence_sync_event( 476 state: &AppState, 477 did: &str, 478 commit_cid: &str, 479) -> Result<i64, String> { 480 let seq_row = sqlx::query!( 481 r#" 482 INSERT INTO repo_seq (did, event_type, commit_cid) 483 VALUES ($1, 'sync', $2) 484 RETURNING seq 485 "#, 486 did, 487 commit_cid, 488 ) 489 .fetch_one(&state.db) 490 .await 491 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?; 492 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 493 .execute(&state.db) 494 .await 495 .map_err(|e| format!("DB Error (notify): {}", e))?; 496 Ok(seq_row.seq) 497} 498 499pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> { 500 let repo_root = sqlx::query_scalar!( 501 "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 502 did 503 ) 504 .fetch_optional(&state.db) 505 .await 506 .map_err(|e| format!("DB Error fetching repo root: {}", e))? 507 .ok_or_else(|| "Repo not found".to_string())?; 508 let ops = serde_json::json!([]); 509 let blobs: Vec<String> = vec![]; 510 let blocks_cids: Vec<String> = vec![]; 511 let seq_row = sqlx::query!( 512 r#" 513 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids) 514 VALUES ($1, 'commit', $2, $2, $3, $4, $5) 515 RETURNING seq 516 "#, 517 did, 518 repo_root, 519 ops, 520 &blobs, 521 &blocks_cids 522 ) 523 .fetch_one(&state.db) 524 .await 525 .map_err(|e| format!("DB Error (repo_seq empty commit): {}", e))?; 526 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 527 .execute(&state.db) 528 .await 529 .map_err(|e| format!("DB Error (notify): {}", e))?; 530 Ok(seq_row.seq) 531}