this repo has no description
1use crate::state::AppState; 2use bytes::Bytes; 3use cid::Cid; 4use jacquard::types::{integer::LimitedU32, string::Tid}; 5use jacquard_repo::storage::BlockStore; 6use k256::ecdsa::{signature::Signer, Signature, SigningKey}; 7use serde::Serialize; 8use serde_json::json; 9use uuid::Uuid; 10/* 11 * Why am I making custom commit objects instead of jacquard's Commit::sign(), you ask? 12 * 13 * At time of writing, jacquard has a bug in how it creates unsigned bytes for signing. 14 * Jacquard sets sig to empty bytes and serializes (6-field CBOR map) 15 * Indigo/ATProto creates a struct *without* the sig field (5-field CBOR map) 16 * 17 * These produce different CBOR bytes, so signatures created with jacquard 18 * don't verify with the relay's algorithm. The relay silently rejects commits 19 * with invalid signatures. 20 * 21 * If you have it downloaded, see: reference-relay-indigo/atproto/repo/commit.go UnsignedBytes() 22 */ 23#[derive(Serialize)] 24struct UnsignedCommit<'a> { 25 data: Cid, 26 did: &'a str, 27 prev: Option<Cid>, 28 rev: &'a str, 29 version: i64, 30} 31fn create_signed_commit( 32 did: &str, 33 data: Cid, 34 rev: &str, 35 prev: Option<Cid>, 36 signing_key: &SigningKey, 37) -> Result<(Vec<u8>, Bytes), String> { 38 let unsigned = UnsignedCommit { 39 data, 40 did, 41 prev, 42 rev, 43 version: 3, 44 }; 45 let unsigned_bytes = serde_ipld_dagcbor::to_vec(&unsigned) 46 .map_err(|e| format!("Failed to serialize unsigned commit: {:?}", e))?; 47 let sig: Signature = signing_key.sign(&unsigned_bytes); 48 let sig_bytes = Bytes::copy_from_slice(&sig.to_bytes()); 49 #[derive(Serialize)] 50 struct SignedCommit<'a> { 51 data: Cid, 52 did: &'a str, 53 prev: Option<Cid>, 54 rev: &'a str, 55 #[serde(with = "serde_bytes")] 56 sig: &'a [u8], 57 version: i64, 58 } 59 let signed = SignedCommit { 60 data, 61 did, 62 prev, 63 rev, 64 sig: &sig_bytes, 65 version: 3, 66 }; 67 let signed_bytes = serde_ipld_dagcbor::to_vec(&signed) 68 .map_err(|e| format!("Failed to serialize signed commit: {:?}", e))?; 69 Ok((signed_bytes, sig_bytes)) 70} 71pub enum RecordOp { 72 Create { collection: String, rkey: String, cid: Cid }, 73 Update { collection: String, rkey: String, cid: Cid, prev: Option<Cid> }, 74 Delete { collection: String, rkey: String, prev: Option<Cid> }, 75} 76pub struct CommitResult { 77 pub commit_cid: Cid, 78 pub rev: String, 79} 80pub async fn commit_and_log( 81 state: &AppState, 82 did: &str, 83 user_id: Uuid, 84 current_root_cid: Option<Cid>, 85 prev_data_cid: Option<Cid>, 86 new_mst_root: Cid, 87 ops: Vec<RecordOp>, 88 blocks_cids: &[String], 89) -> Result<CommitResult, String> { 90 let key_row = sqlx::query!( 91 "SELECT key_bytes, encryption_version FROM user_keys WHERE user_id = $1", 92 user_id 93 ) 94 .fetch_one(&state.db) 95 .await 96 .map_err(|e| format!("Failed to fetch signing key: {}", e))?; 97 let key_bytes = crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) 98 .map_err(|e| format!("Failed to decrypt signing key: {}", e))?; 99 let signing_key = SigningKey::from_slice(&key_bytes) 100 .map_err(|e| format!("Invalid signing key: {}", e))?; 101 let rev = Tid::now(LimitedU32::MIN); 102 let rev_str = rev.to_string(); 103 let (new_commit_bytes, _sig) = create_signed_commit( 104 did, 105 new_mst_root, 106 &rev_str, 107 current_root_cid, 108 &signing_key, 109 )?; 110 let new_root_cid = state.block_store.put(&new_commit_bytes).await 111 .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 112 let mut tx = state.db.begin().await 113 .map_err(|e| format!("Failed to begin transaction: {}", e))?; 114 let lock_result = sqlx::query!( 115 "SELECT repo_root_cid FROM repos WHERE user_id = $1 FOR UPDATE NOWAIT", 116 user_id 117 ) 118 .fetch_optional(&mut *tx) 119 .await; 120 match lock_result { 121 Err(e) => { 122 if let Some(db_err) = e.as_database_error() { 123 if db_err.code().as_deref() == Some("55P03") { 124 return Err("ConcurrentModification: Another request is modifying this repo".to_string()); 125 } 126 } 127 return Err(format!("Failed to acquire repo lock: {}", e)); 128 } 129 Ok(Some(row)) => { 130 if let Some(expected_root) = &current_root_cid { 131 if row.repo_root_cid != expected_root.to_string() { 132 return Err("ConcurrentModification: Repo has been modified since last read".to_string()); 133 } 134 } 135 } 136 Ok(None) => { 137 return Err("Repo not found".to_string()); 138 } 139 } 140 sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 141 .execute(&mut *tx) 142 .await 143 .map_err(|e| format!("DB Error (repos): {}", e))?; 144 let mut upsert_collections: Vec<String> = Vec::new(); 145 let mut upsert_rkeys: Vec<String> = Vec::new(); 146 let mut upsert_cids: Vec<String> = Vec::new(); 147 let mut delete_collections: Vec<String> = Vec::new(); 148 let mut delete_rkeys: Vec<String> = Vec::new(); 149 for op in &ops { 150 match op { 151 RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid, .. } => { 152 upsert_collections.push(collection.clone()); 153 upsert_rkeys.push(rkey.clone()); 154 upsert_cids.push(cid.to_string()); 155 } 156 RecordOp::Delete { collection, rkey, .. } => { 157 delete_collections.push(collection.clone()); 158 delete_rkeys.push(rkey.clone()); 159 } 160 } 161 } 162 if !upsert_collections.is_empty() { 163 sqlx::query!( 164 r#" 165 INSERT INTO records (repo_id, collection, rkey, record_cid, repo_rev) 166 SELECT $1, collection, rkey, record_cid, $5 167 FROM UNNEST($2::text[], $3::text[], $4::text[]) AS t(collection, rkey, record_cid) 168 ON CONFLICT (repo_id, collection, rkey) DO UPDATE 169 SET record_cid = EXCLUDED.record_cid, repo_rev = EXCLUDED.repo_rev, created_at = NOW() 170 "#, 171 user_id, 172 &upsert_collections, 173 &upsert_rkeys, 174 &upsert_cids, 175 rev_str 176 ) 177 .execute(&mut *tx) 178 .await 179 .map_err(|e| format!("DB Error (records batch upsert): {}", e))?; 180 } 181 if !delete_collections.is_empty() { 182 sqlx::query!( 183 r#" 184 DELETE FROM records 185 WHERE repo_id = $1 186 AND (collection, rkey) IN (SELECT * FROM UNNEST($2::text[], $3::text[])) 187 "#, 188 user_id, 189 &delete_collections, 190 &delete_rkeys 191 ) 192 .execute(&mut *tx) 193 .await 194 .map_err(|e| format!("DB Error (records batch delete): {}", e))?; 195 } 196 let ops_json = ops.iter().map(|op| { 197 match op { 198 RecordOp::Create { collection, rkey, cid } => json!({ 199 "action": "create", 200 "path": format!("{}/{}", collection, rkey), 201 "cid": cid.to_string() 202 }), 203 RecordOp::Update { collection, rkey, cid, prev } => { 204 let mut obj = json!({ 205 "action": "update", 206 "path": format!("{}/{}", collection, rkey), 207 "cid": cid.to_string() 208 }); 209 if let Some(prev_cid) = prev { 210 obj["prev"] = json!(prev_cid.to_string()); 211 } 212 obj 213 }, 214 RecordOp::Delete { collection, rkey, prev } => { 215 let mut obj = json!({ 216 "action": "delete", 217 "path": format!("{}/{}", collection, rkey), 218 "cid": null 219 }); 220 if let Some(prev_cid) = prev { 221 obj["prev"] = json!(prev_cid.to_string()); 222 } 223 obj 224 }, 225 } 226 }).collect::<Vec<_>>(); 227 let event_type = "commit"; 228 let prev_cid_str = current_root_cid.map(|c| c.to_string()); 229 let prev_data_cid_str = prev_data_cid.map(|c| c.to_string()); 230 let seq_row = sqlx::query!( 231 r#" 232 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, prev_data_cid) 233 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 234 RETURNING seq 235 "#, 236 did, 237 event_type, 238 new_root_cid.to_string(), 239 prev_cid_str, 240 json!(ops_json), 241 &[] as &[String], 242 blocks_cids, 243 prev_data_cid_str, 244 ) 245 .fetch_one(&mut *tx) 246 .await 247 .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 248 sqlx::query( 249 &format!("NOTIFY repo_updates, '{}'", seq_row.seq) 250 ) 251 .execute(&mut *tx) 252 .await 253 .map_err(|e| format!("DB Error (notify): {}", e))?; 254 tx.commit().await 255 .map_err(|e| format!("Failed to commit transaction: {}", e))?; 256 let _ = sequence_sync_event(state, did, &new_root_cid.to_string()).await; 257 Ok(CommitResult { 258 commit_cid: new_root_cid, 259 rev: rev_str, 260 }) 261} 262pub async fn create_record_internal( 263 state: &AppState, 264 did: &str, 265 collection: &str, 266 rkey: &str, 267 record: &serde_json::Value, 268) -> Result<(String, Cid), String> { 269 use crate::repo::tracking::TrackingBlockStore; 270 use jacquard_repo::mst::Mst; 271 use std::sync::Arc; 272 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 273 .fetch_optional(&state.db) 274 .await 275 .map_err(|e| format!("DB error: {}", e))? 276 .ok_or_else(|| "User not found".to_string())?; 277 let root_cid_str: String = 278 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 279 .fetch_optional(&state.db) 280 .await 281 .map_err(|e| format!("DB error: {}", e))? 282 .ok_or_else(|| "Repo not found".to_string())?; 283 let current_root_cid = Cid::from_str(&root_cid_str) 284 .map_err(|_| "Invalid repo root CID".to_string())?; 285 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 286 let commit_bytes = tracking_store.get(&current_root_cid).await 287 .map_err(|e| format!("Failed to fetch commit: {:?}", e))? 288 .ok_or_else(|| "Commit block not found".to_string())?; 289 let commit = jacquard_repo::commit::Commit::from_cbor(&commit_bytes) 290 .map_err(|e| format!("Failed to parse commit: {:?}", e))?; 291 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 292 let mut record_bytes = Vec::new(); 293 serde_ipld_dagcbor::to_writer(&mut record_bytes, record) 294 .map_err(|e| format!("Failed to serialize record: {:?}", e))?; 295 let record_cid = tracking_store.put(&record_bytes).await 296 .map_err(|e| format!("Failed to save record block: {:?}", e))?; 297 let key = format!("{}/{}", collection, rkey); 298 let new_mst = mst.add(&key, record_cid).await 299 .map_err(|e| format!("Failed to add to MST: {:?}", e))?; 300 let new_mst_root = new_mst.persist().await 301 .map_err(|e| format!("Failed to persist MST: {:?}", e))?; 302 let op = RecordOp::Create { 303 collection: collection.to_string(), 304 rkey: rkey.to_string(), 305 cid: record_cid, 306 }; 307 let mut relevant_blocks = std::collections::BTreeMap::new(); 308 new_mst.blocks_for_path(&key, &mut relevant_blocks).await 309 .map_err(|e| format!("Failed to get new MST blocks for path: {:?}", e))?; 310 mst.blocks_for_path(&key, &mut relevant_blocks).await 311 .map_err(|e| format!("Failed to get old MST blocks for path: {:?}", e))?; 312 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 313 let mut written_cids = tracking_store.get_all_relevant_cids(); 314 for cid in relevant_blocks.keys() { 315 if !written_cids.contains(cid) { 316 written_cids.push(*cid); 317 } 318 } 319 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 320 let result = commit_and_log( 321 state, 322 did, 323 user_id, 324 Some(current_root_cid), 325 Some(commit.data), 326 new_mst_root, 327 vec![op], 328 &written_cids_str, 329 ).await?; 330 let uri = format!("at://{}/{}/{}", did, collection, rkey); 331 Ok((uri, result.commit_cid)) 332} 333use std::str::FromStr; 334pub async fn sequence_identity_event( 335 state: &AppState, 336 did: &str, 337 handle: Option<&str>, 338) -> Result<i64, String> { 339 let seq_row = sqlx::query!( 340 r#" 341 INSERT INTO repo_seq (did, event_type, handle) 342 VALUES ($1, 'identity', $2) 343 RETURNING seq 344 "#, 345 did, 346 handle, 347 ) 348 .fetch_one(&state.db) 349 .await 350 .map_err(|e| format!("DB Error (repo_seq identity): {}", e))?; 351 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 352 .execute(&state.db) 353 .await 354 .map_err(|e| format!("DB Error (notify): {}", e))?; 355 Ok(seq_row.seq) 356} 357pub async fn sequence_account_event( 358 state: &AppState, 359 did: &str, 360 active: bool, 361 status: Option<&str>, 362) -> Result<i64, String> { 363 let seq_row = sqlx::query!( 364 r#" 365 INSERT INTO repo_seq (did, event_type, active, status) 366 VALUES ($1, 'account', $2, $3) 367 RETURNING seq 368 "#, 369 did, 370 active, 371 status, 372 ) 373 .fetch_one(&state.db) 374 .await 375 .map_err(|e| format!("DB Error (repo_seq account): {}", e))?; 376 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 377 .execute(&state.db) 378 .await 379 .map_err(|e| format!("DB Error (notify): {}", e))?; 380 Ok(seq_row.seq) 381} 382pub async fn sequence_sync_event( 383 state: &AppState, 384 did: &str, 385 commit_cid: &str, 386) -> Result<i64, String> { 387 let seq_row = sqlx::query!( 388 r#" 389 INSERT INTO repo_seq (did, event_type, commit_cid) 390 VALUES ($1, 'sync', $2) 391 RETURNING seq 392 "#, 393 did, 394 commit_cid, 395 ) 396 .fetch_one(&state.db) 397 .await 398 .map_err(|e| format!("DB Error (repo_seq sync): {}", e))?; 399 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 400 .execute(&state.db) 401 .await 402 .map_err(|e| format!("DB Error (notify): {}", e))?; 403 Ok(seq_row.seq) 404}