this repo has no description
1use super::validation::validate_record_with_status; 2use crate::api::error::ApiError; 3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 4use crate::delegation::{self, DelegationActionType}; 5use crate::repo::tracking::TrackingBlockStore; 6use crate::state::AppState; 7use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey}; 8use axum::{ 9 Json, 10 extract::State, 11 http::{HeaderMap, StatusCode}, 12 response::{IntoResponse, Response}, 13}; 14use cid::Cid; 15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use sqlx::{PgPool, Row}; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::error; 22use uuid::Uuid; 23 24pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 25 let row = sqlx::query( 26 r#" 27 SELECT 28 email_verified, 29 discord_verified, 30 telegram_verified, 31 signal_verified 32 FROM users 33 WHERE did = $1 34 "#, 35 ) 36 .bind(did) 37 .fetch_optional(db) 38 .await?; 39 match row { 40 Some(r) => { 41 let email_verified: bool = r.get("email_verified"); 42 let discord_verified: bool = r.get("discord_verified"); 43 let telegram_verified: bool = r.get("telegram_verified"); 44 let signal_verified: bool = r.get("signal_verified"); 45 Ok(email_verified || discord_verified || telegram_verified || signal_verified) 46 } 47 None => Ok(false), 48 } 49} 50 51pub struct RepoWriteAuth { 52 pub did: Did, 53 pub user_id: Uuid, 54 pub current_root_cid: Cid, 55 pub is_oauth: bool, 56 pub scope: Option<String>, 57 pub controller_did: Option<Did>, 58} 59 60pub async fn prepare_repo_write( 61 state: &AppState, 62 headers: &HeaderMap, 63 repo_did: &str, 64 http_method: &str, 65 http_uri: &str, 66) -> Result<RepoWriteAuth, Response> { 67 let extracted = crate::auth::extract_auth_token_from_header( 68 headers.get("Authorization").and_then(|h| h.to_str().ok()), 69 ) 70 .ok_or_else(|| ApiError::AuthenticationRequired.into_response())?; 71 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 72 let auth_user = crate::auth::validate_token_with_dpop( 73 &state.db, 74 &extracted.token, 75 extracted.is_dpop, 76 dpop_proof, 77 http_method, 78 http_uri, 79 false, 80 ) 81 .await 82 .map_err(|e| { 83 tracing::warn!(error = ?e, is_dpop = extracted.is_dpop, "Token validation failed in prepare_repo_write"); 84 let mut response = ApiError::from(e).into_response(); 85 if matches!(e, crate::auth::TokenValidationError::TokenExpired) { 86 let scheme = if extracted.is_dpop { "DPoP" } else { "Bearer" }; 87 let www_auth = format!( 88 "{} error=\"invalid_token\", error_description=\"Token has expired\"", 89 scheme 90 ); 91 response.headers_mut().insert( 92 "WWW-Authenticate", 93 www_auth.parse().unwrap(), 94 ); 95 if extracted.is_dpop { 96 let nonce = crate::oauth::verify::generate_dpop_nonce(); 97 response.headers_mut().insert("DPoP-Nonce", nonce.parse().unwrap()); 98 } 99 } 100 response 101 })?; 102 if repo_did != auth_user.did { 103 return Err( 104 ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(), 105 ); 106 } 107 if crate::util::is_account_migrated(&state.db, &auth_user.did) 108 .await 109 .unwrap_or(false) 110 { 111 return Err(ApiError::AccountMigrated.into_response()); 112 } 113 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did) 114 .await 115 .unwrap_or(false); 116 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did) 117 .await 118 .unwrap_or(false); 119 if !is_verified && !is_delegated { 120 return Err(ApiError::AccountNotVerified.into_response()); 121 } 122 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &auth_user.did) 123 .fetch_optional(&state.db) 124 .await 125 .map_err(|e| { 126 error!("DB error fetching user: {}", e); 127 ApiError::InternalError(None).into_response() 128 })? 129 .ok_or_else(|| ApiError::InternalError(Some("User not found".into())).into_response())?; 130 let root_cid_str: String = sqlx::query_scalar!( 131 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 132 user_id 133 ) 134 .fetch_optional(&state.db) 135 .await 136 .map_err(|e| { 137 error!("DB error fetching repo root: {}", e); 138 ApiError::InternalError(None).into_response() 139 })? 140 .ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())).into_response())?; 141 let current_root_cid = Cid::from_str(&root_cid_str) 142 .map_err(|_| ApiError::InternalError(Some("Invalid repo root CID".into())).into_response())?; 143 Ok(RepoWriteAuth { 144 did: auth_user.did.clone(), 145 user_id, 146 current_root_cid, 147 is_oauth: auth_user.is_oauth, 148 scope: auth_user.scope, 149 controller_did: auth_user.controller_did.clone(), 150 }) 151} 152#[derive(Deserialize)] 153#[allow(dead_code)] 154pub struct CreateRecordInput { 155 pub repo: AtIdentifier, 156 pub collection: Nsid, 157 pub rkey: Option<Rkey>, 158 pub validate: Option<bool>, 159 pub record: serde_json::Value, 160 #[serde(rename = "swapCommit")] 161 pub swap_commit: Option<String>, 162} 163#[derive(Serialize)] 164#[serde(rename_all = "camelCase")] 165pub struct CommitInfo { 166 pub cid: String, 167 pub rev: String, 168} 169 170#[derive(Serialize)] 171#[serde(rename_all = "camelCase")] 172pub struct CreateRecordOutput { 173 pub uri: AtUri, 174 pub cid: String, 175 pub commit: CommitInfo, 176 #[serde(skip_serializing_if = "Option::is_none")] 177 pub validation_status: Option<String>, 178} 179pub async fn create_record( 180 State(state): State<AppState>, 181 headers: HeaderMap, 182 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 183 Json(input): Json<CreateRecordInput>, 184) -> Response { 185 let auth = match prepare_repo_write( 186 &state, 187 &headers, 188 &input.repo, 189 "POST", 190 &crate::util::build_full_url(&uri.to_string()), 191 ) 192 .await 193 { 194 Ok(res) => res, 195 Err(err_res) => return err_res, 196 }; 197 198 if let Err(e) = crate::auth::scope_check::check_repo_scope( 199 auth.is_oauth, 200 auth.scope.as_deref(), 201 crate::oauth::RepoAction::Create, 202 &input.collection, 203 ) { 204 return e; 205 } 206 207 let did = auth.did; 208 let user_id = auth.user_id; 209 let current_root_cid = auth.current_root_cid; 210 let controller_did = auth.controller_did; 211 212 if let Some(swap_commit) = &input.swap_commit 213 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 214 { 215 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 216 } 217 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 218 let commit_bytes = match tracking_store.get(&current_root_cid).await { 219 Ok(Some(b)) => b, 220 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 221 }; 222 let commit = match Commit::from_cbor(&commit_bytes) { 223 Ok(c) => c, 224 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 225 }; 226 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 227 let validation_status = if input.validate == Some(false) { 228 None 229 } else { 230 let require_lexicon = input.validate == Some(true); 231 match validate_record_with_status( 232 &input.record, 233 &input.collection, 234 input.rkey.as_ref().map(|r| r.as_str()), 235 require_lexicon, 236 ) { 237 Ok(status) => Some(status), 238 Err(err_response) => return *err_response, 239 } 240 }; 241 let rkey = input.rkey.unwrap_or_else(Rkey::generate); 242 let record_ipld = crate::util::json_to_ipld(&input.record); 243 let mut record_bytes = Vec::new(); 244 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 245 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response(); 246 } 247 let record_cid = match tracking_store.put(&record_bytes).await { 248 Ok(c) => c, 249 _ => { 250 return ApiError::InternalError(Some("Failed to save record block".into())).into_response() 251 } 252 }; 253 let key = format!("{}/{}", input.collection, rkey); 254 let new_mst = match mst.add(&key, record_cid).await { 255 Ok(m) => m, 256 _ => return ApiError::InternalError(Some("Failed to add to MST".into())).into_response(), 257 }; 258 let new_mst_root = match new_mst.persist().await { 259 Ok(c) => c, 260 _ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(), 261 }; 262 let op = RecordOp::Create { 263 collection: input.collection.to_string(), 264 rkey: rkey.to_string(), 265 cid: record_cid, 266 }; 267 let mut relevant_blocks = std::collections::BTreeMap::new(); 268 if new_mst 269 .blocks_for_path(&key, &mut relevant_blocks) 270 .await 271 .is_err() 272 { 273 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 274 .into_response(); 275 } 276 if mst 277 .blocks_for_path(&key, &mut relevant_blocks) 278 .await 279 .is_err() 280 { 281 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 282 .into_response(); 283 } 284 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 285 let mut written_cids = tracking_store.get_all_relevant_cids(); 286 for cid in relevant_blocks.keys() { 287 if !written_cids.contains(cid) { 288 written_cids.push(*cid); 289 } 290 } 291 let written_cids_str = written_cids 292 .iter() 293 .map(|c| c.to_string()) 294 .collect::<Vec<_>>(); 295 let blob_cids = extract_blob_cids(&input.record); 296 let commit_result = match commit_and_log( 297 &state, 298 CommitParams { 299 did: &did, 300 user_id, 301 current_root_cid: Some(current_root_cid), 302 prev_data_cid: Some(commit.data), 303 new_mst_root, 304 ops: vec![op], 305 blocks_cids: &written_cids_str, 306 blobs: &blob_cids, 307 }, 308 ) 309 .await 310 { 311 Ok(res) => res, 312 Err(e) => return ApiError::InternalError(Some(e)).into_response(), 313 }; 314 315 if let Some(ref controller) = controller_did { 316 let _ = delegation::log_delegation_action( 317 &state.db, 318 &did, 319 controller, 320 Some(controller), 321 DelegationActionType::RepoWrite, 322 Some(json!({ 323 "action": "create", 324 "collection": input.collection, 325 "rkey": rkey 326 })), 327 None, 328 None, 329 ) 330 .await; 331 } 332 333 ( 334 StatusCode::OK, 335 Json(CreateRecordOutput { 336 uri: AtUri::from_parts(&did, &input.collection, &rkey), 337 cid: record_cid.to_string(), 338 commit: CommitInfo { 339 cid: commit_result.commit_cid.to_string(), 340 rev: commit_result.rev, 341 }, 342 validation_status: validation_status.map(|s| s.to_string()), 343 }), 344 ) 345 .into_response() 346} 347#[derive(Deserialize)] 348#[allow(dead_code)] 349pub struct PutRecordInput { 350 pub repo: AtIdentifier, 351 pub collection: Nsid, 352 pub rkey: Rkey, 353 pub validate: Option<bool>, 354 pub record: serde_json::Value, 355 #[serde(rename = "swapCommit")] 356 pub swap_commit: Option<String>, 357 #[serde(rename = "swapRecord")] 358 pub swap_record: Option<String>, 359} 360#[derive(Serialize)] 361#[serde(rename_all = "camelCase")] 362pub struct PutRecordOutput { 363 pub uri: AtUri, 364 pub cid: String, 365 #[serde(skip_serializing_if = "Option::is_none")] 366 pub commit: Option<CommitInfo>, 367 #[serde(skip_serializing_if = "Option::is_none")] 368 pub validation_status: Option<String>, 369} 370pub async fn put_record( 371 State(state): State<AppState>, 372 headers: HeaderMap, 373 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 374 Json(input): Json<PutRecordInput>, 375) -> Response { 376 let auth = match prepare_repo_write( 377 &state, 378 &headers, 379 &input.repo, 380 "POST", 381 &crate::util::build_full_url(&uri.to_string()), 382 ) 383 .await 384 { 385 Ok(res) => res, 386 Err(err_res) => return err_res, 387 }; 388 389 if let Err(e) = crate::auth::scope_check::check_repo_scope( 390 auth.is_oauth, 391 auth.scope.as_deref(), 392 crate::oauth::RepoAction::Create, 393 &input.collection, 394 ) { 395 return e; 396 } 397 if let Err(e) = crate::auth::scope_check::check_repo_scope( 398 auth.is_oauth, 399 auth.scope.as_deref(), 400 crate::oauth::RepoAction::Update, 401 &input.collection, 402 ) { 403 return e; 404 } 405 406 let did = auth.did; 407 let user_id = auth.user_id; 408 let current_root_cid = auth.current_root_cid; 409 let controller_did = auth.controller_did; 410 411 if let Some(swap_commit) = &input.swap_commit 412 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 413 { 414 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 415 } 416 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 417 let commit_bytes = match tracking_store.get(&current_root_cid).await { 418 Ok(Some(b)) => b, 419 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 420 }; 421 let commit = match Commit::from_cbor(&commit_bytes) { 422 Ok(c) => c, 423 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 424 }; 425 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 426 let key = format!("{}/{}", input.collection, input.rkey); 427 let validation_status = if input.validate == Some(false) { 428 None 429 } else { 430 let require_lexicon = input.validate == Some(true); 431 match validate_record_with_status( 432 &input.record, 433 &input.collection, 434 Some(input.rkey.as_str()), 435 require_lexicon, 436 ) { 437 Ok(status) => Some(status), 438 Err(err_response) => return *err_response, 439 } 440 }; 441 if let Some(swap_record_str) = &input.swap_record { 442 let expected_cid = Cid::from_str(swap_record_str).ok(); 443 let actual_cid = mst.get(&key).await.ok().flatten(); 444 if expected_cid != actual_cid { 445 return ApiError::InvalidSwap(Some("Record has been modified or does not exist".into())) 446 .into_response(); 447 } 448 } 449 let existing_cid = mst.get(&key).await.ok().flatten(); 450 let record_ipld = crate::util::json_to_ipld(&input.record); 451 let mut record_bytes = Vec::new(); 452 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 453 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response(); 454 } 455 let record_cid = match tracking_store.put(&record_bytes).await { 456 Ok(c) => c, 457 _ => { 458 return ApiError::InternalError(Some("Failed to save record block".into())).into_response() 459 } 460 }; 461 if existing_cid == Some(record_cid) { 462 return ( 463 StatusCode::OK, 464 Json(PutRecordOutput { 465 uri: AtUri::from_parts(&did, &input.collection, &input.rkey), 466 cid: record_cid.to_string(), 467 commit: None, 468 validation_status: validation_status.map(|s| s.to_string()), 469 }), 470 ) 471 .into_response(); 472 } 473 let new_mst = if existing_cid.is_some() { 474 match mst.update(&key, record_cid).await { 475 Ok(m) => m, 476 Err(_) => { 477 return ApiError::InternalError(Some("Failed to update MST".into())).into_response() 478 } 479 } 480 } else { 481 match mst.add(&key, record_cid).await { 482 Ok(m) => m, 483 Err(_) => { 484 return ApiError::InternalError(Some("Failed to add to MST".into())).into_response() 485 } 486 } 487 }; 488 let new_mst_root = match new_mst.persist().await { 489 Ok(c) => c, 490 Err(_) => { 491 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response() 492 } 493 }; 494 let op = if existing_cid.is_some() { 495 RecordOp::Update { 496 collection: input.collection.to_string(), 497 rkey: input.rkey.to_string(), 498 cid: record_cid, 499 prev: existing_cid, 500 } 501 } else { 502 RecordOp::Create { 503 collection: input.collection.to_string(), 504 rkey: input.rkey.to_string(), 505 cid: record_cid, 506 } 507 }; 508 let mut relevant_blocks = std::collections::BTreeMap::new(); 509 if new_mst 510 .blocks_for_path(&key, &mut relevant_blocks) 511 .await 512 .is_err() 513 { 514 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 515 .into_response(); 516 } 517 if mst 518 .blocks_for_path(&key, &mut relevant_blocks) 519 .await 520 .is_err() 521 { 522 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 523 .into_response(); 524 } 525 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 526 let mut written_cids = tracking_store.get_all_relevant_cids(); 527 for cid in relevant_blocks.keys() { 528 if !written_cids.contains(cid) { 529 written_cids.push(*cid); 530 } 531 } 532 let written_cids_str = written_cids 533 .iter() 534 .map(|c| c.to_string()) 535 .collect::<Vec<_>>(); 536 let is_update = existing_cid.is_some(); 537 let blob_cids = extract_blob_cids(&input.record); 538 let commit_result = match commit_and_log( 539 &state, 540 CommitParams { 541 did: &did, 542 user_id, 543 current_root_cid: Some(current_root_cid), 544 prev_data_cid: Some(commit.data), 545 new_mst_root, 546 ops: vec![op], 547 blocks_cids: &written_cids_str, 548 blobs: &blob_cids, 549 }, 550 ) 551 .await 552 { 553 Ok(res) => res, 554 Err(e) => return ApiError::InternalError(Some(e)).into_response(), 555 }; 556 557 if let Some(ref controller) = controller_did { 558 let _ = delegation::log_delegation_action( 559 &state.db, 560 &did, 561 controller, 562 Some(controller), 563 DelegationActionType::RepoWrite, 564 Some(json!({ 565 "action": if is_update { "update" } else { "create" }, 566 "collection": input.collection, 567 "rkey": input.rkey 568 })), 569 None, 570 None, 571 ) 572 .await; 573 } 574 575 ( 576 StatusCode::OK, 577 Json(PutRecordOutput { 578 uri: AtUri::from_parts(&did, &input.collection, &input.rkey), 579 cid: record_cid.to_string(), 580 commit: Some(CommitInfo { 581 cid: commit_result.commit_cid.to_string(), 582 rev: commit_result.rev, 583 }), 584 validation_status: validation_status.map(|s| s.to_string()), 585 }), 586 ) 587 .into_response() 588}