this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::State, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use chrono::Utc; 9use cid::Cid; 10use jacquard::types::{ 11 did::Did, 12 integer::LimitedU32, 13 string::{Nsid, Tid}, 14}; 15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use sqlx::Row; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::error; 22 23#[derive(Deserialize)] 24#[serde(tag = "$type")] 25pub enum WriteOp { 26 #[serde(rename = "com.atproto.repo.applyWrites#create")] 27 Create { 28 collection: String, 29 rkey: Option<String>, 30 value: serde_json::Value, 31 }, 32 #[serde(rename = "com.atproto.repo.applyWrites#update")] 33 Update { 34 collection: String, 35 rkey: String, 36 value: serde_json::Value, 37 }, 38 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 39 Delete { collection: String, rkey: String }, 40} 41 42#[derive(Deserialize)] 43#[serde(rename_all = "camelCase")] 44pub struct ApplyWritesInput { 45 pub repo: String, 46 pub validate: Option<bool>, 47 pub writes: Vec<WriteOp>, 48 pub swap_commit: Option<String>, 49} 50 51#[derive(Serialize)] 52#[serde(tag = "$type")] 53pub enum WriteResult { 54 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 55 CreateResult { uri: String, cid: String }, 56 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 57 UpdateResult { uri: String, cid: String }, 58 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 59 DeleteResult {}, 60} 61 62#[derive(Serialize)] 63pub struct ApplyWritesOutput { 64 pub commit: CommitInfo, 65 pub results: Vec<WriteResult>, 66} 67 68#[derive(Serialize)] 69pub struct CommitInfo { 70 pub cid: String, 71 pub rev: String, 72} 73 74pub async fn apply_writes( 75 State(state): State<AppState>, 76 headers: axum::http::HeaderMap, 77 Json(input): Json<ApplyWritesInput>, 78) -> Response { 79 let auth_header = headers.get("Authorization"); 80 if auth_header.is_none() { 81 return ( 82 StatusCode::UNAUTHORIZED, 83 Json(json!({"error": "AuthenticationRequired"})), 84 ) 85 .into_response(); 86 } 87 let token = auth_header 88 .unwrap() 89 .to_str() 90 .unwrap_or("") 91 .replace("Bearer ", ""); 92 93 let session = sqlx::query( 94 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 95 ) 96 .bind(&token) 97 .fetch_optional(&state.db) 98 .await 99 .unwrap_or(None); 100 101 let (did, key_bytes) = match session { 102 Some(row) => ( 103 row.get::<String, _>("did"), 104 row.get::<Vec<u8>, _>("key_bytes"), 105 ), 106 None => { 107 return ( 108 StatusCode::UNAUTHORIZED, 109 Json(json!({"error": "AuthenticationFailed"})), 110 ) 111 .into_response(); 112 } 113 }; 114 115 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 116 return ( 117 StatusCode::UNAUTHORIZED, 118 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 119 ) 120 .into_response(); 121 } 122 123 if input.repo != did { 124 return ( 125 StatusCode::FORBIDDEN, 126 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 127 ) 128 .into_response(); 129 } 130 131 if input.writes.is_empty() { 132 return ( 133 StatusCode::BAD_REQUEST, 134 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})), 135 ) 136 .into_response(); 137 } 138 139 if input.writes.len() > 200 { 140 return ( 141 StatusCode::BAD_REQUEST, 142 Json(json!({"error": "InvalidRequest", "message": "Too many writes (max 200)"})), 143 ) 144 .into_response(); 145 } 146 147 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 148 .bind(&did) 149 .fetch_optional(&state.db) 150 .await; 151 152 let user_id: uuid::Uuid = match user_query { 153 Ok(Some(row)) => row.get("id"), 154 _ => { 155 return ( 156 StatusCode::INTERNAL_SERVER_ERROR, 157 Json(json!({"error": "InternalError", "message": "User not found"})), 158 ) 159 .into_response(); 160 } 161 }; 162 163 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 164 .bind(user_id) 165 .fetch_optional(&state.db) 166 .await; 167 168 let current_root_cid = match repo_root_query { 169 Ok(Some(row)) => { 170 let cid_str: String = row.get("repo_root_cid"); 171 match Cid::from_str(&cid_str) { 172 Ok(c) => c, 173 Err(_) => { 174 return ( 175 StatusCode::INTERNAL_SERVER_ERROR, 176 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 177 ) 178 .into_response(); 179 } 180 } 181 } 182 _ => { 183 return ( 184 StatusCode::INTERNAL_SERVER_ERROR, 185 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 186 ) 187 .into_response(); 188 } 189 }; 190 191 if let Some(swap_commit) = &input.swap_commit { 192 let swap_cid = match Cid::from_str(swap_commit) { 193 Ok(c) => c, 194 Err(_) => { 195 return ( 196 StatusCode::BAD_REQUEST, 197 Json(json!({"error": "InvalidSwap", "message": "Invalid swapCommit CID"})), 198 ) 199 .into_response(); 200 } 201 }; 202 if swap_cid != current_root_cid { 203 return ( 204 StatusCode::CONFLICT, 205 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 206 ) 207 .into_response(); 208 } 209 } 210 211 let commit_bytes = match state.block_store.get(&current_root_cid).await { 212 Ok(Some(b)) => b, 213 Ok(None) => { 214 return ( 215 StatusCode::INTERNAL_SERVER_ERROR, 216 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 217 ) 218 .into_response(); 219 } 220 Err(e) => { 221 error!("Failed to load commit block: {:?}", e); 222 return ( 223 StatusCode::INTERNAL_SERVER_ERROR, 224 Json(json!({"error": "InternalError"})), 225 ) 226 .into_response(); 227 } 228 }; 229 230 let commit = match Commit::from_cbor(&commit_bytes) { 231 Ok(c) => c, 232 Err(e) => { 233 error!("Failed to parse commit: {:?}", e); 234 return ( 235 StatusCode::INTERNAL_SERVER_ERROR, 236 Json(json!({"error": "InternalError"})), 237 ) 238 .into_response(); 239 } 240 }; 241 242 let mst_root = commit.data; 243 let store = Arc::new(state.block_store.clone()); 244 let mut mst = Mst::load(store.clone(), mst_root, None); 245 246 let mut results: Vec<WriteResult> = Vec::new(); 247 let mut record_ops: Vec<(String, String, Option<String>)> = Vec::new(); 248 249 for write in &input.writes { 250 match write { 251 WriteOp::Create { 252 collection, 253 rkey, 254 value, 255 } => { 256 let collection_nsid = match collection.parse::<Nsid>() { 257 Ok(n) => n, 258 Err(_) => { 259 return ( 260 StatusCode::BAD_REQUEST, 261 Json(json!({"error": "InvalidCollection"})), 262 ) 263 .into_response(); 264 } 265 }; 266 267 let rkey = rkey 268 .clone() 269 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 270 271 let mut record_bytes = Vec::new(); 272 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, value) { 273 error!("Error serializing record: {:?}", e); 274 return ( 275 StatusCode::BAD_REQUEST, 276 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 277 ) 278 .into_response(); 279 } 280 281 let record_cid = match state.block_store.put(&record_bytes).await { 282 Ok(c) => c, 283 Err(e) => { 284 error!("Failed to save record block: {:?}", e); 285 return ( 286 StatusCode::INTERNAL_SERVER_ERROR, 287 Json(json!({"error": "InternalError"})), 288 ) 289 .into_response(); 290 } 291 }; 292 293 let key = format!("{}/{}", collection_nsid, rkey); 294 mst = match mst.add(&key, record_cid).await { 295 Ok(m) => m, 296 Err(e) => { 297 error!("Failed to add to MST: {:?}", e); 298 return ( 299 StatusCode::INTERNAL_SERVER_ERROR, 300 Json(json!({"error": "InternalError"})), 301 ) 302 .into_response(); 303 } 304 }; 305 306 let uri = format!("at://{}/{}/{}", did, collection, rkey); 307 results.push(WriteResult::CreateResult { 308 uri: uri.clone(), 309 cid: record_cid.to_string(), 310 }); 311 record_ops.push((collection.clone(), rkey, Some(record_cid.to_string()))); 312 } 313 WriteOp::Update { 314 collection, 315 rkey, 316 value, 317 } => { 318 let collection_nsid = match collection.parse::<Nsid>() { 319 Ok(n) => n, 320 Err(_) => { 321 return ( 322 StatusCode::BAD_REQUEST, 323 Json(json!({"error": "InvalidCollection"})), 324 ) 325 .into_response(); 326 } 327 }; 328 329 let mut record_bytes = Vec::new(); 330 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, value) { 331 error!("Error serializing record: {:?}", e); 332 return ( 333 StatusCode::BAD_REQUEST, 334 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 335 ) 336 .into_response(); 337 } 338 339 let record_cid = match state.block_store.put(&record_bytes).await { 340 Ok(c) => c, 341 Err(e) => { 342 error!("Failed to save record block: {:?}", e); 343 return ( 344 StatusCode::INTERNAL_SERVER_ERROR, 345 Json(json!({"error": "InternalError"})), 346 ) 347 .into_response(); 348 } 349 }; 350 351 let key = format!("{}/{}", collection_nsid, rkey); 352 mst = match mst.update(&key, record_cid).await { 353 Ok(m) => m, 354 Err(e) => { 355 error!("Failed to update MST: {:?}", e); 356 return ( 357 StatusCode::INTERNAL_SERVER_ERROR, 358 Json(json!({"error": "InternalError"})), 359 ) 360 .into_response(); 361 } 362 }; 363 364 let uri = format!("at://{}/{}/{}", did, collection, rkey); 365 results.push(WriteResult::UpdateResult { 366 uri: uri.clone(), 367 cid: record_cid.to_string(), 368 }); 369 record_ops.push((collection.clone(), rkey.clone(), Some(record_cid.to_string()))); 370 } 371 WriteOp::Delete { collection, rkey } => { 372 let collection_nsid = match collection.parse::<Nsid>() { 373 Ok(n) => n, 374 Err(_) => { 375 return ( 376 StatusCode::BAD_REQUEST, 377 Json(json!({"error": "InvalidCollection"})), 378 ) 379 .into_response(); 380 } 381 }; 382 383 let key = format!("{}/{}", collection_nsid, rkey); 384 mst = match mst.delete(&key).await { 385 Ok(m) => m, 386 Err(e) => { 387 error!("Failed to delete from MST: {:?}", e); 388 return ( 389 StatusCode::INTERNAL_SERVER_ERROR, 390 Json(json!({"error": "InternalError"})), 391 ) 392 .into_response(); 393 } 394 }; 395 396 results.push(WriteResult::DeleteResult {}); 397 record_ops.push((collection.clone(), rkey.clone(), None)); 398 } 399 } 400 } 401 402 let new_mst_root = match mst.persist().await { 403 Ok(c) => c, 404 Err(e) => { 405 error!("Failed to persist MST: {:?}", e); 406 return ( 407 StatusCode::INTERNAL_SERVER_ERROR, 408 Json(json!({"error": "InternalError"})), 409 ) 410 .into_response(); 411 } 412 }; 413 414 let did_obj = match Did::new(&did) { 415 Ok(d) => d, 416 Err(_) => { 417 return ( 418 StatusCode::INTERNAL_SERVER_ERROR, 419 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 420 ) 421 .into_response(); 422 } 423 }; 424 425 let rev = Tid::now(LimitedU32::MIN); 426 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), Some(current_root_cid)); 427 428 let new_commit_bytes = match new_commit.to_cbor() { 429 Ok(b) => b, 430 Err(e) => { 431 error!("Failed to serialize new commit: {:?}", e); 432 return ( 433 StatusCode::INTERNAL_SERVER_ERROR, 434 Json(json!({"error": "InternalError"})), 435 ) 436 .into_response(); 437 } 438 }; 439 440 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 441 Ok(c) => c, 442 Err(e) => { 443 error!("Failed to save new commit: {:?}", e); 444 return ( 445 StatusCode::INTERNAL_SERVER_ERROR, 446 Json(json!({"error": "InternalError"})), 447 ) 448 .into_response(); 449 } 450 }; 451 452 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 453 .bind(new_root_cid.to_string()) 454 .bind(user_id) 455 .execute(&state.db) 456 .await; 457 458 if let Err(e) = update_repo { 459 error!("Failed to update repo root in DB: {:?}", e); 460 return ( 461 StatusCode::INTERNAL_SERVER_ERROR, 462 Json(json!({"error": "InternalError"})), 463 ) 464 .into_response(); 465 } 466 467 for (collection, rkey, record_cid) in record_ops { 468 match record_cid { 469 Some(cid) => { 470 let _ = sqlx::query( 471 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 472 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 473 ) 474 .bind(user_id) 475 .bind(&collection) 476 .bind(&rkey) 477 .bind(&cid) 478 .execute(&state.db) 479 .await; 480 } 481 None => { 482 let _ = sqlx::query( 483 "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 484 ) 485 .bind(user_id) 486 .bind(&collection) 487 .bind(&rkey) 488 .execute(&state.db) 489 .await; 490 } 491 } 492 } 493 494 ( 495 StatusCode::OK, 496 Json(ApplyWritesOutput { 497 commit: CommitInfo { 498 cid: new_root_cid.to_string(), 499 rev: rev.to_string(), 500 }, 501 results, 502 }), 503 ) 504 .into_response() 505}