this repo has no description
1use super::validation::validate_record; 2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 3use crate::delegation::{self, DelegationActionType}; 4use crate::repo::tracking::TrackingBlockStore; 5use crate::state::AppState; 6use axum::{ 7 Json, 8 extract::State, 9 http::{HeaderMap, StatusCode}, 10 response::{IntoResponse, Response}, 11}; 12use cid::Cid; 13use jacquard::types::{ 14 integer::LimitedU32, 15 string::{Nsid, Tid}, 16}; 17use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 18use serde::{Deserialize, Serialize}; 19use serde_json::json; 20use sqlx::{PgPool, Row}; 21use std::str::FromStr; 22use std::sync::Arc; 23use tracing::error; 24use uuid::Uuid; 25 26pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 27 let row = sqlx::query( 28 r#" 29 SELECT 30 email_verified, 31 discord_verified, 32 telegram_verified, 33 signal_verified 34 FROM users 35 WHERE did = $1 36 "#, 37 ) 38 .bind(did) 39 .fetch_optional(db) 40 .await?; 41 match row { 42 Some(r) => { 43 let email_verified: bool = r.get("email_verified"); 44 let discord_verified: bool = r.get("discord_verified"); 45 let telegram_verified: bool = r.get("telegram_verified"); 46 let signal_verified: bool = r.get("signal_verified"); 47 Ok(email_verified || discord_verified || telegram_verified || signal_verified) 48 } 49 None => Ok(false), 50 } 51} 52 53pub struct RepoWriteAuth { 54 pub did: String, 55 pub user_id: Uuid, 56 pub current_root_cid: Cid, 57 pub is_oauth: bool, 58 pub scope: Option<String>, 59 pub controller_did: Option<String>, 60} 61 62pub async fn prepare_repo_write( 63 state: &AppState, 64 headers: &HeaderMap, 65 repo_did: &str, 66 http_method: &str, 67 http_uri: &str, 68) -> Result<RepoWriteAuth, Response> { 69 let extracted = crate::auth::extract_auth_token_from_header( 70 headers.get("Authorization").and_then(|h| h.to_str().ok()), 71 ) 72 .ok_or_else(|| { 73 ( 74 StatusCode::UNAUTHORIZED, 75 Json(json!({"error": "AuthenticationRequired"})), 76 ) 77 .into_response() 78 })?; 79 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 80 let auth_user = crate::auth::validate_token_with_dpop( 81 &state.db, 82 &extracted.token, 83 extracted.is_dpop, 84 dpop_proof, 85 http_method, 86 http_uri, 87 false, 88 ) 89 .await 90 .map_err(|e| { 91 ( 92 StatusCode::UNAUTHORIZED, 93 Json(json!({"error": e.to_string()})), 94 ) 95 .into_response() 96 })?; 97 if repo_did != auth_user.did { 98 return Err(( 99 StatusCode::FORBIDDEN, 100 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 101 ) 102 .into_response()); 103 } 104 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did) 105 .await 106 .unwrap_or(false); 107 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did) 108 .await 109 .unwrap_or(false); 110 if !is_verified && !is_delegated { 111 return Err(( 112 StatusCode::FORBIDDEN, 113 Json(json!({ 114 "error": "AccountNotVerified", 115 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 116 })), 117 ) 118 .into_response()); 119 } 120 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 121 .fetch_optional(&state.db) 122 .await 123 .map_err(|e| { 124 error!("DB error fetching user: {}", e); 125 ( 126 StatusCode::INTERNAL_SERVER_ERROR, 127 Json(json!({"error": "InternalError"})), 128 ) 129 .into_response() 130 })? 131 .ok_or_else(|| { 132 ( 133 StatusCode::INTERNAL_SERVER_ERROR, 134 Json(json!({"error": "InternalError", "message": "User not found"})), 135 ) 136 .into_response() 137 })?; 138 let root_cid_str: String = sqlx::query_scalar!( 139 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 140 user_id 141 ) 142 .fetch_optional(&state.db) 143 .await 144 .map_err(|e| { 145 error!("DB error fetching repo root: {}", e); 146 ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json(json!({"error": "InternalError"})), 149 ) 150 .into_response() 151 })? 152 .ok_or_else(|| { 153 ( 154 StatusCode::INTERNAL_SERVER_ERROR, 155 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 156 ) 157 .into_response() 158 })?; 159 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 160 ( 161 StatusCode::INTERNAL_SERVER_ERROR, 162 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 163 ) 164 .into_response() 165 })?; 166 Ok(RepoWriteAuth { 167 did: auth_user.did, 168 user_id, 169 current_root_cid, 170 is_oauth: auth_user.is_oauth, 171 scope: auth_user.scope, 172 controller_did: auth_user.controller_did, 173 }) 174} 175#[derive(Deserialize)] 176#[allow(dead_code)] 177pub struct CreateRecordInput { 178 pub repo: String, 179 pub collection: String, 180 pub rkey: Option<String>, 181 pub validate: Option<bool>, 182 pub record: serde_json::Value, 183 #[serde(rename = "swapCommit")] 184 pub swap_commit: Option<String>, 185} 186#[derive(Serialize)] 187#[serde(rename_all = "camelCase")] 188pub struct CreateRecordOutput { 189 pub uri: String, 190 pub cid: String, 191} 192pub async fn create_record( 193 State(state): State<AppState>, 194 headers: HeaderMap, 195 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 196 Json(input): Json<CreateRecordInput>, 197) -> Response { 198 let auth = 199 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 200 Ok(res) => res, 201 Err(err_res) => return err_res, 202 }; 203 204 if let Err(e) = crate::auth::scope_check::check_repo_scope( 205 auth.is_oauth, 206 auth.scope.as_deref(), 207 crate::oauth::RepoAction::Create, 208 &input.collection, 209 ) { 210 return e; 211 } 212 213 let did = auth.did; 214 let user_id = auth.user_id; 215 let current_root_cid = auth.current_root_cid; 216 let controller_did = auth.controller_did; 217 218 if let Some(swap_commit) = &input.swap_commit 219 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 220 { 221 return ( 222 StatusCode::CONFLICT, 223 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 224 ) 225 .into_response(); 226 } 227 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 228 let commit_bytes = match tracking_store.get(&current_root_cid).await { 229 Ok(Some(b)) => b, 230 _ => { 231 return ( 232 StatusCode::INTERNAL_SERVER_ERROR, 233 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 234 ) 235 .into_response(); 236 } 237 }; 238 let commit = match Commit::from_cbor(&commit_bytes) { 239 Ok(c) => c, 240 _ => { 241 return ( 242 StatusCode::INTERNAL_SERVER_ERROR, 243 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 244 ) 245 .into_response(); 246 } 247 }; 248 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 249 let collection_nsid = match input.collection.parse::<Nsid>() { 250 Ok(n) => n, 251 Err(_) => { 252 return ( 253 StatusCode::BAD_REQUEST, 254 Json(json!({"error": "InvalidCollection"})), 255 ) 256 .into_response(); 257 } 258 }; 259 if input.validate.unwrap_or(true) 260 && let Err(err_response) = validate_record(&input.record, &input.collection) 261 { 262 return *err_response; 263 } 264 let rkey = input 265 .rkey 266 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 267 let mut record_bytes = Vec::new(); 268 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 269 return ( 270 StatusCode::BAD_REQUEST, 271 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 272 ) 273 .into_response(); 274 } 275 let record_cid = match tracking_store.put(&record_bytes).await { 276 Ok(c) => c, 277 _ => { 278 return ( 279 StatusCode::INTERNAL_SERVER_ERROR, 280 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 281 ) 282 .into_response(); 283 } 284 }; 285 let key = format!("{}/{}", collection_nsid, rkey); 286 let new_mst = match mst.add(&key, record_cid).await { 287 Ok(m) => m, 288 _ => { 289 return ( 290 StatusCode::INTERNAL_SERVER_ERROR, 291 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 292 ) 293 .into_response(); 294 } 295 }; 296 let new_mst_root = match new_mst.persist().await { 297 Ok(c) => c, 298 _ => { 299 return ( 300 StatusCode::INTERNAL_SERVER_ERROR, 301 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 302 ) 303 .into_response(); 304 } 305 }; 306 let op = RecordOp::Create { 307 collection: input.collection.clone(), 308 rkey: rkey.clone(), 309 cid: record_cid, 310 }; 311 let mut relevant_blocks = std::collections::BTreeMap::new(); 312 if new_mst 313 .blocks_for_path(&key, &mut relevant_blocks) 314 .await 315 .is_err() 316 { 317 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 318 } 319 if mst 320 .blocks_for_path(&key, &mut relevant_blocks) 321 .await 322 .is_err() 323 { 324 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 325 } 326 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 327 let mut written_cids = tracking_store.get_all_relevant_cids(); 328 for cid in relevant_blocks.keys() { 329 if !written_cids.contains(cid) { 330 written_cids.push(*cid); 331 } 332 } 333 let written_cids_str = written_cids 334 .iter() 335 .map(|c| c.to_string()) 336 .collect::<Vec<_>>(); 337 let blob_cids = extract_blob_cids(&input.record); 338 if let Err(e) = commit_and_log( 339 &state, 340 CommitParams { 341 did: &did, 342 user_id, 343 current_root_cid: Some(current_root_cid), 344 prev_data_cid: Some(commit.data), 345 new_mst_root, 346 ops: vec![op], 347 blocks_cids: &written_cids_str, 348 blobs: &blob_cids, 349 }, 350 ) 351 .await 352 { 353 return ( 354 StatusCode::INTERNAL_SERVER_ERROR, 355 Json(json!({"error": "InternalError", "message": e})), 356 ) 357 .into_response(); 358 }; 359 360 if let Some(ref controller) = controller_did { 361 let _ = delegation::log_delegation_action( 362 &state.db, 363 &did, 364 controller, 365 Some(controller), 366 DelegationActionType::RepoWrite, 367 Some(json!({ 368 "action": "create", 369 "collection": input.collection, 370 "rkey": rkey 371 })), 372 None, 373 None, 374 ) 375 .await; 376 } 377 378 ( 379 StatusCode::OK, 380 Json(CreateRecordOutput { 381 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 382 cid: record_cid.to_string(), 383 }), 384 ) 385 .into_response() 386} 387#[derive(Deserialize)] 388#[allow(dead_code)] 389pub struct PutRecordInput { 390 pub repo: String, 391 pub collection: String, 392 pub rkey: String, 393 pub validate: Option<bool>, 394 pub record: serde_json::Value, 395 #[serde(rename = "swapCommit")] 396 pub swap_commit: Option<String>, 397 #[serde(rename = "swapRecord")] 398 pub swap_record: Option<String>, 399} 400#[derive(Serialize)] 401#[serde(rename_all = "camelCase")] 402pub struct PutRecordOutput { 403 pub uri: String, 404 pub cid: String, 405} 406pub async fn put_record( 407 State(state): State<AppState>, 408 headers: HeaderMap, 409 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 410 Json(input): Json<PutRecordInput>, 411) -> Response { 412 let auth = 413 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 414 Ok(res) => res, 415 Err(err_res) => return err_res, 416 }; 417 418 if let Err(e) = crate::auth::scope_check::check_repo_scope( 419 auth.is_oauth, 420 auth.scope.as_deref(), 421 crate::oauth::RepoAction::Create, 422 &input.collection, 423 ) { 424 return e; 425 } 426 if let Err(e) = crate::auth::scope_check::check_repo_scope( 427 auth.is_oauth, 428 auth.scope.as_deref(), 429 crate::oauth::RepoAction::Update, 430 &input.collection, 431 ) { 432 return e; 433 } 434 435 let did = auth.did; 436 let user_id = auth.user_id; 437 let current_root_cid = auth.current_root_cid; 438 let controller_did = auth.controller_did; 439 440 if let Some(swap_commit) = &input.swap_commit 441 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 442 { 443 return ( 444 StatusCode::CONFLICT, 445 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 446 ) 447 .into_response(); 448 } 449 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 450 let commit_bytes = match tracking_store.get(&current_root_cid).await { 451 Ok(Some(b)) => b, 452 _ => { 453 return ( 454 StatusCode::INTERNAL_SERVER_ERROR, 455 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 456 ) 457 .into_response(); 458 } 459 }; 460 let commit = match Commit::from_cbor(&commit_bytes) { 461 Ok(c) => c, 462 _ => { 463 return ( 464 StatusCode::INTERNAL_SERVER_ERROR, 465 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 466 ) 467 .into_response(); 468 } 469 }; 470 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 471 let collection_nsid = match input.collection.parse::<Nsid>() { 472 Ok(n) => n, 473 Err(_) => { 474 return ( 475 StatusCode::BAD_REQUEST, 476 Json(json!({"error": "InvalidCollection"})), 477 ) 478 .into_response(); 479 } 480 }; 481 let key = format!("{}/{}", collection_nsid, input.rkey); 482 if input.validate.unwrap_or(true) 483 && let Err(err_response) = validate_record(&input.record, &input.collection) 484 { 485 return *err_response; 486 } 487 if let Some(swap_record_str) = &input.swap_record { 488 let expected_cid = Cid::from_str(swap_record_str).ok(); 489 let actual_cid = mst.get(&key).await.ok().flatten(); 490 if expected_cid != actual_cid { 491 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 492 } 493 } 494 let existing_cid = mst.get(&key).await.ok().flatten(); 495 let mut record_bytes = Vec::new(); 496 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 497 return ( 498 StatusCode::BAD_REQUEST, 499 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 500 ) 501 .into_response(); 502 } 503 let record_cid = match tracking_store.put(&record_bytes).await { 504 Ok(c) => c, 505 _ => { 506 return ( 507 StatusCode::INTERNAL_SERVER_ERROR, 508 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 509 ) 510 .into_response(); 511 } 512 }; 513 let new_mst = if existing_cid.is_some() { 514 match mst.update(&key, record_cid).await { 515 Ok(m) => m, 516 Err(_) => { 517 return ( 518 StatusCode::INTERNAL_SERVER_ERROR, 519 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 520 ) 521 .into_response(); 522 } 523 } 524 } else { 525 match mst.add(&key, record_cid).await { 526 Ok(m) => m, 527 Err(_) => { 528 return ( 529 StatusCode::INTERNAL_SERVER_ERROR, 530 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 531 ) 532 .into_response(); 533 } 534 } 535 }; 536 let new_mst_root = match new_mst.persist().await { 537 Ok(c) => c, 538 Err(_) => { 539 return ( 540 StatusCode::INTERNAL_SERVER_ERROR, 541 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 542 ) 543 .into_response(); 544 } 545 }; 546 let op = if existing_cid.is_some() { 547 RecordOp::Update { 548 collection: input.collection.clone(), 549 rkey: input.rkey.clone(), 550 cid: record_cid, 551 prev: existing_cid, 552 } 553 } else { 554 RecordOp::Create { 555 collection: input.collection.clone(), 556 rkey: input.rkey.clone(), 557 cid: record_cid, 558 } 559 }; 560 let mut relevant_blocks = std::collections::BTreeMap::new(); 561 if new_mst 562 .blocks_for_path(&key, &mut relevant_blocks) 563 .await 564 .is_err() 565 { 566 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 567 } 568 if mst 569 .blocks_for_path(&key, &mut relevant_blocks) 570 .await 571 .is_err() 572 { 573 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 574 } 575 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 576 let mut written_cids = tracking_store.get_all_relevant_cids(); 577 for cid in relevant_blocks.keys() { 578 if !written_cids.contains(cid) { 579 written_cids.push(*cid); 580 } 581 } 582 let written_cids_str = written_cids 583 .iter() 584 .map(|c| c.to_string()) 585 .collect::<Vec<_>>(); 586 let is_update = existing_cid.is_some(); 587 let blob_cids = extract_blob_cids(&input.record); 588 if let Err(e) = commit_and_log( 589 &state, 590 CommitParams { 591 did: &did, 592 user_id, 593 current_root_cid: Some(current_root_cid), 594 prev_data_cid: Some(commit.data), 595 new_mst_root, 596 ops: vec![op], 597 blocks_cids: &written_cids_str, 598 blobs: &blob_cids, 599 }, 600 ) 601 .await 602 { 603 return ( 604 StatusCode::INTERNAL_SERVER_ERROR, 605 Json(json!({"error": "InternalError", "message": e})), 606 ) 607 .into_response(); 608 }; 609 610 if let Some(ref controller) = controller_did { 611 let _ = delegation::log_delegation_action( 612 &state.db, 613 &did, 614 controller, 615 Some(controller), 616 DelegationActionType::RepoWrite, 617 Some(json!({ 618 "action": if is_update { "update" } else { "create" }, 619 "collection": input.collection, 620 "rkey": input.rkey 621 })), 622 None, 623 None, 624 ) 625 .await; 626 } 627 628 ( 629 StatusCode::OK, 630 Json(PutRecordOutput { 631 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 632 cid: record_cid.to_string(), 633 }), 634 ) 635 .into_response() 636}