this repo has no description
1use super::validation::validate_record_with_status; 2use super::write::has_verified_comms_channel; 3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 4use crate::delegation::{self, DelegationActionType}; 5use crate::repo::tracking::TrackingBlockStore; 6use crate::state::AppState; 7use crate::validation::ValidationStatus; 8use axum::{ 9 Json, 10 extract::State, 11 http::StatusCode, 12 response::{IntoResponse, Response}, 13}; 14use cid::Cid; 15use jacquard::types::{ 16 integer::LimitedU32, 17 string::{Nsid, Tid}, 18}; 19use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 20use serde::{Deserialize, Serialize}; 21use serde_json::json; 22use std::str::FromStr; 23use std::sync::Arc; 24use tracing::{error, info}; 25 26const MAX_BATCH_WRITES: usize = 200; 27 28#[derive(Deserialize)] 29#[serde(tag = "$type")] 30pub enum WriteOp { 31 #[serde(rename = "com.atproto.repo.applyWrites#create")] 32 Create { 33 collection: String, 34 rkey: Option<String>, 35 value: serde_json::Value, 36 }, 37 #[serde(rename = "com.atproto.repo.applyWrites#update")] 38 Update { 39 collection: String, 40 rkey: String, 41 value: serde_json::Value, 42 }, 43 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 44 Delete { collection: String, rkey: String }, 45} 46 47#[derive(Deserialize)] 48#[serde(rename_all = "camelCase")] 49pub struct ApplyWritesInput { 50 pub repo: String, 51 pub validate: Option<bool>, 52 pub writes: Vec<WriteOp>, 53 pub swap_commit: Option<String>, 54} 55 56#[derive(Serialize)] 57#[serde(tag = "$type")] 58pub enum WriteResult { 59 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 60 CreateResult { 61 uri: String, 62 cid: String, 63 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")] 64 validation_status: Option<String>, 65 }, 66 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 67 UpdateResult { 68 uri: String, 69 cid: String, 70 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")] 71 validation_status: Option<String>, 72 }, 73 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 74 DeleteResult {}, 75} 76 77#[derive(Serialize)] 78pub struct ApplyWritesOutput { 79 pub commit: CommitInfo, 80 pub results: Vec<WriteResult>, 81} 82 83#[derive(Serialize)] 84pub struct CommitInfo { 85 pub cid: String, 86 pub rev: String, 87} 88 89pub async fn apply_writes( 90 State(state): State<AppState>, 91 headers: axum::http::HeaderMap, 92 Json(input): Json<ApplyWritesInput>, 93) -> Response { 94 info!( 95 "apply_writes called: repo={}, writes={}", 96 input.repo, 97 input.writes.len() 98 ); 99 let token = match crate::auth::extract_bearer_token_from_header( 100 headers.get("Authorization").and_then(|h| h.to_str().ok()), 101 ) { 102 Some(t) => t, 103 None => { 104 return ( 105 StatusCode::UNAUTHORIZED, 106 Json(json!({"error": "AuthenticationRequired"})), 107 ) 108 .into_response(); 109 } 110 }; 111 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 112 Ok(user) => user, 113 Err(_) => { 114 return ( 115 StatusCode::UNAUTHORIZED, 116 Json(json!({"error": "AuthenticationFailed"})), 117 ) 118 .into_response(); 119 } 120 }; 121 let did = auth_user.did.clone(); 122 let is_oauth = auth_user.is_oauth; 123 let scope = auth_user.scope; 124 let controller_did = auth_user.controller_did.clone(); 125 if input.repo != did { 126 return ( 127 StatusCode::FORBIDDEN, 128 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 129 ) 130 .into_response(); 131 } 132 if crate::util::is_account_migrated(&state.db, &did) 133 .await 134 .unwrap_or(false) 135 { 136 return ( 137 StatusCode::FORBIDDEN, 138 Json(json!({ 139 "error": "AccountMigrated", 140 "message": "Account has been migrated to another PDS. Repo operations are not allowed." 141 })), 142 ) 143 .into_response(); 144 } 145 let is_verified = has_verified_comms_channel(&state.db, &did) 146 .await 147 .unwrap_or(false); 148 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did) 149 .await 150 .unwrap_or(false); 151 if !is_verified && !is_delegated { 152 return ( 153 StatusCode::FORBIDDEN, 154 Json(json!({ 155 "error": "AccountNotVerified", 156 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 157 })), 158 ) 159 .into_response(); 160 } 161 if input.writes.is_empty() { 162 return ( 163 StatusCode::BAD_REQUEST, 164 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})), 165 ) 166 .into_response(); 167 } 168 if input.writes.len() > MAX_BATCH_WRITES { 169 return ( 170 StatusCode::BAD_REQUEST, 171 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})), 172 ) 173 .into_response(); 174 } 175 176 let has_custom_scope = scope 177 .as_ref() 178 .map(|s| s != "com.atproto.access") 179 .unwrap_or(false); 180 if is_oauth || has_custom_scope { 181 use std::collections::HashSet; 182 let create_collections: HashSet<&str> = input 183 .writes 184 .iter() 185 .filter_map(|w| { 186 if let WriteOp::Create { collection, .. } = w { 187 Some(collection.as_str()) 188 } else { 189 None 190 } 191 }) 192 .collect(); 193 let update_collections: HashSet<&str> = input 194 .writes 195 .iter() 196 .filter_map(|w| { 197 if let WriteOp::Update { collection, .. } = w { 198 Some(collection.as_str()) 199 } else { 200 None 201 } 202 }) 203 .collect(); 204 let delete_collections: HashSet<&str> = input 205 .writes 206 .iter() 207 .filter_map(|w| { 208 if let WriteOp::Delete { collection, .. } = w { 209 Some(collection.as_str()) 210 } else { 211 None 212 } 213 }) 214 .collect(); 215 216 for collection in create_collections { 217 if let Err(e) = crate::auth::scope_check::check_repo_scope( 218 is_oauth, 219 scope.as_deref(), 220 crate::oauth::RepoAction::Create, 221 collection, 222 ) { 223 return e; 224 } 225 } 226 for collection in update_collections { 227 if let Err(e) = crate::auth::scope_check::check_repo_scope( 228 is_oauth, 229 scope.as_deref(), 230 crate::oauth::RepoAction::Update, 231 collection, 232 ) { 233 return e; 234 } 235 } 236 for collection in delete_collections { 237 if let Err(e) = crate::auth::scope_check::check_repo_scope( 238 is_oauth, 239 scope.as_deref(), 240 crate::oauth::RepoAction::Delete, 241 collection, 242 ) { 243 return e; 244 } 245 } 246 } 247 248 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 249 .fetch_optional(&state.db) 250 .await 251 { 252 Ok(Some(id)) => id, 253 _ => { 254 return ( 255 StatusCode::INTERNAL_SERVER_ERROR, 256 Json(json!({"error": "InternalError", "message": "User not found"})), 257 ) 258 .into_response(); 259 } 260 }; 261 let root_cid_str: String = match sqlx::query_scalar!( 262 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 263 user_id 264 ) 265 .fetch_optional(&state.db) 266 .await 267 { 268 Ok(Some(cid_str)) => cid_str, 269 _ => { 270 return ( 271 StatusCode::INTERNAL_SERVER_ERROR, 272 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 273 ) 274 .into_response(); 275 } 276 }; 277 let current_root_cid = match Cid::from_str(&root_cid_str) { 278 Ok(c) => c, 279 Err(_) => { 280 return ( 281 StatusCode::INTERNAL_SERVER_ERROR, 282 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 283 ) 284 .into_response(); 285 } 286 }; 287 if let Some(swap_commit) = &input.swap_commit 288 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 289 { 290 return ( 291 StatusCode::CONFLICT, 292 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 293 ) 294 .into_response(); 295 } 296 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 297 let commit_bytes = match tracking_store.get(&current_root_cid).await { 298 Ok(Some(b)) => b, 299 _ => { 300 return ( 301 StatusCode::INTERNAL_SERVER_ERROR, 302 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 303 ) 304 .into_response(); 305 } 306 }; 307 let commit = match Commit::from_cbor(&commit_bytes) { 308 Ok(c) => c, 309 _ => { 310 return ( 311 StatusCode::INTERNAL_SERVER_ERROR, 312 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 313 ) 314 .into_response(); 315 } 316 }; 317 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 318 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 319 let mut results: Vec<WriteResult> = Vec::new(); 320 let mut ops: Vec<RecordOp> = Vec::new(); 321 let mut modified_keys: Vec<String> = Vec::new(); 322 let mut all_blob_cids: Vec<String> = Vec::new(); 323 for write in &input.writes { 324 match write { 325 WriteOp::Create { 326 collection, 327 rkey, 328 value, 329 } => { 330 let validation_status = if input.validate == Some(false) { 331 None 332 } else { 333 let require_lexicon = input.validate == Some(true); 334 match validate_record_with_status( 335 value, 336 collection, 337 rkey.as_deref(), 338 require_lexicon, 339 ) { 340 Ok(status) => Some(status), 341 Err(err_response) => return *err_response, 342 } 343 }; 344 all_blob_cids.extend(extract_blob_cids(value)); 345 let rkey = rkey 346 .clone() 347 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 348 let record_ipld = crate::util::json_to_ipld(value); 349 let mut record_bytes = Vec::new(); 350 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 351 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 352 } 353 let record_cid = match tracking_store.put(&record_bytes).await { 354 Ok(c) => c, 355 Err(_) => return ( 356 StatusCode::INTERNAL_SERVER_ERROR, 357 Json( 358 json!({"error": "InternalError", "message": "Failed to store record"}), 359 ), 360 ) 361 .into_response(), 362 }; 363 let collection_nsid = match collection.parse::<Nsid>() { 364 Ok(n) => n, 365 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 366 }; 367 let key = format!("{}/{}", collection_nsid, rkey); 368 modified_keys.push(key.clone()); 369 mst = match mst.add(&key, record_cid).await { 370 Ok(m) => m, 371 Err(_) => return ( 372 StatusCode::INTERNAL_SERVER_ERROR, 373 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 374 ) 375 .into_response(), 376 }; 377 let uri = format!("at://{}/{}/{}", did, collection, rkey); 378 results.push(WriteResult::CreateResult { 379 uri, 380 cid: record_cid.to_string(), 381 validation_status: validation_status.map(|s| match s { 382 ValidationStatus::Valid => "valid".to_string(), 383 ValidationStatus::Unknown => "unknown".to_string(), 384 ValidationStatus::Invalid => "invalid".to_string(), 385 }), 386 }); 387 ops.push(RecordOp::Create { 388 collection: collection.clone(), 389 rkey, 390 cid: record_cid, 391 }); 392 } 393 WriteOp::Update { 394 collection, 395 rkey, 396 value, 397 } => { 398 let validation_status = if input.validate == Some(false) { 399 None 400 } else { 401 let require_lexicon = input.validate == Some(true); 402 match validate_record_with_status( 403 value, 404 collection, 405 Some(rkey), 406 require_lexicon, 407 ) { 408 Ok(status) => Some(status), 409 Err(err_response) => return *err_response, 410 } 411 }; 412 all_blob_cids.extend(extract_blob_cids(value)); 413 let record_ipld = crate::util::json_to_ipld(value); 414 let mut record_bytes = Vec::new(); 415 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 416 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 417 } 418 let record_cid = match tracking_store.put(&record_bytes).await { 419 Ok(c) => c, 420 Err(_) => return ( 421 StatusCode::INTERNAL_SERVER_ERROR, 422 Json( 423 json!({"error": "InternalError", "message": "Failed to store record"}), 424 ), 425 ) 426 .into_response(), 427 }; 428 let collection_nsid = match collection.parse::<Nsid>() { 429 Ok(n) => n, 430 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 431 }; 432 let key = format!("{}/{}", collection_nsid, rkey); 433 modified_keys.push(key.clone()); 434 let prev_record_cid = mst.get(&key).await.ok().flatten(); 435 mst = match mst.update(&key, record_cid).await { 436 Ok(m) => m, 437 Err(_) => return ( 438 StatusCode::INTERNAL_SERVER_ERROR, 439 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 440 ) 441 .into_response(), 442 }; 443 let uri = format!("at://{}/{}/{}", did, collection, rkey); 444 results.push(WriteResult::UpdateResult { 445 uri, 446 cid: record_cid.to_string(), 447 validation_status: validation_status.map(|s| match s { 448 ValidationStatus::Valid => "valid".to_string(), 449 ValidationStatus::Unknown => "unknown".to_string(), 450 ValidationStatus::Invalid => "invalid".to_string(), 451 }), 452 }); 453 ops.push(RecordOp::Update { 454 collection: collection.clone(), 455 rkey: rkey.clone(), 456 cid: record_cid, 457 prev: prev_record_cid, 458 }); 459 } 460 WriteOp::Delete { collection, rkey } => { 461 let collection_nsid = match collection.parse::<Nsid>() { 462 Ok(n) => n, 463 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 464 }; 465 let key = format!("{}/{}", collection_nsid, rkey); 466 modified_keys.push(key.clone()); 467 let prev_record_cid = mst.get(&key).await.ok().flatten(); 468 mst = match mst.delete(&key).await { 469 Ok(m) => m, 470 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(), 471 }; 472 results.push(WriteResult::DeleteResult {}); 473 ops.push(RecordOp::Delete { 474 collection: collection.clone(), 475 rkey: rkey.clone(), 476 prev: prev_record_cid, 477 }); 478 } 479 } 480 } 481 let new_mst_root = match mst.persist().await { 482 Ok(c) => c, 483 Err(_) => { 484 return ( 485 StatusCode::INTERNAL_SERVER_ERROR, 486 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 487 ) 488 .into_response(); 489 } 490 }; 491 let mut relevant_blocks = std::collections::BTreeMap::new(); 492 for key in &modified_keys { 493 if mst 494 .blocks_for_path(key, &mut relevant_blocks) 495 .await 496 .is_err() 497 { 498 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 499 } 500 if original_mst 501 .blocks_for_path(key, &mut relevant_blocks) 502 .await 503 .is_err() 504 { 505 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 506 } 507 } 508 let mut written_cids = tracking_store.get_all_relevant_cids(); 509 for cid in relevant_blocks.keys() { 510 if !written_cids.contains(cid) { 511 written_cids.push(*cid); 512 } 513 } 514 let written_cids_str = written_cids 515 .iter() 516 .map(|c| c.to_string()) 517 .collect::<Vec<_>>(); 518 let commit_res = match commit_and_log( 519 &state, 520 CommitParams { 521 did: &did, 522 user_id, 523 current_root_cid: Some(current_root_cid), 524 prev_data_cid: Some(commit.data), 525 new_mst_root, 526 ops, 527 blocks_cids: &written_cids_str, 528 blobs: &all_blob_cids, 529 }, 530 ) 531 .await 532 { 533 Ok(res) => res, 534 Err(e) => { 535 error!("Commit failed: {}", e); 536 return ( 537 StatusCode::INTERNAL_SERVER_ERROR, 538 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})), 539 ) 540 .into_response(); 541 } 542 }; 543 544 if let Some(ref controller) = controller_did { 545 let write_summary: Vec<serde_json::Value> = input 546 .writes 547 .iter() 548 .map(|w| match w { 549 WriteOp::Create { 550 collection, rkey, .. 551 } => json!({ 552 "action": "create", 553 "collection": collection, 554 "rkey": rkey 555 }), 556 WriteOp::Update { 557 collection, rkey, .. 558 } => json!({ 559 "action": "update", 560 "collection": collection, 561 "rkey": rkey 562 }), 563 WriteOp::Delete { collection, rkey } => json!({ 564 "action": "delete", 565 "collection": collection, 566 "rkey": rkey 567 }), 568 }) 569 .collect(); 570 571 let _ = delegation::log_delegation_action( 572 &state.db, 573 &did, 574 controller, 575 Some(controller), 576 DelegationActionType::RepoWrite, 577 Some(json!({ 578 "action": "apply_writes", 579 "count": input.writes.len(), 580 "writes": write_summary 581 })), 582 None, 583 None, 584 ) 585 .await; 586 } 587 588 ( 589 StatusCode::OK, 590 Json(ApplyWritesOutput { 591 commit: CommitInfo { 592 cid: commit_res.commit_cid.to_string(), 593 rev: commit_res.rev, 594 }, 595 results, 596 }), 597 ) 598 .into_response() 599}