this repo has no description
1use super::validation::validate_record_with_status; 2use super::write::has_verified_comms_channel; 3use crate::api::error::ApiError; 4use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 5use crate::delegation::{self, DelegationActionType}; 6use crate::repo::tracking::TrackingBlockStore; 7use crate::state::AppState; 8use crate::types::{AtIdentifier, AtUri, Nsid, Rkey}; 9use axum::{ 10 Json, 11 extract::State, 12 http::StatusCode, 13 response::{IntoResponse, Response}, 14}; 15use cid::Cid; 16use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::{error, info}; 22 23const MAX_BATCH_WRITES: usize = 200; 24 25#[derive(Deserialize)] 26#[serde(tag = "$type")] 27pub enum WriteOp { 28 #[serde(rename = "com.atproto.repo.applyWrites#create")] 29 Create { 30 collection: Nsid, 31 rkey: Option<Rkey>, 32 value: serde_json::Value, 33 }, 34 #[serde(rename = "com.atproto.repo.applyWrites#update")] 35 Update { 36 collection: Nsid, 37 rkey: Rkey, 38 value: serde_json::Value, 39 }, 40 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 41 Delete { collection: Nsid, rkey: Rkey }, 42} 43 44#[derive(Deserialize)] 45#[serde(rename_all = "camelCase")] 46pub struct ApplyWritesInput { 47 pub repo: AtIdentifier, 48 pub validate: Option<bool>, 49 pub writes: Vec<WriteOp>, 50 pub swap_commit: Option<String>, 51} 52 53#[derive(Serialize)] 54#[serde(tag = "$type")] 55pub enum WriteResult { 56 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 57 CreateResult { 58 uri: AtUri, 59 cid: String, 60 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")] 61 validation_status: Option<String>, 62 }, 63 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 64 UpdateResult { 65 uri: AtUri, 66 cid: String, 67 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")] 68 validation_status: Option<String>, 69 }, 70 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 71 DeleteResult {}, 72} 73 74#[derive(Serialize)] 75pub struct ApplyWritesOutput { 76 pub commit: CommitInfo, 77 pub results: Vec<WriteResult>, 78} 79 80#[derive(Serialize)] 81pub struct CommitInfo { 82 pub cid: String, 83 pub rev: String, 84} 85 86pub async fn apply_writes( 87 State(state): State<AppState>, 88 headers: axum::http::HeaderMap, 89 Json(input): Json<ApplyWritesInput>, 90) -> Response { 91 info!( 92 "apply_writes called: repo={}, writes={}", 93 input.repo, 94 input.writes.len() 95 ); 96 let Some(token) = crate::auth::extract_bearer_token_from_header( 97 headers.get("Authorization").and_then(|h| h.to_str().ok()), 98 ) else { 99 return ApiError::AuthenticationRequired.into_response(); 100 }; 101 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 102 Ok(user) => user, 103 Err(_) => return ApiError::AuthenticationFailed(None).into_response(), 104 }; 105 let did = auth_user.did.clone(); 106 let is_oauth = auth_user.is_oauth; 107 let scope = auth_user.scope; 108 let controller_did = auth_user.controller_did.clone(); 109 if input.repo.as_str() != did { 110 return ApiError::InvalidRepo("Repo does not match authenticated user".into()) 111 .into_response(); 112 } 113 if crate::util::is_account_migrated(&state.db, &did) 114 .await 115 .unwrap_or(false) 116 { 117 return ApiError::AccountMigrated.into_response(); 118 } 119 let is_verified = has_verified_comms_channel(&state.db, &did) 120 .await 121 .unwrap_or(false); 122 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did) 123 .await 124 .unwrap_or(false); 125 if !is_verified && !is_delegated { 126 return ApiError::AccountNotVerified.into_response(); 127 } 128 if input.writes.is_empty() { 129 return ApiError::InvalidRequest("writes array is empty".into()).into_response(); 130 } 131 if input.writes.len() > MAX_BATCH_WRITES { 132 return ApiError::InvalidRequest(format!("Too many writes (max {})", MAX_BATCH_WRITES)) 133 .into_response(); 134 } 135 136 let has_custom_scope = scope 137 .as_ref() 138 .map(|s| s != "com.atproto.access") 139 .unwrap_or(false); 140 if is_oauth || has_custom_scope { 141 use std::collections::HashSet; 142 let create_collections: HashSet<&Nsid> = input 143 .writes 144 .iter() 145 .filter_map(|w| { 146 if let WriteOp::Create { collection, .. } = w { 147 Some(collection) 148 } else { 149 None 150 } 151 }) 152 .collect(); 153 let update_collections: HashSet<&Nsid> = input 154 .writes 155 .iter() 156 .filter_map(|w| { 157 if let WriteOp::Update { collection, .. } = w { 158 Some(collection) 159 } else { 160 None 161 } 162 }) 163 .collect(); 164 let delete_collections: HashSet<&Nsid> = input 165 .writes 166 .iter() 167 .filter_map(|w| { 168 if let WriteOp::Delete { collection, .. } = w { 169 Some(collection) 170 } else { 171 None 172 } 173 }) 174 .collect(); 175 176 for collection in create_collections { 177 if let Err(e) = crate::auth::scope_check::check_repo_scope( 178 is_oauth, 179 scope.as_deref(), 180 crate::oauth::RepoAction::Create, 181 collection, 182 ) { 183 return e; 184 } 185 } 186 for collection in update_collections { 187 if let Err(e) = crate::auth::scope_check::check_repo_scope( 188 is_oauth, 189 scope.as_deref(), 190 crate::oauth::RepoAction::Update, 191 collection, 192 ) { 193 return e; 194 } 195 } 196 for collection in delete_collections { 197 if let Err(e) = crate::auth::scope_check::check_repo_scope( 198 is_oauth, 199 scope.as_deref(), 200 crate::oauth::RepoAction::Delete, 201 collection, 202 ) { 203 return e; 204 } 205 } 206 } 207 208 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str()) 209 .fetch_optional(&state.db) 210 .await 211 { 212 Ok(Some(id)) => id, 213 _ => return ApiError::InternalError(Some("User not found".into())).into_response(), 214 }; 215 let root_cid_str: String = match sqlx::query_scalar!( 216 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 217 user_id 218 ) 219 .fetch_optional(&state.db) 220 .await 221 { 222 Ok(Some(cid_str)) => cid_str, 223 _ => return ApiError::InternalError(Some("Repo root not found".into())).into_response(), 224 }; 225 let current_root_cid = match Cid::from_str(&root_cid_str) { 226 Ok(c) => c, 227 Err(_) => { 228 return ApiError::InternalError(Some("Invalid repo root CID".into())).into_response() 229 } 230 }; 231 if let Some(swap_commit) = &input.swap_commit 232 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 233 { 234 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 235 } 236 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 237 let commit_bytes = match tracking_store.get(&current_root_cid).await { 238 Ok(Some(b)) => b, 239 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 240 }; 241 let commit = match Commit::from_cbor(&commit_bytes) { 242 Ok(c) => c, 243 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 244 }; 245 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 246 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 247 let mut results: Vec<WriteResult> = Vec::new(); 248 let mut ops: Vec<RecordOp> = Vec::new(); 249 let mut modified_keys: Vec<String> = Vec::new(); 250 let mut all_blob_cids: Vec<String> = Vec::new(); 251 for write in &input.writes { 252 match write { 253 WriteOp::Create { 254 collection, 255 rkey, 256 value, 257 } => { 258 let validation_status = if input.validate == Some(false) { 259 None 260 } else { 261 let require_lexicon = input.validate == Some(true); 262 match validate_record_with_status( 263 value, 264 collection, 265 rkey.as_ref().map(|r| r.as_str()), 266 require_lexicon, 267 ) { 268 Ok(status) => Some(status), 269 Err(err_response) => return *err_response, 270 } 271 }; 272 all_blob_cids.extend(extract_blob_cids(value)); 273 let rkey = rkey.clone().unwrap_or_else(Rkey::generate); 274 let record_ipld = crate::util::json_to_ipld(value); 275 let mut record_bytes = Vec::new(); 276 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 277 return ApiError::InvalidRecord("Failed to serialize record".into()) 278 .into_response(); 279 } 280 let record_cid = match tracking_store.put(&record_bytes).await { 281 Ok(c) => c, 282 Err(_) => { 283 return ApiError::InternalError(Some("Failed to store record".into())) 284 .into_response() 285 } 286 }; 287 let key = format!("{}/{}", collection, rkey); 288 modified_keys.push(key.clone()); 289 mst = match mst.add(&key, record_cid).await { 290 Ok(m) => m, 291 Err(_) => { 292 return ApiError::InternalError(Some("Failed to add to MST".into())) 293 .into_response() 294 } 295 }; 296 let uri = AtUri::from_parts(&did, collection, &rkey); 297 results.push(WriteResult::CreateResult { 298 uri, 299 cid: record_cid.to_string(), 300 validation_status: validation_status.map(|s| s.to_string()), 301 }); 302 ops.push(RecordOp::Create { 303 collection: collection.to_string(), 304 rkey: rkey.to_string(), 305 cid: record_cid, 306 }); 307 } 308 WriteOp::Update { 309 collection, 310 rkey, 311 value, 312 } => { 313 let validation_status = if input.validate == Some(false) { 314 None 315 } else { 316 let require_lexicon = input.validate == Some(true); 317 match validate_record_with_status( 318 value, 319 collection, 320 Some(rkey.as_str()), 321 require_lexicon, 322 ) { 323 Ok(status) => Some(status), 324 Err(err_response) => return *err_response, 325 } 326 }; 327 all_blob_cids.extend(extract_blob_cids(value)); 328 let record_ipld = crate::util::json_to_ipld(value); 329 let mut record_bytes = Vec::new(); 330 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 331 return ApiError::InvalidRecord("Failed to serialize record".into()) 332 .into_response(); 333 } 334 let record_cid = match tracking_store.put(&record_bytes).await { 335 Ok(c) => c, 336 Err(_) => { 337 return ApiError::InternalError(Some("Failed to store record".into())) 338 .into_response() 339 } 340 }; 341 let key = format!("{}/{}", collection, rkey); 342 modified_keys.push(key.clone()); 343 let prev_record_cid = mst.get(&key).await.ok().flatten(); 344 mst = match mst.update(&key, record_cid).await { 345 Ok(m) => m, 346 Err(_) => { 347 return ApiError::InternalError(Some("Failed to update MST".into())) 348 .into_response() 349 } 350 }; 351 let uri = AtUri::from_parts(&did, collection, rkey); 352 results.push(WriteResult::UpdateResult { 353 uri, 354 cid: record_cid.to_string(), 355 validation_status: validation_status.map(|s| s.to_string()), 356 }); 357 ops.push(RecordOp::Update { 358 collection: collection.to_string(), 359 rkey: rkey.to_string(), 360 cid: record_cid, 361 prev: prev_record_cid, 362 }); 363 } 364 WriteOp::Delete { collection, rkey } => { 365 let key = format!("{}/{}", collection, rkey); 366 modified_keys.push(key.clone()); 367 let prev_record_cid = mst.get(&key).await.ok().flatten(); 368 mst = match mst.delete(&key).await { 369 Ok(m) => m, 370 Err(_) => { 371 return ApiError::InternalError(Some("Failed to delete from MST".into())) 372 .into_response() 373 } 374 }; 375 results.push(WriteResult::DeleteResult {}); 376 ops.push(RecordOp::Delete { 377 collection: collection.to_string(), 378 rkey: rkey.to_string(), 379 prev: prev_record_cid, 380 }); 381 } 382 } 383 } 384 let new_mst_root = match mst.persist().await { 385 Ok(c) => c, 386 Err(_) => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(), 387 }; 388 let mut relevant_blocks = std::collections::BTreeMap::new(); 389 for key in &modified_keys { 390 if mst 391 .blocks_for_path(key, &mut relevant_blocks) 392 .await 393 .is_err() 394 { 395 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 396 .into_response(); 397 } 398 if original_mst 399 .blocks_for_path(key, &mut relevant_blocks) 400 .await 401 .is_err() 402 { 403 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 404 .into_response(); 405 } 406 } 407 let mut written_cids = tracking_store.get_all_relevant_cids(); 408 for cid in relevant_blocks.keys() { 409 if !written_cids.contains(cid) { 410 written_cids.push(*cid); 411 } 412 } 413 let written_cids_str = written_cids 414 .iter() 415 .map(|c| c.to_string()) 416 .collect::<Vec<_>>(); 417 let commit_res = match commit_and_log( 418 &state, 419 CommitParams { 420 did: &did, 421 user_id, 422 current_root_cid: Some(current_root_cid), 423 prev_data_cid: Some(commit.data), 424 new_mst_root, 425 ops, 426 blocks_cids: &written_cids_str, 427 blobs: &all_blob_cids, 428 }, 429 ) 430 .await 431 { 432 Ok(res) => res, 433 Err(e) => { 434 error!("Commit failed: {}", e); 435 return ApiError::InternalError(Some("Failed to commit changes".into())).into_response(); 436 } 437 }; 438 439 if let Some(ref controller) = controller_did { 440 let write_summary: Vec<serde_json::Value> = input 441 .writes 442 .iter() 443 .map(|w| match w { 444 WriteOp::Create { 445 collection, rkey, .. 446 } => json!({ 447 "action": "create", 448 "collection": collection, 449 "rkey": rkey 450 }), 451 WriteOp::Update { 452 collection, rkey, .. 453 } => json!({ 454 "action": "update", 455 "collection": collection, 456 "rkey": rkey 457 }), 458 WriteOp::Delete { collection, rkey } => json!({ 459 "action": "delete", 460 "collection": collection, 461 "rkey": rkey 462 }), 463 }) 464 .collect(); 465 466 let _ = delegation::log_delegation_action( 467 &state.db, 468 &did, 469 controller, 470 Some(controller), 471 DelegationActionType::RepoWrite, 472 Some(json!({ 473 "action": "apply_writes", 474 "count": input.writes.len(), 475 "writes": write_summary 476 })), 477 None, 478 None, 479 ) 480 .await; 481 } 482 483 ( 484 StatusCode::OK, 485 Json(ApplyWritesOutput { 486 commit: CommitInfo { 487 cid: commit_res.commit_cid.to_string(), 488 rev: commit_res.rev, 489 }, 490 results, 491 }), 492 ) 493 .into_response() 494}