this repo has no description
1use super::validation::validate_record_with_rkey; 2use super::write::has_verified_comms_channel; 3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 4use crate::delegation::{self, DelegationActionType}; 5use crate::repo::tracking::TrackingBlockStore; 6use crate::state::AppState; 7use axum::{ 8 Json, 9 extract::State, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use cid::Cid; 14use jacquard::types::{ 15 integer::LimitedU32, 16 string::{Nsid, Tid}, 17}; 18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 19use serde::{Deserialize, Serialize}; 20use serde_json::json; 21use std::str::FromStr; 22use std::sync::Arc; 23use tracing::{error, info}; 24 25const MAX_BATCH_WRITES: usize = 200; 26 27#[derive(Deserialize)] 28#[serde(tag = "$type")] 29pub enum WriteOp { 30 #[serde(rename = "com.atproto.repo.applyWrites#create")] 31 Create { 32 collection: String, 33 rkey: Option<String>, 34 value: serde_json::Value, 35 }, 36 #[serde(rename = "com.atproto.repo.applyWrites#update")] 37 Update { 38 collection: String, 39 rkey: String, 40 value: serde_json::Value, 41 }, 42 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 43 Delete { collection: String, rkey: String }, 44} 45 46#[derive(Deserialize)] 47#[serde(rename_all = "camelCase")] 48pub struct ApplyWritesInput { 49 pub repo: String, 50 pub validate: Option<bool>, 51 pub writes: Vec<WriteOp>, 52 pub swap_commit: Option<String>, 53} 54 55#[derive(Serialize)] 56#[serde(tag = "$type")] 57pub enum WriteResult { 58 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 59 CreateResult { uri: String, cid: String }, 60 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 61 UpdateResult { uri: String, cid: String }, 62 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 63 DeleteResult {}, 64} 65 66#[derive(Serialize)] 67pub struct ApplyWritesOutput { 68 pub commit: CommitInfo, 69 pub results: Vec<WriteResult>, 70} 71 72#[derive(Serialize)] 73pub struct CommitInfo { 74 pub cid: String, 75 pub rev: String, 76} 77 78pub async fn apply_writes( 79 State(state): State<AppState>, 80 headers: axum::http::HeaderMap, 81 Json(input): Json<ApplyWritesInput>, 82) -> Response { 83 info!( 84 "apply_writes called: repo={}, writes={}", 85 input.repo, 86 input.writes.len() 87 ); 88 let token = match crate::auth::extract_bearer_token_from_header( 89 headers.get("Authorization").and_then(|h| h.to_str().ok()), 90 ) { 91 Some(t) => t, 92 None => { 93 return ( 94 StatusCode::UNAUTHORIZED, 95 Json(json!({"error": "AuthenticationRequired"})), 96 ) 97 .into_response(); 98 } 99 }; 100 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 101 Ok(user) => user, 102 Err(_) => { 103 return ( 104 StatusCode::UNAUTHORIZED, 105 Json(json!({"error": "AuthenticationFailed"})), 106 ) 107 .into_response(); 108 } 109 }; 110 let did = auth_user.did.clone(); 111 let is_oauth = auth_user.is_oauth; 112 let scope = auth_user.scope; 113 let controller_did = auth_user.controller_did.clone(); 114 if input.repo != did { 115 return ( 116 StatusCode::FORBIDDEN, 117 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 118 ) 119 .into_response(); 120 } 121 let is_verified = has_verified_comms_channel(&state.db, &did) 122 .await 123 .unwrap_or(false); 124 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did) 125 .await 126 .unwrap_or(false); 127 if !is_verified && !is_delegated { 128 return ( 129 StatusCode::FORBIDDEN, 130 Json(json!({ 131 "error": "AccountNotVerified", 132 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 133 })), 134 ) 135 .into_response(); 136 } 137 if input.writes.is_empty() { 138 return ( 139 StatusCode::BAD_REQUEST, 140 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})), 141 ) 142 .into_response(); 143 } 144 if input.writes.len() > MAX_BATCH_WRITES { 145 return ( 146 StatusCode::BAD_REQUEST, 147 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})), 148 ) 149 .into_response(); 150 } 151 152 let has_custom_scope = scope 153 .as_ref() 154 .map(|s| s != "com.atproto.access") 155 .unwrap_or(false); 156 if is_oauth || has_custom_scope { 157 use std::collections::HashSet; 158 let create_collections: HashSet<&str> = input 159 .writes 160 .iter() 161 .filter_map(|w| { 162 if let WriteOp::Create { collection, .. } = w { 163 Some(collection.as_str()) 164 } else { 165 None 166 } 167 }) 168 .collect(); 169 let update_collections: HashSet<&str> = input 170 .writes 171 .iter() 172 .filter_map(|w| { 173 if let WriteOp::Update { collection, .. } = w { 174 Some(collection.as_str()) 175 } else { 176 None 177 } 178 }) 179 .collect(); 180 let delete_collections: HashSet<&str> = input 181 .writes 182 .iter() 183 .filter_map(|w| { 184 if let WriteOp::Delete { collection, .. } = w { 185 Some(collection.as_str()) 186 } else { 187 None 188 } 189 }) 190 .collect(); 191 192 for collection in create_collections { 193 if let Err(e) = crate::auth::scope_check::check_repo_scope( 194 is_oauth, 195 scope.as_deref(), 196 crate::oauth::RepoAction::Create, 197 collection, 198 ) { 199 return e; 200 } 201 } 202 for collection in update_collections { 203 if let Err(e) = crate::auth::scope_check::check_repo_scope( 204 is_oauth, 205 scope.as_deref(), 206 crate::oauth::RepoAction::Update, 207 collection, 208 ) { 209 return e; 210 } 211 } 212 for collection in delete_collections { 213 if let Err(e) = crate::auth::scope_check::check_repo_scope( 214 is_oauth, 215 scope.as_deref(), 216 crate::oauth::RepoAction::Delete, 217 collection, 218 ) { 219 return e; 220 } 221 } 222 } 223 224 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 225 .fetch_optional(&state.db) 226 .await 227 { 228 Ok(Some(id)) => id, 229 _ => { 230 return ( 231 StatusCode::INTERNAL_SERVER_ERROR, 232 Json(json!({"error": "InternalError", "message": "User not found"})), 233 ) 234 .into_response(); 235 } 236 }; 237 let root_cid_str: String = match sqlx::query_scalar!( 238 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 239 user_id 240 ) 241 .fetch_optional(&state.db) 242 .await 243 { 244 Ok(Some(cid_str)) => cid_str, 245 _ => { 246 return ( 247 StatusCode::INTERNAL_SERVER_ERROR, 248 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 249 ) 250 .into_response(); 251 } 252 }; 253 let current_root_cid = match Cid::from_str(&root_cid_str) { 254 Ok(c) => c, 255 Err(_) => { 256 return ( 257 StatusCode::INTERNAL_SERVER_ERROR, 258 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 259 ) 260 .into_response(); 261 } 262 }; 263 if let Some(swap_commit) = &input.swap_commit 264 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 265 { 266 return ( 267 StatusCode::CONFLICT, 268 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 269 ) 270 .into_response(); 271 } 272 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 273 let commit_bytes = match tracking_store.get(&current_root_cid).await { 274 Ok(Some(b)) => b, 275 _ => { 276 return ( 277 StatusCode::INTERNAL_SERVER_ERROR, 278 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 279 ) 280 .into_response(); 281 } 282 }; 283 let commit = match Commit::from_cbor(&commit_bytes) { 284 Ok(c) => c, 285 _ => { 286 return ( 287 StatusCode::INTERNAL_SERVER_ERROR, 288 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 289 ) 290 .into_response(); 291 } 292 }; 293 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 294 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 295 let mut results: Vec<WriteResult> = Vec::new(); 296 let mut ops: Vec<RecordOp> = Vec::new(); 297 let mut modified_keys: Vec<String> = Vec::new(); 298 let mut all_blob_cids: Vec<String> = Vec::new(); 299 for write in &input.writes { 300 match write { 301 WriteOp::Create { 302 collection, 303 rkey, 304 value, 305 } => { 306 if input.validate.unwrap_or(true) 307 && let Err(err_response) = 308 validate_record_with_rkey(value, collection, rkey.as_deref()) 309 { 310 return *err_response; 311 } 312 all_blob_cids.extend(extract_blob_cids(value)); 313 let rkey = rkey 314 .clone() 315 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 316 let mut record_bytes = Vec::new(); 317 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() { 318 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 319 } 320 let record_cid = match tracking_store.put(&record_bytes).await { 321 Ok(c) => c, 322 Err(_) => return ( 323 StatusCode::INTERNAL_SERVER_ERROR, 324 Json( 325 json!({"error": "InternalError", "message": "Failed to store record"}), 326 ), 327 ) 328 .into_response(), 329 }; 330 let collection_nsid = match collection.parse::<Nsid>() { 331 Ok(n) => n, 332 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 333 }; 334 let key = format!("{}/{}", collection_nsid, rkey); 335 modified_keys.push(key.clone()); 336 mst = match mst.add(&key, record_cid).await { 337 Ok(m) => m, 338 Err(_) => return ( 339 StatusCode::INTERNAL_SERVER_ERROR, 340 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 341 ) 342 .into_response(), 343 }; 344 let uri = format!("at://{}/{}/{}", did, collection, rkey); 345 results.push(WriteResult::CreateResult { 346 uri, 347 cid: record_cid.to_string(), 348 }); 349 ops.push(RecordOp::Create { 350 collection: collection.clone(), 351 rkey, 352 cid: record_cid, 353 }); 354 } 355 WriteOp::Update { 356 collection, 357 rkey, 358 value, 359 } => { 360 if input.validate.unwrap_or(true) 361 && let Err(err_response) = 362 validate_record_with_rkey(value, collection, Some(rkey)) 363 { 364 return *err_response; 365 } 366 all_blob_cids.extend(extract_blob_cids(value)); 367 let mut record_bytes = Vec::new(); 368 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() { 369 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 370 } 371 let record_cid = match tracking_store.put(&record_bytes).await { 372 Ok(c) => c, 373 Err(_) => return ( 374 StatusCode::INTERNAL_SERVER_ERROR, 375 Json( 376 json!({"error": "InternalError", "message": "Failed to store record"}), 377 ), 378 ) 379 .into_response(), 380 }; 381 let collection_nsid = match collection.parse::<Nsid>() { 382 Ok(n) => n, 383 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 384 }; 385 let key = format!("{}/{}", collection_nsid, rkey); 386 modified_keys.push(key.clone()); 387 let prev_record_cid = mst.get(&key).await.ok().flatten(); 388 mst = match mst.update(&key, record_cid).await { 389 Ok(m) => m, 390 Err(_) => return ( 391 StatusCode::INTERNAL_SERVER_ERROR, 392 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 393 ) 394 .into_response(), 395 }; 396 let uri = format!("at://{}/{}/{}", did, collection, rkey); 397 results.push(WriteResult::UpdateResult { 398 uri, 399 cid: record_cid.to_string(), 400 }); 401 ops.push(RecordOp::Update { 402 collection: collection.clone(), 403 rkey: rkey.clone(), 404 cid: record_cid, 405 prev: prev_record_cid, 406 }); 407 } 408 WriteOp::Delete { collection, rkey } => { 409 let collection_nsid = match collection.parse::<Nsid>() { 410 Ok(n) => n, 411 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 412 }; 413 let key = format!("{}/{}", collection_nsid, rkey); 414 modified_keys.push(key.clone()); 415 let prev_record_cid = mst.get(&key).await.ok().flatten(); 416 mst = match mst.delete(&key).await { 417 Ok(m) => m, 418 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(), 419 }; 420 results.push(WriteResult::DeleteResult {}); 421 ops.push(RecordOp::Delete { 422 collection: collection.clone(), 423 rkey: rkey.clone(), 424 prev: prev_record_cid, 425 }); 426 } 427 } 428 } 429 let new_mst_root = match mst.persist().await { 430 Ok(c) => c, 431 Err(_) => { 432 return ( 433 StatusCode::INTERNAL_SERVER_ERROR, 434 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 435 ) 436 .into_response(); 437 } 438 }; 439 let mut relevant_blocks = std::collections::BTreeMap::new(); 440 for key in &modified_keys { 441 if mst 442 .blocks_for_path(key, &mut relevant_blocks) 443 .await 444 .is_err() 445 { 446 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 447 } 448 if original_mst 449 .blocks_for_path(key, &mut relevant_blocks) 450 .await 451 .is_err() 452 { 453 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 454 } 455 } 456 let mut written_cids = tracking_store.get_all_relevant_cids(); 457 for cid in relevant_blocks.keys() { 458 if !written_cids.contains(cid) { 459 written_cids.push(*cid); 460 } 461 } 462 let written_cids_str = written_cids 463 .iter() 464 .map(|c| c.to_string()) 465 .collect::<Vec<_>>(); 466 let commit_res = match commit_and_log( 467 &state, 468 CommitParams { 469 did: &did, 470 user_id, 471 current_root_cid: Some(current_root_cid), 472 prev_data_cid: Some(commit.data), 473 new_mst_root, 474 ops, 475 blocks_cids: &written_cids_str, 476 blobs: &all_blob_cids, 477 }, 478 ) 479 .await 480 { 481 Ok(res) => res, 482 Err(e) => { 483 error!("Commit failed: {}", e); 484 return ( 485 StatusCode::INTERNAL_SERVER_ERROR, 486 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})), 487 ) 488 .into_response(); 489 } 490 }; 491 492 if let Some(ref controller) = controller_did { 493 let write_summary: Vec<serde_json::Value> = input 494 .writes 495 .iter() 496 .map(|w| match w { 497 WriteOp::Create { 498 collection, rkey, .. 499 } => json!({ 500 "action": "create", 501 "collection": collection, 502 "rkey": rkey 503 }), 504 WriteOp::Update { 505 collection, rkey, .. 506 } => json!({ 507 "action": "update", 508 "collection": collection, 509 "rkey": rkey 510 }), 511 WriteOp::Delete { collection, rkey } => json!({ 512 "action": "delete", 513 "collection": collection, 514 "rkey": rkey 515 }), 516 }) 517 .collect(); 518 519 let _ = delegation::log_delegation_action( 520 &state.db, 521 &did, 522 controller, 523 Some(controller), 524 DelegationActionType::RepoWrite, 525 Some(json!({ 526 "action": "apply_writes", 527 "count": input.writes.len(), 528 "writes": write_summary 529 })), 530 None, 531 None, 532 ) 533 .await; 534 } 535 536 ( 537 StatusCode::OK, 538 Json(ApplyWritesOutput { 539 commit: CommitInfo { 540 cid: commit_res.commit_cid.to_string(), 541 rev: commit_res.rev, 542 }, 543 results, 544 }), 545 ) 546 .into_response() 547}