this repo has no description
1use super::validation::validate_record_with_status; 2use crate::api::error::ApiError; 3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 4use crate::delegation::{self, DelegationActionType}; 5use crate::repo::tracking::TrackingBlockStore; 6use crate::state::AppState; 7use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey}; 8use axum::{ 9 Json, 10 extract::State, 11 http::{HeaderMap, StatusCode}, 12 response::{IntoResponse, Response}, 13}; 14use cid::Cid; 15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use sqlx::{PgPool, Row}; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::error; 22use uuid::Uuid; 23 24pub async fn has_verified_comms_channel(db: &PgPool, did: &Did) -> Result<bool, sqlx::Error> { 25 let row = sqlx::query( 26 r#" 27 SELECT 28 email_verified, 29 discord_verified, 30 telegram_verified, 31 signal_verified 32 FROM users 33 WHERE did = $1 34 "#, 35 ) 36 .bind(did.as_str()) 37 .fetch_optional(db) 38 .await?; 39 match row { 40 Some(r) => { 41 let email_verified: bool = r.get("email_verified"); 42 let discord_verified: bool = r.get("discord_verified"); 43 let telegram_verified: bool = r.get("telegram_verified"); 44 let signal_verified: bool = r.get("signal_verified"); 45 Ok(email_verified || discord_verified || telegram_verified || signal_verified) 46 } 47 None => Ok(false), 48 } 49} 50 51pub struct RepoWriteAuth { 52 pub did: Did, 53 pub user_id: Uuid, 54 pub current_root_cid: Cid, 55 pub is_oauth: bool, 56 pub scope: Option<String>, 57 pub controller_did: Option<Did>, 58} 59 60pub async fn prepare_repo_write( 61 state: &AppState, 62 headers: &HeaderMap, 63 repo: &AtIdentifier, 64 http_method: &str, 65 http_uri: &str, 66) -> Result<RepoWriteAuth, Response> { 67 let extracted = crate::auth::extract_auth_token_from_header( 68 headers.get("Authorization").and_then(|h| h.to_str().ok()), 69 ) 70 .ok_or_else(|| ApiError::AuthenticationRequired.into_response())?; 71 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 72 let auth_user = crate::auth::validate_token_with_dpop( 73 &state.db, 74 &extracted.token, 75 extracted.is_dpop, 76 dpop_proof, 77 http_method, 78 http_uri, 79 false, 80 false, 81 ) 82 .await 83 .map_err(|e| { 84 tracing::warn!(error = ?e, is_dpop = extracted.is_dpop, "Token validation failed in prepare_repo_write"); 85 let mut response = ApiError::from(e).into_response(); 86 if matches!(e, crate::auth::TokenValidationError::TokenExpired) && extracted.is_dpop { 87 *response.status_mut() = axum::http::StatusCode::UNAUTHORIZED; 88 let www_auth = 89 "DPoP error=\"invalid_token\", error_description=\"Token has expired\""; 90 response.headers_mut().insert( 91 "WWW-Authenticate", 92 www_auth.parse().unwrap(), 93 ); 94 let nonce = crate::oauth::verify::generate_dpop_nonce(); 95 response.headers_mut().insert("DPoP-Nonce", nonce.parse().unwrap()); 96 } 97 response 98 })?; 99 if repo.as_str() != auth_user.did.as_str() { 100 return Err( 101 ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(), 102 ); 103 } 104 if crate::util::is_account_migrated(&state.db, &auth_user.did) 105 .await 106 .unwrap_or(false) 107 { 108 return Err(ApiError::AccountMigrated.into_response()); 109 } 110 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did) 111 .await 112 .unwrap_or(false); 113 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did) 114 .await 115 .unwrap_or(false); 116 if !is_verified && !is_delegated { 117 return Err(ApiError::AccountNotVerified.into_response()); 118 } 119 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &auth_user.did) 120 .fetch_optional(&state.db) 121 .await 122 .map_err(|e| { 123 error!("DB error fetching user: {}", e); 124 ApiError::InternalError(None).into_response() 125 })? 126 .ok_or_else(|| ApiError::InternalError(Some("User not found".into())).into_response())?; 127 let root_cid_str: String = sqlx::query_scalar!( 128 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 129 user_id 130 ) 131 .fetch_optional(&state.db) 132 .await 133 .map_err(|e| { 134 error!("DB error fetching repo root: {}", e); 135 ApiError::InternalError(None).into_response() 136 })? 137 .ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())).into_response())?; 138 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 139 ApiError::InternalError(Some("Invalid repo root CID".into())).into_response() 140 })?; 141 Ok(RepoWriteAuth { 142 did: auth_user.did.clone(), 143 user_id, 144 current_root_cid, 145 is_oauth: auth_user.is_oauth, 146 scope: auth_user.scope, 147 controller_did: auth_user.controller_did.clone(), 148 }) 149} 150#[derive(Deserialize)] 151#[allow(dead_code)] 152pub struct CreateRecordInput { 153 pub repo: AtIdentifier, 154 pub collection: Nsid, 155 pub rkey: Option<Rkey>, 156 pub validate: Option<bool>, 157 pub record: serde_json::Value, 158 #[serde(rename = "swapCommit")] 159 pub swap_commit: Option<String>, 160} 161#[derive(Serialize)] 162#[serde(rename_all = "camelCase")] 163pub struct CommitInfo { 164 pub cid: String, 165 pub rev: String, 166} 167 168#[derive(Serialize)] 169#[serde(rename_all = "camelCase")] 170pub struct CreateRecordOutput { 171 pub uri: AtUri, 172 pub cid: String, 173 pub commit: CommitInfo, 174 #[serde(skip_serializing_if = "Option::is_none")] 175 pub validation_status: Option<String>, 176} 177pub async fn create_record( 178 State(state): State<AppState>, 179 headers: HeaderMap, 180 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 181 Json(input): Json<CreateRecordInput>, 182) -> Response { 183 let auth = match prepare_repo_write( 184 &state, 185 &headers, 186 &input.repo, 187 "POST", 188 &crate::util::build_full_url(&uri.to_string()), 189 ) 190 .await 191 { 192 Ok(res) => res, 193 Err(err_res) => return err_res, 194 }; 195 196 if let Err(e) = crate::auth::scope_check::check_repo_scope( 197 auth.is_oauth, 198 auth.scope.as_deref(), 199 crate::oauth::RepoAction::Create, 200 &input.collection, 201 ) { 202 return e; 203 } 204 205 let did = auth.did; 206 let user_id = auth.user_id; 207 let current_root_cid = auth.current_root_cid; 208 let controller_did = auth.controller_did; 209 210 if let Some(swap_commit) = &input.swap_commit 211 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 212 { 213 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 214 } 215 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 216 let commit_bytes = match tracking_store.get(&current_root_cid).await { 217 Ok(Some(b)) => b, 218 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 219 }; 220 let commit = match Commit::from_cbor(&commit_bytes) { 221 Ok(c) => c, 222 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 223 }; 224 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 225 let validation_status = if input.validate == Some(false) { 226 None 227 } else { 228 let require_lexicon = input.validate == Some(true); 229 match validate_record_with_status( 230 &input.record, 231 &input.collection, 232 input.rkey.as_ref(), 233 require_lexicon, 234 ) { 235 Ok(status) => Some(status), 236 Err(err_response) => return *err_response, 237 } 238 }; 239 let rkey = input.rkey.unwrap_or_else(Rkey::generate); 240 let record_ipld = crate::util::json_to_ipld(&input.record); 241 let mut record_bytes = Vec::new(); 242 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 243 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response(); 244 } 245 let record_cid = match tracking_store.put(&record_bytes).await { 246 Ok(c) => c, 247 _ => { 248 return ApiError::InternalError(Some("Failed to save record block".into())) 249 .into_response(); 250 } 251 }; 252 let key = format!("{}/{}", input.collection, rkey); 253 let new_mst = match mst.add(&key, record_cid).await { 254 Ok(m) => m, 255 _ => return ApiError::InternalError(Some("Failed to add to MST".into())).into_response(), 256 }; 257 let new_mst_root = match new_mst.persist().await { 258 Ok(c) => c, 259 _ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(), 260 }; 261 let op = RecordOp::Create { 262 collection: input.collection.clone(), 263 rkey: rkey.clone(), 264 cid: record_cid, 265 }; 266 let mut new_mst_blocks = std::collections::BTreeMap::new(); 267 let mut old_mst_blocks = std::collections::BTreeMap::new(); 268 if new_mst 269 .blocks_for_path(&key, &mut new_mst_blocks) 270 .await 271 .is_err() 272 { 273 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 274 .into_response(); 275 } 276 if mst 277 .blocks_for_path(&key, &mut old_mst_blocks) 278 .await 279 .is_err() 280 { 281 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 282 .into_response(); 283 } 284 let mut relevant_blocks = new_mst_blocks.clone(); 285 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 286 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 287 let written_cids: Vec<Cid> = tracking_store 288 .get_all_relevant_cids() 289 .into_iter() 290 .chain(relevant_blocks.keys().copied()) 291 .collect::<std::collections::HashSet<_>>() 292 .into_iter() 293 .collect(); 294 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 295 let blob_cids = extract_blob_cids(&input.record); 296 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 297 .chain( 298 old_mst_blocks 299 .keys() 300 .filter(|cid| !new_mst_blocks.contains_key(*cid)) 301 .copied(), 302 ) 303 .collect(); 304 let commit_result = match commit_and_log( 305 &state, 306 CommitParams { 307 did: &did, 308 user_id, 309 current_root_cid: Some(current_root_cid), 310 prev_data_cid: Some(commit.data), 311 new_mst_root, 312 ops: vec![op], 313 blocks_cids: &written_cids_str, 314 blobs: &blob_cids, 315 obsolete_cids, 316 }, 317 ) 318 .await 319 { 320 Ok(res) => res, 321 Err(e) if e.contains("ConcurrentModification") => { 322 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 323 } 324 Err(e) => return ApiError::InternalError(Some(e)).into_response(), 325 }; 326 327 if let Some(ref controller) = controller_did { 328 let _ = delegation::log_delegation_action( 329 &state.db, 330 &did, 331 controller, 332 Some(controller), 333 DelegationActionType::RepoWrite, 334 Some(json!({ 335 "action": "create", 336 "collection": input.collection, 337 "rkey": rkey 338 })), 339 None, 340 None, 341 ) 342 .await; 343 } 344 345 ( 346 StatusCode::OK, 347 Json(CreateRecordOutput { 348 uri: AtUri::from_parts(&did, &input.collection, &rkey), 349 cid: record_cid.to_string(), 350 commit: CommitInfo { 351 cid: commit_result.commit_cid.to_string(), 352 rev: commit_result.rev, 353 }, 354 validation_status: validation_status.map(|s| s.to_string()), 355 }), 356 ) 357 .into_response() 358} 359#[derive(Deserialize)] 360#[allow(dead_code)] 361pub struct PutRecordInput { 362 pub repo: AtIdentifier, 363 pub collection: Nsid, 364 pub rkey: Rkey, 365 pub validate: Option<bool>, 366 pub record: serde_json::Value, 367 #[serde(rename = "swapCommit")] 368 pub swap_commit: Option<String>, 369 #[serde(rename = "swapRecord")] 370 pub swap_record: Option<String>, 371} 372#[derive(Serialize)] 373#[serde(rename_all = "camelCase")] 374pub struct PutRecordOutput { 375 pub uri: AtUri, 376 pub cid: String, 377 #[serde(skip_serializing_if = "Option::is_none")] 378 pub commit: Option<CommitInfo>, 379 #[serde(skip_serializing_if = "Option::is_none")] 380 pub validation_status: Option<String>, 381} 382pub async fn put_record( 383 State(state): State<AppState>, 384 headers: HeaderMap, 385 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 386 Json(input): Json<PutRecordInput>, 387) -> Response { 388 let auth = match prepare_repo_write( 389 &state, 390 &headers, 391 &input.repo, 392 "POST", 393 &crate::util::build_full_url(&uri.to_string()), 394 ) 395 .await 396 { 397 Ok(res) => res, 398 Err(err_res) => return err_res, 399 }; 400 401 if let Err(e) = crate::auth::scope_check::check_repo_scope( 402 auth.is_oauth, 403 auth.scope.as_deref(), 404 crate::oauth::RepoAction::Create, 405 &input.collection, 406 ) { 407 return e; 408 } 409 if let Err(e) = crate::auth::scope_check::check_repo_scope( 410 auth.is_oauth, 411 auth.scope.as_deref(), 412 crate::oauth::RepoAction::Update, 413 &input.collection, 414 ) { 415 return e; 416 } 417 418 let did = auth.did; 419 let user_id = auth.user_id; 420 let current_root_cid = auth.current_root_cid; 421 let controller_did = auth.controller_did; 422 423 if let Some(swap_commit) = &input.swap_commit 424 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 425 { 426 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 427 } 428 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 429 let commit_bytes = match tracking_store.get(&current_root_cid).await { 430 Ok(Some(b)) => b, 431 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 432 }; 433 let commit = match Commit::from_cbor(&commit_bytes) { 434 Ok(c) => c, 435 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 436 }; 437 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 438 let key = format!("{}/{}", input.collection, input.rkey); 439 let validation_status = if input.validate == Some(false) { 440 None 441 } else { 442 let require_lexicon = input.validate == Some(true); 443 match validate_record_with_status( 444 &input.record, 445 &input.collection, 446 Some(&input.rkey), 447 require_lexicon, 448 ) { 449 Ok(status) => Some(status), 450 Err(err_response) => return *err_response, 451 } 452 }; 453 if let Some(swap_record_str) = &input.swap_record { 454 let expected_cid = Cid::from_str(swap_record_str).ok(); 455 let actual_cid = mst.get(&key).await.ok().flatten(); 456 if expected_cid != actual_cid { 457 return ApiError::InvalidSwap(Some( 458 "Record has been modified or does not exist".into(), 459 )) 460 .into_response(); 461 } 462 } 463 let existing_cid = mst.get(&key).await.ok().flatten(); 464 let record_ipld = crate::util::json_to_ipld(&input.record); 465 let mut record_bytes = Vec::new(); 466 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 467 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response(); 468 } 469 let record_cid = match tracking_store.put(&record_bytes).await { 470 Ok(c) => c, 471 _ => { 472 return ApiError::InternalError(Some("Failed to save record block".into())) 473 .into_response(); 474 } 475 }; 476 if existing_cid == Some(record_cid) { 477 return ( 478 StatusCode::OK, 479 Json(PutRecordOutput { 480 uri: AtUri::from_parts(&did, &input.collection, &input.rkey), 481 cid: record_cid.to_string(), 482 commit: None, 483 validation_status: validation_status.map(|s| s.to_string()), 484 }), 485 ) 486 .into_response(); 487 } 488 let new_mst = if existing_cid.is_some() { 489 match mst.update(&key, record_cid).await { 490 Ok(m) => m, 491 Err(_) => { 492 return ApiError::InternalError(Some("Failed to update MST".into())) 493 .into_response(); 494 } 495 } 496 } else { 497 match mst.add(&key, record_cid).await { 498 Ok(m) => m, 499 Err(_) => { 500 return ApiError::InternalError(Some("Failed to add to MST".into())) 501 .into_response(); 502 } 503 } 504 }; 505 let new_mst_root = match new_mst.persist().await { 506 Ok(c) => c, 507 Err(_) => { 508 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(); 509 } 510 }; 511 let op = if existing_cid.is_some() { 512 RecordOp::Update { 513 collection: input.collection.clone(), 514 rkey: input.rkey.clone(), 515 cid: record_cid, 516 prev: existing_cid, 517 } 518 } else { 519 RecordOp::Create { 520 collection: input.collection.clone(), 521 rkey: input.rkey.clone(), 522 cid: record_cid, 523 } 524 }; 525 let mut new_mst_blocks = std::collections::BTreeMap::new(); 526 let mut old_mst_blocks = std::collections::BTreeMap::new(); 527 if new_mst 528 .blocks_for_path(&key, &mut new_mst_blocks) 529 .await 530 .is_err() 531 { 532 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 533 .into_response(); 534 } 535 if mst 536 .blocks_for_path(&key, &mut old_mst_blocks) 537 .await 538 .is_err() 539 { 540 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 541 .into_response(); 542 } 543 let mut relevant_blocks = new_mst_blocks.clone(); 544 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 545 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 546 let written_cids: Vec<Cid> = tracking_store 547 .get_all_relevant_cids() 548 .into_iter() 549 .chain(relevant_blocks.keys().copied()) 550 .collect::<std::collections::HashSet<_>>() 551 .into_iter() 552 .collect(); 553 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 554 let is_update = existing_cid.is_some(); 555 let blob_cids = extract_blob_cids(&input.record); 556 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 557 .chain( 558 old_mst_blocks 559 .keys() 560 .filter(|cid| !new_mst_blocks.contains_key(*cid)) 561 .copied(), 562 ) 563 .chain(existing_cid) 564 .collect(); 565 let commit_result = match commit_and_log( 566 &state, 567 CommitParams { 568 did: &did, 569 user_id, 570 current_root_cid: Some(current_root_cid), 571 prev_data_cid: Some(commit.data), 572 new_mst_root, 573 ops: vec![op], 574 blocks_cids: &written_cids_str, 575 blobs: &blob_cids, 576 obsolete_cids, 577 }, 578 ) 579 .await 580 { 581 Ok(res) => res, 582 Err(e) if e.contains("ConcurrentModification") => { 583 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 584 } 585 Err(e) => return ApiError::InternalError(Some(e)).into_response(), 586 }; 587 588 if let Some(ref controller) = controller_did { 589 let _ = delegation::log_delegation_action( 590 &state.db, 591 &did, 592 controller, 593 Some(controller), 594 DelegationActionType::RepoWrite, 595 Some(json!({ 596 "action": if is_update { "update" } else { "create" }, 597 "collection": input.collection, 598 "rkey": input.rkey 599 })), 600 None, 601 None, 602 ) 603 .await; 604 } 605 606 ( 607 StatusCode::OK, 608 Json(PutRecordOutput { 609 uri: AtUri::from_parts(&did, &input.collection, &input.rkey), 610 cid: record_cid.to_string(), 611 commit: Some(CommitInfo { 612 cid: commit_result.commit_cid.to_string(), 613 rev: commit_result.rev, 614 }), 615 validation_status: validation_status.map(|s| s.to_string()), 616 }), 617 ) 618 .into_response() 619}