this repo has no description
1use crate::state::AppState; 2use bytes::Bytes; 3use cid::Cid; 4use jacquard::types::{integer::LimitedU32, string::Tid}; 5use jacquard_repo::storage::BlockStore; 6use k256::ecdsa::{signature::Signer, Signature, SigningKey}; 7use serde::Serialize; 8use serde_json::json; 9use uuid::Uuid; 10/* 11 * Why am I making custom commit objects instead of jacquard's Commit::sign(), you ask? 12 * 13 * At time of writing, jacquard has a bug in how it creates unsigned bytes for signing. 14 * Jacquard sets sig to empty bytes and serializes (6-field CBOR map) 15 * Indigo/ATProto creates a struct *without* the sig field (5-field CBOR map) 16 * 17 * These produce different CBOR bytes, so signatures created with jacquard 18 * don't verify with the relay's algorithm. The relay silently rejects commits 19 * with invalid signatures. 20 * 21 * If you have it downloaded, see: reference-relay-indigo/atproto/repo/commit.go UnsignedBytes() 22 */ 23#[derive(Serialize)] 24struct UnsignedCommit<'a> { 25 data: Cid, 26 did: &'a str, 27 prev: Option<Cid>, 28 rev: &'a str, 29 version: i64, 30} 31 32fn create_signed_commit( 33 did: &str, 34 data: Cid, 35 rev: &str, 36 prev: Option<Cid>, 37 signing_key: &SigningKey, 38) -> Result<(Vec<u8>, Bytes), String> { 39 let unsigned = UnsignedCommit { 40 data, 41 did, 42 prev, 43 rev, 44 version: 3, 45 }; 46 let unsigned_bytes = serde_ipld_dagcbor::to_vec(&unsigned) 47 .map_err(|e| format!("Failed to serialize unsigned commit: {:?}", e))?; 48 let sig: Signature = signing_key.sign(&unsigned_bytes); 49 let sig_bytes = Bytes::copy_from_slice(&sig.to_bytes()); 50 #[derive(Serialize)] 51 struct SignedCommit<'a> { 52 data: Cid, 53 did: &'a str, 54 prev: Option<Cid>, 55 rev: &'a str, 56 #[serde(with = "serde_bytes")] 57 sig: &'a [u8], 58 version: i64, 59 } 60 let signed = SignedCommit { 61 data, 62 did, 63 prev, 64 rev, 65 sig: &sig_bytes, 66 version: 3, 67 }; 68 let signed_bytes = serde_ipld_dagcbor::to_vec(&signed) 69 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?; 70 Ok((signed_bytes, sig_bytes)) 71} 72 73pub enum RecordOp { 74 Create { collection: String, rkey: String, cid: Cid }, 75 Update { collection: String, rkey: String, cid: Cid, prev: Option<Cid> }, 76 Delete { collection: String, rkey: String, prev: Option<Cid> }, 77} 78 79pub struct CommitResult { 80 pub commit_cid: Cid, 81 pub rev: String, 82} 83 84pub async fn commit_and_log( 85 state: &AppState, 86 did: &str, 87 user_id: Uuid, 88 current_root_cid: Option<Cid>, 89 prev_data_cid: Option<Cid>, 90 new_mst_root: Cid, 91 ops: Vec<RecordOp>, 92 blocks_cids: &[String], 93) -> Result<CommitResult, String> { 94 let key_row = sqlx::query!( 95 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 96 user_id 97 ) 98 .fetch_one(&state.db) 99 .await 100 .map_err(|e| format!("Failed to fetch signing key: {}", e))?; 101 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 102 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?; 103 let signing_key = SigningKey::from_slice(&key_bytes) 104 .map_err(|e| format!("Invalid signing key: {}", e))?; 105 let rev = Tid::now(LimitedU32::MIN); 106 let rev_str = rev.to_string(); 107 let (new_commit_bytes, _sig) = create_signed_commit( 108 did, 109 new_mst_root, 110 &rev_str, 111 current_root_cid, 112 &signing_key, 113 )?; 114 let new_root_cid = state.block_store.put(&new_commit_bytes).await 115 .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 116 let mut tx = state.db.begin().await 117 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 118 let lock_result = sqlx::query!( 119 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 120 user_id 121 ) 122 .fetch_optional(&mut *tx) 123 .await; 124 match lock_result { 125 Err(e) => { 126 if let Some(db_err) = e.as_database_error() { 127 if db_err.code().as_deref() == Some("55P03") { 128 return Err("ConcurrentModification: Another request is modifying this repo".to_string()); 129 } 130 } 131 return Err(format!("Failed to acquire repo lock: {}", e)); 132 } 133 Ok(Some(row)) => { 134 if let Some(expected_root) = &current_root_cid { 135 if row.repo_root_cid != expected_root.to_string() { 136 return Err("ConcurrentModification: Repo has been modified since last read".to_string()); 137 } 138 } 139 } 140 Ok(None) => { 141 return Err("Repo not found".to_string()); 142 } 143 } 144 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 145 .execute(&mut *tx) 146 .await 147 .map_err(|e| format!("DB Error (repos): {}", e))?; 148 let mut upsert_collections: Vec<String> = Vec::new(); 149 let mut upsert_rkeys: Vec<String> = Vec::new(); 150 let mut upsert_cids: Vec<String> = Vec::new(); 151 let mut delete_collections: Vec<String> = Vec::new(); 152 let mut delete_rkeys: Vec<String> = Vec::new(); 153 for op in &ops { 154 match op { 155 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid, .. } => { 156 upsert_collections.push(collection.clone()); 157 upsert_rkeys.push(rkey.clone()); 158 upsert_cids.push(cid.to_string()); 159 } 160 RecordOp::Delete { collection, rkey, .. } => { 161 delete_collections.push(collection.clone()); 162 delete_rkeys.push(rkey.clone()); 163 } 164 } 165 } 166 if !upsert_collections.is_empty() { 167 sqlx::query!( 168 r#" 169 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) 170 SELECT $1, collection, rkey, record_cid, $5 171 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid) 172 ON CONFLICT (repo_id, collection, rkey) DO UPDATE 173 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW() 174 "#, 175 user_id, 176 &upsert_collections, 177 &upsert_rkeys, 178 &upsert_cids, 179 rev_str 180 ) 181 .execute(&mut *tx) 182 .await 183 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?; 184 } 185 if !delete_collections.is_empty() { 186 sqlx::query!( 187 r#" 188 DELETE FROM records 189 WHERE repo_id = $1 190 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[])) 191 "#, 192 user_id, 193 &delete_collections, 194 &delete_rkeys 195 ) 196 .execute(&mut *tx) 197 .await 198 .map_err(|e| format!("DB Error (records batch delete): {}", e))?; 199 } 200 let ops_json = ops.iter().map(|op| { 201 match op { 202 RecordOp::Create { collection, rkey, cid } => json!({ 203 "action": "create", 204 "path": format!("{}/{}", collection, rkey), 205 "cid": cid.to_string() 206 }), 207 RecordOp::Update { collection, rkey, cid, prev } => { 208 let mut obj = json!({ 209 "action": "update", 210 "path": format!("{}/{}", collection, rkey), 211 "cid": cid.to_string() 212 }); 213 if let Some(prev_cid) = prev { 214 obj["prev"] = json!(prev_cid.to_string()); 215 } 216 obj 217 }, 218 RecordOp::Delete { collection, rkey, prev } => { 219 let mut obj = json!({ 220 "action": "delete", 221 "path": format!("{}/{}", collection, rkey), 222 "cid": null 223 }); 224 if let Some(prev_cid) = prev { 225 obj["prev"] = json!(prev_cid.to_string()); 226 } 227 obj 228 }, 229 } 230 }).collect::<Vec<_>>(); 231 let event_type = "commit"; 232 let prev_cid_str = current_root_cid.map(|c| c.to_string()); 233 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string()); 234 let seq_row = sqlx::query!( 235 r#" 236 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid) 237 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 238 RETURNING seq 239 "#, 240 did, 241 event_type, 242 new_root_cid.to_string(), 243 prev_cid_str, 244 json!(ops_json), 245 &[] as &[String], 246 blocks_cids, 247 prev_data_cid_str, 248 ) 249 .fetch_one(&mut *tx) 250 .await 251 .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 252 sqlx::query( 253 &format!("NOTIFY repo_updates, '{}'", seq_row.seq) 254 ) 255 .execute(&mut *tx) 256 .await 257 .map_err(|e| format!("DB Error (notify): {}", e))?; 258 tx.commit().await 259 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 260 let _ = sequence_sync_event(state, did, &new_root_cid.to_string()).await; 261 Ok(CommitResult { 262 commit_cid: new_root_cid, 263 rev: rev_str, 264 }) 265} 266pub async fn create_record_internal( 267 state: &AppState, 268 did: &str, 269 collection: &str, 270 rkey: &str, 271 record: &serde_json::Value, 272) -> Result<(String, Cid), String> { 273 use crate::repo::tracking::TrackingBlockStore; 274 use jacquard_repo::mst::Mst; 275 use std::sync::Arc; 276 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 277 .fetch_optional(&state.db) 278 .await 279 .map_err(|e| format!("DB error: {}", e))? 280 .ok_or_else(|| "User not found".to_string())?; 281 let root_cid_str: String = 282 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 283 .fetch_optional(&state.db) 284 .await 285 .map_err(|e| format!("DB error: {}", e))? 286 .ok_or_else(|| "Repo not found".to_string())?; 287 let current_root_cid = Cid::from_str(&root_cid_str) 288 .map_err(|_| "Invalid repo root CID".to_string())?; 289 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 290 let commit_bytes = tracking_store.get(&current_root_cid).await 291 .map_err(|e| format!("Failed to fetch commit: {:?}", e))? 292 .ok_or_else(|| "Commit block not found".to_string())?; 293 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes) 294 .map_err(|e| format!("Failed to parse commit: {:?}", e))?; 295 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 296 let mut record_bytes = Vec::new(); 297 serde_ipld_dagcbor::to_writer(&mut record_bytes, record) 298 .map_err(|e| format!("Failed to serialize record: {:?}", e))?; 299 let record_cid = tracking_store.put(&record_bytes).await 300 .map_err(|e| format!("Failed to save record block: {:?}", e))?; 301 let key = format!("{}/{}", collection, rkey); 302 let new_mst = mst.add(&key, record_cid).await 303 .map_err(|e| format!("Failed to add to MST: {:?}", e))?; 304 let new_mst_root = new_mst.persist().await 305 .map_err(|e| format!("Failed to persist MST: {:?}", e))?; 306 let op = RecordOp::Create { 307 collection: collection.to_string(), 308 rkey: rkey.to_string(), 309 cid: record_cid, 310 }; 311 let mut relevant_blocks = std::collections::BTreeMap::new(); 312 new_mst.blocks_for_path(&key, &mut relevant_blocks).await 313 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?; 314 mst.blocks_for_path(&key, &mut relevant_blocks).await 315 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?; 316 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 317 let mut written_cids = tracking_store.get_all_relevant_cids(); 318 for cid in relevant_blocks.keys() { 319 if !written_cids.contains(cid) { 320 written_cids.push(*cid); 321 } 322 } 323 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 324 let result = commit_and_log( 325 state, 326 did, 327 user_id, 328 Some(current_root_cid), 329 Some(commit.data), 330 new_mst_root, 331 vec![op], 332 &written_cids_str, 333 ).await?; 334 let uri = format!("at://{}/{}/{}", did, collection, rkey); 335 Ok((uri, result.commit_cid)) 336} 337use std::str::FromStr; 338pub async fn sequence_identity_event( 339 state: &AppState, 340 did: &str, 341 handle: Option<&str>, 342) -> Result<i64, String> { 343 let seq_row = sqlx::query!( 344 r#" 345 INSERT INTO repo_seq (did, event_type, handle) 346 VALUES ($1, 'identity', $2) 347 RETURNING seq 348 "#, 349 did, 350 handle, 351 ) 352 .fetch_one(&state.db) 353 .await 354 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?; 355 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 356 .execute(&state.db) 357 .await 358 .map_err(|e| format!("DB Error (notify): {}", e))?; 359 Ok(seq_row.seq) 360} 361pub async fn sequence_account_event( 362 state: &AppState, 363 did: &str, 364 active: bool, 365 status: Option<&str>, 366) -> Result<i64, String> { 367 let seq_row = sqlx::query!( 368 r#" 369 INSERT INTO repo_seq (did, event_type, active, status) 370 VALUES ($1, 'account', $2, $3) 371 RETURNING seq 372 "#, 373 did, 374 active, 375 status, 376 ) 377 .fetch_one(&state.db) 378 .await 379 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?; 380 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 381 .execute(&state.db) 382 .await 383 .map_err(|e| format!("DB Error (notify): {}", e))?; 384 Ok(seq_row.seq) 385} 386pub async fn sequence_sync_event( 387 state: &AppState, 388 did: &str, 389 commit_cid: &str, 390) -> Result<i64, String> { 391 let seq_row = sqlx::query!( 392 r#" 393 INSERT INTO repo_seq (did, event_type, commit_cid) 394 VALUES ($1, 'sync', $2) 395 RETURNING seq 396 "#, 397 did, 398 commit_cid, 399 ) 400 .fetch_one(&state.db) 401 .await 402 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?; 403 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 404 .execute(&state.db) 405 .await 406 .map_err(|e| format!("DB Error (notify): {}", e))?; 407 Ok(seq_row.seq) 408}