this repo has no description
1use super::validation::validate_record_with_rkey; 2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 3use crate::delegation::{self, DelegationActionType}; 4use crate::repo::tracking::TrackingBlockStore; 5use crate::state::AppState; 6use axum::{ 7 Json, 8 extract::State, 9 http::{HeaderMap, StatusCode}, 10 response::{IntoResponse, Response}, 11}; 12use cid::Cid; 13use jacquard::types::{ 14 integer::LimitedU32, 15 string::{Nsid, Tid}, 16}; 17use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 18use serde::{Deserialize, Serialize}; 19use serde_json::json; 20use sqlx::{PgPool, Row}; 21use std::str::FromStr; 22use std::sync::Arc; 23use tracing::error; 24use uuid::Uuid; 25 26pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 27 let row = sqlx::query( 28 r#" 29 SELECT 30 email_verified, 31 discord_verified, 32 telegram_verified, 33 signal_verified 34 FROM users 35 WHERE did = $1 36 "#, 37 ) 38 .bind(did) 39 .fetch_optional(db) 40 .await?; 41 match row { 42 Some(r) => { 43 let email_verified: bool = r.get("email_verified"); 44 let discord_verified: bool = r.get("discord_verified"); 45 let telegram_verified: bool = r.get("telegram_verified"); 46 let signal_verified: bool = r.get("signal_verified"); 47 Ok(email_verified || discord_verified || telegram_verified || signal_verified) 48 } 49 None => Ok(false), 50 } 51} 52 53pub struct RepoWriteAuth { 54 pub did: String, 55 pub user_id: Uuid, 56 pub current_root_cid: Cid, 57 pub is_oauth: bool, 58 pub scope: Option<String>, 59 pub controller_did: Option<String>, 60} 61 62pub async fn prepare_repo_write( 63 state: &AppState, 64 headers: &HeaderMap, 65 repo_did: &str, 66 http_method: &str, 67 http_uri: &str, 68) -> Result<RepoWriteAuth, Response> { 69 let extracted = crate::auth::extract_auth_token_from_header( 70 headers.get("Authorization").and_then(|h| h.to_str().ok()), 71 ) 72 .ok_or_else(|| { 73 ( 74 StatusCode::UNAUTHORIZED, 75 Json(json!({"error": "AuthenticationRequired"})), 76 ) 77 .into_response() 78 })?; 79 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 80 let auth_user = crate::auth::validate_token_with_dpop( 81 &state.db, 82 &extracted.token, 83 extracted.is_dpop, 84 dpop_proof, 85 http_method, 86 http_uri, 87 false, 88 ) 89 .await 90 .map_err(|e| { 91 ( 92 StatusCode::UNAUTHORIZED, 93 Json(json!({"error": e.to_string()})), 94 ) 95 .into_response() 96 })?; 97 if repo_did != auth_user.did { 98 return Err(( 99 StatusCode::FORBIDDEN, 100 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 101 ) 102 .into_response()); 103 } 104 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did) 105 .await 106 .unwrap_or(false); 107 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did) 108 .await 109 .unwrap_or(false); 110 if !is_verified && !is_delegated { 111 return Err(( 112 StatusCode::FORBIDDEN, 113 Json(json!({ 114 "error": "AccountNotVerified", 115 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 116 })), 117 ) 118 .into_response()); 119 } 120 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 121 .fetch_optional(&state.db) 122 .await 123 .map_err(|e| { 124 error!("DB error fetching user: {}", e); 125 ( 126 StatusCode::INTERNAL_SERVER_ERROR, 127 Json(json!({"error": "InternalError"})), 128 ) 129 .into_response() 130 })? 131 .ok_or_else(|| { 132 ( 133 StatusCode::INTERNAL_SERVER_ERROR, 134 Json(json!({"error": "InternalError", "message": "User not found"})), 135 ) 136 .into_response() 137 })?; 138 let root_cid_str: String = sqlx::query_scalar!( 139 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 140 user_id 141 ) 142 .fetch_optional(&state.db) 143 .await 144 .map_err(|e| { 145 error!("DB error fetching repo root: {}", e); 146 ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json(json!({"error": "InternalError"})), 149 ) 150 .into_response() 151 })? 152 .ok_or_else(|| { 153 ( 154 StatusCode::INTERNAL_SERVER_ERROR, 155 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 156 ) 157 .into_response() 158 })?; 159 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 160 ( 161 StatusCode::INTERNAL_SERVER_ERROR, 162 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 163 ) 164 .into_response() 165 })?; 166 Ok(RepoWriteAuth { 167 did: auth_user.did, 168 user_id, 169 current_root_cid, 170 is_oauth: auth_user.is_oauth, 171 scope: auth_user.scope, 172 controller_did: auth_user.controller_did, 173 }) 174} 175#[derive(Deserialize)] 176#[allow(dead_code)] 177pub struct CreateRecordInput { 178 pub repo: String, 179 pub collection: String, 180 pub rkey: Option<String>, 181 pub validate: Option<bool>, 182 pub record: serde_json::Value, 183 #[serde(rename = "swapCommit")] 184 pub swap_commit: Option<String>, 185} 186#[derive(Serialize)] 187#[serde(rename_all = "camelCase")] 188pub struct CreateRecordOutput { 189 pub uri: String, 190 pub cid: String, 191} 192pub async fn create_record( 193 State(state): State<AppState>, 194 headers: HeaderMap, 195 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 196 Json(input): Json<CreateRecordInput>, 197) -> Response { 198 let auth = 199 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 200 Ok(res) => res, 201 Err(err_res) => return err_res, 202 }; 203 204 if let Err(e) = crate::auth::scope_check::check_repo_scope( 205 auth.is_oauth, 206 auth.scope.as_deref(), 207 crate::oauth::RepoAction::Create, 208 &input.collection, 209 ) { 210 return e; 211 } 212 213 let did = auth.did; 214 let user_id = auth.user_id; 215 let current_root_cid = auth.current_root_cid; 216 let controller_did = auth.controller_did; 217 218 if let Some(swap_commit) = &input.swap_commit 219 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 220 { 221 return ( 222 StatusCode::CONFLICT, 223 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 224 ) 225 .into_response(); 226 } 227 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 228 let commit_bytes = match tracking_store.get(&current_root_cid).await { 229 Ok(Some(b)) => b, 230 _ => { 231 return ( 232 StatusCode::INTERNAL_SERVER_ERROR, 233 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 234 ) 235 .into_response(); 236 } 237 }; 238 let commit = match Commit::from_cbor(&commit_bytes) { 239 Ok(c) => c, 240 _ => { 241 return ( 242 StatusCode::INTERNAL_SERVER_ERROR, 243 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 244 ) 245 .into_response(); 246 } 247 }; 248 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 249 let collection_nsid = match input.collection.parse::<Nsid>() { 250 Ok(n) => n, 251 Err(_) => { 252 return ( 253 StatusCode::BAD_REQUEST, 254 Json(json!({"error": "InvalidCollection"})), 255 ) 256 .into_response(); 257 } 258 }; 259 if input.validate.unwrap_or(true) 260 && let Err(err_response) = 261 validate_record_with_rkey(&input.record, &input.collection, input.rkey.as_deref()) 262 { 263 return *err_response; 264 } 265 let rkey = input 266 .rkey 267 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 268 let mut record_bytes = Vec::new(); 269 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 270 return ( 271 StatusCode::BAD_REQUEST, 272 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 273 ) 274 .into_response(); 275 } 276 let record_cid = match tracking_store.put(&record_bytes).await { 277 Ok(c) => c, 278 _ => { 279 return ( 280 StatusCode::INTERNAL_SERVER_ERROR, 281 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 282 ) 283 .into_response(); 284 } 285 }; 286 let key = format!("{}/{}", collection_nsid, rkey); 287 let new_mst = match mst.add(&key, record_cid).await { 288 Ok(m) => m, 289 _ => { 290 return ( 291 StatusCode::INTERNAL_SERVER_ERROR, 292 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 293 ) 294 .into_response(); 295 } 296 }; 297 let new_mst_root = match new_mst.persist().await { 298 Ok(c) => c, 299 _ => { 300 return ( 301 StatusCode::INTERNAL_SERVER_ERROR, 302 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 303 ) 304 .into_response(); 305 } 306 }; 307 let op = RecordOp::Create { 308 collection: input.collection.clone(), 309 rkey: rkey.clone(), 310 cid: record_cid, 311 }; 312 let mut relevant_blocks = std::collections::BTreeMap::new(); 313 if new_mst 314 .blocks_for_path(&key, &mut relevant_blocks) 315 .await 316 .is_err() 317 { 318 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 319 } 320 if mst 321 .blocks_for_path(&key, &mut relevant_blocks) 322 .await 323 .is_err() 324 { 325 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 326 } 327 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 328 let mut written_cids = tracking_store.get_all_relevant_cids(); 329 for cid in relevant_blocks.keys() { 330 if !written_cids.contains(cid) { 331 written_cids.push(*cid); 332 } 333 } 334 let written_cids_str = written_cids 335 .iter() 336 .map(|c| c.to_string()) 337 .collect::<Vec<_>>(); 338 let blob_cids = extract_blob_cids(&input.record); 339 if let Err(e) = commit_and_log( 340 &state, 341 CommitParams { 342 did: &did, 343 user_id, 344 current_root_cid: Some(current_root_cid), 345 prev_data_cid: Some(commit.data), 346 new_mst_root, 347 ops: vec![op], 348 blocks_cids: &written_cids_str, 349 blobs: &blob_cids, 350 }, 351 ) 352 .await 353 { 354 return ( 355 StatusCode::INTERNAL_SERVER_ERROR, 356 Json(json!({"error": "InternalError", "message": e})), 357 ) 358 .into_response(); 359 }; 360 361 if let Some(ref controller) = controller_did { 362 let _ = delegation::log_delegation_action( 363 &state.db, 364 &did, 365 controller, 366 Some(controller), 367 DelegationActionType::RepoWrite, 368 Some(json!({ 369 "action": "create", 370 "collection": input.collection, 371 "rkey": rkey 372 })), 373 None, 374 None, 375 ) 376 .await; 377 } 378 379 ( 380 StatusCode::OK, 381 Json(CreateRecordOutput { 382 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 383 cid: record_cid.to_string(), 384 }), 385 ) 386 .into_response() 387} 388#[derive(Deserialize)] 389#[allow(dead_code)] 390pub struct PutRecordInput { 391 pub repo: String, 392 pub collection: String, 393 pub rkey: String, 394 pub validate: Option<bool>, 395 pub record: serde_json::Value, 396 #[serde(rename = "swapCommit")] 397 pub swap_commit: Option<String>, 398 #[serde(rename = "swapRecord")] 399 pub swap_record: Option<String>, 400} 401#[derive(Serialize)] 402#[serde(rename_all = "camelCase")] 403pub struct PutRecordOutput { 404 pub uri: String, 405 pub cid: String, 406} 407pub async fn put_record( 408 State(state): State<AppState>, 409 headers: HeaderMap, 410 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 411 Json(input): Json<PutRecordInput>, 412) -> Response { 413 let auth = 414 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 415 Ok(res) => res, 416 Err(err_res) => return err_res, 417 }; 418 419 if let Err(e) = crate::auth::scope_check::check_repo_scope( 420 auth.is_oauth, 421 auth.scope.as_deref(), 422 crate::oauth::RepoAction::Create, 423 &input.collection, 424 ) { 425 return e; 426 } 427 if let Err(e) = crate::auth::scope_check::check_repo_scope( 428 auth.is_oauth, 429 auth.scope.as_deref(), 430 crate::oauth::RepoAction::Update, 431 &input.collection, 432 ) { 433 return e; 434 } 435 436 let did = auth.did; 437 let user_id = auth.user_id; 438 let current_root_cid = auth.current_root_cid; 439 let controller_did = auth.controller_did; 440 441 if let Some(swap_commit) = &input.swap_commit 442 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 443 { 444 return ( 445 StatusCode::CONFLICT, 446 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 447 ) 448 .into_response(); 449 } 450 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 451 let commit_bytes = match tracking_store.get(&current_root_cid).await { 452 Ok(Some(b)) => b, 453 _ => { 454 return ( 455 StatusCode::INTERNAL_SERVER_ERROR, 456 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 457 ) 458 .into_response(); 459 } 460 }; 461 let commit = match Commit::from_cbor(&commit_bytes) { 462 Ok(c) => c, 463 _ => { 464 return ( 465 StatusCode::INTERNAL_SERVER_ERROR, 466 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 467 ) 468 .into_response(); 469 } 470 }; 471 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 472 let collection_nsid = match input.collection.parse::<Nsid>() { 473 Ok(n) => n, 474 Err(_) => { 475 return ( 476 StatusCode::BAD_REQUEST, 477 Json(json!({"error": "InvalidCollection"})), 478 ) 479 .into_response(); 480 } 481 }; 482 let key = format!("{}/{}", collection_nsid, input.rkey); 483 if input.validate.unwrap_or(true) 484 && let Err(err_response) = 485 validate_record_with_rkey(&input.record, &input.collection, Some(&input.rkey)) 486 { 487 return *err_response; 488 } 489 if let Some(swap_record_str) = &input.swap_record { 490 let expected_cid = Cid::from_str(swap_record_str).ok(); 491 let actual_cid = mst.get(&key).await.ok().flatten(); 492 if expected_cid != actual_cid { 493 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 494 } 495 } 496 let existing_cid = mst.get(&key).await.ok().flatten(); 497 let mut record_bytes = Vec::new(); 498 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 499 return ( 500 StatusCode::BAD_REQUEST, 501 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 502 ) 503 .into_response(); 504 } 505 let record_cid = match tracking_store.put(&record_bytes).await { 506 Ok(c) => c, 507 _ => { 508 return ( 509 StatusCode::INTERNAL_SERVER_ERROR, 510 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 511 ) 512 .into_response(); 513 } 514 }; 515 let new_mst = if existing_cid.is_some() { 516 match mst.update(&key, record_cid).await { 517 Ok(m) => m, 518 Err(_) => { 519 return ( 520 StatusCode::INTERNAL_SERVER_ERROR, 521 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 522 ) 523 .into_response(); 524 } 525 } 526 } else { 527 match mst.add(&key, record_cid).await { 528 Ok(m) => m, 529 Err(_) => { 530 return ( 531 StatusCode::INTERNAL_SERVER_ERROR, 532 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 533 ) 534 .into_response(); 535 } 536 } 537 }; 538 let new_mst_root = match new_mst.persist().await { 539 Ok(c) => c, 540 Err(_) => { 541 return ( 542 StatusCode::INTERNAL_SERVER_ERROR, 543 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 544 ) 545 .into_response(); 546 } 547 }; 548 let op = if existing_cid.is_some() { 549 RecordOp::Update { 550 collection: input.collection.clone(), 551 rkey: input.rkey.clone(), 552 cid: record_cid, 553 prev: existing_cid, 554 } 555 } else { 556 RecordOp::Create { 557 collection: input.collection.clone(), 558 rkey: input.rkey.clone(), 559 cid: record_cid, 560 } 561 }; 562 let mut relevant_blocks = std::collections::BTreeMap::new(); 563 if new_mst 564 .blocks_for_path(&key, &mut relevant_blocks) 565 .await 566 .is_err() 567 { 568 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 569 } 570 if mst 571 .blocks_for_path(&key, &mut relevant_blocks) 572 .await 573 .is_err() 574 { 575 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 576 } 577 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 578 let mut written_cids = tracking_store.get_all_relevant_cids(); 579 for cid in relevant_blocks.keys() { 580 if !written_cids.contains(cid) { 581 written_cids.push(*cid); 582 } 583 } 584 let written_cids_str = written_cids 585 .iter() 586 .map(|c| c.to_string()) 587 .collect::<Vec<_>>(); 588 let is_update = existing_cid.is_some(); 589 let blob_cids = extract_blob_cids(&input.record); 590 if let Err(e) = commit_and_log( 591 &state, 592 CommitParams { 593 did: &did, 594 user_id, 595 current_root_cid: Some(current_root_cid), 596 prev_data_cid: Some(commit.data), 597 new_mst_root, 598 ops: vec![op], 599 blocks_cids: &written_cids_str, 600 blobs: &blob_cids, 601 }, 602 ) 603 .await 604 { 605 return ( 606 StatusCode::INTERNAL_SERVER_ERROR, 607 Json(json!({"error": "InternalError", "message": e})), 608 ) 609 .into_response(); 610 }; 611 612 if let Some(ref controller) = controller_did { 613 let _ = delegation::log_delegation_action( 614 &state.db, 615 &did, 616 controller, 617 Some(controller), 618 DelegationActionType::RepoWrite, 619 Some(json!({ 620 "action": if is_update { "update" } else { "create" }, 621 "collection": input.collection, 622 "rkey": input.rkey 623 })), 624 None, 625 None, 626 ) 627 .await; 628 } 629 630 ( 631 StatusCode::OK, 632 Json(PutRecordOutput { 633 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 634 cid: record_cid.to_string(), 635 }), 636 ) 637 .into_response() 638}