this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::State, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use chrono::Utc; 9use cid::Cid; 10use jacquard::types::{ 11 did::Did, 12 integer::LimitedU32, 13 string::{Nsid, Tid}, 14}; 15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::str::FromStr; 19use std::sync::Arc; 20use tracing::error; 21 22#[derive(Deserialize)] 23#[serde(tag = "$type")] 24pub enum WriteOp { 25 #[serde(rename = "com.atproto.repo.applyWrites#create")] 26 Create { 27 collection: String, 28 rkey: Option<String>, 29 value: serde_json::Value, 30 }, 31 #[serde(rename = "com.atproto.repo.applyWrites#update")] 32 Update { 33 collection: String, 34 rkey: String, 35 value: serde_json::Value, 36 }, 37 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 38 Delete { collection: String, rkey: String }, 39} 40 41#[derive(Deserialize)] 42#[serde(rename_all = "camelCase")] 43pub struct ApplyWritesInput { 44 pub repo: String, 45 pub validate: Option<bool>, 46 pub writes: Vec<WriteOp>, 47 pub swap_commit: Option<String>, 48} 49 50#[derive(Serialize)] 51#[serde(tag = "$type")] 52pub enum WriteResult { 53 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 54 CreateResult { uri: String, cid: String }, 55 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 56 UpdateResult { uri: String, cid: String }, 57 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 58 DeleteResult {}, 59} 60 61#[derive(Serialize)] 62pub struct ApplyWritesOutput { 63 pub commit: CommitInfo, 64 pub results: Vec<WriteResult>, 65} 66 67#[derive(Serialize)] 68pub struct CommitInfo { 69 pub cid: String, 70 pub rev: String, 71} 72 73pub async fn apply_writes( 74 State(state): State<AppState>, 75 headers: axum::http::HeaderMap, 76 Json(input): Json<ApplyWritesInput>, 77) -> Response { 78 let auth_header = headers.get("Authorization"); 79 if auth_header.is_none() { 80 return ( 81 StatusCode::UNAUTHORIZED, 82 Json(json!({"error": "AuthenticationRequired"})), 83 ) 84 .into_response(); 85 } 86 let token = auth_header 87 .unwrap() 88 .to_str() 89 .unwrap_or("") 90 .replace("Bearer ", ""); 91 92 let session = sqlx::query!( 93 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", 94 token 95 ) 96 .fetch_optional(&state.db) 97 .await 98 .unwrap_or(None); 99 100 let (did, key_bytes) = match session { 101 Some(row) => ( 102 row.did, 103 row.key_bytes, 104 ), 105 None => { 106 return ( 107 StatusCode::UNAUTHORIZED, 108 Json(json!({"error": "AuthenticationFailed"})), 109 ) 110 .into_response(); 111 } 112 }; 113 114 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 115 return ( 116 StatusCode::UNAUTHORIZED, 117 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 118 ) 119 .into_response(); 120 } 121 122 if input.repo != did { 123 return ( 124 StatusCode::FORBIDDEN, 125 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 126 ) 127 .into_response(); 128 } 129 130 if input.writes.is_empty() { 131 return ( 132 StatusCode::BAD_REQUEST, 133 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})), 134 ) 135 .into_response(); 136 } 137 138 if input.writes.len() > 200 { 139 return ( 140 StatusCode::BAD_REQUEST, 141 Json(json!({"error": "InvalidRequest", "message": "Too many writes (max 200)"})), 142 ) 143 .into_response(); 144 } 145 146 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 147 .fetch_optional(&state.db) 148 .await; 149 150 let user_id: uuid::Uuid = match user_query { 151 Ok(Some(row)) => row.id, 152 _ => { 153 return ( 154 StatusCode::INTERNAL_SERVER_ERROR, 155 Json(json!({"error": "InternalError", "message": "User not found"})), 156 ) 157 .into_response(); 158 } 159 }; 160 161 let repo_root_query = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 162 .fetch_optional(&state.db) 163 .await; 164 165 let current_root_cid = match repo_root_query { 166 Ok(Some(row)) => { 167 let cid_str: String = row.repo_root_cid; 168 match Cid::from_str(&cid_str) { 169 Ok(c) => c, 170 Err(_) => { 171 return ( 172 StatusCode::INTERNAL_SERVER_ERROR, 173 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 174 ) 175 .into_response(); 176 } 177 } 178 } 179 _ => { 180 return ( 181 StatusCode::INTERNAL_SERVER_ERROR, 182 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 183 ) 184 .into_response(); 185 } 186 }; 187 188 if let Some(swap_commit) = &input.swap_commit { 189 let swap_cid = match Cid::from_str(swap_commit) { 190 Ok(c) => c, 191 Err(_) => { 192 return ( 193 StatusCode::BAD_REQUEST, 194 Json(json!({"error": "InvalidSwap", "message": "Invalid swapCommit CID"})), 195 ) 196 .into_response(); 197 } 198 }; 199 if swap_cid != current_root_cid { 200 return ( 201 StatusCode::CONFLICT, 202 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 203 ) 204 .into_response(); 205 } 206 } 207 208 let commit_bytes = match state.block_store.get(&current_root_cid).await { 209 Ok(Some(b)) => b, 210 Ok(None) => { 211 return ( 212 StatusCode::INTERNAL_SERVER_ERROR, 213 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 214 ) 215 .into_response(); 216 } 217 Err(e) => { 218 error!("Failed to load commit block: {:?}", e); 219 return ( 220 StatusCode::INTERNAL_SERVER_ERROR, 221 Json(json!({"error": "InternalError"})), 222 ) 223 .into_response(); 224 } 225 }; 226 227 let commit = match Commit::from_cbor(&commit_bytes) { 228 Ok(c) => c, 229 Err(e) => { 230 error!("Failed to parse commit: {:?}", e); 231 return ( 232 StatusCode::INTERNAL_SERVER_ERROR, 233 Json(json!({"error": "InternalError"})), 234 ) 235 .into_response(); 236 } 237 }; 238 239 let mst_root = commit.data; 240 let store = Arc::new(state.block_store.clone()); 241 let mut mst = Mst::load(store.clone(), mst_root, None); 242 243 let mut results: Vec<WriteResult> = Vec::new(); 244 let mut record_ops: Vec<(String, String, Option<String>)> = Vec::new(); 245 246 for write in &input.writes { 247 match write { 248 WriteOp::Create { 249 collection, 250 rkey, 251 value, 252 } => { 253 let collection_nsid = match collection.parse::<Nsid>() { 254 Ok(n) => n, 255 Err(_) => { 256 return ( 257 StatusCode::BAD_REQUEST, 258 Json(json!({"error": "InvalidCollection"})), 259 ) 260 .into_response(); 261 } 262 }; 263 264 let rkey = rkey 265 .clone() 266 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 267 268 let mut record_bytes = Vec::new(); 269 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, value) { 270 error!("Error serializing record: {:?}", e); 271 return ( 272 StatusCode::BAD_REQUEST, 273 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 274 ) 275 .into_response(); 276 } 277 278 let record_cid = match state.block_store.put(&record_bytes).await { 279 Ok(c) => c, 280 Err(e) => { 281 error!("Failed to save record block: {:?}", e); 282 return ( 283 StatusCode::INTERNAL_SERVER_ERROR, 284 Json(json!({"error": "InternalError"})), 285 ) 286 .into_response(); 287 } 288 }; 289 290 let key = format!("{}/{}", collection_nsid, rkey); 291 mst = match mst.add(&key, record_cid).await { 292 Ok(m) => m, 293 Err(e) => { 294 error!("Failed to add to MST: {:?}", e); 295 return ( 296 StatusCode::INTERNAL_SERVER_ERROR, 297 Json(json!({"error": "InternalError"})), 298 ) 299 .into_response(); 300 } 301 }; 302 303 let uri = format!("at://{}/{}/{}", did, collection, rkey); 304 results.push(WriteResult::CreateResult { 305 uri: uri.clone(), 306 cid: record_cid.to_string(), 307 }); 308 record_ops.push((collection.clone(), rkey, Some(record_cid.to_string()))); 309 } 310 WriteOp::Update { 311 collection, 312 rkey, 313 value, 314 } => { 315 let collection_nsid = match collection.parse::<Nsid>() { 316 Ok(n) => n, 317 Err(_) => { 318 return ( 319 StatusCode::BAD_REQUEST, 320 Json(json!({"error": "InvalidCollection"})), 321 ) 322 .into_response(); 323 } 324 }; 325 326 let mut record_bytes = Vec::new(); 327 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, value) { 328 error!("Error serializing record: {:?}", e); 329 return ( 330 StatusCode::BAD_REQUEST, 331 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 332 ) 333 .into_response(); 334 } 335 336 let record_cid = match state.block_store.put(&record_bytes).await { 337 Ok(c) => c, 338 Err(e) => { 339 error!("Failed to save record block: {:?}", e); 340 return ( 341 StatusCode::INTERNAL_SERVER_ERROR, 342 Json(json!({"error": "InternalError"})), 343 ) 344 .into_response(); 345 } 346 }; 347 348 let key = format!("{}/{}", collection_nsid, rkey); 349 mst = match mst.update(&key, record_cid).await { 350 Ok(m) => m, 351 Err(e) => { 352 error!("Failed to update MST: {:?}", e); 353 return ( 354 StatusCode::INTERNAL_SERVER_ERROR, 355 Json(json!({"error": "InternalError"})), 356 ) 357 .into_response(); 358 } 359 }; 360 361 let uri = format!("at://{}/{}/{}", did, collection, rkey); 362 results.push(WriteResult::UpdateResult { 363 uri: uri.clone(), 364 cid: record_cid.to_string(), 365 }); 366 record_ops.push((collection.clone(), rkey.clone(), Some(record_cid.to_string()))); 367 } 368 WriteOp::Delete { collection, rkey } => { 369 let collection_nsid = match collection.parse::<Nsid>() { 370 Ok(n) => n, 371 Err(_) => { 372 return ( 373 StatusCode::BAD_REQUEST, 374 Json(json!({"error": "InvalidCollection"})), 375 ) 376 .into_response(); 377 } 378 }; 379 380 let key = format!("{}/{}", collection_nsid, rkey); 381 mst = match mst.delete(&key).await { 382 Ok(m) => m, 383 Err(e) => { 384 error!("Failed to delete from MST: {:?}", e); 385 return ( 386 StatusCode::INTERNAL_SERVER_ERROR, 387 Json(json!({"error": "InternalError"})), 388 ) 389 .into_response(); 390 } 391 }; 392 393 results.push(WriteResult::DeleteResult {}); 394 record_ops.push((collection.clone(), rkey.clone(), None)); 395 } 396 } 397 } 398 399 let new_mst_root = match mst.persist().await { 400 Ok(c) => c, 401 Err(e) => { 402 error!("Failed to persist MST: {:?}", e); 403 return ( 404 StatusCode::INTERNAL_SERVER_ERROR, 405 Json(json!({"error": "InternalError"})), 406 ) 407 .into_response(); 408 } 409 }; 410 411 let did_obj = match Did::new(&did) { 412 Ok(d) => d, 413 Err(_) => { 414 return ( 415 StatusCode::INTERNAL_SERVER_ERROR, 416 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 417 ) 418 .into_response(); 419 } 420 }; 421 422 let rev = Tid::now(LimitedU32::MIN); 423 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), Some(current_root_cid)); 424 425 let new_commit_bytes = match new_commit.to_cbor() { 426 Ok(b) => b, 427 Err(e) => { 428 error!("Failed to serialize new commit: {:?}", e); 429 return ( 430 StatusCode::INTERNAL_SERVER_ERROR, 431 Json(json!({"error": "InternalError"})), 432 ) 433 .into_response(); 434 } 435 }; 436 437 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 438 Ok(c) => c, 439 Err(e) => { 440 error!("Failed to save new commit: {:?}", e); 441 return ( 442 StatusCode::INTERNAL_SERVER_ERROR, 443 Json(json!({"error": "InternalError"})), 444 ) 445 .into_response(); 446 } 447 }; 448 449 let update_repo = sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 450 .execute(&state.db) 451 .await; 452 453 if let Err(e) = update_repo { 454 error!("Failed to update repo root in DB: {:?}", e); 455 return ( 456 StatusCode::INTERNAL_SERVER_ERROR, 457 Json(json!({"error": "InternalError"})), 458 ) 459 .into_response(); 460 } 461 462 for (collection, rkey, record_cid) in record_ops { 463 match record_cid { 464 Some(cid) => { 465 let _ = sqlx::query!( 466 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 467 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 468 user_id, 469 collection, 470 rkey, 471 cid 472 ) 473 .execute(&state.db) 474 .await; 475 } 476 None => { 477 let _ = sqlx::query!( 478 "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 479 user_id, 480 collection, 481 rkey 482 ) 483 .execute(&state.db) 484 .await; 485 } 486 } 487 } 488 489 ( 490 StatusCode::OK, 491 Json(ApplyWritesOutput { 492 commit: CommitInfo { 493 cid: new_root_cid.to_string(), 494 rev: rev.to_string(), 495 }, 496 results, 497 }), 498 ) 499 .into_response() 500}