this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::State, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use chrono::Utc; 9use cid::Cid; 10use jacquard::types::{ 11 did::Did, 12 integer::LimitedU32, 13 string::{Nsid, Tid}, 14}; 15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use sqlx::Row; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::error; 22 23#[derive(Deserialize)] 24#[allow(dead_code)] 25pub struct CreateRecordInput { 26 pub repo: String, 27 pub collection: String, 28 pub rkey: Option<String>, 29 pub validate: Option<bool>, 30 pub record: serde_json::Value, 31 #[serde(rename = "swapCommit")] 32 pub swap_commit: Option<String>, 33} 34 35#[derive(Serialize)] 36#[serde(rename_all = "camelCase")] 37pub struct CreateRecordOutput { 38 pub uri: String, 39 pub cid: String, 40} 41 42pub async fn create_record( 43 State(state): State<AppState>, 44 headers: axum::http::HeaderMap, 45 Json(input): Json<CreateRecordInput>, 46) -> Response { 47 let auth_header = headers.get("Authorization"); 48 if auth_header.is_none() { 49 return ( 50 StatusCode::UNAUTHORIZED, 51 Json(json!({"error": "AuthenticationRequired"})), 52 ) 53 .into_response(); 54 } 55 let token = auth_header 56 .unwrap() 57 .to_str() 58 .unwrap_or("") 59 .replace("Bearer ", ""); 60 61 let session = sqlx::query( 62 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 63 ) 64 .bind(&token) 65 .fetch_optional(&state.db) 66 .await 67 .unwrap_or(None); 68 69 let (did, key_bytes) = match session { 70 Some(row) => ( 71 row.get::<String, _>("did"), 72 row.get::<Vec<u8>, _>("key_bytes"), 73 ), 74 None => { 75 return ( 76 StatusCode::UNAUTHORIZED, 77 Json(json!({"error": "AuthenticationFailed"})), 78 ) 79 .into_response(); 80 } 81 }; 82 83 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 84 return ( 85 StatusCode::UNAUTHORIZED, 86 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 87 ) 88 .into_response(); 89 } 90 91 if input.repo != did { 92 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 93 } 94 95 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 96 .bind(&did) 97 .fetch_optional(&state.db) 98 .await; 99 100 let user_id: uuid::Uuid = match user_query { 101 Ok(Some(row)) => row.get("id"), 102 _ => { 103 return ( 104 StatusCode::INTERNAL_SERVER_ERROR, 105 Json(json!({"error": "InternalError", "message": "User not found"})), 106 ) 107 .into_response(); 108 } 109 }; 110 111 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 112 .bind(user_id) 113 .fetch_optional(&state.db) 114 .await; 115 116 let current_root_cid = match repo_root_query { 117 Ok(Some(row)) => { 118 let cid_str: String = row.get("repo_root_cid"); 119 Cid::from_str(&cid_str).ok() 120 } 121 _ => None, 122 }; 123 124 if current_root_cid.is_none() { 125 error!("Repo root not found for user {}", did); 126 return ( 127 StatusCode::INTERNAL_SERVER_ERROR, 128 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 129 ) 130 .into_response(); 131 } 132 let current_root_cid = current_root_cid.unwrap(); 133 134 let commit_bytes = match state.block_store.get(&current_root_cid).await { 135 Ok(Some(b)) => b, 136 Ok(None) => { 137 error!("Commit block not found: {}", current_root_cid); 138 return ( 139 StatusCode::INTERNAL_SERVER_ERROR, 140 Json(json!({"error": "InternalError"})), 141 ) 142 .into_response(); 143 } 144 Err(e) => { 145 error!("Failed to load commit block: {:?}", e); 146 return ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json(json!({"error": "InternalError"})), 149 ) 150 .into_response(); 151 } 152 }; 153 154 let commit = match Commit::from_cbor(&commit_bytes) { 155 Ok(c) => c, 156 Err(e) => { 157 error!("Failed to parse commit: {:?}", e); 158 return ( 159 StatusCode::INTERNAL_SERVER_ERROR, 160 Json(json!({"error": "InternalError"})), 161 ) 162 .into_response(); 163 } 164 }; 165 166 let mst_root = commit.data; 167 let store = Arc::new(state.block_store.clone()); 168 let mst = Mst::load(store.clone(), mst_root, None); 169 170 let collection_nsid = match input.collection.parse::<Nsid>() { 171 Ok(n) => n, 172 Err(_) => { 173 return ( 174 StatusCode::BAD_REQUEST, 175 Json(json!({"error": "InvalidCollection"})), 176 ) 177 .into_response(); 178 } 179 }; 180 181 let rkey = input 182 .rkey 183 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 184 185 let mut record_bytes = Vec::new(); 186 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 187 error!("Error serializing record: {:?}", e); 188 return ( 189 StatusCode::BAD_REQUEST, 190 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 191 ) 192 .into_response(); 193 } 194 195 let record_cid = match state.block_store.put(&record_bytes).await { 196 Ok(c) => c, 197 Err(e) => { 198 error!("Failed to save record block: {:?}", e); 199 return ( 200 StatusCode::INTERNAL_SERVER_ERROR, 201 Json(json!({"error": "InternalError"})), 202 ) 203 .into_response(); 204 } 205 }; 206 207 let key = format!("{}/{}", collection_nsid, rkey); 208 let new_mst = match mst.add(&key, record_cid).await { 209 Ok(m) => m, 210 Err(e) => { 211 error!("Failed to add to MST: {:?}", e); 212 return ( 213 StatusCode::INTERNAL_SERVER_ERROR, 214 Json(json!({"error": "InternalError"})), 215 ) 216 .into_response(); 217 } 218 }; 219 220 let new_mst_root = match new_mst.persist().await { 221 Ok(c) => c, 222 Err(e) => { 223 error!("Failed to persist MST: {:?}", e); 224 return ( 225 StatusCode::INTERNAL_SERVER_ERROR, 226 Json(json!({"error": "InternalError"})), 227 ) 228 .into_response(); 229 } 230 }; 231 232 let did_obj = match Did::new(&did) { 233 Ok(d) => d, 234 Err(_) => { 235 return ( 236 StatusCode::INTERNAL_SERVER_ERROR, 237 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 238 ) 239 .into_response(); 240 } 241 }; 242 243 let rev = Tid::now(LimitedU32::MIN); 244 245 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid)); 246 247 let new_commit_bytes = match new_commit.to_cbor() { 248 Ok(b) => b, 249 Err(e) => { 250 error!("Failed to serialize new commit: {:?}", e); 251 return ( 252 StatusCode::INTERNAL_SERVER_ERROR, 253 Json(json!({"error": "InternalError"})), 254 ) 255 .into_response(); 256 } 257 }; 258 259 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 260 Ok(c) => c, 261 Err(e) => { 262 error!("Failed to save new commit: {:?}", e); 263 return ( 264 StatusCode::INTERNAL_SERVER_ERROR, 265 Json(json!({"error": "InternalError"})), 266 ) 267 .into_response(); 268 } 269 }; 270 271 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 272 .bind(new_root_cid.to_string()) 273 .bind(user_id) 274 .execute(&state.db) 275 .await; 276 277 if let Err(e) = update_repo { 278 error!("Failed to update repo root in DB: {:?}", e); 279 return ( 280 StatusCode::INTERNAL_SERVER_ERROR, 281 Json(json!({"error": "InternalError"})), 282 ) 283 .into_response(); 284 } 285 286 let record_insert = sqlx::query( 287 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 288 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 289 ) 290 .bind(user_id) 291 .bind(&input.collection) 292 .bind(&rkey) 293 .bind(record_cid.to_string()) 294 .execute(&state.db) 295 .await; 296 297 if let Err(e) = record_insert { 298 error!("Error inserting record index: {:?}", e); 299 return ( 300 StatusCode::INTERNAL_SERVER_ERROR, 301 Json(json!({"error": "InternalError", "message": "Failed to index record"})), 302 ) 303 .into_response(); 304 } 305 306 let output = CreateRecordOutput { 307 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 308 cid: record_cid.to_string(), 309 }; 310 (StatusCode::OK, Json(output)).into_response() 311} 312 313#[derive(Deserialize)] 314#[allow(dead_code)] 315pub struct PutRecordInput { 316 pub repo: String, 317 pub collection: String, 318 pub rkey: String, 319 pub validate: Option<bool>, 320 pub record: serde_json::Value, 321 #[serde(rename = "swapCommit")] 322 pub swap_commit: Option<String>, 323 #[serde(rename = "swapRecord")] 324 pub swap_record: Option<String>, 325} 326 327#[derive(Serialize)] 328#[serde(rename_all = "camelCase")] 329pub struct PutRecordOutput { 330 pub uri: String, 331 pub cid: String, 332} 333 334pub async fn put_record( 335 State(state): State<AppState>, 336 headers: axum::http::HeaderMap, 337 Json(input): Json<PutRecordInput>, 338) -> Response { 339 let auth_header = headers.get("Authorization"); 340 if auth_header.is_none() { 341 return ( 342 StatusCode::UNAUTHORIZED, 343 Json(json!({"error": "AuthenticationRequired"})), 344 ) 345 .into_response(); 346 } 347 let token = auth_header 348 .unwrap() 349 .to_str() 350 .unwrap_or("") 351 .replace("Bearer ", ""); 352 353 let session = sqlx::query( 354 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 355 ) 356 .bind(&token) 357 .fetch_optional(&state.db) 358 .await 359 .unwrap_or(None); 360 361 let (did, key_bytes) = match session { 362 Some(row) => ( 363 row.get::<String, _>("did"), 364 row.get::<Vec<u8>, _>("key_bytes"), 365 ), 366 None => { 367 return ( 368 StatusCode::UNAUTHORIZED, 369 Json(json!({"error": "AuthenticationFailed"})), 370 ) 371 .into_response(); 372 } 373 }; 374 375 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 376 return ( 377 StatusCode::UNAUTHORIZED, 378 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 379 ) 380 .into_response(); 381 } 382 383 if input.repo != did { 384 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 385 } 386 387 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 388 .bind(&did) 389 .fetch_optional(&state.db) 390 .await; 391 392 let user_id: uuid::Uuid = match user_query { 393 Ok(Some(row)) => row.get("id"), 394 _ => { 395 return ( 396 StatusCode::INTERNAL_SERVER_ERROR, 397 Json(json!({"error": "InternalError", "message": "User not found"})), 398 ) 399 .into_response(); 400 } 401 }; 402 403 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 404 .bind(user_id) 405 .fetch_optional(&state.db) 406 .await; 407 408 let current_root_cid = match repo_root_query { 409 Ok(Some(row)) => { 410 let cid_str: String = row.get("repo_root_cid"); 411 Cid::from_str(&cid_str).ok() 412 } 413 _ => None, 414 }; 415 416 if current_root_cid.is_none() { 417 error!("Repo root not found for user {}", did); 418 return ( 419 StatusCode::INTERNAL_SERVER_ERROR, 420 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 421 ) 422 .into_response(); 423 } 424 let current_root_cid = current_root_cid.unwrap(); 425 426 let commit_bytes = match state.block_store.get(&current_root_cid).await { 427 Ok(Some(b)) => b, 428 Ok(None) => { 429 error!("Commit block not found: {}", current_root_cid); 430 return ( 431 StatusCode::INTERNAL_SERVER_ERROR, 432 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 433 ) 434 .into_response(); 435 } 436 Err(e) => { 437 error!("Failed to load commit block: {:?}", e); 438 return ( 439 StatusCode::INTERNAL_SERVER_ERROR, 440 Json(json!({"error": "InternalError", "message": "Failed to load commit block"})), 441 ) 442 .into_response(); 443 } 444 }; 445 446 let commit = match Commit::from_cbor(&commit_bytes) { 447 Ok(c) => c, 448 Err(e) => { 449 error!("Failed to parse commit: {:?}", e); 450 return ( 451 StatusCode::INTERNAL_SERVER_ERROR, 452 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 453 ) 454 .into_response(); 455 } 456 }; 457 458 let mst_root = commit.data; 459 let store = Arc::new(state.block_store.clone()); 460 let mst = Mst::load(store.clone(), mst_root, None); 461 462 let collection_nsid = match input.collection.parse::<Nsid>() { 463 Ok(n) => n, 464 Err(_) => { 465 return ( 466 StatusCode::BAD_REQUEST, 467 Json(json!({"error": "InvalidCollection"})), 468 ) 469 .into_response(); 470 } 471 }; 472 473 let rkey = input.rkey.clone(); 474 475 let mut record_bytes = Vec::new(); 476 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 477 error!("Error serializing record: {:?}", e); 478 return ( 479 StatusCode::BAD_REQUEST, 480 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 481 ) 482 .into_response(); 483 } 484 485 let record_cid = match state.block_store.put(&record_bytes).await { 486 Ok(c) => c, 487 Err(e) => { 488 error!("Failed to save record block: {:?}", e); 489 return ( 490 StatusCode::INTERNAL_SERVER_ERROR, 491 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 492 ) 493 .into_response(); 494 } 495 }; 496 497 let key = format!("{}/{}", collection_nsid, rkey); 498 499 let existing = match mst.get(&key).await { 500 Ok(v) => v, 501 Err(e) => { 502 error!("Failed to check MST key: {:?}", e); 503 return ( 504 StatusCode::INTERNAL_SERVER_ERROR, 505 Json( 506 json!({"error": "InternalError", "message": "Failed to check existing record"}), 507 ), 508 ) 509 .into_response(); 510 } 511 }; 512 513 if let Some(swap_record_str) = &input.swap_record { 514 let swap_record_cid = match Cid::from_str(swap_record_str) { 515 Ok(c) => c, 516 Err(_) => { 517 return ( 518 StatusCode::BAD_REQUEST, 519 Json( 520 json!({"error": "InvalidSwapRecord", "message": "Invalid swapRecord CID"}), 521 ), 522 ) 523 .into_response(); 524 } 525 }; 526 match &existing { 527 Some(current_cid) if *current_cid != swap_record_cid => { 528 return ( 529 StatusCode::CONFLICT, 530 Json(json!({"error": "InvalidSwap", "message": "Record has been modified"})), 531 ) 532 .into_response(); 533 } 534 None => { 535 return ( 536 StatusCode::CONFLICT, 537 Json(json!({"error": "InvalidSwap", "message": "Record does not exist"})), 538 ) 539 .into_response(); 540 } 541 _ => {} 542 } 543 } 544 545 let new_mst = if existing.is_some() { 546 match mst.update(&key, record_cid).await { 547 Ok(m) => m, 548 Err(e) => { 549 error!("Failed to update MST: {:?}", e); 550 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response(); 551 } 552 } 553 } else { 554 match mst.add(&key, record_cid).await { 555 Ok(m) => m, 556 Err(e) => { 557 error!("Failed to add to MST: {:?}", e); 558 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to add to MST: {:?}", e)}))).into_response(); 559 } 560 } 561 }; 562 563 let new_mst_root = match new_mst.persist().await { 564 Ok(c) => c, 565 Err(e) => { 566 error!("Failed to persist MST: {:?}", e); 567 return ( 568 StatusCode::INTERNAL_SERVER_ERROR, 569 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 570 ) 571 .into_response(); 572 } 573 }; 574 575 let did_obj = match Did::new(&did) { 576 Ok(d) => d, 577 Err(_) => { 578 return ( 579 StatusCode::INTERNAL_SERVER_ERROR, 580 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 581 ) 582 .into_response(); 583 } 584 }; 585 586 let rev = Tid::now(LimitedU32::MIN); 587 588 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid)); 589 590 let new_commit_bytes = match new_commit.to_cbor() { 591 Ok(b) => b, 592 Err(e) => { 593 error!("Failed to serialize new commit: {:?}", e); 594 return ( 595 StatusCode::INTERNAL_SERVER_ERROR, 596 Json( 597 json!({"error": "InternalError", "message": "Failed to serialize new commit"}), 598 ), 599 ) 600 .into_response(); 601 } 602 }; 603 604 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 605 Ok(c) => c, 606 Err(e) => { 607 error!("Failed to save new commit: {:?}", e); 608 return ( 609 StatusCode::INTERNAL_SERVER_ERROR, 610 Json(json!({"error": "InternalError", "message": "Failed to save new commit"})), 611 ) 612 .into_response(); 613 } 614 }; 615 616 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 617 .bind(new_root_cid.to_string()) 618 .bind(user_id) 619 .execute(&state.db) 620 .await; 621 622 if let Err(e) = update_repo { 623 error!("Failed to update repo root in DB: {:?}", e); 624 return ( 625 StatusCode::INTERNAL_SERVER_ERROR, 626 Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"})), 627 ) 628 .into_response(); 629 } 630 631 let record_insert = sqlx::query( 632 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 633 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 634 ) 635 .bind(user_id) 636 .bind(&input.collection) 637 .bind(&rkey) 638 .bind(record_cid.to_string()) 639 .execute(&state.db) 640 .await; 641 642 if let Err(e) = record_insert { 643 error!("Error inserting record index: {:?}", e); 644 return ( 645 StatusCode::INTERNAL_SERVER_ERROR, 646 Json(json!({"error": "InternalError", "message": "Failed to index record"})), 647 ) 648 .into_response(); 649 } 650 651 let output = PutRecordOutput { 652 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 653 cid: record_cid.to_string(), 654 }; 655 (StatusCode::OK, Json(output)).into_response() 656}