this repo has no description
1use super::validation::validate_record; 2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log}; 3use crate::repo::tracking::TrackingBlockStore; 4use crate::state::AppState; 5use axum::{ 6 Json, 7 extract::State, 8 http::{HeaderMap, StatusCode}, 9 response::{IntoResponse, Response}, 10}; 11use cid::Cid; 12use jacquard::types::{ 13 integer::LimitedU32, 14 string::{Nsid, Tid}, 15}; 16use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use sqlx::{PgPool, Row}; 20use std::str::FromStr; 21use std::sync::Arc; 22use tracing::error; 23use uuid::Uuid; 24 25pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 26 let row = sqlx::query( 27 r#" 28 SELECT 29 email_verified, 30 discord_verified, 31 telegram_verified, 32 signal_verified 33 FROM users 34 WHERE did = $1 35 "#, 36 ) 37 .bind(did) 38 .fetch_optional(db) 39 .await?; 40 match row { 41 Some(r) => { 42 let email_verified: bool = r.get("email_verified"); 43 let discord_verified: bool = r.get("discord_verified"); 44 let telegram_verified: bool = r.get("telegram_verified"); 45 let signal_verified: bool = r.get("signal_verified"); 46 Ok(email_verified || discord_verified || telegram_verified || signal_verified) 47 } 48 None => Ok(false), 49 } 50} 51 52pub struct RepoWriteAuth { 53 pub did: String, 54 pub user_id: Uuid, 55 pub current_root_cid: Cid, 56 pub is_oauth: bool, 57 pub scope: Option<String>, 58} 59 60pub async fn prepare_repo_write( 61 state: &AppState, 62 headers: &HeaderMap, 63 repo_did: &str, 64 http_method: &str, 65 http_uri: &str, 66) -> Result<RepoWriteAuth, Response> { 67 let extracted = crate::auth::extract_auth_token_from_header( 68 headers.get("Authorization").and_then(|h| h.to_str().ok()), 69 ) 70 .ok_or_else(|| { 71 ( 72 StatusCode::UNAUTHORIZED, 73 Json(json!({"error": "AuthenticationRequired"})), 74 ) 75 .into_response() 76 })?; 77 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok()); 78 let auth_user = crate::auth::validate_token_with_dpop( 79 &state.db, 80 &extracted.token, 81 extracted.is_dpop, 82 dpop_proof, 83 http_method, 84 http_uri, 85 false, 86 ) 87 .await 88 .map_err(|e| { 89 ( 90 StatusCode::UNAUTHORIZED, 91 Json(json!({"error": e.to_string()})), 92 ) 93 .into_response() 94 })?; 95 if repo_did != auth_user.did { 96 return Err(( 97 StatusCode::FORBIDDEN, 98 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 99 ) 100 .into_response()); 101 } 102 match has_verified_comms_channel(&state.db, &auth_user.did).await { 103 Ok(true) => {} 104 Ok(false) => { 105 return Err(( 106 StatusCode::FORBIDDEN, 107 Json(json!({ 108 "error": "AccountNotVerified", 109 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 110 })), 111 ) 112 .into_response()); 113 } 114 Err(e) => { 115 error!("DB error checking notification channels: {}", e); 116 return Err(( 117 StatusCode::INTERNAL_SERVER_ERROR, 118 Json(json!({"error": "InternalError"})), 119 ) 120 .into_response()); 121 } 122 } 123 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 124 .fetch_optional(&state.db) 125 .await 126 .map_err(|e| { 127 error!("DB error fetching user: {}", e); 128 ( 129 StatusCode::INTERNAL_SERVER_ERROR, 130 Json(json!({"error": "InternalError"})), 131 ) 132 .into_response() 133 })? 134 .ok_or_else(|| { 135 ( 136 StatusCode::INTERNAL_SERVER_ERROR, 137 Json(json!({"error": "InternalError", "message": "User not found"})), 138 ) 139 .into_response() 140 })?; 141 let root_cid_str: String = sqlx::query_scalar!( 142 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 143 user_id 144 ) 145 .fetch_optional(&state.db) 146 .await 147 .map_err(|e| { 148 error!("DB error fetching repo root: {}", e); 149 ( 150 StatusCode::INTERNAL_SERVER_ERROR, 151 Json(json!({"error": "InternalError"})), 152 ) 153 .into_response() 154 })? 155 .ok_or_else(|| { 156 ( 157 StatusCode::INTERNAL_SERVER_ERROR, 158 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 159 ) 160 .into_response() 161 })?; 162 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 163 ( 164 StatusCode::INTERNAL_SERVER_ERROR, 165 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 166 ) 167 .into_response() 168 })?; 169 Ok(RepoWriteAuth { 170 did: auth_user.did, 171 user_id, 172 current_root_cid, 173 is_oauth: auth_user.is_oauth, 174 scope: auth_user.scope, 175 }) 176} 177#[derive(Deserialize)] 178#[allow(dead_code)] 179pub struct CreateRecordInput { 180 pub repo: String, 181 pub collection: String, 182 pub rkey: Option<String>, 183 pub validate: Option<bool>, 184 pub record: serde_json::Value, 185 #[serde(rename = "swapCommit")] 186 pub swap_commit: Option<String>, 187} 188#[derive(Serialize)] 189#[serde(rename_all = "camelCase")] 190pub struct CreateRecordOutput { 191 pub uri: String, 192 pub cid: String, 193} 194pub async fn create_record( 195 State(state): State<AppState>, 196 headers: HeaderMap, 197 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 198 Json(input): Json<CreateRecordInput>, 199) -> Response { 200 let auth = 201 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 202 Ok(res) => res, 203 Err(err_res) => return err_res, 204 }; 205 206 if let Err(e) = crate::auth::scope_check::check_repo_scope( 207 auth.is_oauth, 208 auth.scope.as_deref(), 209 crate::oauth::RepoAction::Create, 210 &input.collection, 211 ) { 212 return e; 213 } 214 215 let did = auth.did; 216 let user_id = auth.user_id; 217 let current_root_cid = auth.current_root_cid; 218 219 if let Some(swap_commit) = &input.swap_commit 220 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 221 { 222 return ( 223 StatusCode::CONFLICT, 224 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 225 ) 226 .into_response(); 227 } 228 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 229 let commit_bytes = match tracking_store.get(&current_root_cid).await { 230 Ok(Some(b)) => b, 231 _ => { 232 return ( 233 StatusCode::INTERNAL_SERVER_ERROR, 234 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 235 ) 236 .into_response(); 237 } 238 }; 239 let commit = match Commit::from_cbor(&commit_bytes) { 240 Ok(c) => c, 241 _ => { 242 return ( 243 StatusCode::INTERNAL_SERVER_ERROR, 244 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 245 ) 246 .into_response(); 247 } 248 }; 249 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 250 let collection_nsid = match input.collection.parse::<Nsid>() { 251 Ok(n) => n, 252 Err(_) => { 253 return ( 254 StatusCode::BAD_REQUEST, 255 Json(json!({"error": "InvalidCollection"})), 256 ) 257 .into_response(); 258 } 259 }; 260 if input.validate.unwrap_or(true) 261 && let Err(err_response) = validate_record(&input.record, &input.collection) 262 { 263 return *err_response; 264 } 265 let rkey = input 266 .rkey 267 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 268 let mut record_bytes = Vec::new(); 269 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 270 return ( 271 StatusCode::BAD_REQUEST, 272 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 273 ) 274 .into_response(); 275 } 276 let record_cid = match tracking_store.put(&record_bytes).await { 277 Ok(c) => c, 278 _ => { 279 return ( 280 StatusCode::INTERNAL_SERVER_ERROR, 281 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 282 ) 283 .into_response(); 284 } 285 }; 286 let key = format!("{}/{}", collection_nsid, rkey); 287 let new_mst = match mst.add(&key, record_cid).await { 288 Ok(m) => m, 289 _ => { 290 return ( 291 StatusCode::INTERNAL_SERVER_ERROR, 292 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 293 ) 294 .into_response(); 295 } 296 }; 297 let new_mst_root = match new_mst.persist().await { 298 Ok(c) => c, 299 _ => { 300 return ( 301 StatusCode::INTERNAL_SERVER_ERROR, 302 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 303 ) 304 .into_response(); 305 } 306 }; 307 let op = RecordOp::Create { 308 collection: input.collection.clone(), 309 rkey: rkey.clone(), 310 cid: record_cid, 311 }; 312 let mut relevant_blocks = std::collections::BTreeMap::new(); 313 if new_mst 314 .blocks_for_path(&key, &mut relevant_blocks) 315 .await 316 .is_err() 317 { 318 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 319 } 320 if mst 321 .blocks_for_path(&key, &mut relevant_blocks) 322 .await 323 .is_err() 324 { 325 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 326 } 327 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 328 let mut written_cids = tracking_store.get_all_relevant_cids(); 329 for cid in relevant_blocks.keys() { 330 if !written_cids.contains(cid) { 331 written_cids.push(*cid); 332 } 333 } 334 let written_cids_str = written_cids 335 .iter() 336 .map(|c| c.to_string()) 337 .collect::<Vec<_>>(); 338 if let Err(e) = commit_and_log( 339 &state, 340 CommitParams { 341 did: &did, 342 user_id, 343 current_root_cid: Some(current_root_cid), 344 prev_data_cid: Some(commit.data), 345 new_mst_root, 346 ops: vec![op], 347 blocks_cids: &written_cids_str, 348 }, 349 ) 350 .await 351 { 352 return ( 353 StatusCode::INTERNAL_SERVER_ERROR, 354 Json(json!({"error": "InternalError", "message": e})), 355 ) 356 .into_response(); 357 }; 358 ( 359 StatusCode::OK, 360 Json(CreateRecordOutput { 361 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 362 cid: record_cid.to_string(), 363 }), 364 ) 365 .into_response() 366} 367#[derive(Deserialize)] 368#[allow(dead_code)] 369pub struct PutRecordInput { 370 pub repo: String, 371 pub collection: String, 372 pub rkey: String, 373 pub validate: Option<bool>, 374 pub record: serde_json::Value, 375 #[serde(rename = "swapCommit")] 376 pub swap_commit: Option<String>, 377 #[serde(rename = "swapRecord")] 378 pub swap_record: Option<String>, 379} 380#[derive(Serialize)] 381#[serde(rename_all = "camelCase")] 382pub struct PutRecordOutput { 383 pub uri: String, 384 pub cid: String, 385} 386pub async fn put_record( 387 State(state): State<AppState>, 388 headers: HeaderMap, 389 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 390 Json(input): Json<PutRecordInput>, 391) -> Response { 392 let auth = 393 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 394 Ok(res) => res, 395 Err(err_res) => return err_res, 396 }; 397 398 if let Err(e) = crate::auth::scope_check::check_repo_scope( 399 auth.is_oauth, 400 auth.scope.as_deref(), 401 crate::oauth::RepoAction::Create, 402 &input.collection, 403 ) { 404 return e; 405 } 406 if let Err(e) = crate::auth::scope_check::check_repo_scope( 407 auth.is_oauth, 408 auth.scope.as_deref(), 409 crate::oauth::RepoAction::Update, 410 &input.collection, 411 ) { 412 return e; 413 } 414 415 let did = auth.did; 416 let user_id = auth.user_id; 417 let current_root_cid = auth.current_root_cid; 418 419 if let Some(swap_commit) = &input.swap_commit 420 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) 421 { 422 return ( 423 StatusCode::CONFLICT, 424 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 425 ) 426 .into_response(); 427 } 428 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 429 let commit_bytes = match tracking_store.get(&current_root_cid).await { 430 Ok(Some(b)) => b, 431 _ => { 432 return ( 433 StatusCode::INTERNAL_SERVER_ERROR, 434 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 435 ) 436 .into_response(); 437 } 438 }; 439 let commit = match Commit::from_cbor(&commit_bytes) { 440 Ok(c) => c, 441 _ => { 442 return ( 443 StatusCode::INTERNAL_SERVER_ERROR, 444 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 445 ) 446 .into_response(); 447 } 448 }; 449 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 450 let collection_nsid = match input.collection.parse::<Nsid>() { 451 Ok(n) => n, 452 Err(_) => { 453 return ( 454 StatusCode::BAD_REQUEST, 455 Json(json!({"error": "InvalidCollection"})), 456 ) 457 .into_response(); 458 } 459 }; 460 let key = format!("{}/{}", collection_nsid, input.rkey); 461 if input.validate.unwrap_or(true) 462 && let Err(err_response) = validate_record(&input.record, &input.collection) 463 { 464 return *err_response; 465 } 466 if let Some(swap_record_str) = &input.swap_record { 467 let expected_cid = Cid::from_str(swap_record_str).ok(); 468 let actual_cid = mst.get(&key).await.ok().flatten(); 469 if expected_cid != actual_cid { 470 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 471 } 472 } 473 let existing_cid = mst.get(&key).await.ok().flatten(); 474 let mut record_bytes = Vec::new(); 475 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 476 return ( 477 StatusCode::BAD_REQUEST, 478 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 479 ) 480 .into_response(); 481 } 482 let record_cid = match tracking_store.put(&record_bytes).await { 483 Ok(c) => c, 484 _ => { 485 return ( 486 StatusCode::INTERNAL_SERVER_ERROR, 487 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 488 ) 489 .into_response(); 490 } 491 }; 492 let new_mst = if existing_cid.is_some() { 493 match mst.update(&key, record_cid).await { 494 Ok(m) => m, 495 Err(_) => { 496 return ( 497 StatusCode::INTERNAL_SERVER_ERROR, 498 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 499 ) 500 .into_response(); 501 } 502 } 503 } else { 504 match mst.add(&key, record_cid).await { 505 Ok(m) => m, 506 Err(_) => { 507 return ( 508 StatusCode::INTERNAL_SERVER_ERROR, 509 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 510 ) 511 .into_response(); 512 } 513 } 514 }; 515 let new_mst_root = match new_mst.persist().await { 516 Ok(c) => c, 517 Err(_) => { 518 return ( 519 StatusCode::INTERNAL_SERVER_ERROR, 520 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 521 ) 522 .into_response(); 523 } 524 }; 525 let op = if existing_cid.is_some() { 526 RecordOp::Update { 527 collection: input.collection.clone(), 528 rkey: input.rkey.clone(), 529 cid: record_cid, 530 prev: existing_cid, 531 } 532 } else { 533 RecordOp::Create { 534 collection: input.collection.clone(), 535 rkey: input.rkey.clone(), 536 cid: record_cid, 537 } 538 }; 539 let mut relevant_blocks = std::collections::BTreeMap::new(); 540 if new_mst 541 .blocks_for_path(&key, &mut relevant_blocks) 542 .await 543 .is_err() 544 { 545 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 546 } 547 if mst 548 .blocks_for_path(&key, &mut relevant_blocks) 549 .await 550 .is_err() 551 { 552 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 553 } 554 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 555 let mut written_cids = tracking_store.get_all_relevant_cids(); 556 for cid in relevant_blocks.keys() { 557 if !written_cids.contains(cid) { 558 written_cids.push(*cid); 559 } 560 } 561 let written_cids_str = written_cids 562 .iter() 563 .map(|c| c.to_string()) 564 .collect::<Vec<_>>(); 565 if let Err(e) = commit_and_log( 566 &state, 567 CommitParams { 568 did: &did, 569 user_id, 570 current_root_cid: Some(current_root_cid), 571 prev_data_cid: Some(commit.data), 572 new_mst_root, 573 ops: vec![op], 574 blocks_cids: &written_cids_str, 575 }, 576 ) 577 .await 578 { 579 return ( 580 StatusCode::INTERNAL_SERVER_ERROR, 581 Json(json!({"error": "InternalError", "message": e})), 582 ) 583 .into_response(); 584 }; 585 ( 586 StatusCode::OK, 587 Json(PutRecordOutput { 588 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 589 cid: record_cid.to_string(), 590 }), 591 ) 592 .into_response() 593}