this repo has no description
1use super::validation::validate_record_with_status; 2use super::write::has_verified_comms_channel; 3use crate::validation::ValidationStatus; 4use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 5use crate::delegation::{self, DelegationActionType}; 6use crate::repo::tracking::TrackingBlockStore; 7use crate::state::AppState; 8use axum::{ 9 Json, 10 extract::State, 11 http::StatusCode, 12 response::{IntoResponse, Response}, 13}; 14use cid::Cid; 15use jacquard::types::{ 16 integer::LimitedU32, 17 string::{Nsid, Tid}, 18}; 19use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 20use serde::{Deserialize, Serialize}; 21use serde_json::json; 22use std::str::FromStr; 23use std::sync::Arc; 24use tracing::{error, info}; 25 26const MAX_BATCH_WRITES: usize = 200; 27 28#[derive(Deserialize)] 29#[serde(tag = "$type")] 30pub enum WriteOp { 31 #[serde(rename = "com.atproto.repo.applyWrites#create")] 32 Create { 33 collection: String, 34 rkey: Option<String>, 35 value: serde_json::Value, 36 }, 37 #[serde(rename = "com.atproto.repo.applyWrites#update")] 38 Update { 39 collection: String, 40 rkey: String, 41 value: serde_json::Value, 42 }, 43 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 44 Delete { collection: String, rkey: String }, 45} 46 47#[derive(Deserialize)] 48#[serde(rename_all = "camelCase")] 49pub struct ApplyWritesInput { 50 pub repo: String, 51 pub validate: Option<bool>, 52 pub writes: Vec<WriteOp>, 53 pub swap_commit: Option<String>, 54} 55 56#[derive(Serialize)] 57#[serde(tag = "$type")] 58pub enum WriteResult { 59 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 60 CreateResult { 61 uri: String, 62 cid: String, 63 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")] 64 validation_status: Option<String>, 65 }, 66 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 67 UpdateResult { 68 uri: String, 69 cid: String, 70 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")] 71 validation_status: Option<String>, 72 }, 73 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 74 DeleteResult {}, 75} 76 77#[derive(Serialize)] 78pub struct ApplyWritesOutput { 79 pub commit: CommitInfo, 80 pub results: Vec<WriteResult>, 81} 82 83#[derive(Serialize)] 84pub struct CommitInfo { 85 pub cid: String, 86 pub rev: String, 87} 88 89pub async fn apply_writes( 90 State(state): State<AppState>, 91 headers: axum::http::HeaderMap, 92 Json(input): Json<ApplyWritesInput>, 93) -> Response { 94 info!( 95 "apply_writes called: repo={}, writes={}", 96 input.repo, 97 input.writes.len() 98 ); 99 let token = match crate::auth::extract_bearer_token_from_header( 100 headers.get("Authorization").and_then(|h| h.to_str().ok()), 101 ) { 102 Some(t) => t, 103 None => { 104 return ( 105 StatusCode::UNAUTHORIZED, 106 Json(json!({"error": "AuthenticationRequired"})), 107 ) 108 .into_response(); 109 } 110 }; 111 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 112 Ok(user) => user, 113 Err(_) => { 114 return ( 115 StatusCode::UNAUTHORIZED, 116 Json(json!({"error": "AuthenticationFailed"})), 117 ) 118 .into_response(); 119 } 120 }; 121 let did = auth_user.did.clone(); 122 let is_oauth = auth_user.is_oauth; 123 let scope = auth_user.scope; 124 let controller_did = auth_user.controller_did.clone(); 125 if input.repo != did { 126 return ( 127 StatusCode::FORBIDDEN, 128 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 129 ) 130 .into_response(); 131 } 132 let is_verified = has_verified_comms_channel(&state.db, &did) 133 .await 134 .unwrap_or(false); 135 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did) 136 .await 137 .unwrap_or(false); 138 if !is_verified && !is_delegated { 139 return ( 140 StatusCode::FORBIDDEN, 141 Json(json!({ 142 "error": "AccountNotVerified", 143 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 144 })), 145 ) 146 .into_response(); 147 } 148 if input.writes.is_empty() { 149 return ( 150 StatusCode::BAD_REQUEST, 151 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})), 152 ) 153 .into_response(); 154 } 155 if input.writes.len() > MAX_BATCH_WRITES { 156 return ( 157 StatusCode::BAD_REQUEST, 158 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})), 159 ) 160 .into_response(); 161 } 162 163 let has_custom_scope = scope 164 .as_ref() 165 .map(|s| s != "com.atproto.access") 166 .unwrap_or(false); 167 if is_oauth || has_custom_scope { 168 use std::collections::HashSet; 169 let create_collections: HashSet<&str> = input 170 .writes 171 .iter() 172 .filter_map(|w| { 173 if let WriteOp::Create { collection, .. } = w { 174 Some(collection.as_str()) 175 } else { 176 None 177 } 178 }) 179 .collect(); 180 let update_collections: HashSet<&str> = input 181 .writes 182 .iter() 183 .filter_map(|w| { 184 if let WriteOp::Update { collection, .. } = w { 185 Some(collection.as_str()) 186 } else { 187 None 188 } 189 }) 190 .collect(); 191 let delete_collections: HashSet<&str> = input 192 .writes 193 .iter() 194 .filter_map(|w| { 195 if let WriteOp::Delete { collection, .. } = w { 196 Some(collection.as_str()) 197 } else { 198 None 199 } 200 }) 201 .collect(); 202 203 for collection in create_collections { 204 if let Err(e) = crate::auth::scope_check::check_repo_scope( 205 is_oauth, 206 scope.as_deref(), 207 crate::oauth::RepoAction::Create, 208 collection, 209 ) { 210 return e; 211 } 212 } 213 for collection in update_collections { 214 if let Err(e) = crate::auth::scope_check::check_repo_scope( 215 is_oauth, 216 scope.as_deref(), 217 crate::oauth::RepoAction::Update, 218 collection, 219 ) { 220 return e; 221 } 222 } 223 for collection in delete_collections { 224 if let Err(e) = crate::auth::scope_check::check_repo_scope( 225 is_oauth, 226 scope.as_deref(), 227 crate::oauth::RepoAction::Delete, 228 collection, 229 ) { 230 return e; 231 } 232 } 233 } 234 235 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 236 .fetch_optional(&state.db) 237 .await 238 { 239 Ok(Some(id)) => id, 240 _ => { 241 return ( 242 StatusCode::INTERNAL_SERVER_ERROR, 243 Json(json!({"error": "InternalError", "message": "User not found"})), 244 ) 245 .into_response(); 246 } 247 }; 248 let root_cid_str: String = match sqlx::query_scalar!( 249 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 250 user_id 251 ) 252 .fetch_optional(&state.db) 253 .await 254 { 255 Ok(Some(cid_str)) => cid_str, 256 _ => { 257 return ( 258 StatusCode::INTERNAL_SERVER_ERROR, 259 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 260 ) 261 .into_response(); 262 } 263 }; 264 let current_root_cid = match Cid::from_str(&root_cid_str) { 265 Ok(c) => c, 266 Err(_) => { 267 return ( 268 StatusCode::INTERNAL_SERVER_ERROR, 269 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 270 ) 271 .into_response(); 272 } 273 }; 274 if let Some(swap_commit) = &input.swap_commit 275 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 276 { 277 return ( 278 StatusCode::CONFLICT, 279 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 280 ) 281 .into_response(); 282 } 283 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 284 let commit_bytes = match tracking_store.get(&current_root_cid).await { 285 Ok(Some(b)) => b, 286 _ => { 287 return ( 288 StatusCode::INTERNAL_SERVER_ERROR, 289 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 290 ) 291 .into_response(); 292 } 293 }; 294 let commit = match Commit::from_cbor(&commit_bytes) { 295 Ok(c) => c, 296 _ => { 297 return ( 298 StatusCode::INTERNAL_SERVER_ERROR, 299 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 300 ) 301 .into_response(); 302 } 303 }; 304 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 305 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 306 let mut results: Vec<WriteResult> = Vec::new(); 307 let mut ops: Vec<RecordOp> = Vec::new(); 308 let mut modified_keys: Vec<String> = Vec::new(); 309 let mut all_blob_cids: Vec<String> = Vec::new(); 310 for write in &input.writes { 311 match write { 312 WriteOp::Create { 313 collection, 314 rkey, 315 value, 316 } => { 317 let validation_status = if input.validate == Some(false) { 318 None 319 } else { 320 let require_lexicon = input.validate == Some(true); 321 match validate_record_with_status( 322 value, 323 collection, 324 rkey.as_deref(), 325 require_lexicon, 326 ) { 327 Ok(status) => Some(status), 328 Err(err_response) => return *err_response, 329 } 330 }; 331 all_blob_cids.extend(extract_blob_cids(value)); 332 let rkey = rkey 333 .clone() 334 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 335 let mut record_bytes = Vec::new(); 336 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() { 337 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 338 } 339 let record_cid = match tracking_store.put(&record_bytes).await { 340 Ok(c) => c, 341 Err(_) => return ( 342 StatusCode::INTERNAL_SERVER_ERROR, 343 Json( 344 json!({"error": "InternalError", "message": "Failed to store record"}), 345 ), 346 ) 347 .into_response(), 348 }; 349 let collection_nsid = match collection.parse::<Nsid>() { 350 Ok(n) => n, 351 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 352 }; 353 let key = format!("{}/{}", collection_nsid, rkey); 354 modified_keys.push(key.clone()); 355 mst = match mst.add(&key, record_cid).await { 356 Ok(m) => m, 357 Err(_) => return ( 358 StatusCode::INTERNAL_SERVER_ERROR, 359 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 360 ) 361 .into_response(), 362 }; 363 let uri = format!("at://{}/{}/{}", did, collection, rkey); 364 results.push(WriteResult::CreateResult { 365 uri, 366 cid: record_cid.to_string(), 367 validation_status: validation_status.map(|s| match s { 368 ValidationStatus::Valid => "valid".to_string(), 369 ValidationStatus::Unknown => "unknown".to_string(), 370 ValidationStatus::Invalid => "invalid".to_string(), 371 }), 372 }); 373 ops.push(RecordOp::Create { 374 collection: collection.clone(), 375 rkey, 376 cid: record_cid, 377 }); 378 } 379 WriteOp::Update { 380 collection, 381 rkey, 382 value, 383 } => { 384 let validation_status = if input.validate == Some(false) { 385 None 386 } else { 387 let require_lexicon = input.validate == Some(true); 388 match validate_record_with_status( 389 value, 390 collection, 391 Some(rkey), 392 require_lexicon, 393 ) { 394 Ok(status) => Some(status), 395 Err(err_response) => return *err_response, 396 } 397 }; 398 all_blob_cids.extend(extract_blob_cids(value)); 399 let mut record_bytes = Vec::new(); 400 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() { 401 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 402 } 403 let record_cid = match tracking_store.put(&record_bytes).await { 404 Ok(c) => c, 405 Err(_) => return ( 406 StatusCode::INTERNAL_SERVER_ERROR, 407 Json( 408 json!({"error": "InternalError", "message": "Failed to store record"}), 409 ), 410 ) 411 .into_response(), 412 }; 413 let collection_nsid = match collection.parse::<Nsid>() { 414 Ok(n) => n, 415 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 416 }; 417 let key = format!("{}/{}", collection_nsid, rkey); 418 modified_keys.push(key.clone()); 419 let prev_record_cid = mst.get(&key).await.ok().flatten(); 420 mst = match mst.update(&key, record_cid).await { 421 Ok(m) => m, 422 Err(_) => return ( 423 StatusCode::INTERNAL_SERVER_ERROR, 424 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 425 ) 426 .into_response(), 427 }; 428 let uri = format!("at://{}/{}/{}", did, collection, rkey); 429 results.push(WriteResult::UpdateResult { 430 uri, 431 cid: record_cid.to_string(), 432 validation_status: validation_status.map(|s| match s { 433 ValidationStatus::Valid => "valid".to_string(), 434 ValidationStatus::Unknown => "unknown".to_string(), 435 ValidationStatus::Invalid => "invalid".to_string(), 436 }), 437 }); 438 ops.push(RecordOp::Update { 439 collection: collection.clone(), 440 rkey: rkey.clone(), 441 cid: record_cid, 442 prev: prev_record_cid, 443 }); 444 } 445 WriteOp::Delete { collection, rkey } => { 446 let collection_nsid = match collection.parse::<Nsid>() { 447 Ok(n) => n, 448 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 449 }; 450 let key = format!("{}/{}", collection_nsid, rkey); 451 modified_keys.push(key.clone()); 452 let prev_record_cid = mst.get(&key).await.ok().flatten(); 453 mst = match mst.delete(&key).await { 454 Ok(m) => m, 455 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(), 456 }; 457 results.push(WriteResult::DeleteResult {}); 458 ops.push(RecordOp::Delete { 459 collection: collection.clone(), 460 rkey: rkey.clone(), 461 prev: prev_record_cid, 462 }); 463 } 464 } 465 } 466 let new_mst_root = match mst.persist().await { 467 Ok(c) => c, 468 Err(_) => { 469 return ( 470 StatusCode::INTERNAL_SERVER_ERROR, 471 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 472 ) 473 .into_response(); 474 } 475 }; 476 let mut relevant_blocks = std::collections::BTreeMap::new(); 477 for key in &modified_keys { 478 if mst 479 .blocks_for_path(key, &mut relevant_blocks) 480 .await 481 .is_err() 482 { 483 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 484 } 485 if original_mst 486 .blocks_for_path(key, &mut relevant_blocks) 487 .await 488 .is_err() 489 { 490 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 491 } 492 } 493 let mut written_cids = tracking_store.get_all_relevant_cids(); 494 for cid in relevant_blocks.keys() { 495 if !written_cids.contains(cid) { 496 written_cids.push(*cid); 497 } 498 } 499 let written_cids_str = written_cids 500 .iter() 501 .map(|c| c.to_string()) 502 .collect::<Vec<_>>(); 503 let commit_res = match commit_and_log( 504 &state, 505 CommitParams { 506 did: &did, 507 user_id, 508 current_root_cid: Some(current_root_cid), 509 prev_data_cid: Some(commit.data), 510 new_mst_root, 511 ops, 512 blocks_cids: &written_cids_str, 513 blobs: &all_blob_cids, 514 }, 515 ) 516 .await 517 { 518 Ok(res) => res, 519 Err(e) => { 520 error!("Commit failed: {}", e); 521 return ( 522 StatusCode::INTERNAL_SERVER_ERROR, 523 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})), 524 ) 525 .into_response(); 526 } 527 }; 528 529 if let Some(ref controller) = controller_did { 530 let write_summary: Vec<serde_json::Value> = input 531 .writes 532 .iter() 533 .map(|w| match w { 534 WriteOp::Create { 535 collection, rkey, .. 536 } => json!({ 537 "action": "create", 538 "collection": collection, 539 "rkey": rkey 540 }), 541 WriteOp::Update { 542 collection, rkey, .. 543 } => json!({ 544 "action": "update", 545 "collection": collection, 546 "rkey": rkey 547 }), 548 WriteOp::Delete { collection, rkey } => json!({ 549 "action": "delete", 550 "collection": collection, 551 "rkey": rkey 552 }), 553 }) 554 .collect(); 555 556 let _ = delegation::log_delegation_action( 557 &state.db, 558 &did, 559 controller, 560 Some(controller), 561 DelegationActionType::RepoWrite, 562 Some(json!({ 563 "action": "apply_writes", 564 "count": input.writes.len(), 565 "writes": write_summary 566 })), 567 None, 568 None, 569 ) 570 .await; 571 } 572 573 ( 574 StatusCode::OK, 575 Json(ApplyWritesOutput { 576 commit: CommitInfo { 577 cid: commit_res.commit_cid.to_string(), 578 rev: commit_res.rev, 579 }, 580 results, 581 }), 582 ) 583 .into_response() 584}