this repo has no description
1use super::validation::validate_record; 2use super::write::has_verified_comms_channel; 3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log}; 4use crate::delegation::{self, DelegationActionType}; 5use crate::repo::tracking::TrackingBlockStore; 6use crate::state::AppState; 7use axum::{ 8 Json, 9 extract::State, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use cid::Cid; 14use jacquard::types::{ 15 integer::LimitedU32, 16 string::{Nsid, Tid}, 17}; 18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 19use serde::{Deserialize, Serialize}; 20use serde_json::json; 21use std::str::FromStr; 22use std::sync::Arc; 23use tracing::{error, info}; 24 25const MAX_BATCH_WRITES: usize = 200; 26 27#[derive(Deserialize)] 28#[serde(tag = "$type")] 29pub enum WriteOp { 30 #[serde(rename = "com.atproto.repo.applyWrites#create")] 31 Create { 32 collection: String, 33 rkey: Option<String>, 34 value: serde_json::Value, 35 }, 36 #[serde(rename = "com.atproto.repo.applyWrites#update")] 37 Update { 38 collection: String, 39 rkey: String, 40 value: serde_json::Value, 41 }, 42 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 43 Delete { collection: String, rkey: String }, 44} 45 46#[derive(Deserialize)] 47#[serde(rename_all = "camelCase")] 48pub struct ApplyWritesInput { 49 pub repo: String, 50 pub validate: Option<bool>, 51 pub writes: Vec<WriteOp>, 52 pub swap_commit: Option<String>, 53} 54 55#[derive(Serialize)] 56#[serde(tag = "$type")] 57pub enum WriteResult { 58 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 59 CreateResult { uri: String, cid: String }, 60 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 61 UpdateResult { uri: String, cid: String }, 62 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 63 DeleteResult {}, 64} 65 66#[derive(Serialize)] 67pub struct ApplyWritesOutput { 68 pub commit: CommitInfo, 69 pub results: Vec<WriteResult>, 70} 71 72#[derive(Serialize)] 73pub struct CommitInfo { 74 pub cid: String, 75 pub rev: String, 76} 77 78pub async fn apply_writes( 79 State(state): State<AppState>, 80 headers: axum::http::HeaderMap, 81 Json(input): Json<ApplyWritesInput>, 82) -> Response { 83 info!( 84 "apply_writes called: repo={}, writes={}", 85 input.repo, 86 input.writes.len() 87 ); 88 let token = match crate::auth::extract_bearer_token_from_header( 89 headers.get("Authorization").and_then(|h| h.to_str().ok()), 90 ) { 91 Some(t) => t, 92 None => { 93 return ( 94 StatusCode::UNAUTHORIZED, 95 Json(json!({"error": "AuthenticationRequired"})), 96 ) 97 .into_response(); 98 } 99 }; 100 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 101 Ok(user) => user, 102 Err(_) => { 103 return ( 104 StatusCode::UNAUTHORIZED, 105 Json(json!({"error": "AuthenticationFailed"})), 106 ) 107 .into_response(); 108 } 109 }; 110 let did = auth_user.did.clone(); 111 let is_oauth = auth_user.is_oauth; 112 let scope = auth_user.scope; 113 let controller_did = auth_user.controller_did.clone(); 114 if input.repo != did { 115 return ( 116 StatusCode::FORBIDDEN, 117 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 118 ) 119 .into_response(); 120 } 121 let is_verified = has_verified_comms_channel(&state.db, &did) 122 .await 123 .unwrap_or(false); 124 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did) 125 .await 126 .unwrap_or(false); 127 if !is_verified && !is_delegated { 128 return ( 129 StatusCode::FORBIDDEN, 130 Json(json!({ 131 "error": "AccountNotVerified", 132 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 133 })), 134 ) 135 .into_response(); 136 } 137 if input.writes.is_empty() { 138 return ( 139 StatusCode::BAD_REQUEST, 140 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})), 141 ) 142 .into_response(); 143 } 144 if input.writes.len() > MAX_BATCH_WRITES { 145 return ( 146 StatusCode::BAD_REQUEST, 147 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})), 148 ) 149 .into_response(); 150 } 151 152 let has_custom_scope = scope 153 .as_ref() 154 .map(|s| s != "com.atproto.access") 155 .unwrap_or(false); 156 if is_oauth || has_custom_scope { 157 use std::collections::HashSet; 158 let create_collections: HashSet<&str> = input 159 .writes 160 .iter() 161 .filter_map(|w| { 162 if let WriteOp::Create { collection, .. } = w { 163 Some(collection.as_str()) 164 } else { 165 None 166 } 167 }) 168 .collect(); 169 let update_collections: HashSet<&str> = input 170 .writes 171 .iter() 172 .filter_map(|w| { 173 if let WriteOp::Update { collection, .. } = w { 174 Some(collection.as_str()) 175 } else { 176 None 177 } 178 }) 179 .collect(); 180 let delete_collections: HashSet<&str> = input 181 .writes 182 .iter() 183 .filter_map(|w| { 184 if let WriteOp::Delete { collection, .. } = w { 185 Some(collection.as_str()) 186 } else { 187 None 188 } 189 }) 190 .collect(); 191 192 for collection in create_collections { 193 if let Err(e) = crate::auth::scope_check::check_repo_scope( 194 is_oauth, 195 scope.as_deref(), 196 crate::oauth::RepoAction::Create, 197 collection, 198 ) { 199 return e; 200 } 201 } 202 for collection in update_collections { 203 if let Err(e) = crate::auth::scope_check::check_repo_scope( 204 is_oauth, 205 scope.as_deref(), 206 crate::oauth::RepoAction::Update, 207 collection, 208 ) { 209 return e; 210 } 211 } 212 for collection in delete_collections { 213 if let Err(e) = crate::auth::scope_check::check_repo_scope( 214 is_oauth, 215 scope.as_deref(), 216 crate::oauth::RepoAction::Delete, 217 collection, 218 ) { 219 return e; 220 } 221 } 222 } 223 224 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 225 .fetch_optional(&state.db) 226 .await 227 { 228 Ok(Some(id)) => id, 229 _ => { 230 return ( 231 StatusCode::INTERNAL_SERVER_ERROR, 232 Json(json!({"error": "InternalError", "message": "User not found"})), 233 ) 234 .into_response(); 235 } 236 }; 237 let root_cid_str: String = match sqlx::query_scalar!( 238 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 239 user_id 240 ) 241 .fetch_optional(&state.db) 242 .await 243 { 244 Ok(Some(cid_str)) => cid_str, 245 _ => { 246 return ( 247 StatusCode::INTERNAL_SERVER_ERROR, 248 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 249 ) 250 .into_response(); 251 } 252 }; 253 let current_root_cid = match Cid::from_str(&root_cid_str) { 254 Ok(c) => c, 255 Err(_) => { 256 return ( 257 StatusCode::INTERNAL_SERVER_ERROR, 258 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 259 ) 260 .into_response(); 261 } 262 }; 263 if let Some(swap_commit) = &input.swap_commit 264 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 265 { 266 return ( 267 StatusCode::CONFLICT, 268 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 269 ) 270 .into_response(); 271 } 272 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 273 let commit_bytes = match tracking_store.get(&current_root_cid).await { 274 Ok(Some(b)) => b, 275 _ => { 276 return ( 277 StatusCode::INTERNAL_SERVER_ERROR, 278 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 279 ) 280 .into_response(); 281 } 282 }; 283 let commit = match Commit::from_cbor(&commit_bytes) { 284 Ok(c) => c, 285 _ => { 286 return ( 287 StatusCode::INTERNAL_SERVER_ERROR, 288 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 289 ) 290 .into_response(); 291 } 292 }; 293 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 294 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 295 let mut results: Vec<WriteResult> = Vec::new(); 296 let mut ops: Vec<RecordOp> = Vec::new(); 297 let mut modified_keys: Vec<String> = Vec::new(); 298 for write in &input.writes { 299 match write { 300 WriteOp::Create { 301 collection, 302 rkey, 303 value, 304 } => { 305 if input.validate.unwrap_or(true) 306 && let Err(err_response) = validate_record(value, collection) 307 { 308 return *err_response; 309 } 310 let rkey = rkey 311 .clone() 312 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 313 let mut record_bytes = Vec::new(); 314 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() { 315 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 316 } 317 let record_cid = match tracking_store.put(&record_bytes).await { 318 Ok(c) => c, 319 Err(_) => return ( 320 StatusCode::INTERNAL_SERVER_ERROR, 321 Json( 322 json!({"error": "InternalError", "message": "Failed to store record"}), 323 ), 324 ) 325 .into_response(), 326 }; 327 let collection_nsid = match collection.parse::<Nsid>() { 328 Ok(n) => n, 329 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 330 }; 331 let key = format!("{}/{}", collection_nsid, rkey); 332 modified_keys.push(key.clone()); 333 mst = match mst.add(&key, record_cid).await { 334 Ok(m) => m, 335 Err(_) => return ( 336 StatusCode::INTERNAL_SERVER_ERROR, 337 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 338 ) 339 .into_response(), 340 }; 341 let uri = format!("at://{}/{}/{}", did, collection, rkey); 342 results.push(WriteResult::CreateResult { 343 uri, 344 cid: record_cid.to_string(), 345 }); 346 ops.push(RecordOp::Create { 347 collection: collection.clone(), 348 rkey, 349 cid: record_cid, 350 }); 351 } 352 WriteOp::Update { 353 collection, 354 rkey, 355 value, 356 } => { 357 if input.validate.unwrap_or(true) 358 && let Err(err_response) = validate_record(value, collection) 359 { 360 return *err_response; 361 } 362 let mut record_bytes = Vec::new(); 363 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() { 364 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 365 } 366 let record_cid = match tracking_store.put(&record_bytes).await { 367 Ok(c) => c, 368 Err(_) => return ( 369 StatusCode::INTERNAL_SERVER_ERROR, 370 Json( 371 json!({"error": "InternalError", "message": "Failed to store record"}), 372 ), 373 ) 374 .into_response(), 375 }; 376 let collection_nsid = match collection.parse::<Nsid>() { 377 Ok(n) => n, 378 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 379 }; 380 let key = format!("{}/{}", collection_nsid, rkey); 381 modified_keys.push(key.clone()); 382 let prev_record_cid = mst.get(&key).await.ok().flatten(); 383 mst = match mst.update(&key, record_cid).await { 384 Ok(m) => m, 385 Err(_) => return ( 386 StatusCode::INTERNAL_SERVER_ERROR, 387 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 388 ) 389 .into_response(), 390 }; 391 let uri = format!("at://{}/{}/{}", did, collection, rkey); 392 results.push(WriteResult::UpdateResult { 393 uri, 394 cid: record_cid.to_string(), 395 }); 396 ops.push(RecordOp::Update { 397 collection: collection.clone(), 398 rkey: rkey.clone(), 399 cid: record_cid, 400 prev: prev_record_cid, 401 }); 402 } 403 WriteOp::Delete { collection, rkey } => { 404 let collection_nsid = match collection.parse::<Nsid>() { 405 Ok(n) => n, 406 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 407 }; 408 let key = format!("{}/{}", collection_nsid, rkey); 409 modified_keys.push(key.clone()); 410 let prev_record_cid = mst.get(&key).await.ok().flatten(); 411 mst = match mst.delete(&key).await { 412 Ok(m) => m, 413 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(), 414 }; 415 results.push(WriteResult::DeleteResult {}); 416 ops.push(RecordOp::Delete { 417 collection: collection.clone(), 418 rkey: rkey.clone(), 419 prev: prev_record_cid, 420 }); 421 } 422 } 423 } 424 let new_mst_root = match mst.persist().await { 425 Ok(c) => c, 426 Err(_) => { 427 return ( 428 StatusCode::INTERNAL_SERVER_ERROR, 429 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 430 ) 431 .into_response(); 432 } 433 }; 434 let mut relevant_blocks = std::collections::BTreeMap::new(); 435 for key in &modified_keys { 436 if mst 437 .blocks_for_path(key, &mut relevant_blocks) 438 .await 439 .is_err() 440 { 441 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 442 } 443 if original_mst 444 .blocks_for_path(key, &mut relevant_blocks) 445 .await 446 .is_err() 447 { 448 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 449 } 450 } 451 let mut written_cids = tracking_store.get_all_relevant_cids(); 452 for cid in relevant_blocks.keys() { 453 if !written_cids.contains(cid) { 454 written_cids.push(*cid); 455 } 456 } 457 let written_cids_str = written_cids 458 .iter() 459 .map(|c| c.to_string()) 460 .collect::<Vec<_>>(); 461 let commit_res = match commit_and_log( 462 &state, 463 CommitParams { 464 did: &did, 465 user_id, 466 current_root_cid: Some(current_root_cid), 467 prev_data_cid: Some(commit.data), 468 new_mst_root, 469 ops, 470 blocks_cids: &written_cids_str, 471 }, 472 ) 473 .await 474 { 475 Ok(res) => res, 476 Err(e) => { 477 error!("Commit failed: {}", e); 478 return ( 479 StatusCode::INTERNAL_SERVER_ERROR, 480 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})), 481 ) 482 .into_response(); 483 } 484 }; 485 486 if let Some(ref controller) = controller_did { 487 let write_summary: Vec<serde_json::Value> = input 488 .writes 489 .iter() 490 .map(|w| match w { 491 WriteOp::Create { 492 collection, rkey, .. 493 } => json!({ 494 "action": "create", 495 "collection": collection, 496 "rkey": rkey 497 }), 498 WriteOp::Update { 499 collection, rkey, .. 500 } => json!({ 501 "action": "update", 502 "collection": collection, 503 "rkey": rkey 504 }), 505 WriteOp::Delete { collection, rkey } => json!({ 506 "action": "delete", 507 "collection": collection, 508 "rkey": rkey 509 }), 510 }) 511 .collect(); 512 513 let _ = delegation::log_delegation_action( 514 &state.db, 515 &did, 516 controller, 517 Some(controller), 518 DelegationActionType::RepoWrite, 519 Some(json!({ 520 "action": "apply_writes", 521 "count": input.writes.len(), 522 "writes": write_summary 523 })), 524 None, 525 None, 526 ) 527 .await; 528 } 529 530 ( 531 StatusCode::OK, 532 Json(ApplyWritesOutput { 533 commit: CommitInfo { 534 cid: commit_res.commit_cid.to_string(), 535 rev: commit_res.rev, 536 }, 537 results, 538 }), 539 ) 540 .into_response() 541}