this repo has no description
1use super::validation::validate_record_with_status; 2use crate::api::error::ApiError; 3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 4use crate::delegation::{self, DelegationActionType}; 5use crate::repo::tracking::TrackingBlockStore; 6use crate::state::AppState; 7use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey}; 8use axum::{ 9 Json, 10 extract::State, 11 http::{HeaderMap, StatusCode}, 12 response::{IntoResponse, Response}, 13}; 14use cid::Cid; 15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use sqlx::{PgPool, Row}; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::error; 22use uuid::Uuid; 23 24pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 25 let row = sqlx::query( 26 r#" 27 SELECT 28 email_verified, 29 discord_verified, 30 telegram_verified, 31 signal_verified 32 FROM users 33 WHERE did = $1 34 "#, 35 ) 36 .bind(did) 37 .fetch_optional(db) 38 .await?; 39 match row { 40 Some(r) => { 41 let email_verified: bool = r.get("email_verified"); 42 let discord_verified: bool = r.get("discord_verified"); 43 let telegram_verified: bool = r.get("telegram_verified"); 44 let signal_verified: bool = r.get("signal_verified"); 45 Ok(email_verified || discord_verified || telegram_verified || signal_verified) 46 } 47 None => Ok(false), 48 } 49} 50 51pub struct RepoWriteAuth { 52 pub did: Did, 53 pub user_id: Uuid, 54 pub current_root_cid: Cid, 55 pub is_oauth: bool, 56 pub scope: Option<String>, 57 pub controller_did: Option<Did>, 58} 59 60pub async fn prepare_repo_write( 61 state: &AppState, 62 headers: &HeaderMap, 63 repo_did: &str, 64 http_method: &str, 65 http_uri: &str, 66) -> Result<RepoWriteAuth, Response> { 67 let extracted = crate::auth::extract_auth_token_from_header( 68 headers.get("Authorization").and_then(|h| h.to_str().ok()), 69 ) 70 .ok_or_else(|| ApiError::AuthenticationRequired.into_response())?; 71 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 72 let auth_user = crate::auth::validate_token_with_dpop( 73 &state.db, 74 &extracted.token, 75 extracted.is_dpop, 76 dpop_proof, 77 http_method, 78 http_uri, 79 false, 80 false, 81 ) 82 .await 83 .map_err(|e| { 84 tracing::warn!(error = ?e, is_dpop = extracted.is_dpop, "Token validation failed in prepare_repo_write"); 85 let mut response = ApiError::from(e).into_response(); 86 if matches!(e, crate::auth::TokenValidationError::TokenExpired) { 87 let scheme = if extracted.is_dpop { "DPoP" } else { "Bearer" }; 88 let www_auth = format!( 89 "{} error=\"invalid_token\", error_description=\"Token has expired\"", 90 scheme 91 ); 92 response.headers_mut().insert( 93 "WWW-Authenticate", 94 www_auth.parse().unwrap(), 95 ); 96 if extracted.is_dpop { 97 let nonce = crate::oauth::verify::generate_dpop_nonce(); 98 response.headers_mut().insert("DPoP-Nonce", nonce.parse().unwrap()); 99 } 100 } 101 response 102 })?; 103 if repo_did != auth_user.did { 104 return Err( 105 ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(), 106 ); 107 } 108 if crate::util::is_account_migrated(&state.db, &auth_user.did) 109 .await 110 .unwrap_or(false) 111 { 112 return Err(ApiError::AccountMigrated.into_response()); 113 } 114 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did) 115 .await 116 .unwrap_or(false); 117 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did) 118 .await 119 .unwrap_or(false); 120 if !is_verified && !is_delegated { 121 return Err(ApiError::AccountNotVerified.into_response()); 122 } 123 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &auth_user.did) 124 .fetch_optional(&state.db) 125 .await 126 .map_err(|e| { 127 error!("DB error fetching user: {}", e); 128 ApiError::InternalError(None).into_response() 129 })? 130 .ok_or_else(|| ApiError::InternalError(Some("User not found".into())).into_response())?; 131 let root_cid_str: String = sqlx::query_scalar!( 132 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 133 user_id 134 ) 135 .fetch_optional(&state.db) 136 .await 137 .map_err(|e| { 138 error!("DB error fetching repo root: {}", e); 139 ApiError::InternalError(None).into_response() 140 })? 141 .ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())).into_response())?; 142 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 143 ApiError::InternalError(Some("Invalid repo root CID".into())).into_response() 144 })?; 145 Ok(RepoWriteAuth { 146 did: auth_user.did.clone(), 147 user_id, 148 current_root_cid, 149 is_oauth: auth_user.is_oauth, 150 scope: auth_user.scope, 151 controller_did: auth_user.controller_did.clone(), 152 }) 153} 154#[derive(Deserialize)] 155#[allow(dead_code)] 156pub struct CreateRecordInput { 157 pub repo: AtIdentifier, 158 pub collection: Nsid, 159 pub rkey: Option<Rkey>, 160 pub validate: Option<bool>, 161 pub record: serde_json::Value, 162 #[serde(rename = "swapCommit")] 163 pub swap_commit: Option<String>, 164} 165#[derive(Serialize)] 166#[serde(rename_all = "camelCase")] 167pub struct CommitInfo { 168 pub cid: String, 169 pub rev: String, 170} 171 172#[derive(Serialize)] 173#[serde(rename_all = "camelCase")] 174pub struct CreateRecordOutput { 175 pub uri: AtUri, 176 pub cid: String, 177 pub commit: CommitInfo, 178 #[serde(skip_serializing_if = "Option::is_none")] 179 pub validation_status: Option<String>, 180} 181pub async fn create_record( 182 State(state): State<AppState>, 183 headers: HeaderMap, 184 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 185 Json(input): Json<CreateRecordInput>, 186) -> Response { 187 let auth = match prepare_repo_write( 188 &state, 189 &headers, 190 &input.repo, 191 "POST", 192 &crate::util::build_full_url(&uri.to_string()), 193 ) 194 .await 195 { 196 Ok(res) => res, 197 Err(err_res) => return err_res, 198 }; 199 200 if let Err(e) = crate::auth::scope_check::check_repo_scope( 201 auth.is_oauth, 202 auth.scope.as_deref(), 203 crate::oauth::RepoAction::Create, 204 &input.collection, 205 ) { 206 return e; 207 } 208 209 let did = auth.did; 210 let user_id = auth.user_id; 211 let current_root_cid = auth.current_root_cid; 212 let controller_did = auth.controller_did; 213 214 if let Some(swap_commit) = &input.swap_commit 215 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 216 { 217 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 218 } 219 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 220 let commit_bytes = match tracking_store.get(&current_root_cid).await { 221 Ok(Some(b)) => b, 222 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 223 }; 224 let commit = match Commit::from_cbor(&commit_bytes) { 225 Ok(c) => c, 226 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 227 }; 228 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 229 let validation_status = if input.validate == Some(false) { 230 None 231 } else { 232 let require_lexicon = input.validate == Some(true); 233 match validate_record_with_status( 234 &input.record, 235 &input.collection, 236 input.rkey.as_ref().map(|r| r.as_str()), 237 require_lexicon, 238 ) { 239 Ok(status) => Some(status), 240 Err(err_response) => return *err_response, 241 } 242 }; 243 let rkey = input.rkey.unwrap_or_else(Rkey::generate); 244 let record_ipld = crate::util::json_to_ipld(&input.record); 245 let mut record_bytes = Vec::new(); 246 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 247 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response(); 248 } 249 let record_cid = match tracking_store.put(&record_bytes).await { 250 Ok(c) => c, 251 _ => { 252 return ApiError::InternalError(Some("Failed to save record block".into())) 253 .into_response(); 254 } 255 }; 256 let key = format!("{}/{}", input.collection, rkey); 257 let new_mst = match mst.add(&key, record_cid).await { 258 Ok(m) => m, 259 _ => return ApiError::InternalError(Some("Failed to add to MST".into())).into_response(), 260 }; 261 let new_mst_root = match new_mst.persist().await { 262 Ok(c) => c, 263 _ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(), 264 }; 265 let op = RecordOp::Create { 266 collection: input.collection.to_string(), 267 rkey: rkey.to_string(), 268 cid: record_cid, 269 }; 270 let mut new_mst_blocks = std::collections::BTreeMap::new(); 271 let mut old_mst_blocks = std::collections::BTreeMap::new(); 272 if new_mst 273 .blocks_for_path(&key, &mut new_mst_blocks) 274 .await 275 .is_err() 276 { 277 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 278 .into_response(); 279 } 280 if mst 281 .blocks_for_path(&key, &mut old_mst_blocks) 282 .await 283 .is_err() 284 { 285 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 286 .into_response(); 287 } 288 let mut relevant_blocks = new_mst_blocks.clone(); 289 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 290 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 291 let written_cids: Vec<Cid> = tracking_store 292 .get_all_relevant_cids() 293 .into_iter() 294 .chain(relevant_blocks.keys().copied()) 295 .collect::<std::collections::HashSet<_>>() 296 .into_iter() 297 .collect(); 298 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 299 let blob_cids = extract_blob_cids(&input.record); 300 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 301 .chain( 302 old_mst_blocks 303 .keys() 304 .filter(|cid| !new_mst_blocks.contains_key(*cid)) 305 .copied(), 306 ) 307 .collect(); 308 let commit_result = match commit_and_log( 309 &state, 310 CommitParams { 311 did: &did, 312 user_id, 313 current_root_cid: Some(current_root_cid), 314 prev_data_cid: Some(commit.data), 315 new_mst_root, 316 ops: vec![op], 317 blocks_cids: &written_cids_str, 318 blobs: &blob_cids, 319 obsolete_cids, 320 }, 321 ) 322 .await 323 { 324 Ok(res) => res, 325 Err(e) => return ApiError::InternalError(Some(e)).into_response(), 326 }; 327 328 if let Some(ref controller) = controller_did { 329 let _ = delegation::log_delegation_action( 330 &state.db, 331 &did, 332 controller, 333 Some(controller), 334 DelegationActionType::RepoWrite, 335 Some(json!({ 336 "action": "create", 337 "collection": input.collection, 338 "rkey": rkey 339 })), 340 None, 341 None, 342 ) 343 .await; 344 } 345 346 ( 347 StatusCode::OK, 348 Json(CreateRecordOutput { 349 uri: AtUri::from_parts(&did, &input.collection, &rkey), 350 cid: record_cid.to_string(), 351 commit: CommitInfo { 352 cid: commit_result.commit_cid.to_string(), 353 rev: commit_result.rev, 354 }, 355 validation_status: validation_status.map(|s| s.to_string()), 356 }), 357 ) 358 .into_response() 359} 360#[derive(Deserialize)] 361#[allow(dead_code)] 362pub struct PutRecordInput { 363 pub repo: AtIdentifier, 364 pub collection: Nsid, 365 pub rkey: Rkey, 366 pub validate: Option<bool>, 367 pub record: serde_json::Value, 368 #[serde(rename = "swapCommit")] 369 pub swap_commit: Option<String>, 370 #[serde(rename = "swapRecord")] 371 pub swap_record: Option<String>, 372} 373#[derive(Serialize)] 374#[serde(rename_all = "camelCase")] 375pub struct PutRecordOutput { 376 pub uri: AtUri, 377 pub cid: String, 378 #[serde(skip_serializing_if = "Option::is_none")] 379 pub commit: Option<CommitInfo>, 380 #[serde(skip_serializing_if = "Option::is_none")] 381 pub validation_status: Option<String>, 382} 383pub async fn put_record( 384 State(state): State<AppState>, 385 headers: HeaderMap, 386 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 387 Json(input): Json<PutRecordInput>, 388) -> Response { 389 let auth = match prepare_repo_write( 390 &state, 391 &headers, 392 &input.repo, 393 "POST", 394 &crate::util::build_full_url(&uri.to_string()), 395 ) 396 .await 397 { 398 Ok(res) => res, 399 Err(err_res) => return err_res, 400 }; 401 402 if let Err(e) = crate::auth::scope_check::check_repo_scope( 403 auth.is_oauth, 404 auth.scope.as_deref(), 405 crate::oauth::RepoAction::Create, 406 &input.collection, 407 ) { 408 return e; 409 } 410 if let Err(e) = crate::auth::scope_check::check_repo_scope( 411 auth.is_oauth, 412 auth.scope.as_deref(), 413 crate::oauth::RepoAction::Update, 414 &input.collection, 415 ) { 416 return e; 417 } 418 419 let did = auth.did; 420 let user_id = auth.user_id; 421 let current_root_cid = auth.current_root_cid; 422 let controller_did = auth.controller_did; 423 424 if let Some(swap_commit) = &input.swap_commit 425 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 426 { 427 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 428 } 429 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 430 let commit_bytes = match tracking_store.get(&current_root_cid).await { 431 Ok(Some(b)) => b, 432 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 433 }; 434 let commit = match Commit::from_cbor(&commit_bytes) { 435 Ok(c) => c, 436 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 437 }; 438 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 439 let key = format!("{}/{}", input.collection, input.rkey); 440 let validation_status = if input.validate == Some(false) { 441 None 442 } else { 443 let require_lexicon = input.validate == Some(true); 444 match validate_record_with_status( 445 &input.record, 446 &input.collection, 447 Some(input.rkey.as_str()), 448 require_lexicon, 449 ) { 450 Ok(status) => Some(status), 451 Err(err_response) => return *err_response, 452 } 453 }; 454 if let Some(swap_record_str) = &input.swap_record { 455 let expected_cid = Cid::from_str(swap_record_str).ok(); 456 let actual_cid = mst.get(&key).await.ok().flatten(); 457 if expected_cid != actual_cid { 458 return ApiError::InvalidSwap(Some( 459 "Record has been modified or does not exist".into(), 460 )) 461 .into_response(); 462 } 463 } 464 let existing_cid = mst.get(&key).await.ok().flatten(); 465 let record_ipld = crate::util::json_to_ipld(&input.record); 466 let mut record_bytes = Vec::new(); 467 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 468 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response(); 469 } 470 let record_cid = match tracking_store.put(&record_bytes).await { 471 Ok(c) => c, 472 _ => { 473 return ApiError::InternalError(Some("Failed to save record block".into())) 474 .into_response(); 475 } 476 }; 477 if existing_cid == Some(record_cid) { 478 return ( 479 StatusCode::OK, 480 Json(PutRecordOutput { 481 uri: AtUri::from_parts(&did, &input.collection, &input.rkey), 482 cid: record_cid.to_string(), 483 commit: None, 484 validation_status: validation_status.map(|s| s.to_string()), 485 }), 486 ) 487 .into_response(); 488 } 489 let new_mst = if existing_cid.is_some() { 490 match mst.update(&key, record_cid).await { 491 Ok(m) => m, 492 Err(_) => { 493 return ApiError::InternalError(Some("Failed to update MST".into())) 494 .into_response(); 495 } 496 } 497 } else { 498 match mst.add(&key, record_cid).await { 499 Ok(m) => m, 500 Err(_) => { 501 return ApiError::InternalError(Some("Failed to add to MST".into())) 502 .into_response(); 503 } 504 } 505 }; 506 let new_mst_root = match new_mst.persist().await { 507 Ok(c) => c, 508 Err(_) => { 509 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(); 510 } 511 }; 512 let op = if existing_cid.is_some() { 513 RecordOp::Update { 514 collection: input.collection.to_string(), 515 rkey: input.rkey.to_string(), 516 cid: record_cid, 517 prev: existing_cid, 518 } 519 } else { 520 RecordOp::Create { 521 collection: input.collection.to_string(), 522 rkey: input.rkey.to_string(), 523 cid: record_cid, 524 } 525 }; 526 let mut new_mst_blocks = std::collections::BTreeMap::new(); 527 let mut old_mst_blocks = std::collections::BTreeMap::new(); 528 if new_mst 529 .blocks_for_path(&key, &mut new_mst_blocks) 530 .await 531 .is_err() 532 { 533 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 534 .into_response(); 535 } 536 if mst 537 .blocks_for_path(&key, &mut old_mst_blocks) 538 .await 539 .is_err() 540 { 541 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 542 .into_response(); 543 } 544 let mut relevant_blocks = new_mst_blocks.clone(); 545 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 546 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 547 let written_cids: Vec<Cid> = tracking_store 548 .get_all_relevant_cids() 549 .into_iter() 550 .chain(relevant_blocks.keys().copied()) 551 .collect::<std::collections::HashSet<_>>() 552 .into_iter() 553 .collect(); 554 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 555 let is_update = existing_cid.is_some(); 556 let blob_cids = extract_blob_cids(&input.record); 557 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 558 .chain( 559 old_mst_blocks 560 .keys() 561 .filter(|cid| !new_mst_blocks.contains_key(*cid)) 562 .copied(), 563 ) 564 .chain(existing_cid) 565 .collect(); 566 let commit_result = match commit_and_log( 567 &state, 568 CommitParams { 569 did: &did, 570 user_id, 571 current_root_cid: Some(current_root_cid), 572 prev_data_cid: Some(commit.data), 573 new_mst_root, 574 ops: vec![op], 575 blocks_cids: &written_cids_str, 576 blobs: &blob_cids, 577 obsolete_cids, 578 }, 579 ) 580 .await 581 { 582 Ok(res) => res, 583 Err(e) => return ApiError::InternalError(Some(e)).into_response(), 584 }; 585 586 if let Some(ref controller) = controller_did { 587 let _ = delegation::log_delegation_action( 588 &state.db, 589 &did, 590 controller, 591 Some(controller), 592 DelegationActionType::RepoWrite, 593 Some(json!({ 594 "action": if is_update { "update" } else { "create" }, 595 "collection": input.collection, 596 "rkey": input.rkey 597 })), 598 None, 599 None, 600 ) 601 .await; 602 } 603 604 ( 605 StatusCode::OK, 606 Json(PutRecordOutput { 607 uri: AtUri::from_parts(&did, &input.collection, &input.rkey), 608 cid: record_cid.to_string(), 609 commit: Some(CommitInfo { 610 cid: commit_result.commit_cid.to_string(), 611 rev: commit_result.rev, 612 }), 613 validation_status: validation_status.map(|s| s.to_string()), 614 }), 615 ) 616 .into_response() 617}