this repo has no description
1use super::validation::validate_record; 2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log}; 3use crate::delegation::{self, DelegationActionType}; 4use crate::repo::tracking::TrackingBlockStore; 5use crate::state::AppState; 6use axum::{ 7 Json, 8 extract::State, 9 http::{HeaderMap, StatusCode}, 10 response::{IntoResponse, Response}, 11}; 12use cid::Cid; 13use jacquard::types::{ 14 integer::LimitedU32, 15 string::{Nsid, Tid}, 16}; 17use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 18use serde::{Deserialize, Serialize}; 19use serde_json::json; 20use sqlx::{PgPool, Row}; 21use std::str::FromStr; 22use std::sync::Arc; 23use tracing::error; 24use uuid::Uuid; 25 26pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 27 let row = sqlx::query( 28 r#" 29 SELECT 30 email_verified, 31 discord_verified, 32 telegram_verified, 33 signal_verified 34 FROM users 35 WHERE did = $1 36 "#, 37 ) 38 .bind(did) 39 .fetch_optional(db) 40 .await?; 41 match row { 42 Some(r) => { 43 let email_verified: bool = r.get("email_verified"); 44 let discord_verified: bool = r.get("discord_verified"); 45 let telegram_verified: bool = r.get("telegram_verified"); 46 let signal_verified: bool = r.get("signal_verified"); 47 Ok(email_verified || discord_verified || telegram_verified || signal_verified) 48 } 49 None => Ok(false), 50 } 51} 52 53pub struct RepoWriteAuth { 54 pub did: String, 55 pub user_id: Uuid, 56 pub current_root_cid: Cid, 57 pub is_oauth: bool, 58 pub scope: Option<String>, 59 pub controller_did: Option<String>, 60} 61 62pub async fn prepare_repo_write( 63 state: &AppState, 64 headers: &HeaderMap, 65 repo_did: &str, 66 http_method: &str, 67 http_uri: &str, 68) -> Result<RepoWriteAuth, Response> { 69 let extracted = crate::auth::extract_auth_token_from_header( 70 headers.get("Authorization").and_then(|h| h.to_str().ok()), 71 ) 72 .ok_or_else(|| { 73 ( 74 StatusCode::UNAUTHORIZED, 75 Json(json!({"error": "AuthenticationRequired"})), 76 ) 77 .into_response() 78 })?; 79 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 80 let auth_user = crate::auth::validate_token_with_dpop( 81 &state.db, 82 &extracted.token, 83 extracted.is_dpop, 84 dpop_proof, 85 http_method, 86 http_uri, 87 false, 88 ) 89 .await 90 .map_err(|e| { 91 ( 92 StatusCode::UNAUTHORIZED, 93 Json(json!({"error": e.to_string()})), 94 ) 95 .into_response() 96 })?; 97 if repo_did != auth_user.did { 98 return Err(( 99 StatusCode::FORBIDDEN, 100 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 101 ) 102 .into_response()); 103 } 104 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did) 105 .await 106 .unwrap_or(false); 107 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did) 108 .await 109 .unwrap_or(false); 110 if !is_verified && !is_delegated { 111 return Err(( 112 StatusCode::FORBIDDEN, 113 Json(json!({ 114 "error": "AccountNotVerified", 115 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 116 })), 117 ) 118 .into_response()); 119 } 120 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 121 .fetch_optional(&state.db) 122 .await 123 .map_err(|e| { 124 error!("DB error fetching user: {}", e); 125 ( 126 StatusCode::INTERNAL_SERVER_ERROR, 127 Json(json!({"error": "InternalError"})), 128 ) 129 .into_response() 130 })? 131 .ok_or_else(|| { 132 ( 133 StatusCode::INTERNAL_SERVER_ERROR, 134 Json(json!({"error": "InternalError", "message": "User not found"})), 135 ) 136 .into_response() 137 })?; 138 let root_cid_str: String = sqlx::query_scalar!( 139 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 140 user_id 141 ) 142 .fetch_optional(&state.db) 143 .await 144 .map_err(|e| { 145 error!("DB error fetching repo root: {}", e); 146 ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json(json!({"error": "InternalError"})), 149 ) 150 .into_response() 151 })? 152 .ok_or_else(|| { 153 ( 154 StatusCode::INTERNAL_SERVER_ERROR, 155 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 156 ) 157 .into_response() 158 })?; 159 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 160 ( 161 StatusCode::INTERNAL_SERVER_ERROR, 162 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 163 ) 164 .into_response() 165 })?; 166 Ok(RepoWriteAuth { 167 did: auth_user.did, 168 user_id, 169 current_root_cid, 170 is_oauth: auth_user.is_oauth, 171 scope: auth_user.scope, 172 controller_did: auth_user.controller_did, 173 }) 174} 175#[derive(Deserialize)] 176#[allow(dead_code)] 177pub struct CreateRecordInput { 178 pub repo: String, 179 pub collection: String, 180 pub rkey: Option<String>, 181 pub validate: Option<bool>, 182 pub record: serde_json::Value, 183 #[serde(rename = "swapCommit")] 184 pub swap_commit: Option<String>, 185} 186#[derive(Serialize)] 187#[serde(rename_all = "camelCase")] 188pub struct CreateRecordOutput { 189 pub uri: String, 190 pub cid: String, 191} 192pub async fn create_record( 193 State(state): State<AppState>, 194 headers: HeaderMap, 195 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 196 Json(input): Json<CreateRecordInput>, 197) -> Response { 198 let auth = 199 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 200 Ok(res) => res, 201 Err(err_res) => return err_res, 202 }; 203 204 if let Err(e) = crate::auth::scope_check::check_repo_scope( 205 auth.is_oauth, 206 auth.scope.as_deref(), 207 crate::oauth::RepoAction::Create, 208 &input.collection, 209 ) { 210 return e; 211 } 212 213 let did = auth.did; 214 let user_id = auth.user_id; 215 let current_root_cid = auth.current_root_cid; 216 let controller_did = auth.controller_did; 217 218 if let Some(swap_commit) = &input.swap_commit 219 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 220 { 221 return ( 222 StatusCode::CONFLICT, 223 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 224 ) 225 .into_response(); 226 } 227 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 228 let commit_bytes = match tracking_store.get(&current_root_cid).await { 229 Ok(Some(b)) => b, 230 _ => { 231 return ( 232 StatusCode::INTERNAL_SERVER_ERROR, 233 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 234 ) 235 .into_response(); 236 } 237 }; 238 let commit = match Commit::from_cbor(&commit_bytes) { 239 Ok(c) => c, 240 _ => { 241 return ( 242 StatusCode::INTERNAL_SERVER_ERROR, 243 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 244 ) 245 .into_response(); 246 } 247 }; 248 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 249 let collection_nsid = match input.collection.parse::<Nsid>() { 250 Ok(n) => n, 251 Err(_) => { 252 return ( 253 StatusCode::BAD_REQUEST, 254 Json(json!({"error": "InvalidCollection"})), 255 ) 256 .into_response(); 257 } 258 }; 259 if input.validate.unwrap_or(true) 260 && let Err(err_response) = validate_record(&input.record, &input.collection) 261 { 262 return *err_response; 263 } 264 let rkey = input 265 .rkey 266 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 267 let mut record_bytes = Vec::new(); 268 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 269 return ( 270 StatusCode::BAD_REQUEST, 271 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 272 ) 273 .into_response(); 274 } 275 let record_cid = match tracking_store.put(&record_bytes).await { 276 Ok(c) => c, 277 _ => { 278 return ( 279 StatusCode::INTERNAL_SERVER_ERROR, 280 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 281 ) 282 .into_response(); 283 } 284 }; 285 let key = format!("{}/{}", collection_nsid, rkey); 286 let new_mst = match mst.add(&key, record_cid).await { 287 Ok(m) => m, 288 _ => { 289 return ( 290 StatusCode::INTERNAL_SERVER_ERROR, 291 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 292 ) 293 .into_response(); 294 } 295 }; 296 let new_mst_root = match new_mst.persist().await { 297 Ok(c) => c, 298 _ => { 299 return ( 300 StatusCode::INTERNAL_SERVER_ERROR, 301 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 302 ) 303 .into_response(); 304 } 305 }; 306 let op = RecordOp::Create { 307 collection: input.collection.clone(), 308 rkey: rkey.clone(), 309 cid: record_cid, 310 }; 311 let mut relevant_blocks = std::collections::BTreeMap::new(); 312 if new_mst 313 .blocks_for_path(&key, &mut relevant_blocks) 314 .await 315 .is_err() 316 { 317 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 318 } 319 if mst 320 .blocks_for_path(&key, &mut relevant_blocks) 321 .await 322 .is_err() 323 { 324 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 325 } 326 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 327 let mut written_cids = tracking_store.get_all_relevant_cids(); 328 for cid in relevant_blocks.keys() { 329 if !written_cids.contains(cid) { 330 written_cids.push(*cid); 331 } 332 } 333 let written_cids_str = written_cids 334 .iter() 335 .map(|c| c.to_string()) 336 .collect::<Vec<_>>(); 337 if let Err(e) = commit_and_log( 338 &state, 339 CommitParams { 340 did: &did, 341 user_id, 342 current_root_cid: Some(current_root_cid), 343 prev_data_cid: Some(commit.data), 344 new_mst_root, 345 ops: vec![op], 346 blocks_cids: &written_cids_str, 347 }, 348 ) 349 .await 350 { 351 return ( 352 StatusCode::INTERNAL_SERVER_ERROR, 353 Json(json!({"error": "InternalError", "message": e})), 354 ) 355 .into_response(); 356 }; 357 358 if let Some(ref controller) = controller_did { 359 let _ = delegation::log_delegation_action( 360 &state.db, 361 &did, 362 controller, 363 Some(controller), 364 DelegationActionType::RepoWrite, 365 Some(json!({ 366 "action": "create", 367 "collection": input.collection, 368 "rkey": rkey 369 })), 370 None, 371 None, 372 ) 373 .await; 374 } 375 376 ( 377 StatusCode::OK, 378 Json(CreateRecordOutput { 379 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 380 cid: record_cid.to_string(), 381 }), 382 ) 383 .into_response() 384} 385#[derive(Deserialize)] 386#[allow(dead_code)] 387pub struct PutRecordInput { 388 pub repo: String, 389 pub collection: String, 390 pub rkey: String, 391 pub validate: Option<bool>, 392 pub record: serde_json::Value, 393 #[serde(rename = "swapCommit")] 394 pub swap_commit: Option<String>, 395 #[serde(rename = "swapRecord")] 396 pub swap_record: Option<String>, 397} 398#[derive(Serialize)] 399#[serde(rename_all = "camelCase")] 400pub struct PutRecordOutput { 401 pub uri: String, 402 pub cid: String, 403} 404pub async fn put_record( 405 State(state): State<AppState>, 406 headers: HeaderMap, 407 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 408 Json(input): Json<PutRecordInput>, 409) -> Response { 410 let auth = 411 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 412 Ok(res) => res, 413 Err(err_res) => return err_res, 414 }; 415 416 if let Err(e) = crate::auth::scope_check::check_repo_scope( 417 auth.is_oauth, 418 auth.scope.as_deref(), 419 crate::oauth::RepoAction::Create, 420 &input.collection, 421 ) { 422 return e; 423 } 424 if let Err(e) = crate::auth::scope_check::check_repo_scope( 425 auth.is_oauth, 426 auth.scope.as_deref(), 427 crate::oauth::RepoAction::Update, 428 &input.collection, 429 ) { 430 return e; 431 } 432 433 let did = auth.did; 434 let user_id = auth.user_id; 435 let current_root_cid = auth.current_root_cid; 436 let controller_did = auth.controller_did; 437 438 if let Some(swap_commit) = &input.swap_commit 439 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 440 { 441 return ( 442 StatusCode::CONFLICT, 443 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 444 ) 445 .into_response(); 446 } 447 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 448 let commit_bytes = match tracking_store.get(&current_root_cid).await { 449 Ok(Some(b)) => b, 450 _ => { 451 return ( 452 StatusCode::INTERNAL_SERVER_ERROR, 453 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 454 ) 455 .into_response(); 456 } 457 }; 458 let commit = match Commit::from_cbor(&commit_bytes) { 459 Ok(c) => c, 460 _ => { 461 return ( 462 StatusCode::INTERNAL_SERVER_ERROR, 463 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 464 ) 465 .into_response(); 466 } 467 }; 468 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 469 let collection_nsid = match input.collection.parse::<Nsid>() { 470 Ok(n) => n, 471 Err(_) => { 472 return ( 473 StatusCode::BAD_REQUEST, 474 Json(json!({"error": "InvalidCollection"})), 475 ) 476 .into_response(); 477 } 478 }; 479 let key = format!("{}/{}", collection_nsid, input.rkey); 480 if input.validate.unwrap_or(true) 481 && let Err(err_response) = validate_record(&input.record, &input.collection) 482 { 483 return *err_response; 484 } 485 if let Some(swap_record_str) = &input.swap_record { 486 let expected_cid = Cid::from_str(swap_record_str).ok(); 487 let actual_cid = mst.get(&key).await.ok().flatten(); 488 if expected_cid != actual_cid { 489 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 490 } 491 } 492 let existing_cid = mst.get(&key).await.ok().flatten(); 493 let mut record_bytes = Vec::new(); 494 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 495 return ( 496 StatusCode::BAD_REQUEST, 497 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 498 ) 499 .into_response(); 500 } 501 let record_cid = match tracking_store.put(&record_bytes).await { 502 Ok(c) => c, 503 _ => { 504 return ( 505 StatusCode::INTERNAL_SERVER_ERROR, 506 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 507 ) 508 .into_response(); 509 } 510 }; 511 let new_mst = if existing_cid.is_some() { 512 match mst.update(&key, record_cid).await { 513 Ok(m) => m, 514 Err(_) => { 515 return ( 516 StatusCode::INTERNAL_SERVER_ERROR, 517 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 518 ) 519 .into_response(); 520 } 521 } 522 } else { 523 match mst.add(&key, record_cid).await { 524 Ok(m) => m, 525 Err(_) => { 526 return ( 527 StatusCode::INTERNAL_SERVER_ERROR, 528 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 529 ) 530 .into_response(); 531 } 532 } 533 }; 534 let new_mst_root = match new_mst.persist().await { 535 Ok(c) => c, 536 Err(_) => { 537 return ( 538 StatusCode::INTERNAL_SERVER_ERROR, 539 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 540 ) 541 .into_response(); 542 } 543 }; 544 let op = if existing_cid.is_some() { 545 RecordOp::Update { 546 collection: input.collection.clone(), 547 rkey: input.rkey.clone(), 548 cid: record_cid, 549 prev: existing_cid, 550 } 551 } else { 552 RecordOp::Create { 553 collection: input.collection.clone(), 554 rkey: input.rkey.clone(), 555 cid: record_cid, 556 } 557 }; 558 let mut relevant_blocks = std::collections::BTreeMap::new(); 559 if new_mst 560 .blocks_for_path(&key, &mut relevant_blocks) 561 .await 562 .is_err() 563 { 564 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 565 } 566 if mst 567 .blocks_for_path(&key, &mut relevant_blocks) 568 .await 569 .is_err() 570 { 571 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 572 } 573 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 574 let mut written_cids = tracking_store.get_all_relevant_cids(); 575 for cid in relevant_blocks.keys() { 576 if !written_cids.contains(cid) { 577 written_cids.push(*cid); 578 } 579 } 580 let written_cids_str = written_cids 581 .iter() 582 .map(|c| c.to_string()) 583 .collect::<Vec<_>>(); 584 let is_update = existing_cid.is_some(); 585 if let Err(e) = commit_and_log( 586 &state, 587 CommitParams { 588 did: &did, 589 user_id, 590 current_root_cid: Some(current_root_cid), 591 prev_data_cid: Some(commit.data), 592 new_mst_root, 593 ops: vec![op], 594 blocks_cids: &written_cids_str, 595 }, 596 ) 597 .await 598 { 599 return ( 600 StatusCode::INTERNAL_SERVER_ERROR, 601 Json(json!({"error": "InternalError", "message": e})), 602 ) 603 .into_response(); 604 }; 605 606 if let Some(ref controller) = controller_did { 607 let _ = delegation::log_delegation_action( 608 &state.db, 609 &did, 610 controller, 611 Some(controller), 612 DelegationActionType::RepoWrite, 613 Some(json!({ 614 "action": if is_update { "update" } else { "create" }, 615 "collection": input.collection, 616 "rkey": input.rkey 617 })), 618 None, 619 None, 620 ) 621 .await; 622 } 623 624 ( 625 StatusCode::OK, 626 Json(PutRecordOutput { 627 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 628 cid: record_cid.to_string(), 629 }), 630 ) 631 .into_response() 632}