this repo has no description
1use super::validation::validate_record_with_status; 2use super::write::has_verified_comms_channel; 3use crate::api::error::ApiError; 4use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 5use crate::delegation::{self, DelegationActionType}; 6use crate::repo::tracking::TrackingBlockStore; 7use crate::state::AppState; 8use crate::types::{AtIdentifier, AtUri, Nsid, Rkey}; 9use axum::{ 10 Json, 11 extract::State, 12 http::StatusCode, 13 response::{IntoResponse, Response}, 14}; 15use cid::Cid; 16use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::{error, info}; 22 23const MAX_BATCH_WRITES: usize = 200; 24 25#[derive(Deserialize)] 26#[serde(tag = "$type")] 27pub enum WriteOp { 28 #[serde(rename = "com.atproto.repo.applyWrites#create")] 29 Create { 30 collection: Nsid, 31 rkey: Option<Rkey>, 32 value: serde_json::Value, 33 }, 34 #[serde(rename = "com.atproto.repo.applyWrites#update")] 35 Update { 36 collection: Nsid, 37 rkey: Rkey, 38 value: serde_json::Value, 39 }, 40 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 41 Delete { collection: Nsid, rkey: Rkey }, 42} 43 44#[derive(Deserialize)] 45#[serde(rename_all = "camelCase")] 46pub struct ApplyWritesInput { 47 pub repo: AtIdentifier, 48 pub validate: Option<bool>, 49 pub writes: Vec<WriteOp>, 50 pub swap_commit: Option<String>, 51} 52 53#[derive(Serialize)] 54#[serde(tag = "$type")] 55pub enum WriteResult { 56 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 57 CreateResult { 58 uri: AtUri, 59 cid: String, 60 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")] 61 validation_status: Option<String>, 62 }, 63 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 64 UpdateResult { 65 uri: AtUri, 66 cid: String, 67 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")] 68 validation_status: Option<String>, 69 }, 70 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 71 DeleteResult {}, 72} 73 74#[derive(Serialize)] 75pub struct ApplyWritesOutput { 76 pub commit: CommitInfo, 77 pub results: Vec<WriteResult>, 78} 79 80#[derive(Serialize)] 81pub struct CommitInfo { 82 pub cid: String, 83 pub rev: String, 84} 85 86pub async fn apply_writes( 87 State(state): State<AppState>, 88 headers: axum::http::HeaderMap, 89 Json(input): Json<ApplyWritesInput>, 90) -> Response { 91 info!( 92 "apply_writes called: repo={}, writes={}", 93 input.repo, 94 input.writes.len() 95 ); 96 let Some(token) = crate::auth::extract_bearer_token_from_header( 97 headers.get("Authorization").and_then(|h| h.to_str().ok()), 98 ) else { 99 return ApiError::AuthenticationRequired.into_response(); 100 }; 101 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 102 Ok(user) => user, 103 Err(_) => return ApiError::AuthenticationFailed(None).into_response(), 104 }; 105 let did = auth_user.did.clone(); 106 let is_oauth = auth_user.is_oauth; 107 let scope = auth_user.scope; 108 let controller_did = auth_user.controller_did.clone(); 109 if input.repo.as_str() != did { 110 return ApiError::InvalidRepo("Repo does not match authenticated user".into()) 111 .into_response(); 112 } 113 if crate::util::is_account_migrated(&state.db, &did) 114 .await 115 .unwrap_or(false) 116 { 117 return ApiError::AccountMigrated.into_response(); 118 } 119 let is_verified = has_verified_comms_channel(&state.db, &did) 120 .await 121 .unwrap_or(false); 122 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did) 123 .await 124 .unwrap_or(false); 125 if !is_verified && !is_delegated { 126 return ApiError::AccountNotVerified.into_response(); 127 } 128 if input.writes.is_empty() { 129 return ApiError::InvalidRequest("writes array is empty".into()).into_response(); 130 } 131 if input.writes.len() > MAX_BATCH_WRITES { 132 return ApiError::InvalidRequest(format!("Too many writes (max {})", MAX_BATCH_WRITES)) 133 .into_response(); 134 } 135 136 let has_custom_scope = scope 137 .as_ref() 138 .map(|s| s != "com.atproto.access") 139 .unwrap_or(false); 140 if is_oauth || has_custom_scope { 141 use std::collections::HashSet; 142 let create_collections: HashSet<&Nsid> = input 143 .writes 144 .iter() 145 .filter_map(|w| { 146 if let WriteOp::Create { collection, .. } = w { 147 Some(collection) 148 } else { 149 None 150 } 151 }) 152 .collect(); 153 let update_collections: HashSet<&Nsid> = input 154 .writes 155 .iter() 156 .filter_map(|w| { 157 if let WriteOp::Update { collection, .. } = w { 158 Some(collection) 159 } else { 160 None 161 } 162 }) 163 .collect(); 164 let delete_collections: HashSet<&Nsid> = input 165 .writes 166 .iter() 167 .filter_map(|w| { 168 if let WriteOp::Delete { collection, .. } = w { 169 Some(collection) 170 } else { 171 None 172 } 173 }) 174 .collect(); 175 176 for collection in create_collections { 177 if let Err(e) = crate::auth::scope_check::check_repo_scope( 178 is_oauth, 179 scope.as_deref(), 180 crate::oauth::RepoAction::Create, 181 collection, 182 ) { 183 return e; 184 } 185 } 186 for collection in update_collections { 187 if let Err(e) = crate::auth::scope_check::check_repo_scope( 188 is_oauth, 189 scope.as_deref(), 190 crate::oauth::RepoAction::Update, 191 collection, 192 ) { 193 return e; 194 } 195 } 196 for collection in delete_collections { 197 if let Err(e) = crate::auth::scope_check::check_repo_scope( 198 is_oauth, 199 scope.as_deref(), 200 crate::oauth::RepoAction::Delete, 201 collection, 202 ) { 203 return e; 204 } 205 } 206 } 207 208 let user_id: uuid::Uuid = 209 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 210 .fetch_optional(&state.db) 211 .await 212 { 213 Ok(Some(id)) => id, 214 _ => return ApiError::InternalError(Some("User not found".into())).into_response(), 215 }; 216 let root_cid_str: String = match sqlx::query_scalar!( 217 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 218 user_id 219 ) 220 .fetch_optional(&state.db) 221 .await 222 { 223 Ok(Some(cid_str)) => cid_str, 224 _ => return ApiError::InternalError(Some("Repo root not found".into())).into_response(), 225 }; 226 let current_root_cid = match Cid::from_str(&root_cid_str) { 227 Ok(c) => c, 228 Err(_) => { 229 return ApiError::InternalError(Some("Invalid repo root CID".into())).into_response(); 230 } 231 }; 232 if let Some(swap_commit) = &input.swap_commit 233 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 234 { 235 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 236 } 237 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 238 let commit_bytes = match tracking_store.get(&current_root_cid).await { 239 Ok(Some(b)) => b, 240 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 241 }; 242 let commit = match Commit::from_cbor(&commit_bytes) { 243 Ok(c) => c, 244 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 245 }; 246 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 247 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 248 let mut results: Vec<WriteResult> = Vec::new(); 249 let mut ops: Vec<RecordOp> = Vec::new(); 250 let mut modified_keys: Vec<String> = Vec::new(); 251 let mut all_blob_cids: Vec<String> = Vec::new(); 252 for write in &input.writes { 253 match write { 254 WriteOp::Create { 255 collection, 256 rkey, 257 value, 258 } => { 259 let validation_status = if input.validate == Some(false) { 260 None 261 } else { 262 let require_lexicon = input.validate == Some(true); 263 match validate_record_with_status( 264 value, 265 collection, 266 rkey.as_ref().map(|r| r.as_str()), 267 require_lexicon, 268 ) { 269 Ok(status) => Some(status), 270 Err(err_response) => return *err_response, 271 } 272 }; 273 all_blob_cids.extend(extract_blob_cids(value)); 274 let rkey = rkey.clone().unwrap_or_else(Rkey::generate); 275 let record_ipld = crate::util::json_to_ipld(value); 276 let mut record_bytes = Vec::new(); 277 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 278 return ApiError::InvalidRecord("Failed to serialize record".into()) 279 .into_response(); 280 } 281 let record_cid = match tracking_store.put(&record_bytes).await { 282 Ok(c) => c, 283 Err(_) => { 284 return ApiError::InternalError(Some("Failed to store record".into())) 285 .into_response(); 286 } 287 }; 288 let key = format!("{}/{}", collection, rkey); 289 modified_keys.push(key.clone()); 290 mst = match mst.add(&key, record_cid).await { 291 Ok(m) => m, 292 Err(_) => { 293 return ApiError::InternalError(Some("Failed to add to MST".into())) 294 .into_response(); 295 } 296 }; 297 let uri = AtUri::from_parts(&did, collection, &rkey); 298 results.push(WriteResult::CreateResult { 299 uri, 300 cid: record_cid.to_string(), 301 validation_status: validation_status.map(|s| s.to_string()), 302 }); 303 ops.push(RecordOp::Create { 304 collection: collection.to_string(), 305 rkey: rkey.to_string(), 306 cid: record_cid, 307 }); 308 } 309 WriteOp::Update { 310 collection, 311 rkey, 312 value, 313 } => { 314 let validation_status = if input.validate == Some(false) { 315 None 316 } else { 317 let require_lexicon = input.validate == Some(true); 318 match validate_record_with_status( 319 value, 320 collection, 321 Some(rkey.as_str()), 322 require_lexicon, 323 ) { 324 Ok(status) => Some(status), 325 Err(err_response) => return *err_response, 326 } 327 }; 328 all_blob_cids.extend(extract_blob_cids(value)); 329 let record_ipld = crate::util::json_to_ipld(value); 330 let mut record_bytes = Vec::new(); 331 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 332 return ApiError::InvalidRecord("Failed to serialize record".into()) 333 .into_response(); 334 } 335 let record_cid = match tracking_store.put(&record_bytes).await { 336 Ok(c) => c, 337 Err(_) => { 338 return ApiError::InternalError(Some("Failed to store record".into())) 339 .into_response(); 340 } 341 }; 342 let key = format!("{}/{}", collection, rkey); 343 modified_keys.push(key.clone()); 344 let prev_record_cid = mst.get(&key).await.ok().flatten(); 345 mst = match mst.update(&key, record_cid).await { 346 Ok(m) => m, 347 Err(_) => { 348 return ApiError::InternalError(Some("Failed to update MST".into())) 349 .into_response(); 350 } 351 }; 352 let uri = AtUri::from_parts(&did, collection, rkey); 353 results.push(WriteResult::UpdateResult { 354 uri, 355 cid: record_cid.to_string(), 356 validation_status: validation_status.map(|s| s.to_string()), 357 }); 358 ops.push(RecordOp::Update { 359 collection: collection.to_string(), 360 rkey: rkey.to_string(), 361 cid: record_cid, 362 prev: prev_record_cid, 363 }); 364 } 365 WriteOp::Delete { collection, rkey } => { 366 let key = format!("{}/{}", collection, rkey); 367 modified_keys.push(key.clone()); 368 let prev_record_cid = mst.get(&key).await.ok().flatten(); 369 mst = match mst.delete(&key).await { 370 Ok(m) => m, 371 Err(_) => { 372 return ApiError::InternalError(Some("Failed to delete from MST".into())) 373 .into_response(); 374 } 375 }; 376 results.push(WriteResult::DeleteResult {}); 377 ops.push(RecordOp::Delete { 378 collection: collection.to_string(), 379 rkey: rkey.to_string(), 380 prev: prev_record_cid, 381 }); 382 } 383 } 384 } 385 let new_mst_root = match mst.persist().await { 386 Ok(c) => c, 387 Err(_) => { 388 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(); 389 } 390 }; 391 let mut new_mst_blocks = std::collections::BTreeMap::new(); 392 let mut old_mst_blocks = std::collections::BTreeMap::new(); 393 for key in &modified_keys { 394 if mst.blocks_for_path(key, &mut new_mst_blocks).await.is_err() { 395 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 396 .into_response(); 397 } 398 if original_mst 399 .blocks_for_path(key, &mut old_mst_blocks) 400 .await 401 .is_err() 402 { 403 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 404 .into_response(); 405 } 406 } 407 let mut relevant_blocks = new_mst_blocks.clone(); 408 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 409 let written_cids: Vec<Cid> = tracking_store 410 .get_all_relevant_cids() 411 .into_iter() 412 .chain(relevant_blocks.keys().copied()) 413 .collect::<std::collections::HashSet<_>>() 414 .into_iter() 415 .collect(); 416 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 417 let prev_record_cids = ops.iter().filter_map(|op| match op { 418 RecordOp::Update { 419 prev: Some(cid), .. 420 } 421 | RecordOp::Delete { 422 prev: Some(cid), .. 423 } => Some(*cid), 424 _ => None, 425 }); 426 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 427 .chain( 428 old_mst_blocks 429 .keys() 430 .filter(|cid| !new_mst_blocks.contains_key(*cid)) 431 .copied(), 432 ) 433 .chain(prev_record_cids) 434 .collect::<std::collections::HashSet<_>>() 435 .into_iter() 436 .collect(); 437 let commit_res = match commit_and_log( 438 &state, 439 CommitParams { 440 did: &did, 441 user_id, 442 current_root_cid: Some(current_root_cid), 443 prev_data_cid: Some(commit.data), 444 new_mst_root, 445 ops, 446 blocks_cids: &written_cids_str, 447 blobs: &all_blob_cids, 448 obsolete_cids, 449 }, 450 ) 451 .await 452 { 453 Ok(res) => res, 454 Err(e) => { 455 error!("Commit failed: {}", e); 456 return ApiError::InternalError(Some("Failed to commit changes".into())) 457 .into_response(); 458 } 459 }; 460 461 if let Some(ref controller) = controller_did { 462 let write_summary: Vec<serde_json::Value> = input 463 .writes 464 .iter() 465 .map(|w| match w { 466 WriteOp::Create { 467 collection, rkey, .. 468 } => json!({ 469 "action": "create", 470 "collection": collection, 471 "rkey": rkey 472 }), 473 WriteOp::Update { 474 collection, rkey, .. 475 } => json!({ 476 "action": "update", 477 "collection": collection, 478 "rkey": rkey 479 }), 480 WriteOp::Delete { collection, rkey } => json!({ 481 "action": "delete", 482 "collection": collection, 483 "rkey": rkey 484 }), 485 }) 486 .collect(); 487 488 let _ = delegation::log_delegation_action( 489 &state.db, 490 &did, 491 controller, 492 Some(controller), 493 DelegationActionType::RepoWrite, 494 Some(json!({ 495 "action": "apply_writes", 496 "count": input.writes.len(), 497 "writes": write_summary 498 })), 499 None, 500 None, 501 ) 502 .await; 503 } 504 505 ( 506 StatusCode::OK, 507 Json(ApplyWritesOutput { 508 commit: CommitInfo { 509 cid: commit_res.commit_cid.to_string(), 510 rev: commit_res.rev, 511 }, 512 results, 513 }), 514 ) 515 .into_response() 516}