this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::State, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use chrono::Utc; 9use cid::Cid; 10use jacquard::types::{ 11 did::Did, 12 integer::LimitedU32, 13 string::{Nsid, Tid}, 14}; 15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use std::str::FromStr; 19use std::sync::Arc; 20use tracing::error; 21 22#[derive(Deserialize)] 23#[allow(dead_code)] 24pub struct CreateRecordInput { 25 pub repo: String, 26 pub collection: String, 27 pub rkey: Option<String>, 28 pub validate: Option<bool>, 29 pub record: serde_json::Value, 30 #[serde(rename = "swapCommit")] 31 pub swap_commit: Option<String>, 32} 33 34#[derive(Serialize)] 35#[serde(rename_all = "camelCase")] 36pub struct CreateRecordOutput { 37 pub uri: String, 38 pub cid: String, 39} 40 41pub async fn create_record( 42 State(state): State<AppState>, 43 headers: axum::http::HeaderMap, 44 Json(input): Json<CreateRecordInput>, 45) -> Response { 46 let auth_header = headers.get("Authorization"); 47 if auth_header.is_none() { 48 return ( 49 StatusCode::UNAUTHORIZED, 50 Json(json!({"error": "AuthenticationRequired"})), 51 ) 52 .into_response(); 53 } 54 let token = auth_header 55 .unwrap() 56 .to_str() 57 .unwrap_or("") 58 .replace("Bearer ", ""); 59 60 let session = sqlx::query!( 61 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", 62 token 63 ) 64 .fetch_optional(&state.db) 65 .await 66 .unwrap_or(None); 67 68 let (did, key_bytes) = match session { 69 Some(row) => ( 70 row.did, 71 row.key_bytes, 72 ), 73 None => { 74 return ( 75 StatusCode::UNAUTHORIZED, 76 Json(json!({"error": "AuthenticationFailed"})), 77 ) 78 .into_response(); 79 } 80 }; 81 82 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 83 return ( 84 StatusCode::UNAUTHORIZED, 85 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 86 ) 87 .into_response(); 88 } 89 90 if input.repo != did { 91 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 92 } 93 94 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 95 .fetch_optional(&state.db) 96 .await; 97 98 let user_id: uuid::Uuid = match user_query { 99 Ok(Some(row)) => row.id, 100 _ => { 101 return ( 102 StatusCode::INTERNAL_SERVER_ERROR, 103 Json(json!({"error": "InternalError", "message": "User not found"})), 104 ) 105 .into_response(); 106 } 107 }; 108 109 let repo_root_query = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 110 .fetch_optional(&state.db) 111 .await; 112 113 let current_root_cid = match repo_root_query { 114 Ok(Some(row)) => { 115 let cid_str: String = row.repo_root_cid; 116 Cid::from_str(&cid_str).ok() 117 } 118 _ => None, 119 }; 120 121 if current_root_cid.is_none() { 122 error!("Repo root not found for user {}", did); 123 return ( 124 StatusCode::INTERNAL_SERVER_ERROR, 125 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 126 ) 127 .into_response(); 128 } 129 let current_root_cid = current_root_cid.unwrap(); 130 131 let commit_bytes = match state.block_store.get(&current_root_cid).await { 132 Ok(Some(b)) => b, 133 Ok(None) => { 134 error!("Commit block not found: {}", current_root_cid); 135 return ( 136 StatusCode::INTERNAL_SERVER_ERROR, 137 Json(json!({"error": "InternalError"})), 138 ) 139 .into_response(); 140 } 141 Err(e) => { 142 error!("Failed to load commit block: {:?}", e); 143 return ( 144 StatusCode::INTERNAL_SERVER_ERROR, 145 Json(json!({"error": "InternalError"})), 146 ) 147 .into_response(); 148 } 149 }; 150 151 let commit = match Commit::from_cbor(&commit_bytes) { 152 Ok(c) => c, 153 Err(e) => { 154 error!("Failed to parse commit: {:?}", e); 155 return ( 156 StatusCode::INTERNAL_SERVER_ERROR, 157 Json(json!({"error": "InternalError"})), 158 ) 159 .into_response(); 160 } 161 }; 162 163 let mst_root = commit.data; 164 let store = Arc::new(state.block_store.clone()); 165 let mst = Mst::load(store.clone(), mst_root, None); 166 167 let collection_nsid = match input.collection.parse::<Nsid>() { 168 Ok(n) => n, 169 Err(_) => { 170 return ( 171 StatusCode::BAD_REQUEST, 172 Json(json!({"error": "InvalidCollection"})), 173 ) 174 .into_response(); 175 } 176 }; 177 178 let rkey = input 179 .rkey 180 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 181 182 if input.validate.unwrap_or(true) { 183 if input.collection == "app.bsky.feed.post" { 184 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() { 185 return ( 186 StatusCode::BAD_REQUEST, 187 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})), 188 ) 189 .into_response(); 190 } 191 } 192 } 193 194 let mut record_bytes = Vec::new(); 195 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 196 error!("Error serializing record: {:?}", e); 197 return ( 198 StatusCode::BAD_REQUEST, 199 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 200 ) 201 .into_response(); 202 } 203 204 let record_cid = match state.block_store.put(&record_bytes).await { 205 Ok(c) => c, 206 Err(e) => { 207 error!("Failed to save record block: {:?}", e); 208 return ( 209 StatusCode::INTERNAL_SERVER_ERROR, 210 Json(json!({"error": "InternalError"})), 211 ) 212 .into_response(); 213 } 214 }; 215 216 let key = format!("{}/{}", collection_nsid, rkey); 217 let new_mst = match mst.add(&key, record_cid).await { 218 Ok(m) => m, 219 Err(e) => { 220 error!("Failed to add to MST: {:?}", e); 221 return ( 222 StatusCode::INTERNAL_SERVER_ERROR, 223 Json(json!({"error": "InternalError"})), 224 ) 225 .into_response(); 226 } 227 }; 228 229 let new_mst_root = match new_mst.persist().await { 230 Ok(c) => c, 231 Err(e) => { 232 error!("Failed to persist MST: {:?}", e); 233 return ( 234 StatusCode::INTERNAL_SERVER_ERROR, 235 Json(json!({"error": "InternalError"})), 236 ) 237 .into_response(); 238 } 239 }; 240 241 let did_obj = match Did::new(&did) { 242 Ok(d) => d, 243 Err(_) => { 244 return ( 245 StatusCode::INTERNAL_SERVER_ERROR, 246 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 247 ) 248 .into_response(); 249 } 250 }; 251 252 let rev = Tid::now(LimitedU32::MIN); 253 254 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid)); 255 256 let new_commit_bytes = match new_commit.to_cbor() { 257 Ok(b) => b, 258 Err(e) => { 259 error!("Failed to serialize new commit: {:?}", e); 260 return ( 261 StatusCode::INTERNAL_SERVER_ERROR, 262 Json(json!({"error": "InternalError"})), 263 ) 264 .into_response(); 265 } 266 }; 267 268 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 269 Ok(c) => c, 270 Err(e) => { 271 error!("Failed to save new commit: {:?}", e); 272 return ( 273 StatusCode::INTERNAL_SERVER_ERROR, 274 Json(json!({"error": "InternalError"})), 275 ) 276 .into_response(); 277 } 278 }; 279 280 let update_repo = sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 281 .execute(&state.db) 282 .await; 283 284 if let Err(e) = update_repo { 285 error!("Failed to update repo root in DB: {:?}", e); 286 return ( 287 StatusCode::INTERNAL_SERVER_ERROR, 288 Json(json!({"error": "InternalError"})), 289 ) 290 .into_response(); 291 } 292 293 let record_insert = sqlx::query!( 294 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 295 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 296 user_id, 297 input.collection, 298 rkey, 299 record_cid.to_string() 300 ) 301 .execute(&state.db) 302 .await; 303 304 if let Err(e) = record_insert { 305 error!("Error inserting record index: {:?}", e); 306 return ( 307 StatusCode::INTERNAL_SERVER_ERROR, 308 Json(json!({"error": "InternalError", "message": "Failed to index record"})), 309 ) 310 .into_response(); 311 } 312 313 let output = CreateRecordOutput { 314 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 315 cid: record_cid.to_string(), 316 }; 317 (StatusCode::OK, Json(output)).into_response() 318} 319 320#[derive(Deserialize)] 321#[allow(dead_code)] 322pub struct PutRecordInput { 323 pub repo: String, 324 pub collection: String, 325 pub rkey: String, 326 pub validate: Option<bool>, 327 pub record: serde_json::Value, 328 #[serde(rename = "swapCommit")] 329 pub swap_commit: Option<String>, 330 #[serde(rename = "swapRecord")] 331 pub swap_record: Option<String>, 332} 333 334#[derive(Serialize)] 335#[serde(rename_all = "camelCase")] 336pub struct PutRecordOutput { 337 pub uri: String, 338 pub cid: String, 339} 340 341pub async fn put_record( 342 State(state): State<AppState>, 343 headers: axum::http::HeaderMap, 344 Json(input): Json<PutRecordInput>, 345) -> Response { 346 let auth_header = headers.get("Authorization"); 347 if auth_header.is_none() { 348 return ( 349 StatusCode::UNAUTHORIZED, 350 Json(json!({"error": "AuthenticationRequired"})), 351 ) 352 .into_response(); 353 } 354 let token = auth_header 355 .unwrap() 356 .to_str() 357 .unwrap_or("") 358 .replace("Bearer ", ""); 359 360 let session = sqlx::query!( 361 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", 362 token 363 ) 364 .fetch_optional(&state.db) 365 .await 366 .unwrap_or(None); 367 368 let (did, key_bytes) = match session { 369 Some(row) => ( 370 row.did, 371 row.key_bytes, 372 ), 373 None => { 374 return ( 375 StatusCode::UNAUTHORIZED, 376 Json(json!({"error": "AuthenticationFailed"})), 377 ) 378 .into_response(); 379 } 380 }; 381 382 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 383 return ( 384 StatusCode::UNAUTHORIZED, 385 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 386 ) 387 .into_response(); 388 } 389 390 if input.repo != did { 391 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 392 } 393 394 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 395 .fetch_optional(&state.db) 396 .await; 397 398 let user_id: uuid::Uuid = match user_query { 399 Ok(Some(row)) => row.id, 400 _ => { 401 return ( 402 StatusCode::INTERNAL_SERVER_ERROR, 403 Json(json!({"error": "InternalError", "message": "User not found"})), 404 ) 405 .into_response(); 406 } 407 }; 408 409 let repo_root_query = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 410 .fetch_optional(&state.db) 411 .await; 412 413 let current_root_cid = match repo_root_query { 414 Ok(Some(row)) => { 415 let cid_str: String = row.repo_root_cid; 416 Cid::from_str(&cid_str).ok() 417 } 418 _ => None, 419 }; 420 421 if current_root_cid.is_none() { 422 error!("Repo root not found for user {}", did); 423 return ( 424 StatusCode::INTERNAL_SERVER_ERROR, 425 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 426 ) 427 .into_response(); 428 } 429 let current_root_cid = current_root_cid.unwrap(); 430 431 let commit_bytes = match state.block_store.get(&current_root_cid).await { 432 Ok(Some(b)) => b, 433 Ok(None) => { 434 error!("Commit block not found: {}", current_root_cid); 435 return ( 436 StatusCode::INTERNAL_SERVER_ERROR, 437 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 438 ) 439 .into_response(); 440 } 441 Err(e) => { 442 error!("Failed to load commit block: {:?}", e); 443 return ( 444 StatusCode::INTERNAL_SERVER_ERROR, 445 Json(json!({"error": "InternalError", "message": "Failed to load commit block"})), 446 ) 447 .into_response(); 448 } 449 }; 450 451 let commit = match Commit::from_cbor(&commit_bytes) { 452 Ok(c) => c, 453 Err(e) => { 454 error!("Failed to parse commit: {:?}", e); 455 return ( 456 StatusCode::INTERNAL_SERVER_ERROR, 457 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 458 ) 459 .into_response(); 460 } 461 }; 462 463 let mst_root = commit.data; 464 let store = Arc::new(state.block_store.clone()); 465 let mst = Mst::load(store.clone(), mst_root, None); 466 467 let collection_nsid = match input.collection.parse::<Nsid>() { 468 Ok(n) => n, 469 Err(_) => { 470 return ( 471 StatusCode::BAD_REQUEST, 472 Json(json!({"error": "InvalidCollection"})), 473 ) 474 .into_response(); 475 } 476 }; 477 478 let rkey = input.rkey.clone(); 479 480 if input.validate.unwrap_or(true) { 481 if input.collection == "app.bsky.feed.post" { 482 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() { 483 return ( 484 StatusCode::BAD_REQUEST, 485 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})), 486 ) 487 .into_response(); 488 } 489 } 490 } 491 492 let mut record_bytes = Vec::new(); 493 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 494 error!("Error serializing record: {:?}", e); 495 return ( 496 StatusCode::BAD_REQUEST, 497 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 498 ) 499 .into_response(); 500 } 501 502 let record_cid = match state.block_store.put(&record_bytes).await { 503 Ok(c) => c, 504 Err(e) => { 505 error!("Failed to save record block: {:?}", e); 506 return ( 507 StatusCode::INTERNAL_SERVER_ERROR, 508 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 509 ) 510 .into_response(); 511 } 512 }; 513 514 let key = format!("{}/{}", collection_nsid, rkey); 515 516 let existing = match mst.get(&key).await { 517 Ok(v) => v, 518 Err(e) => { 519 error!("Failed to check MST key: {:?}", e); 520 return ( 521 StatusCode::INTERNAL_SERVER_ERROR, 522 Json( 523 json!({"error": "InternalError", "message": "Failed to check existing record"}), 524 ), 525 ) 526 .into_response(); 527 } 528 }; 529 530 if let Some(swap_record_str) = &input.swap_record { 531 let swap_record_cid = match Cid::from_str(swap_record_str) { 532 Ok(c) => c, 533 Err(_) => { 534 return ( 535 StatusCode::BAD_REQUEST, 536 Json( 537 json!({"error": "InvalidSwapRecord", "message": "Invalid swapRecord CID"}), 538 ), 539 ) 540 .into_response(); 541 } 542 }; 543 match &existing { 544 Some(current_cid) if *current_cid != swap_record_cid => { 545 return ( 546 StatusCode::CONFLICT, 547 Json(json!({"error": "InvalidSwap", "message": "Record has been modified"})), 548 ) 549 .into_response(); 550 } 551 None => { 552 return ( 553 StatusCode::CONFLICT, 554 Json(json!({"error": "InvalidSwap", "message": "Record does not exist"})), 555 ) 556 .into_response(); 557 } 558 _ => {} 559 } 560 } 561 562 let new_mst = if existing.is_some() { 563 match mst.update(&key, record_cid).await { 564 Ok(m) => m, 565 Err(e) => { 566 error!("Failed to update MST: {:?}", e); 567 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response(); 568 } 569 } 570 } else { 571 match mst.add(&key, record_cid).await { 572 Ok(m) => m, 573 Err(e) => { 574 error!("Failed to add to MST: {:?}", e); 575 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to add to MST: {:?}", e)}))).into_response(); 576 } 577 } 578 }; 579 580 let new_mst_root = match new_mst.persist().await { 581 Ok(c) => c, 582 Err(e) => { 583 error!("Failed to persist MST: {:?}", e); 584 return ( 585 StatusCode::INTERNAL_SERVER_ERROR, 586 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 587 ) 588 .into_response(); 589 } 590 }; 591 592 let did_obj = match Did::new(&did) { 593 Ok(d) => d, 594 Err(_) => { 595 return ( 596 StatusCode::INTERNAL_SERVER_ERROR, 597 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 598 ) 599 .into_response(); 600 } 601 }; 602 603 let rev = Tid::now(LimitedU32::MIN); 604 605 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid)); 606 607 let new_commit_bytes = match new_commit.to_cbor() { 608 Ok(b) => b, 609 Err(e) => { 610 error!("Failed to serialize new commit: {:?}", e); 611 return ( 612 StatusCode::INTERNAL_SERVER_ERROR, 613 Json( 614 json!({"error": "InternalError", "message": "Failed to serialize new commit"}), 615 ), 616 ) 617 .into_response(); 618 } 619 }; 620 621 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 622 Ok(c) => c, 623 Err(e) => { 624 error!("Failed to save new commit: {:?}", e); 625 return ( 626 StatusCode::INTERNAL_SERVER_ERROR, 627 Json(json!({"error": "InternalError", "message": "Failed to save new commit"})), 628 ) 629 .into_response(); 630 } 631 }; 632 633 let update_repo = sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 634 .execute(&state.db) 635 .await; 636 637 if let Err(e) = update_repo { 638 error!("Failed to update repo root in DB: {:?}", e); 639 return ( 640 StatusCode::INTERNAL_SERVER_ERROR, 641 Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"})), 642 ) 643 .into_response(); 644 } 645 646 let record_insert = sqlx::query!( 647 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 648 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 649 user_id, 650 input.collection, 651 rkey, 652 record_cid.to_string() 653 ) 654 .execute(&state.db) 655 .await; 656 657 if let Err(e) = record_insert { 658 error!("Error inserting record index: {:?}", e); 659 return ( 660 StatusCode::INTERNAL_SERVER_ERROR, 661 Json(json!({"error": "InternalError", "message": "Failed to index record"})), 662 ) 663 .into_response(); 664 } 665 666 let output = PutRecordOutput { 667 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 668 cid: record_cid.to_string(), 669 }; 670 (StatusCode::OK, Json(output)).into_response() 671}