this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::State, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use chrono::Utc; 9use cid::Cid; 10use jacquard::types::{ 11 did::Did, 12 integer::LimitedU32, 13 string::{Nsid, Tid}, 14}; 15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use sqlx::Row; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::error; 22 23#[derive(Deserialize)] 24#[allow(dead_code)] 25pub struct CreateRecordInput { 26 pub repo: String, 27 pub collection: String, 28 pub rkey: Option<String>, 29 pub validate: Option<bool>, 30 pub record: serde_json::Value, 31 #[serde(rename = "swapCommit")] 32 pub swap_commit: Option<String>, 33} 34 35#[derive(Serialize)] 36#[serde(rename_all = "camelCase")] 37pub struct CreateRecordOutput { 38 pub uri: String, 39 pub cid: String, 40} 41 42pub async fn create_record( 43 State(state): State<AppState>, 44 headers: axum::http::HeaderMap, 45 Json(input): Json<CreateRecordInput>, 46) -> Response { 47 let auth_header = headers.get("Authorization"); 48 if auth_header.is_none() { 49 return ( 50 StatusCode::UNAUTHORIZED, 51 Json(json!({"error": "AuthenticationRequired"})), 52 ) 53 .into_response(); 54 } 55 let token = auth_header 56 .unwrap() 57 .to_str() 58 .unwrap_or("") 59 .replace("Bearer ", ""); 60 61 let session = sqlx::query( 62 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 63 ) 64 .bind(&token) 65 .fetch_optional(&state.db) 66 .await 67 .unwrap_or(None); 68 69 let (did, key_bytes) = match session { 70 Some(row) => ( 71 row.get::<String, _>("did"), 72 row.get::<Vec<u8>, _>("key_bytes"), 73 ), 74 None => { 75 return ( 76 StatusCode::UNAUTHORIZED, 77 Json(json!({"error": "AuthenticationFailed"})), 78 ) 79 .into_response(); 80 } 81 }; 82 83 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 84 return ( 85 StatusCode::UNAUTHORIZED, 86 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 87 ) 88 .into_response(); 89 } 90 91 if input.repo != did { 92 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 93 } 94 95 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 96 .bind(&did) 97 .fetch_optional(&state.db) 98 .await; 99 100 let user_id: uuid::Uuid = match user_query { 101 Ok(Some(row)) => row.get("id"), 102 _ => { 103 return ( 104 StatusCode::INTERNAL_SERVER_ERROR, 105 Json(json!({"error": "InternalError", "message": "User not found"})), 106 ) 107 .into_response(); 108 } 109 }; 110 111 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 112 .bind(user_id) 113 .fetch_optional(&state.db) 114 .await; 115 116 let current_root_cid = match repo_root_query { 117 Ok(Some(row)) => { 118 let cid_str: String = row.get("repo_root_cid"); 119 Cid::from_str(&cid_str).ok() 120 } 121 _ => None, 122 }; 123 124 if current_root_cid.is_none() { 125 error!("Repo root not found for user {}", did); 126 return ( 127 StatusCode::INTERNAL_SERVER_ERROR, 128 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 129 ) 130 .into_response(); 131 } 132 let current_root_cid = current_root_cid.unwrap(); 133 134 let commit_bytes = match state.block_store.get(&current_root_cid).await { 135 Ok(Some(b)) => b, 136 Ok(None) => { 137 error!("Commit block not found: {}", current_root_cid); 138 return ( 139 StatusCode::INTERNAL_SERVER_ERROR, 140 Json(json!({"error": "InternalError"})), 141 ) 142 .into_response(); 143 } 144 Err(e) => { 145 error!("Failed to load commit block: {:?}", e); 146 return ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json(json!({"error": "InternalError"})), 149 ) 150 .into_response(); 151 } 152 }; 153 154 let commit = match Commit::from_cbor(&commit_bytes) { 155 Ok(c) => c, 156 Err(e) => { 157 error!("Failed to parse commit: {:?}", e); 158 return ( 159 StatusCode::INTERNAL_SERVER_ERROR, 160 Json(json!({"error": "InternalError"})), 161 ) 162 .into_response(); 163 } 164 }; 165 166 let mst_root = commit.data; 167 let store = Arc::new(state.block_store.clone()); 168 let mst = Mst::load(store.clone(), mst_root, None); 169 170 let collection_nsid = match input.collection.parse::<Nsid>() { 171 Ok(n) => n, 172 Err(_) => { 173 return ( 174 StatusCode::BAD_REQUEST, 175 Json(json!({"error": "InvalidCollection"})), 176 ) 177 .into_response(); 178 } 179 }; 180 181 let rkey = input 182 .rkey 183 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 184 185 if input.validate.unwrap_or(true) { 186 if input.collection == "app.bsky.feed.post" { 187 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() { 188 return ( 189 StatusCode::BAD_REQUEST, 190 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})), 191 ) 192 .into_response(); 193 } 194 } 195 } 196 197 let mut record_bytes = Vec::new(); 198 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 199 error!("Error serializing record: {:?}", e); 200 return ( 201 StatusCode::BAD_REQUEST, 202 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 203 ) 204 .into_response(); 205 } 206 207 let record_cid = match state.block_store.put(&record_bytes).await { 208 Ok(c) => c, 209 Err(e) => { 210 error!("Failed to save record block: {:?}", e); 211 return ( 212 StatusCode::INTERNAL_SERVER_ERROR, 213 Json(json!({"error": "InternalError"})), 214 ) 215 .into_response(); 216 } 217 }; 218 219 let key = format!("{}/{}", collection_nsid, rkey); 220 let new_mst = match mst.add(&key, record_cid).await { 221 Ok(m) => m, 222 Err(e) => { 223 error!("Failed to add to MST: {:?}", e); 224 return ( 225 StatusCode::INTERNAL_SERVER_ERROR, 226 Json(json!({"error": "InternalError"})), 227 ) 228 .into_response(); 229 } 230 }; 231 232 let new_mst_root = match new_mst.persist().await { 233 Ok(c) => c, 234 Err(e) => { 235 error!("Failed to persist MST: {:?}", e); 236 return ( 237 StatusCode::INTERNAL_SERVER_ERROR, 238 Json(json!({"error": "InternalError"})), 239 ) 240 .into_response(); 241 } 242 }; 243 244 let did_obj = match Did::new(&did) { 245 Ok(d) => d, 246 Err(_) => { 247 return ( 248 StatusCode::INTERNAL_SERVER_ERROR, 249 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 250 ) 251 .into_response(); 252 } 253 }; 254 255 let rev = Tid::now(LimitedU32::MIN); 256 257 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid)); 258 259 let new_commit_bytes = match new_commit.to_cbor() { 260 Ok(b) => b, 261 Err(e) => { 262 error!("Failed to serialize new commit: {:?}", e); 263 return ( 264 StatusCode::INTERNAL_SERVER_ERROR, 265 Json(json!({"error": "InternalError"})), 266 ) 267 .into_response(); 268 } 269 }; 270 271 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 272 Ok(c) => c, 273 Err(e) => { 274 error!("Failed to save new commit: {:?}", e); 275 return ( 276 StatusCode::INTERNAL_SERVER_ERROR, 277 Json(json!({"error": "InternalError"})), 278 ) 279 .into_response(); 280 } 281 }; 282 283 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 284 .bind(new_root_cid.to_string()) 285 .bind(user_id) 286 .execute(&state.db) 287 .await; 288 289 if let Err(e) = update_repo { 290 error!("Failed to update repo root in DB: {:?}", e); 291 return ( 292 StatusCode::INTERNAL_SERVER_ERROR, 293 Json(json!({"error": "InternalError"})), 294 ) 295 .into_response(); 296 } 297 298 let record_insert = sqlx::query( 299 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 300 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 301 ) 302 .bind(user_id) 303 .bind(&input.collection) 304 .bind(&rkey) 305 .bind(record_cid.to_string()) 306 .execute(&state.db) 307 .await; 308 309 if let Err(e) = record_insert { 310 error!("Error inserting record index: {:?}", e); 311 return ( 312 StatusCode::INTERNAL_SERVER_ERROR, 313 Json(json!({"error": "InternalError", "message": "Failed to index record"})), 314 ) 315 .into_response(); 316 } 317 318 let output = CreateRecordOutput { 319 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 320 cid: record_cid.to_string(), 321 }; 322 (StatusCode::OK, Json(output)).into_response() 323} 324 325#[derive(Deserialize)] 326#[allow(dead_code)] 327pub struct PutRecordInput { 328 pub repo: String, 329 pub collection: String, 330 pub rkey: String, 331 pub validate: Option<bool>, 332 pub record: serde_json::Value, 333 #[serde(rename = "swapCommit")] 334 pub swap_commit: Option<String>, 335 #[serde(rename = "swapRecord")] 336 pub swap_record: Option<String>, 337} 338 339#[derive(Serialize)] 340#[serde(rename_all = "camelCase")] 341pub struct PutRecordOutput { 342 pub uri: String, 343 pub cid: String, 344} 345 346pub async fn put_record( 347 State(state): State<AppState>, 348 headers: axum::http::HeaderMap, 349 Json(input): Json<PutRecordInput>, 350) -> Response { 351 let auth_header = headers.get("Authorization"); 352 if auth_header.is_none() { 353 return ( 354 StatusCode::UNAUTHORIZED, 355 Json(json!({"error": "AuthenticationRequired"})), 356 ) 357 .into_response(); 358 } 359 let token = auth_header 360 .unwrap() 361 .to_str() 362 .unwrap_or("") 363 .replace("Bearer ", ""); 364 365 let session = sqlx::query( 366 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 367 ) 368 .bind(&token) 369 .fetch_optional(&state.db) 370 .await 371 .unwrap_or(None); 372 373 let (did, key_bytes) = match session { 374 Some(row) => ( 375 row.get::<String, _>("did"), 376 row.get::<Vec<u8>, _>("key_bytes"), 377 ), 378 None => { 379 return ( 380 StatusCode::UNAUTHORIZED, 381 Json(json!({"error": "AuthenticationFailed"})), 382 ) 383 .into_response(); 384 } 385 }; 386 387 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 388 return ( 389 StatusCode::UNAUTHORIZED, 390 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 391 ) 392 .into_response(); 393 } 394 395 if input.repo != did { 396 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 397 } 398 399 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 400 .bind(&did) 401 .fetch_optional(&state.db) 402 .await; 403 404 let user_id: uuid::Uuid = match user_query { 405 Ok(Some(row)) => row.get("id"), 406 _ => { 407 return ( 408 StatusCode::INTERNAL_SERVER_ERROR, 409 Json(json!({"error": "InternalError", "message": "User not found"})), 410 ) 411 .into_response(); 412 } 413 }; 414 415 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 416 .bind(user_id) 417 .fetch_optional(&state.db) 418 .await; 419 420 let current_root_cid = match repo_root_query { 421 Ok(Some(row)) => { 422 let cid_str: String = row.get("repo_root_cid"); 423 Cid::from_str(&cid_str).ok() 424 } 425 _ => None, 426 }; 427 428 if current_root_cid.is_none() { 429 error!("Repo root not found for user {}", did); 430 return ( 431 StatusCode::INTERNAL_SERVER_ERROR, 432 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 433 ) 434 .into_response(); 435 } 436 let current_root_cid = current_root_cid.unwrap(); 437 438 let commit_bytes = match state.block_store.get(&current_root_cid).await { 439 Ok(Some(b)) => b, 440 Ok(None) => { 441 error!("Commit block not found: {}", current_root_cid); 442 return ( 443 StatusCode::INTERNAL_SERVER_ERROR, 444 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 445 ) 446 .into_response(); 447 } 448 Err(e) => { 449 error!("Failed to load commit block: {:?}", e); 450 return ( 451 StatusCode::INTERNAL_SERVER_ERROR, 452 Json(json!({"error": "InternalError", "message": "Failed to load commit block"})), 453 ) 454 .into_response(); 455 } 456 }; 457 458 let commit = match Commit::from_cbor(&commit_bytes) { 459 Ok(c) => c, 460 Err(e) => { 461 error!("Failed to parse commit: {:?}", e); 462 return ( 463 StatusCode::INTERNAL_SERVER_ERROR, 464 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 465 ) 466 .into_response(); 467 } 468 }; 469 470 let mst_root = commit.data; 471 let store = Arc::new(state.block_store.clone()); 472 let mst = Mst::load(store.clone(), mst_root, None); 473 474 let collection_nsid = match input.collection.parse::<Nsid>() { 475 Ok(n) => n, 476 Err(_) => { 477 return ( 478 StatusCode::BAD_REQUEST, 479 Json(json!({"error": "InvalidCollection"})), 480 ) 481 .into_response(); 482 } 483 }; 484 485 let rkey = input.rkey.clone(); 486 487 if input.validate.unwrap_or(true) { 488 if input.collection == "app.bsky.feed.post" { 489 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() { 490 return ( 491 StatusCode::BAD_REQUEST, 492 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})), 493 ) 494 .into_response(); 495 } 496 } 497 } 498 499 let mut record_bytes = Vec::new(); 500 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 501 error!("Error serializing record: {:?}", e); 502 return ( 503 StatusCode::BAD_REQUEST, 504 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 505 ) 506 .into_response(); 507 } 508 509 let record_cid = match state.block_store.put(&record_bytes).await { 510 Ok(c) => c, 511 Err(e) => { 512 error!("Failed to save record block: {:?}", e); 513 return ( 514 StatusCode::INTERNAL_SERVER_ERROR, 515 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 516 ) 517 .into_response(); 518 } 519 }; 520 521 let key = format!("{}/{}", collection_nsid, rkey); 522 523 let existing = match mst.get(&key).await { 524 Ok(v) => v, 525 Err(e) => { 526 error!("Failed to check MST key: {:?}", e); 527 return ( 528 StatusCode::INTERNAL_SERVER_ERROR, 529 Json( 530 json!({"error": "InternalError", "message": "Failed to check existing record"}), 531 ), 532 ) 533 .into_response(); 534 } 535 }; 536 537 if let Some(swap_record_str) = &input.swap_record { 538 let swap_record_cid = match Cid::from_str(swap_record_str) { 539 Ok(c) => c, 540 Err(_) => { 541 return ( 542 StatusCode::BAD_REQUEST, 543 Json( 544 json!({"error": "InvalidSwapRecord", "message": "Invalid swapRecord CID"}), 545 ), 546 ) 547 .into_response(); 548 } 549 }; 550 match &existing { 551 Some(current_cid) if *current_cid != swap_record_cid => { 552 return ( 553 StatusCode::CONFLICT, 554 Json(json!({"error": "InvalidSwap", "message": "Record has been modified"})), 555 ) 556 .into_response(); 557 } 558 None => { 559 return ( 560 StatusCode::CONFLICT, 561 Json(json!({"error": "InvalidSwap", "message": "Record does not exist"})), 562 ) 563 .into_response(); 564 } 565 _ => {} 566 } 567 } 568 569 let new_mst = if existing.is_some() { 570 match mst.update(&key, record_cid).await { 571 Ok(m) => m, 572 Err(e) => { 573 error!("Failed to update MST: {:?}", e); 574 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response(); 575 } 576 } 577 } else { 578 match mst.add(&key, record_cid).await { 579 Ok(m) => m, 580 Err(e) => { 581 error!("Failed to add to MST: {:?}", e); 582 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to add to MST: {:?}", e)}))).into_response(); 583 } 584 } 585 }; 586 587 let new_mst_root = match new_mst.persist().await { 588 Ok(c) => c, 589 Err(e) => { 590 error!("Failed to persist MST: {:?}", e); 591 return ( 592 StatusCode::INTERNAL_SERVER_ERROR, 593 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 594 ) 595 .into_response(); 596 } 597 }; 598 599 let did_obj = match Did::new(&did) { 600 Ok(d) => d, 601 Err(_) => { 602 return ( 603 StatusCode::INTERNAL_SERVER_ERROR, 604 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 605 ) 606 .into_response(); 607 } 608 }; 609 610 let rev = Tid::now(LimitedU32::MIN); 611 612 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid)); 613 614 let new_commit_bytes = match new_commit.to_cbor() { 615 Ok(b) => b, 616 Err(e) => { 617 error!("Failed to serialize new commit: {:?}", e); 618 return ( 619 StatusCode::INTERNAL_SERVER_ERROR, 620 Json( 621 json!({"error": "InternalError", "message": "Failed to serialize new commit"}), 622 ), 623 ) 624 .into_response(); 625 } 626 }; 627 628 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 629 Ok(c) => c, 630 Err(e) => { 631 error!("Failed to save new commit: {:?}", e); 632 return ( 633 StatusCode::INTERNAL_SERVER_ERROR, 634 Json(json!({"error": "InternalError", "message": "Failed to save new commit"})), 635 ) 636 .into_response(); 637 } 638 }; 639 640 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 641 .bind(new_root_cid.to_string()) 642 .bind(user_id) 643 .execute(&state.db) 644 .await; 645 646 if let Err(e) = update_repo { 647 error!("Failed to update repo root in DB: {:?}", e); 648 return ( 649 StatusCode::INTERNAL_SERVER_ERROR, 650 Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"})), 651 ) 652 .into_response(); 653 } 654 655 let record_insert = sqlx::query( 656 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 657 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 658 ) 659 .bind(user_id) 660 .bind(&input.collection) 661 .bind(&rkey) 662 .bind(record_cid.to_string()) 663 .execute(&state.db) 664 .await; 665 666 if let Err(e) = record_insert { 667 error!("Error inserting record index: {:?}", e); 668 return ( 669 StatusCode::INTERNAL_SERVER_ERROR, 670 Json(json!({"error": "InternalError", "message": "Failed to index record"})), 671 ) 672 .into_response(); 673 } 674 675 let output = PutRecordOutput { 676 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 677 cid: record_cid.to_string(), 678 }; 679 (StatusCode::OK, Json(output)).into_response() 680}