this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::State, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use chrono::Utc; 9use cid::Cid; 10use jacquard::types::{ 11 did::Did, 12 integer::LimitedU32, 13 string::{Nsid, Tid}, 14}; 15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use sqlx::Row; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::error; 22 23#[derive(Deserialize)] 24#[allow(dead_code)] 25pub struct CreateRecordInput { 26 pub repo: String, 27 pub collection: String, 28 pub rkey: Option<String>, 29 pub validate: Option<bool>, 30 pub record: serde_json::Value, 31 #[serde(rename = "swapCommit")] 32 pub swap_commit: Option<String>, 33} 34 35#[derive(Serialize)] 36#[serde(rename_all = "camelCase")] 37pub struct CreateRecordOutput { 38 pub uri: String, 39 pub cid: String, 40} 41 42pub async fn create_record( 43 State(state): State<AppState>, 44 headers: axum::http::HeaderMap, 45 Json(input): Json<CreateRecordInput>, 46) -> Response { 47 let auth_header = headers.get("Authorization"); 48 if auth_header.is_none() { 49 return ( 50 StatusCode::UNAUTHORIZED, 51 Json(json!({"error": "AuthenticationRequired"})), 52 ) 53 .into_response(); 54 } 55 let token = auth_header 56 .unwrap() 57 .to_str() 58 .unwrap_or("") 59 .replace("Bearer ", ""); 60 61 let session = sqlx::query( 62 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 63 ) 64 .bind(&token) 65 .fetch_optional(&state.db) 66 .await 67 .unwrap_or(None); 68 69 let (did, key_bytes) = match session { 70 Some(row) => ( 71 row.get::<String, _>("did"), 72 row.get::<Vec<u8>, _>("key_bytes"), 73 ), 74 None => { 75 return ( 76 StatusCode::UNAUTHORIZED, 77 Json(json!({"error": "AuthenticationFailed"})), 78 ) 79 .into_response(); 80 } 81 }; 82 83 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 84 return ( 85 StatusCode::UNAUTHORIZED, 86 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 87 ) 88 .into_response(); 89 } 90 91 if input.repo != did { 92 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 93 } 94 95 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 96 .bind(&did) 97 .fetch_optional(&state.db) 98 .await; 99 100 let user_id: uuid::Uuid = match user_query { 101 Ok(Some(row)) => row.get("id"), 102 _ => { 103 return ( 104 StatusCode::INTERNAL_SERVER_ERROR, 105 Json(json!({"error": "InternalError", "message": "User not found"})), 106 ) 107 .into_response(); 108 } 109 }; 110 111 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 112 .bind(user_id) 113 .fetch_optional(&state.db) 114 .await; 115 116 let current_root_cid = match repo_root_query { 117 Ok(Some(row)) => { 118 let cid_str: String = row.get("repo_root_cid"); 119 Cid::from_str(&cid_str).ok() 120 } 121 _ => None, 122 }; 123 124 if current_root_cid.is_none() { 125 error!("Repo root not found for user {}", did); 126 return ( 127 StatusCode::INTERNAL_SERVER_ERROR, 128 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 129 ) 130 .into_response(); 131 } 132 let current_root_cid = current_root_cid.unwrap(); 133 134 let commit_bytes = match state.block_store.get(&current_root_cid).await { 135 Ok(Some(b)) => b, 136 Ok(None) => { 137 error!("Commit block not found: {}", current_root_cid); 138 return ( 139 StatusCode::INTERNAL_SERVER_ERROR, 140 Json(json!({"error": "InternalError"})), 141 ) 142 .into_response(); 143 } 144 Err(e) => { 145 error!("Failed to load commit block: {:?}", e); 146 return ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json(json!({"error": "InternalError"})), 149 ) 150 .into_response(); 151 } 152 }; 153 154 let commit = match Commit::from_cbor(&commit_bytes) { 155 Ok(c) => c, 156 Err(e) => { 157 error!("Failed to parse commit: {:?}", e); 158 return ( 159 StatusCode::INTERNAL_SERVER_ERROR, 160 Json(json!({"error": "InternalError"})), 161 ) 162 .into_response(); 163 } 164 }; 165 166 let mst_root = commit.data; 167 let store = Arc::new(state.block_store.clone()); 168 let mst = Mst::load(store.clone(), mst_root, None); 169 170 let collection_nsid = match input.collection.parse::<Nsid>() { 171 Ok(n) => n, 172 Err(_) => { 173 return ( 174 StatusCode::BAD_REQUEST, 175 Json(json!({"error": "InvalidCollection"})), 176 ) 177 .into_response(); 178 } 179 }; 180 181 let rkey = input 182 .rkey 183 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 184 185 let mut record_bytes = Vec::new(); 186 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 187 error!("Error serializing record: {:?}", e); 188 return ( 189 StatusCode::BAD_REQUEST, 190 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 191 ) 192 .into_response(); 193 } 194 195 let record_cid = match state.block_store.put(&record_bytes).await { 196 Ok(c) => c, 197 Err(e) => { 198 error!("Failed to save record block: {:?}", e); 199 return ( 200 StatusCode::INTERNAL_SERVER_ERROR, 201 Json(json!({"error": "InternalError"})), 202 ) 203 .into_response(); 204 } 205 }; 206 207 let key = format!("{}/{}", collection_nsid, rkey); 208 if let Err(e) = mst.update(&key, record_cid).await { 209 error!("Failed to update MST: {:?}", e); 210 return ( 211 StatusCode::INTERNAL_SERVER_ERROR, 212 Json(json!({"error": "InternalError"})), 213 ) 214 .into_response(); 215 } 216 217 let new_mst_root = match mst.root().await { 218 Ok(c) => c, 219 Err(e) => { 220 error!("Failed to get new MST root: {:?}", e); 221 return ( 222 StatusCode::INTERNAL_SERVER_ERROR, 223 Json(json!({"error": "InternalError"})), 224 ) 225 .into_response(); 226 } 227 }; 228 229 let did_obj = match Did::new(&did) { 230 Ok(d) => d, 231 Err(_) => { 232 return ( 233 StatusCode::INTERNAL_SERVER_ERROR, 234 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 235 ) 236 .into_response(); 237 } 238 }; 239 240 let rev = Tid::now(LimitedU32::MIN); 241 242 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid)); 243 244 let new_commit_bytes = match new_commit.to_cbor() { 245 Ok(b) => b, 246 Err(e) => { 247 error!("Failed to serialize new commit: {:?}", e); 248 return ( 249 StatusCode::INTERNAL_SERVER_ERROR, 250 Json(json!({"error": "InternalError"})), 251 ) 252 .into_response(); 253 } 254 }; 255 256 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 257 Ok(c) => c, 258 Err(e) => { 259 error!("Failed to save new commit: {:?}", e); 260 return ( 261 StatusCode::INTERNAL_SERVER_ERROR, 262 Json(json!({"error": "InternalError"})), 263 ) 264 .into_response(); 265 } 266 }; 267 268 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 269 .bind(new_root_cid.to_string()) 270 .bind(user_id) 271 .execute(&state.db) 272 .await; 273 274 if let Err(e) = update_repo { 275 error!("Failed to update repo root in DB: {:?}", e); 276 return ( 277 StatusCode::INTERNAL_SERVER_ERROR, 278 Json(json!({"error": "InternalError"})), 279 ) 280 .into_response(); 281 } 282 283 let record_insert = sqlx::query( 284 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 285 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 286 ) 287 .bind(user_id) 288 .bind(&input.collection) 289 .bind(&rkey) 290 .bind(record_cid.to_string()) 291 .execute(&state.db) 292 .await; 293 294 if let Err(e) = record_insert { 295 error!("Error inserting record index: {:?}", e); 296 return ( 297 StatusCode::INTERNAL_SERVER_ERROR, 298 Json(json!({"error": "InternalError", "message": "Failed to index record"})), 299 ) 300 .into_response(); 301 } 302 303 let output = CreateRecordOutput { 304 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 305 cid: record_cid.to_string(), 306 }; 307 (StatusCode::OK, Json(output)).into_response() 308} 309 310#[derive(Deserialize)] 311#[allow(dead_code)] 312pub struct PutRecordInput { 313 pub repo: String, 314 pub collection: String, 315 pub rkey: String, 316 pub validate: Option<bool>, 317 pub record: serde_json::Value, 318 #[serde(rename = "swapCommit")] 319 pub swap_commit: Option<String>, 320} 321 322#[derive(Serialize)] 323#[serde(rename_all = "camelCase")] 324pub struct PutRecordOutput { 325 pub uri: String, 326 pub cid: String, 327} 328 329pub async fn put_record( 330 State(state): State<AppState>, 331 headers: axum::http::HeaderMap, 332 Json(input): Json<PutRecordInput>, 333) -> Response { 334 let auth_header = headers.get("Authorization"); 335 if auth_header.is_none() { 336 return ( 337 StatusCode::UNAUTHORIZED, 338 Json(json!({"error": "AuthenticationRequired"})), 339 ) 340 .into_response(); 341 } 342 let token = auth_header 343 .unwrap() 344 .to_str() 345 .unwrap_or("") 346 .replace("Bearer ", ""); 347 348 let session = sqlx::query( 349 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 350 ) 351 .bind(&token) 352 .fetch_optional(&state.db) 353 .await 354 .unwrap_or(None); 355 356 let (did, key_bytes) = match session { 357 Some(row) => ( 358 row.get::<String, _>("did"), 359 row.get::<Vec<u8>, _>("key_bytes"), 360 ), 361 None => { 362 return ( 363 StatusCode::UNAUTHORIZED, 364 Json(json!({"error": "AuthenticationFailed"})), 365 ) 366 .into_response(); 367 } 368 }; 369 370 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 371 return ( 372 StatusCode::UNAUTHORIZED, 373 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 374 ) 375 .into_response(); 376 } 377 378 if input.repo != did { 379 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 380 } 381 382 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 383 .bind(&did) 384 .fetch_optional(&state.db) 385 .await; 386 387 let user_id: uuid::Uuid = match user_query { 388 Ok(Some(row)) => row.get("id"), 389 _ => { 390 return ( 391 StatusCode::INTERNAL_SERVER_ERROR, 392 Json(json!({"error": "InternalError", "message": "User not found"})), 393 ) 394 .into_response(); 395 } 396 }; 397 398 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 399 .bind(user_id) 400 .fetch_optional(&state.db) 401 .await; 402 403 let current_root_cid = match repo_root_query { 404 Ok(Some(row)) => { 405 let cid_str: String = row.get("repo_root_cid"); 406 Cid::from_str(&cid_str).ok() 407 } 408 _ => None, 409 }; 410 411 if current_root_cid.is_none() { 412 error!("Repo root not found for user {}", did); 413 return ( 414 StatusCode::INTERNAL_SERVER_ERROR, 415 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 416 ) 417 .into_response(); 418 } 419 let current_root_cid = current_root_cid.unwrap(); 420 421 let commit_bytes = match state.block_store.get(&current_root_cid).await { 422 Ok(Some(b)) => b, 423 Ok(None) => { 424 error!("Commit block not found: {}", current_root_cid); 425 return ( 426 StatusCode::INTERNAL_SERVER_ERROR, 427 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 428 ) 429 .into_response(); 430 } 431 Err(e) => { 432 error!("Failed to load commit block: {:?}", e); 433 return ( 434 StatusCode::INTERNAL_SERVER_ERROR, 435 Json(json!({"error": "InternalError", "message": "Failed to load commit block"})), 436 ) 437 .into_response(); 438 } 439 }; 440 441 let commit = match Commit::from_cbor(&commit_bytes) { 442 Ok(c) => c, 443 Err(e) => { 444 error!("Failed to parse commit: {:?}", e); 445 return ( 446 StatusCode::INTERNAL_SERVER_ERROR, 447 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 448 ) 449 .into_response(); 450 } 451 }; 452 453 let mst_root = commit.data; 454 let store = Arc::new(state.block_store.clone()); 455 let mst = Mst::load(store.clone(), mst_root, None); 456 457 let collection_nsid = match input.collection.parse::<Nsid>() { 458 Ok(n) => n, 459 Err(_) => { 460 return ( 461 StatusCode::BAD_REQUEST, 462 Json(json!({"error": "InvalidCollection"})), 463 ) 464 .into_response(); 465 } 466 }; 467 468 let rkey = input.rkey.clone(); 469 470 let mut record_bytes = Vec::new(); 471 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 472 error!("Error serializing record: {:?}", e); 473 return ( 474 StatusCode::BAD_REQUEST, 475 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 476 ) 477 .into_response(); 478 } 479 480 let record_cid = match state.block_store.put(&record_bytes).await { 481 Ok(c) => c, 482 Err(e) => { 483 error!("Failed to save record block: {:?}", e); 484 return ( 485 StatusCode::INTERNAL_SERVER_ERROR, 486 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 487 ) 488 .into_response(); 489 } 490 }; 491 492 let key = format!("{}/{}", collection_nsid, rkey); 493 if let Err(e) = mst.update(&key, record_cid).await { 494 error!("Failed to update MST: {:?}", e); 495 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response(); 496 } 497 498 let new_mst_root = match mst.root().await { 499 Ok(c) => c, 500 Err(e) => { 501 error!("Failed to get new MST root: {:?}", e); 502 return ( 503 StatusCode::INTERNAL_SERVER_ERROR, 504 Json(json!({"error": "InternalError", "message": "Failed to get new MST root"})), 505 ) 506 .into_response(); 507 } 508 }; 509 510 let did_obj = match Did::new(&did) { 511 Ok(d) => d, 512 Err(_) => { 513 return ( 514 StatusCode::INTERNAL_SERVER_ERROR, 515 Json(json!({"error": "InternalError", "message": "Invalid DID"})), 516 ) 517 .into_response(); 518 } 519 }; 520 521 let rev = Tid::now(LimitedU32::MIN); 522 523 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid)); 524 525 let new_commit_bytes = match new_commit.to_cbor() { 526 Ok(b) => b, 527 Err(e) => { 528 error!("Failed to serialize new commit: {:?}", e); 529 return ( 530 StatusCode::INTERNAL_SERVER_ERROR, 531 Json( 532 json!({"error": "InternalError", "message": "Failed to serialize new commit"}), 533 ), 534 ) 535 .into_response(); 536 } 537 }; 538 539 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 540 Ok(c) => c, 541 Err(e) => { 542 error!("Failed to save new commit: {:?}", e); 543 return ( 544 StatusCode::INTERNAL_SERVER_ERROR, 545 Json(json!({"error": "InternalError", "message": "Failed to save new commit"})), 546 ) 547 .into_response(); 548 } 549 }; 550 551 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 552 .bind(new_root_cid.to_string()) 553 .bind(user_id) 554 .execute(&state.db) 555 .await; 556 557 if let Err(e) = update_repo { 558 error!("Failed to update repo root in DB: {:?}", e); 559 return ( 560 StatusCode::INTERNAL_SERVER_ERROR, 561 Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"})), 562 ) 563 .into_response(); 564 } 565 566 let record_insert = sqlx::query( 567 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 568 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 569 ) 570 .bind(user_id) 571 .bind(&input.collection) 572 .bind(&rkey) 573 .bind(record_cid.to_string()) 574 .execute(&state.db) 575 .await; 576 577 if let Err(e) = record_insert { 578 error!("Error inserting record index: {:?}", e); 579 return ( 580 StatusCode::INTERNAL_SERVER_ERROR, 581 Json(json!({"error": "InternalError", "message": "Failed to index record"})), 582 ) 583 .into_response(); 584 } 585 586 let output = PutRecordOutput { 587 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 588 cid: record_cid.to_string(), 589 }; 590 (StatusCode::OK, Json(output)).into_response() 591}