this repo has no description
1use super::validation::validate_record_with_status; 2use super::write::has_verified_comms_channel; 3use crate::api::error::ApiError; 4use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 5use crate::delegation::{self, DelegationActionType}; 6use crate::repo::tracking::TrackingBlockStore; 7use crate::state::AppState; 8use crate::types::{AtIdentifier, AtUri, Nsid, Rkey}; 9use axum::{ 10 Json, 11 extract::State, 12 http::StatusCode, 13 response::{IntoResponse, Response}, 14}; 15use cid::Cid; 16use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::{error, info}; 22 23const MAX_BATCH_WRITES: usize = 200; 24 25#[derive(Deserialize)] 26#[serde(tag = "$type")] 27pub enum WriteOp { 28 #[serde(rename = "com.atproto.repo.applyWrites#create")] 29 Create { 30 collection: Nsid, 31 rkey: Option<Rkey>, 32 value: serde_json::Value, 33 }, 34 #[serde(rename = "com.atproto.repo.applyWrites#update")] 35 Update { 36 collection: Nsid, 37 rkey: Rkey, 38 value: serde_json::Value, 39 }, 40 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 41 Delete { collection: Nsid, rkey: Rkey }, 42} 43 44#[derive(Deserialize)] 45#[serde(rename_all = "camelCase")] 46pub struct ApplyWritesInput { 47 pub repo: AtIdentifier, 48 pub validate: Option<bool>, 49 pub writes: Vec<WriteOp>, 50 pub swap_commit: Option<String>, 51} 52 53#[derive(Serialize)] 54#[serde(tag = "$type")] 55pub enum WriteResult { 56 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 57 CreateResult { 58 uri: AtUri, 59 cid: String, 60 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")] 61 validation_status: Option<String>, 62 }, 63 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 64 UpdateResult { 65 uri: AtUri, 66 cid: String, 67 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")] 68 validation_status: Option<String>, 69 }, 70 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 71 DeleteResult {}, 72} 73 74#[derive(Serialize)] 75pub struct ApplyWritesOutput { 76 pub commit: CommitInfo, 77 pub results: Vec<WriteResult>, 78} 79 80#[derive(Serialize)] 81pub struct CommitInfo { 82 pub cid: String, 83 pub rev: String, 84} 85 86pub async fn apply_writes( 87 State(state): State<AppState>, 88 headers: axum::http::HeaderMap, 89 Json(input): Json<ApplyWritesInput>, 90) -> Response { 91 info!( 92 "apply_writes called: repo={}, writes={}", 93 input.repo, 94 input.writes.len() 95 ); 96 let Some(token) = crate::auth::extract_bearer_token_from_header( 97 headers.get("Authorization").and_then(|h| h.to_str().ok()), 98 ) else { 99 return ApiError::AuthenticationRequired.into_response(); 100 }; 101 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 102 Ok(user) => user, 103 Err(_) => return ApiError::AuthenticationFailed(None).into_response(), 104 }; 105 let did = auth_user.did.clone(); 106 let is_oauth = auth_user.is_oauth; 107 let scope = auth_user.scope; 108 let controller_did = auth_user.controller_did.clone(); 109 if input.repo.as_str() != did { 110 return ApiError::InvalidRepo("Repo does not match authenticated user".into()) 111 .into_response(); 112 } 113 if crate::util::is_account_migrated(&state.db, &did) 114 .await 115 .unwrap_or(false) 116 { 117 return ApiError::AccountMigrated.into_response(); 118 } 119 let is_verified = has_verified_comms_channel(&state.db, &did) 120 .await 121 .unwrap_or(false); 122 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did) 123 .await 124 .unwrap_or(false); 125 if !is_verified && !is_delegated { 126 return ApiError::AccountNotVerified.into_response(); 127 } 128 if input.writes.is_empty() { 129 return ApiError::InvalidRequest("writes array is empty".into()).into_response(); 130 } 131 if input.writes.len() > MAX_BATCH_WRITES { 132 return ApiError::InvalidRequest(format!("Too many writes (max {})", MAX_BATCH_WRITES)) 133 .into_response(); 134 } 135 136 let has_custom_scope = scope 137 .as_ref() 138 .map(|s| s != "com.atproto.access") 139 .unwrap_or(false); 140 if is_oauth || has_custom_scope { 141 use std::collections::HashSet; 142 let create_collections: HashSet<&Nsid> = input 143 .writes 144 .iter() 145 .filter_map(|w| { 146 if let WriteOp::Create { collection, .. } = w { 147 Some(collection) 148 } else { 149 None 150 } 151 }) 152 .collect(); 153 let update_collections: HashSet<&Nsid> = input 154 .writes 155 .iter() 156 .filter_map(|w| { 157 if let WriteOp::Update { collection, .. } = w { 158 Some(collection) 159 } else { 160 None 161 } 162 }) 163 .collect(); 164 let delete_collections: HashSet<&Nsid> = input 165 .writes 166 .iter() 167 .filter_map(|w| { 168 if let WriteOp::Delete { collection, .. } = w { 169 Some(collection) 170 } else { 171 None 172 } 173 }) 174 .collect(); 175 176 for collection in create_collections { 177 if let Err(e) = crate::auth::scope_check::check_repo_scope( 178 is_oauth, 179 scope.as_deref(), 180 crate::oauth::RepoAction::Create, 181 collection, 182 ) { 183 return e; 184 } 185 } 186 for collection in update_collections { 187 if let Err(e) = crate::auth::scope_check::check_repo_scope( 188 is_oauth, 189 scope.as_deref(), 190 crate::oauth::RepoAction::Update, 191 collection, 192 ) { 193 return e; 194 } 195 } 196 for collection in delete_collections { 197 if let Err(e) = crate::auth::scope_check::check_repo_scope( 198 is_oauth, 199 scope.as_deref(), 200 crate::oauth::RepoAction::Delete, 201 collection, 202 ) { 203 return e; 204 } 205 } 206 } 207 208 let user_id: uuid::Uuid = 209 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 210 .fetch_optional(&state.db) 211 .await 212 { 213 Ok(Some(id)) => id, 214 _ => return ApiError::InternalError(Some("User not found".into())).into_response(), 215 }; 216 let root_cid_str: String = match sqlx::query_scalar!( 217 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 218 user_id 219 ) 220 .fetch_optional(&state.db) 221 .await 222 { 223 Ok(Some(cid_str)) => cid_str, 224 _ => return ApiError::InternalError(Some("Repo root not found".into())).into_response(), 225 }; 226 let current_root_cid = match Cid::from_str(&root_cid_str) { 227 Ok(c) => c, 228 Err(_) => { 229 return ApiError::InternalError(Some("Invalid repo root CID".into())).into_response(); 230 } 231 }; 232 if let Some(swap_commit) = &input.swap_commit 233 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 234 { 235 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 236 } 237 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 238 let commit_bytes = match tracking_store.get(&current_root_cid).await { 239 Ok(Some(b)) => b, 240 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 241 }; 242 let commit = match Commit::from_cbor(&commit_bytes) { 243 Ok(c) => c, 244 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 245 }; 246 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 247 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 248 let mut results: Vec<WriteResult> = Vec::new(); 249 let mut ops: Vec<RecordOp> = Vec::new(); 250 let mut modified_keys: Vec<String> = Vec::new(); 251 let mut all_blob_cids: Vec<String> = Vec::new(); 252 for write in &input.writes { 253 match write { 254 WriteOp::Create { 255 collection, 256 rkey, 257 value, 258 } => { 259 let validation_status = if input.validate == Some(false) { 260 None 261 } else { 262 let require_lexicon = input.validate == Some(true); 263 match validate_record_with_status( 264 value, 265 collection, 266 rkey.as_ref().map(|r| r.as_str()), 267 require_lexicon, 268 ) { 269 Ok(status) => Some(status), 270 Err(err_response) => return *err_response, 271 } 272 }; 273 all_blob_cids.extend(extract_blob_cids(value)); 274 let rkey = rkey.clone().unwrap_or_else(Rkey::generate); 275 let record_ipld = crate::util::json_to_ipld(value); 276 let mut record_bytes = Vec::new(); 277 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 278 return ApiError::InvalidRecord("Failed to serialize record".into()) 279 .into_response(); 280 } 281 let record_cid = match tracking_store.put(&record_bytes).await { 282 Ok(c) => c, 283 Err(_) => { 284 return ApiError::InternalError(Some("Failed to store record".into())) 285 .into_response(); 286 } 287 }; 288 let key = format!("{}/{}", collection, rkey); 289 modified_keys.push(key.clone()); 290 mst = match mst.add(&key, record_cid).await { 291 Ok(m) => m, 292 Err(_) => { 293 return ApiError::InternalError(Some("Failed to add to MST".into())) 294 .into_response(); 295 } 296 }; 297 let uri = AtUri::from_parts(&did, collection, &rkey); 298 results.push(WriteResult::CreateResult { 299 uri, 300 cid: record_cid.to_string(), 301 validation_status: validation_status.map(|s| s.to_string()), 302 }); 303 ops.push(RecordOp::Create { 304 collection: collection.to_string(), 305 rkey: rkey.to_string(), 306 cid: record_cid, 307 }); 308 } 309 WriteOp::Update { 310 collection, 311 rkey, 312 value, 313 } => { 314 let validation_status = if input.validate == Some(false) { 315 None 316 } else { 317 let require_lexicon = input.validate == Some(true); 318 match validate_record_with_status( 319 value, 320 collection, 321 Some(rkey.as_str()), 322 require_lexicon, 323 ) { 324 Ok(status) => Some(status), 325 Err(err_response) => return *err_response, 326 } 327 }; 328 all_blob_cids.extend(extract_blob_cids(value)); 329 let record_ipld = crate::util::json_to_ipld(value); 330 let mut record_bytes = Vec::new(); 331 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 332 return ApiError::InvalidRecord("Failed to serialize record".into()) 333 .into_response(); 334 } 335 let record_cid = match tracking_store.put(&record_bytes).await { 336 Ok(c) => c, 337 Err(_) => { 338 return ApiError::InternalError(Some("Failed to store record".into())) 339 .into_response(); 340 } 341 }; 342 let key = format!("{}/{}", collection, rkey); 343 modified_keys.push(key.clone()); 344 let prev_record_cid = mst.get(&key).await.ok().flatten(); 345 mst = match mst.update(&key, record_cid).await { 346 Ok(m) => m, 347 Err(_) => { 348 return ApiError::InternalError(Some("Failed to update MST".into())) 349 .into_response(); 350 } 351 }; 352 let uri = AtUri::from_parts(&did, collection, rkey); 353 results.push(WriteResult::UpdateResult { 354 uri, 355 cid: record_cid.to_string(), 356 validation_status: validation_status.map(|s| s.to_string()), 357 }); 358 ops.push(RecordOp::Update { 359 collection: collection.to_string(), 360 rkey: rkey.to_string(), 361 cid: record_cid, 362 prev: prev_record_cid, 363 }); 364 } 365 WriteOp::Delete { collection, rkey } => { 366 let key = format!("{}/{}", collection, rkey); 367 modified_keys.push(key.clone()); 368 let prev_record_cid = mst.get(&key).await.ok().flatten(); 369 mst = match mst.delete(&key).await { 370 Ok(m) => m, 371 Err(_) => { 372 return ApiError::InternalError(Some("Failed to delete from MST".into())) 373 .into_response(); 374 } 375 }; 376 results.push(WriteResult::DeleteResult {}); 377 ops.push(RecordOp::Delete { 378 collection: collection.to_string(), 379 rkey: rkey.to_string(), 380 prev: prev_record_cid, 381 }); 382 } 383 } 384 } 385 let new_mst_root = match mst.persist().await { 386 Ok(c) => c, 387 Err(_) => { 388 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(); 389 } 390 }; 391 let mut relevant_blocks = std::collections::BTreeMap::new(); 392 for key in &modified_keys { 393 if mst 394 .blocks_for_path(key, &mut relevant_blocks) 395 .await 396 .is_err() 397 { 398 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 399 .into_response(); 400 } 401 if original_mst 402 .blocks_for_path(key, &mut relevant_blocks) 403 .await 404 .is_err() 405 { 406 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 407 .into_response(); 408 } 409 } 410 let mut written_cids = tracking_store.get_all_relevant_cids(); 411 for cid in relevant_blocks.keys() { 412 if !written_cids.contains(cid) { 413 written_cids.push(*cid); 414 } 415 } 416 let written_cids_str = written_cids 417 .iter() 418 .map(|c| c.to_string()) 419 .collect::<Vec<_>>(); 420 let commit_res = match commit_and_log( 421 &state, 422 CommitParams { 423 did: &did, 424 user_id, 425 current_root_cid: Some(current_root_cid), 426 prev_data_cid: Some(commit.data), 427 new_mst_root, 428 ops, 429 blocks_cids: &written_cids_str, 430 blobs: &all_blob_cids, 431 }, 432 ) 433 .await 434 { 435 Ok(res) => res, 436 Err(e) => { 437 error!("Commit failed: {}", e); 438 return ApiError::InternalError(Some("Failed to commit changes".into())) 439 .into_response(); 440 } 441 }; 442 443 if let Some(ref controller) = controller_did { 444 let write_summary: Vec<serde_json::Value> = input 445 .writes 446 .iter() 447 .map(|w| match w { 448 WriteOp::Create { 449 collection, rkey, .. 450 } => json!({ 451 "action": "create", 452 "collection": collection, 453 "rkey": rkey 454 }), 455 WriteOp::Update { 456 collection, rkey, .. 457 } => json!({ 458 "action": "update", 459 "collection": collection, 460 "rkey": rkey 461 }), 462 WriteOp::Delete { collection, rkey } => json!({ 463 "action": "delete", 464 "collection": collection, 465 "rkey": rkey 466 }), 467 }) 468 .collect(); 469 470 let _ = delegation::log_delegation_action( 471 &state.db, 472 &did, 473 controller, 474 Some(controller), 475 DelegationActionType::RepoWrite, 476 Some(json!({ 477 "action": "apply_writes", 478 "count": input.writes.len(), 479 "writes": write_summary 480 })), 481 None, 482 None, 483 ) 484 .await; 485 } 486 487 ( 488 StatusCode::OK, 489 Json(ApplyWritesOutput { 490 commit: CommitInfo { 491 cid: commit_res.commit_cid.to_string(), 492 rev: commit_res.rev, 493 }, 494 results, 495 }), 496 ) 497 .into_response() 498}