this repo has no description
1use super::validation::validate_record; 2use super::write::has_verified_comms_channel; 3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 4use crate::delegation::{self, DelegationActionType}; 5use crate::repo::tracking::TrackingBlockStore; 6use crate::state::AppState; 7use axum::{ 8 Json, 9 extract::State, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use cid::Cid; 14use jacquard::types::{ 15 integer::LimitedU32, 16 string::{Nsid, Tid}, 17}; 18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 19use serde::{Deserialize, Serialize}; 20use serde_json::json; 21use std::str::FromStr; 22use std::sync::Arc; 23use tracing::{error, info}; 24 25const MAX_BATCH_WRITES: usize = 200; 26 27#[derive(Deserialize)] 28#[serde(tag = "$type")] 29pub enum WriteOp { 30 #[serde(rename = "com.atproto.repo.applyWrites#create")] 31 Create { 32 collection: String, 33 rkey: Option<String>, 34 value: serde_json::Value, 35 }, 36 #[serde(rename = "com.atproto.repo.applyWrites#update")] 37 Update { 38 collection: String, 39 rkey: String, 40 value: serde_json::Value, 41 }, 42 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 43 Delete { collection: String, rkey: String }, 44} 45 46#[derive(Deserialize)] 47#[serde(rename_all = "camelCase")] 48pub struct ApplyWritesInput { 49 pub repo: String, 50 pub validate: Option<bool>, 51 pub writes: Vec<WriteOp>, 52 pub swap_commit: Option<String>, 53} 54 55#[derive(Serialize)] 56#[serde(tag = "$type")] 57pub enum WriteResult { 58 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 59 CreateResult { uri: String, cid: String }, 60 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 61 UpdateResult { uri: String, cid: String }, 62 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 63 DeleteResult {}, 64} 65 66#[derive(Serialize)] 67pub struct ApplyWritesOutput { 68 pub commit: CommitInfo, 69 pub results: Vec<WriteResult>, 70} 71 72#[derive(Serialize)] 73pub struct CommitInfo { 74 pub cid: String, 75 pub rev: String, 76} 77 78pub async fn apply_writes( 79 State(state): State<AppState>, 80 headers: axum::http::HeaderMap, 81 Json(input): Json<ApplyWritesInput>, 82) -> Response { 83 info!( 84 "apply_writes called: repo={}, writes={}", 85 input.repo, 86 input.writes.len() 87 ); 88 let token = match crate::auth::extract_bearer_token_from_header( 89 headers.get("Authorization").and_then(|h| h.to_str().ok()), 90 ) { 91 Some(t) => t, 92 None => { 93 return ( 94 StatusCode::UNAUTHORIZED, 95 Json(json!({"error": "AuthenticationRequired"})), 96 ) 97 .into_response(); 98 } 99 }; 100 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 101 Ok(user) => user, 102 Err(_) => { 103 return ( 104 StatusCode::UNAUTHORIZED, 105 Json(json!({"error": "AuthenticationFailed"})), 106 ) 107 .into_response(); 108 } 109 }; 110 let did = auth_user.did.clone(); 111 let is_oauth = auth_user.is_oauth; 112 let scope = auth_user.scope; 113 let controller_did = auth_user.controller_did.clone(); 114 if input.repo != did { 115 return ( 116 StatusCode::FORBIDDEN, 117 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 118 ) 119 .into_response(); 120 } 121 let is_verified = has_verified_comms_channel(&state.db, &did) 122 .await 123 .unwrap_or(false); 124 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did) 125 .await 126 .unwrap_or(false); 127 if !is_verified && !is_delegated { 128 return ( 129 StatusCode::FORBIDDEN, 130 Json(json!({ 131 "error": "AccountNotVerified", 132 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 133 })), 134 ) 135 .into_response(); 136 } 137 if input.writes.is_empty() { 138 return ( 139 StatusCode::BAD_REQUEST, 140 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})), 141 ) 142 .into_response(); 143 } 144 if input.writes.len() > MAX_BATCH_WRITES { 145 return ( 146 StatusCode::BAD_REQUEST, 147 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})), 148 ) 149 .into_response(); 150 } 151 152 let has_custom_scope = scope 153 .as_ref() 154 .map(|s| s != "com.atproto.access") 155 .unwrap_or(false); 156 if is_oauth || has_custom_scope { 157 use std::collections::HashSet; 158 let create_collections: HashSet<&str> = input 159 .writes 160 .iter() 161 .filter_map(|w| { 162 if let WriteOp::Create { collection, .. } = w { 163 Some(collection.as_str()) 164 } else { 165 None 166 } 167 }) 168 .collect(); 169 let update_collections: HashSet<&str> = input 170 .writes 171 .iter() 172 .filter_map(|w| { 173 if let WriteOp::Update { collection, .. } = w { 174 Some(collection.as_str()) 175 } else { 176 None 177 } 178 }) 179 .collect(); 180 let delete_collections: HashSet<&str> = input 181 .writes 182 .iter() 183 .filter_map(|w| { 184 if let WriteOp::Delete { collection, .. } = w { 185 Some(collection.as_str()) 186 } else { 187 None 188 } 189 }) 190 .collect(); 191 192 for collection in create_collections { 193 if let Err(e) = crate::auth::scope_check::check_repo_scope( 194 is_oauth, 195 scope.as_deref(), 196 crate::oauth::RepoAction::Create, 197 collection, 198 ) { 199 return e; 200 } 201 } 202 for collection in update_collections { 203 if let Err(e) = crate::auth::scope_check::check_repo_scope( 204 is_oauth, 205 scope.as_deref(), 206 crate::oauth::RepoAction::Update, 207 collection, 208 ) { 209 return e; 210 } 211 } 212 for collection in delete_collections { 213 if let Err(e) = crate::auth::scope_check::check_repo_scope( 214 is_oauth, 215 scope.as_deref(), 216 crate::oauth::RepoAction::Delete, 217 collection, 218 ) { 219 return e; 220 } 221 } 222 } 223 224 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 225 .fetch_optional(&state.db) 226 .await 227 { 228 Ok(Some(id)) => id, 229 _ => { 230 return ( 231 StatusCode::INTERNAL_SERVER_ERROR, 232 Json(json!({"error": "InternalError", "message": "User not found"})), 233 ) 234 .into_response(); 235 } 236 }; 237 let root_cid_str: String = match sqlx::query_scalar!( 238 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 239 user_id 240 ) 241 .fetch_optional(&state.db) 242 .await 243 { 244 Ok(Some(cid_str)) => cid_str, 245 _ => { 246 return ( 247 StatusCode::INTERNAL_SERVER_ERROR, 248 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 249 ) 250 .into_response(); 251 } 252 }; 253 let current_root_cid = match Cid::from_str(&root_cid_str) { 254 Ok(c) => c, 255 Err(_) => { 256 return ( 257 StatusCode::INTERNAL_SERVER_ERROR, 258 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 259 ) 260 .into_response(); 261 } 262 }; 263 if let Some(swap_commit) = &input.swap_commit 264 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 265 { 266 return ( 267 StatusCode::CONFLICT, 268 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 269 ) 270 .into_response(); 271 } 272 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 273 let commit_bytes = match tracking_store.get(&current_root_cid).await { 274 Ok(Some(b)) => b, 275 _ => { 276 return ( 277 StatusCode::INTERNAL_SERVER_ERROR, 278 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 279 ) 280 .into_response(); 281 } 282 }; 283 let commit = match Commit::from_cbor(&commit_bytes) { 284 Ok(c) => c, 285 _ => { 286 return ( 287 StatusCode::INTERNAL_SERVER_ERROR, 288 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 289 ) 290 .into_response(); 291 } 292 }; 293 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 294 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 295 let mut results: Vec<WriteResult> = Vec::new(); 296 let mut ops: Vec<RecordOp> = Vec::new(); 297 let mut modified_keys: Vec<String> = Vec::new(); 298 let mut all_blob_cids: Vec<String> = Vec::new(); 299 for write in &input.writes { 300 match write { 301 WriteOp::Create { 302 collection, 303 rkey, 304 value, 305 } => { 306 if input.validate.unwrap_or(true) 307 && let Err(err_response) = validate_record(value, collection) 308 { 309 return *err_response; 310 } 311 all_blob_cids.extend(extract_blob_cids(value)); 312 let rkey = rkey 313 .clone() 314 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 315 let mut record_bytes = Vec::new(); 316 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() { 317 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 318 } 319 let record_cid = match tracking_store.put(&record_bytes).await { 320 Ok(c) => c, 321 Err(_) => return ( 322 StatusCode::INTERNAL_SERVER_ERROR, 323 Json( 324 json!({"error": "InternalError", "message": "Failed to store record"}), 325 ), 326 ) 327 .into_response(), 328 }; 329 let collection_nsid = match collection.parse::<Nsid>() { 330 Ok(n) => n, 331 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 332 }; 333 let key = format!("{}/{}", collection_nsid, rkey); 334 modified_keys.push(key.clone()); 335 mst = match mst.add(&key, record_cid).await { 336 Ok(m) => m, 337 Err(_) => return ( 338 StatusCode::INTERNAL_SERVER_ERROR, 339 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 340 ) 341 .into_response(), 342 }; 343 let uri = format!("at://{}/{}/{}", did, collection, rkey); 344 results.push(WriteResult::CreateResult { 345 uri, 346 cid: record_cid.to_string(), 347 }); 348 ops.push(RecordOp::Create { 349 collection: collection.clone(), 350 rkey, 351 cid: record_cid, 352 }); 353 } 354 WriteOp::Update { 355 collection, 356 rkey, 357 value, 358 } => { 359 if input.validate.unwrap_or(true) 360 && let Err(err_response) = validate_record(value, collection) 361 { 362 return *err_response; 363 } 364 all_blob_cids.extend(extract_blob_cids(value)); 365 let mut record_bytes = Vec::new(); 366 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() { 367 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 368 } 369 let record_cid = match tracking_store.put(&record_bytes).await { 370 Ok(c) => c, 371 Err(_) => return ( 372 StatusCode::INTERNAL_SERVER_ERROR, 373 Json( 374 json!({"error": "InternalError", "message": "Failed to store record"}), 375 ), 376 ) 377 .into_response(), 378 }; 379 let collection_nsid = match collection.parse::<Nsid>() { 380 Ok(n) => n, 381 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 382 }; 383 let key = format!("{}/{}", collection_nsid, rkey); 384 modified_keys.push(key.clone()); 385 let prev_record_cid = mst.get(&key).await.ok().flatten(); 386 mst = match mst.update(&key, record_cid).await { 387 Ok(m) => m, 388 Err(_) => return ( 389 StatusCode::INTERNAL_SERVER_ERROR, 390 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 391 ) 392 .into_response(), 393 }; 394 let uri = format!("at://{}/{}/{}", did, collection, rkey); 395 results.push(WriteResult::UpdateResult { 396 uri, 397 cid: record_cid.to_string(), 398 }); 399 ops.push(RecordOp::Update { 400 collection: collection.clone(), 401 rkey: rkey.clone(), 402 cid: record_cid, 403 prev: prev_record_cid, 404 }); 405 } 406 WriteOp::Delete { collection, rkey } => { 407 let collection_nsid = match collection.parse::<Nsid>() { 408 Ok(n) => n, 409 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 410 }; 411 let key = format!("{}/{}", collection_nsid, rkey); 412 modified_keys.push(key.clone()); 413 let prev_record_cid = mst.get(&key).await.ok().flatten(); 414 mst = match mst.delete(&key).await { 415 Ok(m) => m, 416 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(), 417 }; 418 results.push(WriteResult::DeleteResult {}); 419 ops.push(RecordOp::Delete { 420 collection: collection.clone(), 421 rkey: rkey.clone(), 422 prev: prev_record_cid, 423 }); 424 } 425 } 426 } 427 let new_mst_root = match mst.persist().await { 428 Ok(c) => c, 429 Err(_) => { 430 return ( 431 StatusCode::INTERNAL_SERVER_ERROR, 432 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 433 ) 434 .into_response(); 435 } 436 }; 437 let mut relevant_blocks = std::collections::BTreeMap::new(); 438 for key in &modified_keys { 439 if mst 440 .blocks_for_path(key, &mut relevant_blocks) 441 .await 442 .is_err() 443 { 444 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 445 } 446 if original_mst 447 .blocks_for_path(key, &mut relevant_blocks) 448 .await 449 .is_err() 450 { 451 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 452 } 453 } 454 let mut written_cids = tracking_store.get_all_relevant_cids(); 455 for cid in relevant_blocks.keys() { 456 if !written_cids.contains(cid) { 457 written_cids.push(*cid); 458 } 459 } 460 let written_cids_str = written_cids 461 .iter() 462 .map(|c| c.to_string()) 463 .collect::<Vec<_>>(); 464 let commit_res = match commit_and_log( 465 &state, 466 CommitParams { 467 did: &did, 468 user_id, 469 current_root_cid: Some(current_root_cid), 470 prev_data_cid: Some(commit.data), 471 new_mst_root, 472 ops, 473 blocks_cids: &written_cids_str, 474 blobs: &all_blob_cids, 475 }, 476 ) 477 .await 478 { 479 Ok(res) => res, 480 Err(e) => { 481 error!("Commit failed: {}", e); 482 return ( 483 StatusCode::INTERNAL_SERVER_ERROR, 484 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})), 485 ) 486 .into_response(); 487 } 488 }; 489 490 if let Some(ref controller) = controller_did { 491 let write_summary: Vec<serde_json::Value> = input 492 .writes 493 .iter() 494 .map(|w| match w { 495 WriteOp::Create { 496 collection, rkey, .. 497 } => json!({ 498 "action": "create", 499 "collection": collection, 500 "rkey": rkey 501 }), 502 WriteOp::Update { 503 collection, rkey, .. 504 } => json!({ 505 "action": "update", 506 "collection": collection, 507 "rkey": rkey 508 }), 509 WriteOp::Delete { collection, rkey } => json!({ 510 "action": "delete", 511 "collection": collection, 512 "rkey": rkey 513 }), 514 }) 515 .collect(); 516 517 let _ = delegation::log_delegation_action( 518 &state.db, 519 &did, 520 controller, 521 Some(controller), 522 DelegationActionType::RepoWrite, 523 Some(json!({ 524 "action": "apply_writes", 525 "count": input.writes.len(), 526 "writes": write_summary 527 })), 528 None, 529 None, 530 ) 531 .await; 532 } 533 534 ( 535 StatusCode::OK, 536 Json(ApplyWritesOutput { 537 commit: CommitInfo { 538 cid: commit_res.commit_cid.to_string(), 539 rev: commit_res.rev, 540 }, 541 results, 542 }), 543 ) 544 .into_response() 545}