this repo has no description
1use axum::{ 2 extract::{State, Query}, 3 Json, 4 response::{IntoResponse, Response}, 5 http::StatusCode, 6}; 7use serde::{Deserialize, Serialize}; 8use serde_json::json; 9use crate::state::AppState; 10use chrono::Utc; 11use sqlx::Row; 12use cid::Cid; 13use std::str::FromStr; 14use jacquard_repo::{mst::Mst, commit::Commit, storage::BlockStore}; 15use jacquard::types::{string::{Nsid, Tid}, did::Did, integer::LimitedU32}; 16use tracing::error; 17use std::sync::Arc; 18use sha2::{Sha256, Digest}; 19use multihash::Multihash; 20use axum::body::Bytes; 21 22#[derive(Deserialize)] 23#[allow(dead_code)] 24pub struct CreateRecordInput { 25 pub repo: String, 26 pub collection: String, 27 pub rkey: Option<String>, 28 pub validate: Option<bool>, 29 pub record: serde_json::Value, 30 #[serde(rename = "swapCommit")] 31 pub swap_commit: Option<String>, 32} 33 34#[derive(Serialize)] 35#[serde(rename_all = "camelCase")] 36pub struct CreateRecordOutput { 37 pub uri: String, 38 pub cid: String, 39} 40 41pub async fn create_record( 42 State(state): State<AppState>, 43 headers: axum::http::HeaderMap, 44 Json(input): Json<CreateRecordInput>, 45) -> Response { 46 let auth_header = headers.get("Authorization"); 47 if auth_header.is_none() { 48 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response(); 49 } 50 let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", ""); 51 52 let session = sqlx::query( 53 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 54 ) 55 .bind(&token) 56 .fetch_optional(&state.db) 57 .await 58 .unwrap_or(None); 59 60 let (did, key_bytes) = match session { 61 Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")), 62 None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(), 63 }; 64 65 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 66 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response(); 67 } 68 69 if input.repo != did { 70 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 71 } 72 73 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 74 .bind(&did) 75 .fetch_optional(&state.db) 76 .await; 77 78 let user_id: uuid::Uuid = match user_query { 79 Ok(Some(row)) => row.get("id"), 80 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "User not found"}))).into_response(), 81 }; 82 83 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 84 .bind(user_id) 85 .fetch_optional(&state.db) 86 .await; 87 88 let current_root_cid = match repo_root_query { 89 Ok(Some(row)) => { 90 let cid_str: String = row.get("repo_root_cid"); 91 Cid::from_str(&cid_str).ok() 92 }, 93 _ => None, 94 }; 95 96 if current_root_cid.is_none() { 97 error!("Repo root not found for user {}", did); 98 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Repo root not found"}))).into_response(); 99 } 100 let current_root_cid = current_root_cid.unwrap(); 101 102 let commit_bytes = match state.block_store.get(&current_root_cid).await { 103 Ok(Some(b)) => b, 104 Ok(None) => { 105 error!("Commit block not found: {}", current_root_cid); 106 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 107 }, 108 Err(e) => { 109 error!("Failed to load commit block: {:?}", e); 110 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 111 } 112 }; 113 114 let commit = match Commit::from_cbor(&commit_bytes) { 115 Ok(c) => c, 116 Err(e) => { 117 error!("Failed to parse commit: {:?}", e); 118 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 119 } 120 }; 121 122 let mst_root = commit.data; 123 let store = Arc::new(state.block_store.clone()); 124 let mst = Mst::load(store.clone(), mst_root, None); 125 126 let collection_nsid = match input.collection.parse::<Nsid>() { 127 Ok(n) => n, 128 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 129 }; 130 131 let rkey = input.rkey.unwrap_or_else(|| { 132 Utc::now().format("%Y%m%d%H%M%S%f").to_string() 133 }); 134 135 let mut record_bytes = Vec::new(); 136 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 137 error!("Error serializing record: {:?}", e); 138 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 139 } 140 141 let record_cid = match state.block_store.put(&record_bytes).await { 142 Ok(c) => c, 143 Err(e) => { 144 error!("Failed to save record block: {:?}", e); 145 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 146 } 147 }; 148 149 let key = format!("{}/{}", collection_nsid, rkey); 150 if let Err(e) = mst.update(&key, record_cid).await { 151 error!("Failed to update MST: {:?}", e); 152 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 153 } 154 155 let new_mst_root = match mst.root().await { 156 Ok(c) => c, 157 Err(e) => { 158 error!("Failed to get new MST root: {:?}", e); 159 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 160 } 161 }; 162 163 let did_obj = match Did::new(&did) { 164 Ok(d) => d, 165 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(), 166 }; 167 168 let rev = Tid::now(LimitedU32::MIN); 169 170 let new_commit = Commit::new_unsigned( 171 did_obj, 172 new_mst_root, 173 rev, 174 Some(current_root_cid) 175 ); 176 177 let new_commit_bytes = match new_commit.to_cbor() { 178 Ok(b) => b, 179 Err(e) => { 180 error!("Failed to serialize new commit: {:?}", e); 181 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 182 } 183 }; 184 185 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 186 Ok(c) => c, 187 Err(e) => { 188 error!("Failed to save new commit: {:?}", e); 189 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 190 } 191 }; 192 193 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 194 .bind(new_root_cid.to_string()) 195 .bind(user_id) 196 .execute(&state.db) 197 .await; 198 199 if let Err(e) = update_repo { 200 error!("Failed to update repo root in DB: {:?}", e); 201 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 202 } 203 204 let record_insert = sqlx::query( 205 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 206 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()" 207 ) 208 .bind(user_id) 209 .bind(&input.collection) 210 .bind(&rkey) 211 .bind(record_cid.to_string()) 212 .execute(&state.db) 213 .await; 214 215 if let Err(e) = record_insert { 216 error!("Error inserting record index: {:?}", e); 217 } 218 219 let output = CreateRecordOutput { 220 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 221 cid: record_cid.to_string(), 222 }; 223 (StatusCode::OK, Json(output)).into_response() 224} 225 226#[derive(Deserialize)] 227#[allow(dead_code)] 228pub struct PutRecordInput { 229 pub repo: String, 230 pub collection: String, 231 pub rkey: String, 232 pub validate: Option<bool>, 233 pub record: serde_json::Value, 234 #[serde(rename = "swapCommit")] 235 pub swap_commit: Option<String>, 236} 237 238#[derive(Serialize)] 239#[serde(rename_all = "camelCase")] 240pub struct PutRecordOutput { 241 pub uri: String, 242 pub cid: String, 243} 244 245pub async fn put_record( 246 State(state): State<AppState>, 247 headers: axum::http::HeaderMap, 248 Json(input): Json<PutRecordInput>, 249) -> Response { 250 let auth_header = headers.get("Authorization"); 251 if auth_header.is_none() { 252 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response(); 253 } 254 let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", ""); 255 256 let session = sqlx::query( 257 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 258 ) 259 .bind(&token) 260 .fetch_optional(&state.db) 261 .await 262 .unwrap_or(None); 263 264 let (did, key_bytes) = match session { 265 Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")), 266 None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(), 267 }; 268 269 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 270 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response(); 271 } 272 273 if input.repo != did { 274 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 275 } 276 277 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 278 .bind(&did) 279 .fetch_optional(&state.db) 280 .await; 281 282 let user_id: uuid::Uuid = match user_query { 283 Ok(Some(row)) => row.get("id"), 284 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "User not found"}))).into_response(), 285 }; 286 287 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 288 .bind(user_id) 289 .fetch_optional(&state.db) 290 .await; 291 292 let current_root_cid = match repo_root_query { 293 Ok(Some(row)) => { 294 let cid_str: String = row.get("repo_root_cid"); 295 Cid::from_str(&cid_str).ok() 296 }, 297 _ => None, 298 }; 299 300 if current_root_cid.is_none() { 301 error!("Repo root not found for user {}", did); 302 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Repo root not found"}))).into_response(); 303 } 304 let current_root_cid = current_root_cid.unwrap(); 305 306 let commit_bytes = match state.block_store.get(&current_root_cid).await { 307 Ok(Some(b)) => b, 308 Ok(None) => { 309 error!("Commit block not found: {}", current_root_cid); 310 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(); 311 }, 312 Err(e) => { 313 error!("Failed to load commit block: {:?}", e); 314 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to load commit block"}))).into_response(); 315 } 316 }; 317 318 let commit = match Commit::from_cbor(&commit_bytes) { 319 Ok(c) => c, 320 Err(e) => { 321 error!("Failed to parse commit: {:?}", e); 322 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(); 323 } 324 }; 325 326 let mst_root = commit.data; 327 let store = Arc::new(state.block_store.clone()); 328 let mst = Mst::load(store.clone(), mst_root, None); 329 330 let collection_nsid = match input.collection.parse::<Nsid>() { 331 Ok(n) => n, 332 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 333 }; 334 335 let rkey = input.rkey.clone(); 336 337 let mut record_bytes = Vec::new(); 338 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 339 error!("Error serializing record: {:?}", e); 340 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 341 } 342 343 let record_cid = match state.block_store.put(&record_bytes).await { 344 Ok(c) => c, 345 Err(e) => { 346 error!("Failed to save record block: {:?}", e); 347 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(); 348 } 349 }; 350 351 let key = format!("{}/{}", collection_nsid, rkey); 352 if let Err(e) = mst.update(&key, record_cid).await { 353 error!("Failed to update MST: {:?}", e); 354 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response(); 355 } 356 357 let new_mst_root = match mst.root().await { 358 Ok(c) => c, 359 Err(e) => { 360 error!("Failed to get new MST root: {:?}", e); 361 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST root"}))).into_response(); 362 } 363 }; 364 365 let did_obj = match Did::new(&did) { 366 Ok(d) => d, 367 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(), 368 }; 369 370 let rev = Tid::now(LimitedU32::MIN); 371 372 let new_commit = Commit::new_unsigned( 373 did_obj, 374 new_mst_root, 375 rev, 376 Some(current_root_cid) 377 ); 378 379 let new_commit_bytes = match new_commit.to_cbor() { 380 Ok(b) => b, 381 Err(e) => { 382 error!("Failed to serialize new commit: {:?}", e); 383 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to serialize new commit"}))).into_response(); 384 } 385 }; 386 387 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 388 Ok(c) => c, 389 Err(e) => { 390 error!("Failed to save new commit: {:?}", e); 391 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save new commit"}))).into_response(); 392 } 393 }; 394 395 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 396 .bind(new_root_cid.to_string()) 397 .bind(user_id) 398 .execute(&state.db) 399 .await; 400 401 if let Err(e) = update_repo { 402 error!("Failed to update repo root in DB: {:?}", e); 403 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"}))).into_response(); 404 } 405 406 let record_insert = sqlx::query( 407 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 408 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()" 409 ) 410 .bind(user_id) 411 .bind(&input.collection) 412 .bind(&rkey) 413 .bind(record_cid.to_string()) 414 .execute(&state.db) 415 .await; 416 417 if let Err(e) = record_insert { 418 error!("Error inserting record index: {:?}", e); 419 } 420 421 let output = PutRecordOutput { 422 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 423 cid: record_cid.to_string(), 424 }; 425 (StatusCode::OK, Json(output)).into_response() 426} 427 428#[derive(Deserialize)] 429pub struct GetRecordInput { 430 pub repo: String, 431 pub collection: String, 432 pub rkey: String, 433 pub cid: Option<String>, 434} 435 436pub async fn get_record( 437 State(state): State<AppState>, 438 Query(input): Query<GetRecordInput>, 439) -> Response { 440 let user_row = if input.repo.starts_with("did:") { 441 sqlx::query("SELECT id FROM users WHERE did = $1") 442 .bind(&input.repo) 443 .fetch_optional(&state.db) 444 .await 445 } else { 446 sqlx::query("SELECT id FROM users WHERE handle = $1") 447 .bind(&input.repo) 448 .fetch_optional(&state.db) 449 .await 450 }; 451 452 let user_id: uuid::Uuid = match user_row { 453 Ok(Some(row)) => row.get("id"), 454 _ => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Repo not found"}))).into_response(), 455 }; 456 457 let record_row = sqlx::query("SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3") 458 .bind(user_id) 459 .bind(&input.collection) 460 .bind(&input.rkey) 461 .fetch_optional(&state.db) 462 .await; 463 464 let record_cid_str: String = match record_row { 465 Ok(Some(row)) => row.get("record_cid"), 466 _ => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Record not found"}))).into_response(), 467 }; 468 469 if let Some(expected_cid) = &input.cid { 470 if &record_cid_str != expected_cid { 471 return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Record CID mismatch"}))).into_response(); 472 } 473 } 474 475 let cid = match Cid::from_str(&record_cid_str) { 476 Ok(c) => c, 477 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid CID in DB"}))).into_response(), 478 }; 479 480 let block = match state.block_store.get(&cid).await { 481 Ok(Some(b)) => b, 482 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Record block not found"}))).into_response(), 483 }; 484 485 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 486 Ok(v) => v, 487 Err(e) => { 488 error!("Failed to deserialize record: {:?}", e); 489 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 490 } 491 }; 492 493 Json(json!({ 494 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 495 "cid": record_cid_str, 496 "value": value 497 })).into_response() 498} 499 500#[derive(Deserialize)] 501pub struct DeleteRecordInput { 502 pub repo: String, 503 pub collection: String, 504 pub rkey: String, 505 #[serde(rename = "swapRecord")] 506 pub swap_record: Option<String>, 507 #[serde(rename = "swapCommit")] 508 pub swap_commit: Option<String>, 509} 510 511pub async fn delete_record( 512 State(state): State<AppState>, 513 headers: axum::http::HeaderMap, 514 Json(input): Json<DeleteRecordInput>, 515) -> Response { 516 let auth_header = headers.get("Authorization"); 517 if auth_header.is_none() { 518 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response(); 519 } 520 let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", ""); 521 522 let session = sqlx::query( 523 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 524 ) 525 .bind(&token) 526 .fetch_optional(&state.db) 527 .await 528 .unwrap_or(None); 529 530 let (did, key_bytes) = match session { 531 Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")), 532 None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(), 533 }; 534 535 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 536 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response(); 537 } 538 539 if input.repo != did { 540 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 541 } 542 543 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 544 .bind(&did) 545 .fetch_optional(&state.db) 546 .await; 547 548 let user_id: uuid::Uuid = match user_query { 549 Ok(Some(row)) => row.get("id"), 550 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "User not found"}))).into_response(), 551 }; 552 553 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 554 .bind(user_id) 555 .fetch_optional(&state.db) 556 .await; 557 558 let current_root_cid = match repo_root_query { 559 Ok(Some(row)) => { 560 let cid_str: String = row.get("repo_root_cid"); 561 Cid::from_str(&cid_str).ok() 562 }, 563 _ => None, 564 }; 565 566 if current_root_cid.is_none() { 567 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Repo root not found"}))).into_response(); 568 } 569 let current_root_cid = current_root_cid.unwrap(); 570 571 let commit_bytes = match state.block_store.get(&current_root_cid).await { 572 Ok(Some(b)) => b, 573 Ok(None) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 574 Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to load commit block: {:?}", e)}))).into_response(), 575 }; 576 577 let commit = match Commit::from_cbor(&commit_bytes) { 578 Ok(c) => c, 579 Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to parse commit: {:?}", e)}))).into_response(), 580 }; 581 582 let mst_root = commit.data; 583 let store = Arc::new(state.block_store.clone()); 584 let mst = Mst::load(store.clone(), mst_root, None); 585 586 let collection_nsid = match input.collection.parse::<Nsid>() { 587 Ok(n) => n, 588 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 589 }; 590 591 let key = format!("{}/{}", collection_nsid, input.rkey); 592 593 // TODO: Check swapRecord if provided? Skipping for brevity/robustness 594 595 if let Err(e) = mst.delete(&key).await { 596 error!("Failed to delete from MST: {:?}", e); 597 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to delete from MST: {:?}", e)}))).into_response(); 598 } 599 600 let new_mst_root = match mst.root().await { 601 Ok(c) => c, 602 Err(_e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST root"}))).into_response(), 603 }; 604 605 let did_obj = match Did::new(&did) { 606 Ok(d) => d, 607 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(), 608 }; 609 610 let rev = Tid::now(LimitedU32::MIN); 611 612 let new_commit = Commit::new_unsigned( 613 did_obj, 614 new_mst_root, 615 rev, 616 Some(current_root_cid) 617 ); 618 619 let new_commit_bytes = match new_commit.to_cbor() { 620 Ok(b) => b, 621 Err(_e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to serialize new commit"}))).into_response(), 622 }; 623 624 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 625 Ok(c) => c, 626 Err(_e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save new commit"}))).into_response(), 627 }; 628 629 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 630 .bind(new_root_cid.to_string()) 631 .bind(user_id) 632 .execute(&state.db) 633 .await; 634 635 if let Err(e) = update_repo { 636 error!("Failed to update repo root in DB: {:?}", e); 637 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"}))).into_response(); 638 } 639 640 let record_delete = sqlx::query("DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3") 641 .bind(user_id) 642 .bind(&input.collection) 643 .bind(&input.rkey) 644 .execute(&state.db) 645 .await; 646 647 if let Err(e) = record_delete { 648 error!("Error deleting record index: {:?}", e); 649 } 650 651 (StatusCode::OK, Json(json!({}))).into_response() 652} 653 654#[derive(Deserialize)] 655pub struct ListRecordsInput { 656 pub repo: String, 657 pub collection: String, 658 pub limit: Option<i32>, 659 pub cursor: Option<String>, 660 #[serde(rename = "rkeyStart")] 661 pub rkey_start: Option<String>, 662 #[serde(rename = "rkeyEnd")] 663 pub rkey_end: Option<String>, 664 pub reverse: Option<bool>, 665} 666 667#[derive(Serialize)] 668pub struct ListRecordsOutput { 669 pub cursor: Option<String>, 670 pub records: Vec<serde_json::Value>, 671} 672 673pub async fn list_records( 674 State(state): State<AppState>, 675 Query(input): Query<ListRecordsInput>, 676) -> Response { 677 let user_row = if input.repo.starts_with("did:") { 678 sqlx::query("SELECT id FROM users WHERE did = $1") 679 .bind(&input.repo) 680 .fetch_optional(&state.db) 681 .await 682 } else { 683 sqlx::query("SELECT id FROM users WHERE handle = $1") 684 .bind(&input.repo) 685 .fetch_optional(&state.db) 686 .await 687 }; 688 689 let user_id: uuid::Uuid = match user_row { 690 Ok(Some(row)) => row.get("id"), 691 _ => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Repo not found"}))).into_response(), 692 }; 693 694 let limit = input.limit.unwrap_or(50).clamp(1, 100); 695 let reverse = input.reverse.unwrap_or(false); 696 697 // Simplistic query construction - no sophisticated cursor handling or rkey ranges for now, just basic pagination 698 // TODO: Implement rkeyStart/End and correct cursor logic 699 700 let query_str = format!( 701 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 {} ORDER BY rkey {} LIMIT {}", 702 if let Some(_c) = &input.cursor { 703 if reverse { "AND rkey < $3" } else { "AND rkey > $3" } 704 } else { 705 "" 706 }, 707 if reverse { "DESC" } else { "ASC" }, 708 limit 709 ); 710 711 let mut query = sqlx::query(&query_str) 712 .bind(user_id) 713 .bind(&input.collection); 714 715 if let Some(c) = &input.cursor { 716 query = query.bind(c); 717 } 718 719 let rows = match query.fetch_all(&state.db).await { 720 Ok(r) => r, 721 Err(e) => { 722 error!("Error listing records: {:?}", e); 723 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 724 } 725 }; 726 727 let mut records = Vec::new(); 728 let mut last_rkey = None; 729 730 for row in rows { 731 let rkey: String = row.get("rkey"); 732 let cid_str: String = row.get("record_cid"); 733 last_rkey = Some(rkey.clone()); 734 735 if let Ok(cid) = Cid::from_str(&cid_str) { 736 if let Ok(Some(block)) = state.block_store.get(&cid).await { 737 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 738 records.push(json!({ 739 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 740 "cid": cid_str, 741 "value": value 742 })); 743 } 744 } 745 } 746 } 747 748 Json(ListRecordsOutput { 749 cursor: last_rkey, 750 records, 751 }).into_response() 752} 753 754#[derive(Deserialize)] 755pub struct DescribeRepoInput { 756 pub repo: String, 757} 758 759pub async fn describe_repo( 760 State(state): State<AppState>, 761 Query(input): Query<DescribeRepoInput>, 762) -> Response { 763 let user_row = if input.repo.starts_with("did:") { 764 sqlx::query("SELECT id, handle, did FROM users WHERE did = $1") 765 .bind(&input.repo) 766 .fetch_optional(&state.db) 767 .await 768 } else { 769 sqlx::query("SELECT id, handle, did FROM users WHERE handle = $1") 770 .bind(&input.repo) 771 .fetch_optional(&state.db) 772 .await 773 }; 774 775 let (user_id, handle, did) = match user_row { 776 Ok(Some(row)) => (row.get::<uuid::Uuid, _>("id"), row.get::<String, _>("handle"), row.get::<String, _>("did")), 777 _ => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Repo not found"}))).into_response(), 778 }; 779 780 let collections_query = sqlx::query("SELECT DISTINCT collection FROM records WHERE repo_id = $1") 781 .bind(user_id) 782 .fetch_all(&state.db) 783 .await; 784 785 let collections: Vec<String> = match collections_query { 786 Ok(rows) => rows.iter().map(|r| r.get("collection")).collect(), 787 Err(_) => Vec::new(), 788 }; 789 790 let did_doc = json!({ 791 "id": did, 792 "alsoKnownAs": [format!("at://{}", handle)] 793 }); 794 795 Json(json!({ 796 "handle": handle, 797 "did": did, 798 "didDoc": did_doc, 799 "collections": collections, 800 "handleIsCorrect": true 801 })).into_response() 802} 803 804pub async fn upload_blob( 805 State(state): State<AppState>, 806 headers: axum::http::HeaderMap, 807 body: Bytes, 808) -> Response { 809 let auth_header = headers.get("Authorization"); 810 if auth_header.is_none() { 811 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response(); 812 } 813 let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", ""); 814 815 let session = sqlx::query( 816 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 817 ) 818 .bind(&token) 819 .fetch_optional(&state.db) 820 .await 821 .unwrap_or(None); 822 823 let (did, key_bytes) = match session { 824 Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")), 825 None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(), 826 }; 827 828 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 829 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response(); 830 } 831 832 let mime_type = headers.get("content-type") 833 .and_then(|h| h.to_str().ok()) 834 .unwrap_or("application/octet-stream") 835 .to_string(); 836 837 let size = body.len() as i64; 838 let data = body.to_vec(); 839 840 let mut hasher = Sha256::new(); 841 hasher.update(&data); 842 let hash = hasher.finalize(); 843 let multihash = Multihash::wrap(0x12, &hash).unwrap(); 844 let cid = Cid::new_v1(0x55, multihash); 845 let cid_str = cid.to_string(); 846 847 let storage_key = format!("blobs/{}", cid_str); 848 849 if let Err(e) = state.blob_store.put(&storage_key, &data).await { 850 error!("Failed to upload blob to storage: {:?}", e); 851 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to store blob"}))).into_response(); 852 } 853 854 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 855 .bind(&did) 856 .fetch_optional(&state.db) 857 .await; 858 859 let user_id: uuid::Uuid = match user_query { 860 Ok(Some(row)) => row.get("id"), 861 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(), 862 }; 863 864 let insert = sqlx::query( 865 "INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING" 866 ) 867 .bind(&cid_str) 868 .bind(&mime_type) 869 .bind(size) 870 .bind(user_id) 871 .bind(&storage_key) 872 .execute(&state.db) 873 .await; 874 875 if let Err(e) = insert { 876 error!("Failed to insert blob record: {:?}", e); 877 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 878 } 879 880 Json(json!({ 881 "blob": { 882 "ref": { 883 "$link": cid_str 884 }, 885 "mimeType": mime_type, 886 "size": size 887 } 888 })).into_response() 889}