this repo has no description
1use super::validation::validate_record_with_status; 2use super::write::has_verified_comms_channel; 3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 4use crate::delegation::{self, DelegationActionType}; 5use crate::repo::tracking::TrackingBlockStore; 6use crate::state::AppState; 7use crate::validation::ValidationStatus; 8use axum::{ 9 Json, 10 extract::State, 11 http::StatusCode, 12 response::{IntoResponse, Response}, 13}; 14use cid::Cid; 15use jacquard::types::{ 16 integer::LimitedU32, 17 string::{Nsid, Tid}, 18}; 19use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 20use serde::{Deserialize, Serialize}; 21use serde_json::json; 22use std::str::FromStr; 23use std::sync::Arc; 24use tracing::{error, info}; 25 26const MAX_BATCH_WRITES: usize = 200; 27 28#[derive(Deserialize)] 29#[serde(tag = "$type")] 30pub enum WriteOp { 31 #[serde(rename = "com.atproto.repo.applyWrites#create")] 32 Create { 33 collection: String, 34 rkey: Option<String>, 35 value: serde_json::Value, 36 }, 37 #[serde(rename = "com.atproto.repo.applyWrites#update")] 38 Update { 39 collection: String, 40 rkey: String, 41 value: serde_json::Value, 42 }, 43 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 44 Delete { collection: String, rkey: String }, 45} 46 47#[derive(Deserialize)] 48#[serde(rename_all = "camelCase")] 49pub struct ApplyWritesInput { 50 pub repo: String, 51 pub validate: Option<bool>, 52 pub writes: Vec<WriteOp>, 53 pub swap_commit: Option<String>, 54} 55 56#[derive(Serialize)] 57#[serde(tag = "$type")] 58pub enum WriteResult { 59 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 60 CreateResult { 61 uri: String, 62 cid: String, 63 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")] 64 validation_status: Option<String>, 65 }, 66 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 67 UpdateResult { 68 uri: String, 69 cid: String, 70 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")] 71 validation_status: Option<String>, 72 }, 73 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 74 DeleteResult {}, 75} 76 77#[derive(Serialize)] 78pub struct ApplyWritesOutput { 79 pub commit: CommitInfo, 80 pub results: Vec<WriteResult>, 81} 82 83#[derive(Serialize)] 84pub struct CommitInfo { 85 pub cid: String, 86 pub rev: String, 87} 88 89pub async fn apply_writes( 90 State(state): State<AppState>, 91 headers: axum::http::HeaderMap, 92 Json(input): Json<ApplyWritesInput>, 93) -> Response { 94 info!( 95 "apply_writes called: repo={}, writes={}", 96 input.repo, 97 input.writes.len() 98 ); 99 let token = match crate::auth::extract_bearer_token_from_header( 100 headers.get("Authorization").and_then(|h| h.to_str().ok()), 101 ) { 102 Some(t) => t, 103 None => { 104 return ( 105 StatusCode::UNAUTHORIZED, 106 Json(json!({"error": "AuthenticationRequired"})), 107 ) 108 .into_response(); 109 } 110 }; 111 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 112 Ok(user) => user, 113 Err(_) => { 114 return ( 115 StatusCode::UNAUTHORIZED, 116 Json(json!({"error": "AuthenticationFailed"})), 117 ) 118 .into_response(); 119 } 120 }; 121 let did = auth_user.did.clone(); 122 let is_oauth = auth_user.is_oauth; 123 let scope = auth_user.scope; 124 let controller_did = auth_user.controller_did.clone(); 125 if input.repo != did { 126 return ( 127 StatusCode::FORBIDDEN, 128 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 129 ) 130 .into_response(); 131 } 132 if crate::util::is_account_migrated(&state.db, &did) 133 .await 134 .unwrap_or(false) 135 { 136 return ( 137 StatusCode::FORBIDDEN, 138 Json(json!({ 139 "error": "AccountMigrated", 140 "message": "Account has been migrated to another PDS. Repo operations are not allowed." 141 })), 142 ) 143 .into_response(); 144 } 145 let is_verified = has_verified_comms_channel(&state.db, &did) 146 .await 147 .unwrap_or(false); 148 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did) 149 .await 150 .unwrap_or(false); 151 if !is_verified && !is_delegated { 152 return ( 153 StatusCode::FORBIDDEN, 154 Json(json!({ 155 "error": "AccountNotVerified", 156 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 157 })), 158 ) 159 .into_response(); 160 } 161 if input.writes.is_empty() { 162 return ( 163 StatusCode::BAD_REQUEST, 164 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})), 165 ) 166 .into_response(); 167 } 168 if input.writes.len() > MAX_BATCH_WRITES { 169 return ( 170 StatusCode::BAD_REQUEST, 171 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})), 172 ) 173 .into_response(); 174 } 175 176 let has_custom_scope = scope 177 .as_ref() 178 .map(|s| s != "com.atproto.access") 179 .unwrap_or(false); 180 if is_oauth || has_custom_scope { 181 use std::collections::HashSet; 182 let create_collections: HashSet<&str> = input 183 .writes 184 .iter() 185 .filter_map(|w| { 186 if let WriteOp::Create { collection, .. } = w { 187 Some(collection.as_str()) 188 } else { 189 None 190 } 191 }) 192 .collect(); 193 let update_collections: HashSet<&str> = input 194 .writes 195 .iter() 196 .filter_map(|w| { 197 if let WriteOp::Update { collection, .. } = w { 198 Some(collection.as_str()) 199 } else { 200 None 201 } 202 }) 203 .collect(); 204 let delete_collections: HashSet<&str> = input 205 .writes 206 .iter() 207 .filter_map(|w| { 208 if let WriteOp::Delete { collection, .. } = w { 209 Some(collection.as_str()) 210 } else { 211 None 212 } 213 }) 214 .collect(); 215 216 for collection in create_collections { 217 if let Err(e) = crate::auth::scope_check::check_repo_scope( 218 is_oauth, 219 scope.as_deref(), 220 crate::oauth::RepoAction::Create, 221 collection, 222 ) { 223 return e; 224 } 225 } 226 for collection in update_collections { 227 if let Err(e) = crate::auth::scope_check::check_repo_scope( 228 is_oauth, 229 scope.as_deref(), 230 crate::oauth::RepoAction::Update, 231 collection, 232 ) { 233 return e; 234 } 235 } 236 for collection in delete_collections { 237 if let Err(e) = crate::auth::scope_check::check_repo_scope( 238 is_oauth, 239 scope.as_deref(), 240 crate::oauth::RepoAction::Delete, 241 collection, 242 ) { 243 return e; 244 } 245 } 246 } 247 248 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 249 .fetch_optional(&state.db) 250 .await 251 { 252 Ok(Some(id)) => id, 253 _ => { 254 return ( 255 StatusCode::INTERNAL_SERVER_ERROR, 256 Json(json!({"error": "InternalError", "message": "User not found"})), 257 ) 258 .into_response(); 259 } 260 }; 261 let root_cid_str: String = match sqlx::query_scalar!( 262 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 263 user_id 264 ) 265 .fetch_optional(&state.db) 266 .await 267 { 268 Ok(Some(cid_str)) => cid_str, 269 _ => { 270 return ( 271 StatusCode::INTERNAL_SERVER_ERROR, 272 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 273 ) 274 .into_response(); 275 } 276 }; 277 let current_root_cid = match Cid::from_str(&root_cid_str) { 278 Ok(c) => c, 279 Err(_) => { 280 return ( 281 StatusCode::INTERNAL_SERVER_ERROR, 282 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 283 ) 284 .into_response(); 285 } 286 }; 287 if let Some(swap_commit) = &input.swap_commit 288 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 289 { 290 return ( 291 StatusCode::CONFLICT, 292 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 293 ) 294 .into_response(); 295 } 296 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 297 let commit_bytes = match tracking_store.get(&current_root_cid).await { 298 Ok(Some(b)) => b, 299 _ => { 300 return ( 301 StatusCode::INTERNAL_SERVER_ERROR, 302 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 303 ) 304 .into_response(); 305 } 306 }; 307 let commit = match Commit::from_cbor(&commit_bytes) { 308 Ok(c) => c, 309 _ => { 310 return ( 311 StatusCode::INTERNAL_SERVER_ERROR, 312 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 313 ) 314 .into_response(); 315 } 316 }; 317 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 318 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 319 let mut results: Vec<WriteResult> = Vec::new(); 320 let mut ops: Vec<RecordOp> = Vec::new(); 321 let mut modified_keys: Vec<String> = Vec::new(); 322 let mut all_blob_cids: Vec<String> = Vec::new(); 323 for write in &input.writes { 324 match write { 325 WriteOp::Create { 326 collection, 327 rkey, 328 value, 329 } => { 330 let validation_status = if input.validate == Some(false) { 331 None 332 } else { 333 let require_lexicon = input.validate == Some(true); 334 match validate_record_with_status( 335 value, 336 collection, 337 rkey.as_deref(), 338 require_lexicon, 339 ) { 340 Ok(status) => Some(status), 341 Err(err_response) => return *err_response, 342 } 343 }; 344 all_blob_cids.extend(extract_blob_cids(value)); 345 let rkey = rkey 346 .clone() 347 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 348 let mut record_bytes = Vec::new(); 349 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() { 350 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 351 } 352 let record_cid = match tracking_store.put(&record_bytes).await { 353 Ok(c) => c, 354 Err(_) => return ( 355 StatusCode::INTERNAL_SERVER_ERROR, 356 Json( 357 json!({"error": "InternalError", "message": "Failed to store record"}), 358 ), 359 ) 360 .into_response(), 361 }; 362 let collection_nsid = match collection.parse::<Nsid>() { 363 Ok(n) => n, 364 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 365 }; 366 let key = format!("{}/{}", collection_nsid, rkey); 367 modified_keys.push(key.clone()); 368 mst = match mst.add(&key, record_cid).await { 369 Ok(m) => m, 370 Err(_) => return ( 371 StatusCode::INTERNAL_SERVER_ERROR, 372 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 373 ) 374 .into_response(), 375 }; 376 let uri = format!("at://{}/{}/{}", did, collection, rkey); 377 results.push(WriteResult::CreateResult { 378 uri, 379 cid: record_cid.to_string(), 380 validation_status: validation_status.map(|s| match s { 381 ValidationStatus::Valid => "valid".to_string(), 382 ValidationStatus::Unknown => "unknown".to_string(), 383 ValidationStatus::Invalid => "invalid".to_string(), 384 }), 385 }); 386 ops.push(RecordOp::Create { 387 collection: collection.clone(), 388 rkey, 389 cid: record_cid, 390 }); 391 } 392 WriteOp::Update { 393 collection, 394 rkey, 395 value, 396 } => { 397 let validation_status = if input.validate == Some(false) { 398 None 399 } else { 400 let require_lexicon = input.validate == Some(true); 401 match validate_record_with_status( 402 value, 403 collection, 404 Some(rkey), 405 require_lexicon, 406 ) { 407 Ok(status) => Some(status), 408 Err(err_response) => return *err_response, 409 } 410 }; 411 all_blob_cids.extend(extract_blob_cids(value)); 412 let mut record_bytes = Vec::new(); 413 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() { 414 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 415 } 416 let record_cid = match tracking_store.put(&record_bytes).await { 417 Ok(c) => c, 418 Err(_) => return ( 419 StatusCode::INTERNAL_SERVER_ERROR, 420 Json( 421 json!({"error": "InternalError", "message": "Failed to store record"}), 422 ), 423 ) 424 .into_response(), 425 }; 426 let collection_nsid = match collection.parse::<Nsid>() { 427 Ok(n) => n, 428 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 429 }; 430 let key = format!("{}/{}", collection_nsid, rkey); 431 modified_keys.push(key.clone()); 432 let prev_record_cid = mst.get(&key).await.ok().flatten(); 433 mst = match mst.update(&key, record_cid).await { 434 Ok(m) => m, 435 Err(_) => return ( 436 StatusCode::INTERNAL_SERVER_ERROR, 437 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 438 ) 439 .into_response(), 440 }; 441 let uri = format!("at://{}/{}/{}", did, collection, rkey); 442 results.push(WriteResult::UpdateResult { 443 uri, 444 cid: record_cid.to_string(), 445 validation_status: validation_status.map(|s| match s { 446 ValidationStatus::Valid => "valid".to_string(), 447 ValidationStatus::Unknown => "unknown".to_string(), 448 ValidationStatus::Invalid => "invalid".to_string(), 449 }), 450 }); 451 ops.push(RecordOp::Update { 452 collection: collection.clone(), 453 rkey: rkey.clone(), 454 cid: record_cid, 455 prev: prev_record_cid, 456 }); 457 } 458 WriteOp::Delete { collection, rkey } => { 459 let collection_nsid = match collection.parse::<Nsid>() { 460 Ok(n) => n, 461 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 462 }; 463 let key = format!("{}/{}", collection_nsid, rkey); 464 modified_keys.push(key.clone()); 465 let prev_record_cid = mst.get(&key).await.ok().flatten(); 466 mst = match mst.delete(&key).await { 467 Ok(m) => m, 468 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(), 469 }; 470 results.push(WriteResult::DeleteResult {}); 471 ops.push(RecordOp::Delete { 472 collection: collection.clone(), 473 rkey: rkey.clone(), 474 prev: prev_record_cid, 475 }); 476 } 477 } 478 } 479 let new_mst_root = match mst.persist().await { 480 Ok(c) => c, 481 Err(_) => { 482 return ( 483 StatusCode::INTERNAL_SERVER_ERROR, 484 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 485 ) 486 .into_response(); 487 } 488 }; 489 let mut relevant_blocks = std::collections::BTreeMap::new(); 490 for key in &modified_keys { 491 if mst 492 .blocks_for_path(key, &mut relevant_blocks) 493 .await 494 .is_err() 495 { 496 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 497 } 498 if original_mst 499 .blocks_for_path(key, &mut relevant_blocks) 500 .await 501 .is_err() 502 { 503 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 504 } 505 } 506 let mut written_cids = tracking_store.get_all_relevant_cids(); 507 for cid in relevant_blocks.keys() { 508 if !written_cids.contains(cid) { 509 written_cids.push(*cid); 510 } 511 } 512 let written_cids_str = written_cids 513 .iter() 514 .map(|c| c.to_string()) 515 .collect::<Vec<_>>(); 516 let commit_res = match commit_and_log( 517 &state, 518 CommitParams { 519 did: &did, 520 user_id, 521 current_root_cid: Some(current_root_cid), 522 prev_data_cid: Some(commit.data), 523 new_mst_root, 524 ops, 525 blocks_cids: &written_cids_str, 526 blobs: &all_blob_cids, 527 }, 528 ) 529 .await 530 { 531 Ok(res) => res, 532 Err(e) => { 533 error!("Commit failed: {}", e); 534 return ( 535 StatusCode::INTERNAL_SERVER_ERROR, 536 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})), 537 ) 538 .into_response(); 539 } 540 }; 541 542 if let Some(ref controller) = controller_did { 543 let write_summary: Vec<serde_json::Value> = input 544 .writes 545 .iter() 546 .map(|w| match w { 547 WriteOp::Create { 548 collection, rkey, .. 549 } => json!({ 550 "action": "create", 551 "collection": collection, 552 "rkey": rkey 553 }), 554 WriteOp::Update { 555 collection, rkey, .. 556 } => json!({ 557 "action": "update", 558 "collection": collection, 559 "rkey": rkey 560 }), 561 WriteOp::Delete { collection, rkey } => json!({ 562 "action": "delete", 563 "collection": collection, 564 "rkey": rkey 565 }), 566 }) 567 .collect(); 568 569 let _ = delegation::log_delegation_action( 570 &state.db, 571 &did, 572 controller, 573 Some(controller), 574 DelegationActionType::RepoWrite, 575 Some(json!({ 576 "action": "apply_writes", 577 "count": input.writes.len(), 578 "writes": write_summary 579 })), 580 None, 581 None, 582 ) 583 .await; 584 } 585 586 ( 587 StatusCode::OK, 588 Json(ApplyWritesOutput { 589 commit: CommitInfo { 590 cid: commit_res.commit_cid.to_string(), 591 rev: commit_res.rev, 592 }, 593 results, 594 }), 595 ) 596 .into_response() 597}