this repo has no description
1use super::validation::validate_record_with_status; 2use crate::api::error::ApiError; 3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 4use crate::delegation::{self, DelegationActionType}; 5use crate::repo::tracking::TrackingBlockStore; 6use crate::state::AppState; 7use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey}; 8use axum::{ 9 Json, 10 extract::State, 11 http::{HeaderMap, StatusCode}, 12 response::{IntoResponse, Response}, 13}; 14use cid::Cid; 15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use sqlx::{PgPool, Row}; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::error; 22use uuid::Uuid; 23 24pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 25 let row = sqlx::query( 26 r#" 27 SELECT 28 email_verified, 29 discord_verified, 30 telegram_verified, 31 signal_verified 32 FROM users 33 WHERE did = $1 34 "#, 35 ) 36 .bind(did) 37 .fetch_optional(db) 38 .await?; 39 match row { 40 Some(r) => { 41 let email_verified: bool = r.get("email_verified"); 42 let discord_verified: bool = r.get("discord_verified"); 43 let telegram_verified: bool = r.get("telegram_verified"); 44 let signal_verified: bool = r.get("signal_verified"); 45 Ok(email_verified || discord_verified || telegram_verified || signal_verified) 46 } 47 None => Ok(false), 48 } 49} 50 51pub struct RepoWriteAuth { 52 pub did: Did, 53 pub user_id: Uuid, 54 pub current_root_cid: Cid, 55 pub is_oauth: bool, 56 pub scope: Option<String>, 57 pub controller_did: Option<Did>, 58} 59 60pub async fn prepare_repo_write( 61 state: &AppState, 62 headers: &HeaderMap, 63 repo_did: &str, 64 http_method: &str, 65 http_uri: &str, 66) -> Result<RepoWriteAuth, Response> { 67 let extracted = crate::auth::extract_auth_token_from_header( 68 headers.get("Authorization").and_then(|h| h.to_str().ok()), 69 ) 70 .ok_or_else(|| ApiError::AuthenticationRequired.into_response())?; 71 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 72 let auth_user = crate::auth::validate_token_with_dpop( 73 &state.db, 74 &extracted.token, 75 extracted.is_dpop, 76 dpop_proof, 77 http_method, 78 http_uri, 79 false, 80 ) 81 .await 82 .map_err(|e| { 83 tracing::warn!(error = ?e, is_dpop = extracted.is_dpop, "Token validation failed in prepare_repo_write"); 84 let mut response = ApiError::from(e).into_response(); 85 if matches!(e, crate::auth::TokenValidationError::TokenExpired) { 86 let scheme = if extracted.is_dpop { "DPoP" } else { "Bearer" }; 87 let www_auth = format!( 88 "{} error=\"invalid_token\", error_description=\"Token has expired\"", 89 scheme 90 ); 91 response.headers_mut().insert( 92 "WWW-Authenticate", 93 www_auth.parse().unwrap(), 94 ); 95 if extracted.is_dpop { 96 let nonce = crate::oauth::verify::generate_dpop_nonce(); 97 response.headers_mut().insert("DPoP-Nonce", nonce.parse().unwrap()); 98 } 99 } 100 response 101 })?; 102 if repo_did != auth_user.did { 103 return Err( 104 ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(), 105 ); 106 } 107 if crate::util::is_account_migrated(&state.db, &auth_user.did) 108 .await 109 .unwrap_or(false) 110 { 111 return Err(ApiError::AccountMigrated.into_response()); 112 } 113 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did) 114 .await 115 .unwrap_or(false); 116 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did) 117 .await 118 .unwrap_or(false); 119 if !is_verified && !is_delegated { 120 return Err(ApiError::AccountNotVerified.into_response()); 121 } 122 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &auth_user.did) 123 .fetch_optional(&state.db) 124 .await 125 .map_err(|e| { 126 error!("DB error fetching user: {}", e); 127 ApiError::InternalError(None).into_response() 128 })? 129 .ok_or_else(|| ApiError::InternalError(Some("User not found".into())).into_response())?; 130 let root_cid_str: String = sqlx::query_scalar!( 131 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 132 user_id 133 ) 134 .fetch_optional(&state.db) 135 .await 136 .map_err(|e| { 137 error!("DB error fetching repo root: {}", e); 138 ApiError::InternalError(None).into_response() 139 })? 140 .ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())).into_response())?; 141 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 142 ApiError::InternalError(Some("Invalid repo root CID".into())).into_response() 143 })?; 144 Ok(RepoWriteAuth { 145 did: auth_user.did.clone(), 146 user_id, 147 current_root_cid, 148 is_oauth: auth_user.is_oauth, 149 scope: auth_user.scope, 150 controller_did: auth_user.controller_did.clone(), 151 }) 152} 153#[derive(Deserialize)] 154#[allow(dead_code)] 155pub struct CreateRecordInput { 156 pub repo: AtIdentifier, 157 pub collection: Nsid, 158 pub rkey: Option<Rkey>, 159 pub validate: Option<bool>, 160 pub record: serde_json::Value, 161 #[serde(rename = "swapCommit")] 162 pub swap_commit: Option<String>, 163} 164#[derive(Serialize)] 165#[serde(rename_all = "camelCase")] 166pub struct CommitInfo { 167 pub cid: String, 168 pub rev: String, 169} 170 171#[derive(Serialize)] 172#[serde(rename_all = "camelCase")] 173pub struct CreateRecordOutput { 174 pub uri: AtUri, 175 pub cid: String, 176 pub commit: CommitInfo, 177 #[serde(skip_serializing_if = "Option::is_none")] 178 pub validation_status: Option<String>, 179} 180pub async fn create_record( 181 State(state): State<AppState>, 182 headers: HeaderMap, 183 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 184 Json(input): Json<CreateRecordInput>, 185) -> Response { 186 let auth = match prepare_repo_write( 187 &state, 188 &headers, 189 &input.repo, 190 "POST", 191 &crate::util::build_full_url(&uri.to_string()), 192 ) 193 .await 194 { 195 Ok(res) => res, 196 Err(err_res) => return err_res, 197 }; 198 199 if let Err(e) = crate::auth::scope_check::check_repo_scope( 200 auth.is_oauth, 201 auth.scope.as_deref(), 202 crate::oauth::RepoAction::Create, 203 &input.collection, 204 ) { 205 return e; 206 } 207 208 let did = auth.did; 209 let user_id = auth.user_id; 210 let current_root_cid = auth.current_root_cid; 211 let controller_did = auth.controller_did; 212 213 if let Some(swap_commit) = &input.swap_commit 214 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 215 { 216 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 217 } 218 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 219 let commit_bytes = match tracking_store.get(&current_root_cid).await { 220 Ok(Some(b)) => b, 221 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 222 }; 223 let commit = match Commit::from_cbor(&commit_bytes) { 224 Ok(c) => c, 225 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 226 }; 227 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 228 let validation_status = if input.validate == Some(false) { 229 None 230 } else { 231 let require_lexicon = input.validate == Some(true); 232 match validate_record_with_status( 233 &input.record, 234 &input.collection, 235 input.rkey.as_ref().map(|r| r.as_str()), 236 require_lexicon, 237 ) { 238 Ok(status) => Some(status), 239 Err(err_response) => return *err_response, 240 } 241 }; 242 let rkey = input.rkey.unwrap_or_else(Rkey::generate); 243 let record_ipld = crate::util::json_to_ipld(&input.record); 244 let mut record_bytes = Vec::new(); 245 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 246 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response(); 247 } 248 let record_cid = match tracking_store.put(&record_bytes).await { 249 Ok(c) => c, 250 _ => { 251 return ApiError::InternalError(Some("Failed to save record block".into())) 252 .into_response(); 253 } 254 }; 255 let key = format!("{}/{}", input.collection, rkey); 256 let new_mst = match mst.add(&key, record_cid).await { 257 Ok(m) => m, 258 _ => return ApiError::InternalError(Some("Failed to add to MST".into())).into_response(), 259 }; 260 let new_mst_root = match new_mst.persist().await { 261 Ok(c) => c, 262 _ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(), 263 }; 264 let op = RecordOp::Create { 265 collection: input.collection.to_string(), 266 rkey: rkey.to_string(), 267 cid: record_cid, 268 }; 269 let mut relevant_blocks = std::collections::BTreeMap::new(); 270 if new_mst 271 .blocks_for_path(&key, &mut relevant_blocks) 272 .await 273 .is_err() 274 { 275 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 276 .into_response(); 277 } 278 if mst 279 .blocks_for_path(&key, &mut relevant_blocks) 280 .await 281 .is_err() 282 { 283 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 284 .into_response(); 285 } 286 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 287 let mut written_cids = tracking_store.get_all_relevant_cids(); 288 for cid in relevant_blocks.keys() { 289 if !written_cids.contains(cid) { 290 written_cids.push(*cid); 291 } 292 } 293 let written_cids_str = written_cids 294 .iter() 295 .map(|c| c.to_string()) 296 .collect::<Vec<_>>(); 297 let blob_cids = extract_blob_cids(&input.record); 298 let commit_result = match commit_and_log( 299 &state, 300 CommitParams { 301 did: &did, 302 user_id, 303 current_root_cid: Some(current_root_cid), 304 prev_data_cid: Some(commit.data), 305 new_mst_root, 306 ops: vec![op], 307 blocks_cids: &written_cids_str, 308 blobs: &blob_cids, 309 }, 310 ) 311 .await 312 { 313 Ok(res) => res, 314 Err(e) => return ApiError::InternalError(Some(e)).into_response(), 315 }; 316 317 if let Some(ref controller) = controller_did { 318 let _ = delegation::log_delegation_action( 319 &state.db, 320 &did, 321 controller, 322 Some(controller), 323 DelegationActionType::RepoWrite, 324 Some(json!({ 325 "action": "create", 326 "collection": input.collection, 327 "rkey": rkey 328 })), 329 None, 330 None, 331 ) 332 .await; 333 } 334 335 ( 336 StatusCode::OK, 337 Json(CreateRecordOutput { 338 uri: AtUri::from_parts(&did, &input.collection, &rkey), 339 cid: record_cid.to_string(), 340 commit: CommitInfo { 341 cid: commit_result.commit_cid.to_string(), 342 rev: commit_result.rev, 343 }, 344 validation_status: validation_status.map(|s| s.to_string()), 345 }), 346 ) 347 .into_response() 348} 349#[derive(Deserialize)] 350#[allow(dead_code)] 351pub struct PutRecordInput { 352 pub repo: AtIdentifier, 353 pub collection: Nsid, 354 pub rkey: Rkey, 355 pub validate: Option<bool>, 356 pub record: serde_json::Value, 357 #[serde(rename = "swapCommit")] 358 pub swap_commit: Option<String>, 359 #[serde(rename = "swapRecord")] 360 pub swap_record: Option<String>, 361} 362#[derive(Serialize)] 363#[serde(rename_all = "camelCase")] 364pub struct PutRecordOutput { 365 pub uri: AtUri, 366 pub cid: String, 367 #[serde(skip_serializing_if = "Option::is_none")] 368 pub commit: Option<CommitInfo>, 369 #[serde(skip_serializing_if = "Option::is_none")] 370 pub validation_status: Option<String>, 371} 372pub async fn put_record( 373 State(state): State<AppState>, 374 headers: HeaderMap, 375 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 376 Json(input): Json<PutRecordInput>, 377) -> Response { 378 let auth = match prepare_repo_write( 379 &state, 380 &headers, 381 &input.repo, 382 "POST", 383 &crate::util::build_full_url(&uri.to_string()), 384 ) 385 .await 386 { 387 Ok(res) => res, 388 Err(err_res) => return err_res, 389 }; 390 391 if let Err(e) = crate::auth::scope_check::check_repo_scope( 392 auth.is_oauth, 393 auth.scope.as_deref(), 394 crate::oauth::RepoAction::Create, 395 &input.collection, 396 ) { 397 return e; 398 } 399 if let Err(e) = crate::auth::scope_check::check_repo_scope( 400 auth.is_oauth, 401 auth.scope.as_deref(), 402 crate::oauth::RepoAction::Update, 403 &input.collection, 404 ) { 405 return e; 406 } 407 408 let did = auth.did; 409 let user_id = auth.user_id; 410 let current_root_cid = auth.current_root_cid; 411 let controller_did = auth.controller_did; 412 413 if let Some(swap_commit) = &input.swap_commit 414 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 415 { 416 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 417 } 418 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 419 let commit_bytes = match tracking_store.get(&current_root_cid).await { 420 Ok(Some(b)) => b, 421 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 422 }; 423 let commit = match Commit::from_cbor(&commit_bytes) { 424 Ok(c) => c, 425 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 426 }; 427 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 428 let key = format!("{}/{}", input.collection, input.rkey); 429 let validation_status = if input.validate == Some(false) { 430 None 431 } else { 432 let require_lexicon = input.validate == Some(true); 433 match validate_record_with_status( 434 &input.record, 435 &input.collection, 436 Some(input.rkey.as_str()), 437 require_lexicon, 438 ) { 439 Ok(status) => Some(status), 440 Err(err_response) => return *err_response, 441 } 442 }; 443 if let Some(swap_record_str) = &input.swap_record { 444 let expected_cid = Cid::from_str(swap_record_str).ok(); 445 let actual_cid = mst.get(&key).await.ok().flatten(); 446 if expected_cid != actual_cid { 447 return ApiError::InvalidSwap(Some( 448 "Record has been modified or does not exist".into(), 449 )) 450 .into_response(); 451 } 452 } 453 let existing_cid = mst.get(&key).await.ok().flatten(); 454 let record_ipld = crate::util::json_to_ipld(&input.record); 455 let mut record_bytes = Vec::new(); 456 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 457 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response(); 458 } 459 let record_cid = match tracking_store.put(&record_bytes).await { 460 Ok(c) => c, 461 _ => { 462 return ApiError::InternalError(Some("Failed to save record block".into())) 463 .into_response(); 464 } 465 }; 466 if existing_cid == Some(record_cid) { 467 return ( 468 StatusCode::OK, 469 Json(PutRecordOutput { 470 uri: AtUri::from_parts(&did, &input.collection, &input.rkey), 471 cid: record_cid.to_string(), 472 commit: None, 473 validation_status: validation_status.map(|s| s.to_string()), 474 }), 475 ) 476 .into_response(); 477 } 478 let new_mst = if existing_cid.is_some() { 479 match mst.update(&key, record_cid).await { 480 Ok(m) => m, 481 Err(_) => { 482 return ApiError::InternalError(Some("Failed to update MST".into())) 483 .into_response(); 484 } 485 } 486 } else { 487 match mst.add(&key, record_cid).await { 488 Ok(m) => m, 489 Err(_) => { 490 return ApiError::InternalError(Some("Failed to add to MST".into())) 491 .into_response(); 492 } 493 } 494 }; 495 let new_mst_root = match new_mst.persist().await { 496 Ok(c) => c, 497 Err(_) => { 498 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(); 499 } 500 }; 501 let op = if existing_cid.is_some() { 502 RecordOp::Update { 503 collection: input.collection.to_string(), 504 rkey: input.rkey.to_string(), 505 cid: record_cid, 506 prev: existing_cid, 507 } 508 } else { 509 RecordOp::Create { 510 collection: input.collection.to_string(), 511 rkey: input.rkey.to_string(), 512 cid: record_cid, 513 } 514 }; 515 let mut relevant_blocks = std::collections::BTreeMap::new(); 516 if new_mst 517 .blocks_for_path(&key, &mut relevant_blocks) 518 .await 519 .is_err() 520 { 521 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 522 .into_response(); 523 } 524 if mst 525 .blocks_for_path(&key, &mut relevant_blocks) 526 .await 527 .is_err() 528 { 529 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 530 .into_response(); 531 } 532 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 533 let mut written_cids = tracking_store.get_all_relevant_cids(); 534 for cid in relevant_blocks.keys() { 535 if !written_cids.contains(cid) { 536 written_cids.push(*cid); 537 } 538 } 539 let written_cids_str = written_cids 540 .iter() 541 .map(|c| c.to_string()) 542 .collect::<Vec<_>>(); 543 let is_update = existing_cid.is_some(); 544 let blob_cids = extract_blob_cids(&input.record); 545 let commit_result = match commit_and_log( 546 &state, 547 CommitParams { 548 did: &did, 549 user_id, 550 current_root_cid: Some(current_root_cid), 551 prev_data_cid: Some(commit.data), 552 new_mst_root, 553 ops: vec![op], 554 blocks_cids: &written_cids_str, 555 blobs: &blob_cids, 556 }, 557 ) 558 .await 559 { 560 Ok(res) => res, 561 Err(e) => return ApiError::InternalError(Some(e)).into_response(), 562 }; 563 564 if let Some(ref controller) = controller_did { 565 let _ = delegation::log_delegation_action( 566 &state.db, 567 &did, 568 controller, 569 Some(controller), 570 DelegationActionType::RepoWrite, 571 Some(json!({ 572 "action": if is_update { "update" } else { "create" }, 573 "collection": input.collection, 574 "rkey": input.rkey 575 })), 576 None, 577 None, 578 ) 579 .await; 580 } 581 582 ( 583 StatusCode::OK, 584 Json(PutRecordOutput { 585 uri: AtUri::from_parts(&did, &input.collection, &input.rkey), 586 cid: record_cid.to_string(), 587 commit: Some(CommitInfo { 588 cid: commit_result.commit_cid.to_string(), 589 rev: commit_result.rev, 590 }), 591 validation_status: validation_status.map(|s| s.to_string()), 592 }), 593 ) 594 .into_response() 595}