this repo has no description
at main 20 kB view raw
1use super::validation::validate_record_with_status; 2use crate::api::error::ApiError; 3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 4use crate::delegation::{self, DelegationActionType}; 5use crate::repo::tracking::TrackingBlockStore; 6use crate::state::AppState; 7use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey}; 8use axum::{ 9 Json, 10 extract::State, 11 http::{HeaderMap, StatusCode}, 12 response::{IntoResponse, Response}, 13}; 14use cid::Cid; 15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16use serde::{Deserialize, Serialize}; 17use serde_json::json; 18use sqlx::{PgPool, Row}; 19use std::str::FromStr; 20use std::sync::Arc; 21use tracing::error; 22use uuid::Uuid; 23 24pub async fn has_verified_comms_channel(db: &PgPool, did: &Did) -> Result<bool, sqlx::Error> { 25 let row = sqlx::query( 26 r#" 27 SELECT 28 email_verified, 29 discord_verified, 30 telegram_verified, 31 signal_verified 32 FROM users 33 WHERE did = $1 34 "#, 35 ) 36 .bind(did.as_str()) 37 .fetch_optional(db) 38 .await?; 39 match row { 40 Some(r) => { 41 let email_verified: bool = r.get("email_verified"); 42 let discord_verified: bool = r.get("discord_verified"); 43 let telegram_verified: bool = r.get("telegram_verified"); 44 let signal_verified: bool = r.get("signal_verified"); 45 Ok(email_verified || discord_verified || telegram_verified || signal_verified) 46 } 47 None => Ok(false), 48 } 49} 50 51pub struct RepoWriteAuth { 52 pub did: Did, 53 pub user_id: Uuid, 54 pub current_root_cid: Cid, 55 pub is_oauth: bool, 56 pub scope: Option<String>, 57 pub controller_did: Option<Did>, 58} 59 60pub async fn prepare_repo_write( 61 state: &AppState, 62 headers: &HeaderMap, 63 repo: &AtIdentifier, 64 http_method: &str, 65 http_uri: &str, 66) -> Result<RepoWriteAuth, Response> { 67 let extracted = crate::auth::extract_auth_token_from_header( 68 headers.get("Authorization").and_then(|h| h.to_str().ok()), 69 ) 70 .ok_or_else(|| ApiError::AuthenticationRequired.into_response())?; 71 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 72 let auth_user = crate::auth::validate_token_with_dpop( 73 &state.db, 74 &extracted.token, 75 extracted.is_dpop, 76 dpop_proof, 77 http_method, 78 http_uri, 79 false, 80 false, 81 ) 82 .await 83 .map_err(|e| { 84 tracing::warn!(error = ?e, is_dpop = extracted.is_dpop, "Token validation failed in prepare_repo_write"); 85 ApiError::from(e).into_response() 86 })?; 87 if repo.as_str() != auth_user.did.as_str() { 88 return Err( 89 ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(), 90 ); 91 } 92 if crate::util::is_account_migrated(&state.db, &auth_user.did) 93 .await 94 .unwrap_or(false) 95 { 96 return Err(ApiError::AccountMigrated.into_response()); 97 } 98 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did) 99 .await 100 .unwrap_or(false); 101 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did) 102 .await 103 .unwrap_or(false); 104 if !is_verified && !is_delegated { 105 return Err(ApiError::AccountNotVerified.into_response()); 106 } 107 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &auth_user.did) 108 .fetch_optional(&state.db) 109 .await 110 .map_err(|e| { 111 error!("DB error fetching user: {}", e); 112 ApiError::InternalError(None).into_response() 113 })? 114 .ok_or_else(|| ApiError::InternalError(Some("User not found".into())).into_response())?; 115 let root_cid_str: String = sqlx::query_scalar!( 116 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 117 user_id 118 ) 119 .fetch_optional(&state.db) 120 .await 121 .map_err(|e| { 122 error!("DB error fetching repo root: {}", e); 123 ApiError::InternalError(None).into_response() 124 })? 125 .ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())).into_response())?; 126 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 127 ApiError::InternalError(Some("Invalid repo root CID".into())).into_response() 128 })?; 129 Ok(RepoWriteAuth { 130 did: auth_user.did.clone(), 131 user_id, 132 current_root_cid, 133 is_oauth: auth_user.is_oauth, 134 scope: auth_user.scope, 135 controller_did: auth_user.controller_did.clone(), 136 }) 137} 138#[derive(Deserialize)] 139#[allow(dead_code)] 140pub struct CreateRecordInput { 141 pub repo: AtIdentifier, 142 pub collection: Nsid, 143 pub rkey: Option<Rkey>, 144 pub validate: Option<bool>, 145 pub record: serde_json::Value, 146 #[serde(rename = "swapCommit")] 147 pub swap_commit: Option<String>, 148} 149#[derive(Serialize)] 150#[serde(rename_all = "camelCase")] 151pub struct CommitInfo { 152 pub cid: String, 153 pub rev: String, 154} 155 156#[derive(Serialize)] 157#[serde(rename_all = "camelCase")] 158pub struct CreateRecordOutput { 159 pub uri: AtUri, 160 pub cid: String, 161 pub commit: CommitInfo, 162 #[serde(skip_serializing_if = "Option::is_none")] 163 pub validation_status: Option<String>, 164} 165pub async fn create_record( 166 State(state): State<AppState>, 167 headers: HeaderMap, 168 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 169 Json(input): Json<CreateRecordInput>, 170) -> Response { 171 let auth = match prepare_repo_write( 172 &state, 173 &headers, 174 &input.repo, 175 "POST", 176 &crate::util::build_full_url(&uri.to_string()), 177 ) 178 .await 179 { 180 Ok(res) => res, 181 Err(err_res) => return err_res, 182 }; 183 184 if let Err(e) = crate::auth::scope_check::check_repo_scope( 185 auth.is_oauth, 186 auth.scope.as_deref(), 187 crate::oauth::RepoAction::Create, 188 &input.collection, 189 ) { 190 return e; 191 } 192 193 let did = auth.did; 194 let user_id = auth.user_id; 195 let current_root_cid = auth.current_root_cid; 196 let controller_did = auth.controller_did; 197 198 if let Some(swap_commit) = &input.swap_commit 199 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 200 { 201 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 202 } 203 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 204 let commit_bytes = match tracking_store.get(&current_root_cid).await { 205 Ok(Some(b)) => b, 206 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 207 }; 208 let commit = match Commit::from_cbor(&commit_bytes) { 209 Ok(c) => c, 210 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 211 }; 212 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 213 let validation_status = if input.validate == Some(false) { 214 None 215 } else { 216 let require_lexicon = input.validate == Some(true); 217 match validate_record_with_status( 218 &input.record, 219 &input.collection, 220 input.rkey.as_ref(), 221 require_lexicon, 222 ) { 223 Ok(status) => Some(status), 224 Err(err_response) => return *err_response, 225 } 226 }; 227 let rkey = input.rkey.unwrap_or_else(Rkey::generate); 228 let record_ipld = crate::util::json_to_ipld(&input.record); 229 let mut record_bytes = Vec::new(); 230 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 231 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response(); 232 } 233 let record_cid = match tracking_store.put(&record_bytes).await { 234 Ok(c) => c, 235 _ => { 236 return ApiError::InternalError(Some("Failed to save record block".into())) 237 .into_response(); 238 } 239 }; 240 let key = format!("{}/{}", input.collection, rkey); 241 let new_mst = match mst.add(&key, record_cid).await { 242 Ok(m) => m, 243 _ => return ApiError::InternalError(Some("Failed to add to MST".into())).into_response(), 244 }; 245 let new_mst_root = match new_mst.persist().await { 246 Ok(c) => c, 247 _ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(), 248 }; 249 let op = RecordOp::Create { 250 collection: input.collection.clone(), 251 rkey: rkey.clone(), 252 cid: record_cid, 253 }; 254 let mut new_mst_blocks = std::collections::BTreeMap::new(); 255 let mut old_mst_blocks = std::collections::BTreeMap::new(); 256 if new_mst 257 .blocks_for_path(&key, &mut new_mst_blocks) 258 .await 259 .is_err() 260 { 261 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 262 .into_response(); 263 } 264 if mst 265 .blocks_for_path(&key, &mut old_mst_blocks) 266 .await 267 .is_err() 268 { 269 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 270 .into_response(); 271 } 272 let mut relevant_blocks = new_mst_blocks.clone(); 273 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 274 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 275 let written_cids: Vec<Cid> = tracking_store 276 .get_all_relevant_cids() 277 .into_iter() 278 .chain(relevant_blocks.keys().copied()) 279 .collect::<std::collections::HashSet<_>>() 280 .into_iter() 281 .collect(); 282 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 283 let blob_cids = extract_blob_cids(&input.record); 284 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 285 .chain( 286 old_mst_blocks 287 .keys() 288 .filter(|cid| !new_mst_blocks.contains_key(*cid)) 289 .copied(), 290 ) 291 .collect(); 292 let commit_result = match commit_and_log( 293 &state, 294 CommitParams { 295 did: &did, 296 user_id, 297 current_root_cid: Some(current_root_cid), 298 prev_data_cid: Some(commit.data), 299 new_mst_root, 300 ops: vec![op], 301 blocks_cids: &written_cids_str, 302 blobs: &blob_cids, 303 obsolete_cids, 304 }, 305 ) 306 .await 307 { 308 Ok(res) => res, 309 Err(e) if e.contains("ConcurrentModification") => { 310 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 311 } 312 Err(e) => return ApiError::InternalError(Some(e)).into_response(), 313 }; 314 315 if let Some(ref controller) = controller_did { 316 let _ = delegation::log_delegation_action( 317 &state.db, 318 &did, 319 controller, 320 Some(controller), 321 DelegationActionType::RepoWrite, 322 Some(json!({ 323 "action": "create", 324 "collection": input.collection, 325 "rkey": rkey 326 })), 327 None, 328 None, 329 ) 330 .await; 331 } 332 333 ( 334 StatusCode::OK, 335 Json(CreateRecordOutput { 336 uri: AtUri::from_parts(&did, &input.collection, &rkey), 337 cid: record_cid.to_string(), 338 commit: CommitInfo { 339 cid: commit_result.commit_cid.to_string(), 340 rev: commit_result.rev, 341 }, 342 validation_status: validation_status.map(|s| s.to_string()), 343 }), 344 ) 345 .into_response() 346} 347#[derive(Deserialize)] 348#[allow(dead_code)] 349pub struct PutRecordInput { 350 pub repo: AtIdentifier, 351 pub collection: Nsid, 352 pub rkey: Rkey, 353 pub validate: Option<bool>, 354 pub record: serde_json::Value, 355 #[serde(rename = "swapCommit")] 356 pub swap_commit: Option<String>, 357 #[serde(rename = "swapRecord")] 358 pub swap_record: Option<String>, 359} 360#[derive(Serialize)] 361#[serde(rename_all = "camelCase")] 362pub struct PutRecordOutput { 363 pub uri: AtUri, 364 pub cid: String, 365 #[serde(skip_serializing_if = "Option::is_none")] 366 pub commit: Option<CommitInfo>, 367 #[serde(skip_serializing_if = "Option::is_none")] 368 pub validation_status: Option<String>, 369} 370pub async fn put_record( 371 State(state): State<AppState>, 372 headers: HeaderMap, 373 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 374 Json(input): Json<PutRecordInput>, 375) -> Response { 376 let auth = match prepare_repo_write( 377 &state, 378 &headers, 379 &input.repo, 380 "POST", 381 &crate::util::build_full_url(&uri.to_string()), 382 ) 383 .await 384 { 385 Ok(res) => res, 386 Err(err_res) => return err_res, 387 }; 388 389 if let Err(e) = crate::auth::scope_check::check_repo_scope( 390 auth.is_oauth, 391 auth.scope.as_deref(), 392 crate::oauth::RepoAction::Create, 393 &input.collection, 394 ) { 395 return e; 396 } 397 if let Err(e) = crate::auth::scope_check::check_repo_scope( 398 auth.is_oauth, 399 auth.scope.as_deref(), 400 crate::oauth::RepoAction::Update, 401 &input.collection, 402 ) { 403 return e; 404 } 405 406 let did = auth.did; 407 let user_id = auth.user_id; 408 let current_root_cid = auth.current_root_cid; 409 let controller_did = auth.controller_did; 410 411 if let Some(swap_commit) = &input.swap_commit 412 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 413 { 414 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 415 } 416 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 417 let commit_bytes = match tracking_store.get(&current_root_cid).await { 418 Ok(Some(b)) => b, 419 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(), 420 }; 421 let commit = match Commit::from_cbor(&commit_bytes) { 422 Ok(c) => c, 423 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 424 }; 425 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 426 let key = format!("{}/{}", input.collection, input.rkey); 427 let validation_status = if input.validate == Some(false) { 428 None 429 } else { 430 let require_lexicon = input.validate == Some(true); 431 match validate_record_with_status( 432 &input.record, 433 &input.collection, 434 Some(&input.rkey), 435 require_lexicon, 436 ) { 437 Ok(status) => Some(status), 438 Err(err_response) => return *err_response, 439 } 440 }; 441 if let Some(swap_record_str) = &input.swap_record { 442 let expected_cid = Cid::from_str(swap_record_str).ok(); 443 let actual_cid = mst.get(&key).await.ok().flatten(); 444 if expected_cid != actual_cid { 445 return ApiError::InvalidSwap(Some( 446 "Record has been modified or does not exist".into(), 447 )) 448 .into_response(); 449 } 450 } 451 let existing_cid = mst.get(&key).await.ok().flatten(); 452 let record_ipld = crate::util::json_to_ipld(&input.record); 453 let mut record_bytes = Vec::new(); 454 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() { 455 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response(); 456 } 457 let record_cid = match tracking_store.put(&record_bytes).await { 458 Ok(c) => c, 459 _ => { 460 return ApiError::InternalError(Some("Failed to save record block".into())) 461 .into_response(); 462 } 463 }; 464 if existing_cid == Some(record_cid) { 465 return ( 466 StatusCode::OK, 467 Json(PutRecordOutput { 468 uri: AtUri::from_parts(&did, &input.collection, &input.rkey), 469 cid: record_cid.to_string(), 470 commit: None, 471 validation_status: validation_status.map(|s| s.to_string()), 472 }), 473 ) 474 .into_response(); 475 } 476 let new_mst = if existing_cid.is_some() { 477 match mst.update(&key, record_cid).await { 478 Ok(m) => m, 479 Err(_) => { 480 return ApiError::InternalError(Some("Failed to update MST".into())) 481 .into_response(); 482 } 483 } 484 } else { 485 match mst.add(&key, record_cid).await { 486 Ok(m) => m, 487 Err(_) => { 488 return ApiError::InternalError(Some("Failed to add to MST".into())) 489 .into_response(); 490 } 491 } 492 }; 493 let new_mst_root = match new_mst.persist().await { 494 Ok(c) => c, 495 Err(_) => { 496 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(); 497 } 498 }; 499 let op = if existing_cid.is_some() { 500 RecordOp::Update { 501 collection: input.collection.clone(), 502 rkey: input.rkey.clone(), 503 cid: record_cid, 504 prev: existing_cid, 505 } 506 } else { 507 RecordOp::Create { 508 collection: input.collection.clone(), 509 rkey: input.rkey.clone(), 510 cid: record_cid, 511 } 512 }; 513 let mut new_mst_blocks = std::collections::BTreeMap::new(); 514 let mut old_mst_blocks = std::collections::BTreeMap::new(); 515 if new_mst 516 .blocks_for_path(&key, &mut new_mst_blocks) 517 .await 518 .is_err() 519 { 520 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 521 .into_response(); 522 } 523 if mst 524 .blocks_for_path(&key, &mut old_mst_blocks) 525 .await 526 .is_err() 527 { 528 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 529 .into_response(); 530 } 531 let mut relevant_blocks = new_mst_blocks.clone(); 532 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 533 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 534 let written_cids: Vec<Cid> = tracking_store 535 .get_all_relevant_cids() 536 .into_iter() 537 .chain(relevant_blocks.keys().copied()) 538 .collect::<std::collections::HashSet<_>>() 539 .into_iter() 540 .collect(); 541 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 542 let is_update = existing_cid.is_some(); 543 let blob_cids = extract_blob_cids(&input.record); 544 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 545 .chain( 546 old_mst_blocks 547 .keys() 548 .filter(|cid| !new_mst_blocks.contains_key(*cid)) 549 .copied(), 550 ) 551 .chain(existing_cid) 552 .collect(); 553 let commit_result = match commit_and_log( 554 &state, 555 CommitParams { 556 did: &did, 557 user_id, 558 current_root_cid: Some(current_root_cid), 559 prev_data_cid: Some(commit.data), 560 new_mst_root, 561 ops: vec![op], 562 blocks_cids: &written_cids_str, 563 blobs: &blob_cids, 564 obsolete_cids, 565 }, 566 ) 567 .await 568 { 569 Ok(res) => res, 570 Err(e) if e.contains("ConcurrentModification") => { 571 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response(); 572 } 573 Err(e) => return ApiError::InternalError(Some(e)).into_response(), 574 }; 575 576 if let Some(ref controller) = controller_did { 577 let _ = delegation::log_delegation_action( 578 &state.db, 579 &did, 580 controller, 581 Some(controller), 582 DelegationActionType::RepoWrite, 583 Some(json!({ 584 "action": if is_update { "update" } else { "create" }, 585 "collection": input.collection, 586 "rkey": input.rkey 587 })), 588 None, 589 None, 590 ) 591 .await; 592 } 593 594 ( 595 StatusCode::OK, 596 Json(PutRecordOutput { 597 uri: AtUri::from_parts(&did, &input.collection, &input.rkey), 598 cid: record_cid.to_string(), 599 commit: Some(CommitInfo { 600 cid: commit_result.commit_cid.to_string(), 601 rev: commit_result.rev, 602 }), 603 validation_status: validation_status.map(|s| s.to_string()), 604 }), 605 ) 606 .into_response() 607}