this repo has no description
1use super::validation::validate_record; 2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log}; 3use crate::repo::tracking::TrackingBlockStore; 4use crate::state::AppState; 5use axum::{ 6 Json, 7 extract::State, 8 http::{HeaderMap, StatusCode}, 9 response::{IntoResponse, Response}, 10}; 11use cid::Cid; 12use jacquard::types::{ 13 integer::LimitedU32, 14 string::{Nsid, Tid}, 15}; 16use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use sqlx::{PgPool, Row}; 20use std::str::FromStr; 21use std::sync::Arc; 22use tracing::error; 23use uuid::Uuid; 24 25pub async fn has_verified_notification_channel( 26 db: &PgPool, 27 did: &str, 28) -> Result<bool, sqlx::Error> { 29 let row = sqlx::query( 30 r#" 31 SELECT 32 email_confirmed, 33 discord_verified, 34 telegram_verified, 35 signal_verified 36 FROM users 37 WHERE did = $1 38 "#, 39 ) 40 .bind(did) 41 .fetch_optional(db) 42 .await?; 43 match row { 44 Some(r) => { 45 let email_confirmed: bool = r.get("email_confirmed"); 46 let discord_verified: bool = r.get("discord_verified"); 47 let telegram_verified: bool = r.get("telegram_verified"); 48 let signal_verified: bool = r.get("signal_verified"); 49 Ok(email_confirmed || discord_verified || telegram_verified || signal_verified) 50 } 51 None => Ok(false), 52 } 53} 54 55pub async fn prepare_repo_write( 56 state: &AppState, 57 headers: &HeaderMap, 58 repo_did: &str, 59 http_method: &str, 60 http_uri: &str, 61) -> Result<(String, Uuid, Cid), Response> { 62 let extracted = crate::auth::extract_auth_token_from_header( 63 headers.get("Authorization").and_then(|h| h.to_str().ok()), 64 ) 65 .ok_or_else(|| { 66 ( 67 StatusCode::UNAUTHORIZED, 68 Json(json!({"error": "AuthenticationRequired"})), 69 ) 70 .into_response() 71 })?; 72 let dpop_proof = headers 73 .get("DPoP") 74 .and_then(|h| h.to_str().ok()); 75 let auth_user = crate::auth::validate_token_with_dpop( 76 &state.db, 77 &extracted.token, 78 extracted.is_dpop, 79 dpop_proof, 80 http_method, 81 http_uri, 82 false, 83 ) 84 .await 85 .map_err(|e| { 86 ( 87 StatusCode::UNAUTHORIZED, 88 Json(json!({"error": e.to_string()})), 89 ) 90 .into_response() 91 })?; 92 if repo_did != auth_user.did { 93 return Err(( 94 StatusCode::FORBIDDEN, 95 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 96 ) 97 .into_response()); 98 } 99 match has_verified_notification_channel(&state.db, &auth_user.did).await { 100 Ok(true) => {} 101 Ok(false) => { 102 return Err(( 103 StatusCode::FORBIDDEN, 104 Json(json!({ 105 "error": "AccountNotVerified", 106 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 107 })), 108 ) 109 .into_response()); 110 } 111 Err(e) => { 112 error!("DB error checking notification channels: {}", e); 113 return Err(( 114 StatusCode::INTERNAL_SERVER_ERROR, 115 Json(json!({"error": "InternalError"})), 116 ) 117 .into_response()); 118 } 119 } 120 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 121 .fetch_optional(&state.db) 122 .await 123 .map_err(|e| { 124 error!("DB error fetching user: {}", e); 125 ( 126 StatusCode::INTERNAL_SERVER_ERROR, 127 Json(json!({"error": "InternalError"})), 128 ) 129 .into_response() 130 })? 131 .ok_or_else(|| { 132 ( 133 StatusCode::INTERNAL_SERVER_ERROR, 134 Json(json!({"error": "InternalError", "message": "User not found"})), 135 ) 136 .into_response() 137 })?; 138 let root_cid_str: String = sqlx::query_scalar!( 139 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 140 user_id 141 ) 142 .fetch_optional(&state.db) 143 .await 144 .map_err(|e| { 145 error!("DB error fetching repo root: {}", e); 146 ( 147 StatusCode::INTERNAL_SERVER_ERROR, 148 Json(json!({"error": "InternalError"})), 149 ) 150 .into_response() 151 })? 152 .ok_or_else(|| { 153 ( 154 StatusCode::INTERNAL_SERVER_ERROR, 155 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 156 ) 157 .into_response() 158 })?; 159 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 160 ( 161 StatusCode::INTERNAL_SERVER_ERROR, 162 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 163 ) 164 .into_response() 165 })?; 166 Ok((auth_user.did, user_id, current_root_cid)) 167} 168#[derive(Deserialize)] 169#[allow(dead_code)] 170pub struct CreateRecordInput { 171 pub repo: String, 172 pub collection: String, 173 pub rkey: Option<String>, 174 pub validate: Option<bool>, 175 pub record: serde_json::Value, 176 #[serde(rename = "swapCommit")] 177 pub swap_commit: Option<String>, 178} 179#[derive(Serialize)] 180#[serde(rename_all = "camelCase")] 181pub struct CreateRecordOutput { 182 pub uri: String, 183 pub cid: String, 184} 185pub async fn create_record( 186 State(state): State<AppState>, 187 headers: HeaderMap, 188 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 189 Json(input): Json<CreateRecordInput>, 190) -> Response { 191 let (did, user_id, current_root_cid) = 192 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 193 Ok(res) => res, 194 Err(err_res) => return err_res, 195 }; 196 if let Some(swap_commit) = &input.swap_commit 197 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 198 return ( 199 StatusCode::CONFLICT, 200 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 201 ) 202 .into_response(); 203 } 204 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 205 let commit_bytes = match tracking_store.get(&current_root_cid).await { 206 Ok(Some(b)) => b, 207 _ => { 208 return ( 209 StatusCode::INTERNAL_SERVER_ERROR, 210 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 211 ) 212 .into_response(); 213 } 214 }; 215 let commit = match Commit::from_cbor(&commit_bytes) { 216 Ok(c) => c, 217 _ => { 218 return ( 219 StatusCode::INTERNAL_SERVER_ERROR, 220 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 221 ) 222 .into_response(); 223 } 224 }; 225 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 226 let collection_nsid = match input.collection.parse::<Nsid>() { 227 Ok(n) => n, 228 Err(_) => { 229 return ( 230 StatusCode::BAD_REQUEST, 231 Json(json!({"error": "InvalidCollection"})), 232 ) 233 .into_response(); 234 } 235 }; 236 if input.validate.unwrap_or(true) 237 && let Err(err_response) = validate_record(&input.record, &input.collection) { 238 return *err_response; 239 } 240 let rkey = input 241 .rkey 242 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 243 let mut record_bytes = Vec::new(); 244 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 245 return ( 246 StatusCode::BAD_REQUEST, 247 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 248 ) 249 .into_response(); 250 } 251 let record_cid = match tracking_store.put(&record_bytes).await { 252 Ok(c) => c, 253 _ => { 254 return ( 255 StatusCode::INTERNAL_SERVER_ERROR, 256 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 257 ) 258 .into_response(); 259 } 260 }; 261 let key = format!("{}/{}", collection_nsid, rkey); 262 let new_mst = match mst.add(&key, record_cid).await { 263 Ok(m) => m, 264 _ => { 265 return ( 266 StatusCode::INTERNAL_SERVER_ERROR, 267 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 268 ) 269 .into_response(); 270 } 271 }; 272 let new_mst_root = match new_mst.persist().await { 273 Ok(c) => c, 274 _ => { 275 return ( 276 StatusCode::INTERNAL_SERVER_ERROR, 277 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 278 ) 279 .into_response(); 280 } 281 }; 282 let op = RecordOp::Create { 283 collection: input.collection.clone(), 284 rkey: rkey.clone(), 285 cid: record_cid, 286 }; 287 let mut relevant_blocks = std::collections::BTreeMap::new(); 288 if new_mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() { 289 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 290 } 291 if mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() { 292 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 293 } 294 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 295 let mut written_cids = tracking_store.get_all_relevant_cids(); 296 for cid in relevant_blocks.keys() { 297 if !written_cids.contains(cid) { 298 written_cids.push(*cid); 299 } 300 } 301 let written_cids_str = written_cids 302 .iter() 303 .map(|c| c.to_string()) 304 .collect::<Vec<_>>(); 305 if let Err(e) = commit_and_log( 306 &state, 307 CommitParams { 308 did: &did, 309 user_id, 310 current_root_cid: Some(current_root_cid), 311 prev_data_cid: Some(commit.data), 312 new_mst_root, 313 ops: vec![op], 314 blocks_cids: &written_cids_str, 315 }, 316 ) 317 .await 318 { 319 return ( 320 StatusCode::INTERNAL_SERVER_ERROR, 321 Json(json!({"error": "InternalError", "message": e})), 322 ) 323 .into_response(); 324 }; 325 ( 326 StatusCode::OK, 327 Json(CreateRecordOutput { 328 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 329 cid: record_cid.to_string(), 330 }), 331 ) 332 .into_response() 333} 334#[derive(Deserialize)] 335#[allow(dead_code)] 336pub struct PutRecordInput { 337 pub repo: String, 338 pub collection: String, 339 pub rkey: String, 340 pub validate: Option<bool>, 341 pub record: serde_json::Value, 342 #[serde(rename = "swapCommit")] 343 pub swap_commit: Option<String>, 344 #[serde(rename = "swapRecord")] 345 pub swap_record: Option<String>, 346} 347#[derive(Serialize)] 348#[serde(rename_all = "camelCase")] 349pub struct PutRecordOutput { 350 pub uri: String, 351 pub cid: String, 352} 353pub async fn put_record( 354 State(state): State<AppState>, 355 headers: HeaderMap, 356 axum::extract::OriginalUri(uri): axum::extract::OriginalUri, 357 Json(input): Json<PutRecordInput>, 358) -> Response { 359 let (did, user_id, current_root_cid) = 360 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await { 361 Ok(res) => res, 362 Err(err_res) => return err_res, 363 }; 364 if let Some(swap_commit) = &input.swap_commit 365 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 366 return ( 367 StatusCode::CONFLICT, 368 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 369 ) 370 .into_response(); 371 } 372 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 373 let commit_bytes = match tracking_store.get(&current_root_cid).await { 374 Ok(Some(b)) => b, 375 _ => { 376 return ( 377 StatusCode::INTERNAL_SERVER_ERROR, 378 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 379 ) 380 .into_response(); 381 } 382 }; 383 let commit = match Commit::from_cbor(&commit_bytes) { 384 Ok(c) => c, 385 _ => { 386 return ( 387 StatusCode::INTERNAL_SERVER_ERROR, 388 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 389 ) 390 .into_response(); 391 } 392 }; 393 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 394 let collection_nsid = match input.collection.parse::<Nsid>() { 395 Ok(n) => n, 396 Err(_) => { 397 return ( 398 StatusCode::BAD_REQUEST, 399 Json(json!({"error": "InvalidCollection"})), 400 ) 401 .into_response(); 402 } 403 }; 404 let key = format!("{}/{}", collection_nsid, input.rkey); 405 if input.validate.unwrap_or(true) 406 && let Err(err_response) = validate_record(&input.record, &input.collection) { 407 return *err_response; 408 } 409 if let Some(swap_record_str) = &input.swap_record { 410 let expected_cid = Cid::from_str(swap_record_str).ok(); 411 let actual_cid = mst.get(&key).await.ok().flatten(); 412 if expected_cid != actual_cid { 413 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 414 } 415 } 416 let existing_cid = mst.get(&key).await.ok().flatten(); 417 let mut record_bytes = Vec::new(); 418 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 419 return ( 420 StatusCode::BAD_REQUEST, 421 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 422 ) 423 .into_response(); 424 } 425 let record_cid = match tracking_store.put(&record_bytes).await { 426 Ok(c) => c, 427 _ => { 428 return ( 429 StatusCode::INTERNAL_SERVER_ERROR, 430 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 431 ) 432 .into_response(); 433 } 434 }; 435 let new_mst = if existing_cid.is_some() { 436 match mst.update(&key, record_cid).await { 437 Ok(m) => m, 438 Err(_) => { 439 return ( 440 StatusCode::INTERNAL_SERVER_ERROR, 441 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 442 ) 443 .into_response(); 444 } 445 } 446 } else { 447 match mst.add(&key, record_cid).await { 448 Ok(m) => m, 449 Err(_) => { 450 return ( 451 StatusCode::INTERNAL_SERVER_ERROR, 452 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 453 ) 454 .into_response(); 455 } 456 } 457 }; 458 let new_mst_root = match new_mst.persist().await { 459 Ok(c) => c, 460 Err(_) => { 461 return ( 462 StatusCode::INTERNAL_SERVER_ERROR, 463 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 464 ) 465 .into_response(); 466 } 467 }; 468 let op = if existing_cid.is_some() { 469 RecordOp::Update { 470 collection: input.collection.clone(), 471 rkey: input.rkey.clone(), 472 cid: record_cid, 473 prev: existing_cid, 474 } 475 } else { 476 RecordOp::Create { 477 collection: input.collection.clone(), 478 rkey: input.rkey.clone(), 479 cid: record_cid, 480 } 481 }; 482 let mut relevant_blocks = std::collections::BTreeMap::new(); 483 if new_mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() { 484 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 485 } 486 if mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() { 487 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 488 } 489 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 490 let mut written_cids = tracking_store.get_all_relevant_cids(); 491 for cid in relevant_blocks.keys() { 492 if !written_cids.contains(cid) { 493 written_cids.push(*cid); 494 } 495 } 496 let written_cids_str = written_cids 497 .iter() 498 .map(|c| c.to_string()) 499 .collect::<Vec<_>>(); 500 if let Err(e) = commit_and_log( 501 &state, 502 CommitParams { 503 did: &did, 504 user_id, 505 current_root_cid: Some(current_root_cid), 506 prev_data_cid: Some(commit.data), 507 new_mst_root, 508 ops: vec![op], 509 blocks_cids: &written_cids_str, 510 }, 511 ) 512 .await 513 { 514 return ( 515 StatusCode::INTERNAL_SERVER_ERROR, 516 Json(json!({"error": "InternalError", "message": e})), 517 ) 518 .into_response(); 519 }; 520 ( 521 StatusCode::OK, 522 Json(PutRecordOutput { 523 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 524 cid: record_cid.to_string(), 525 }), 526 ) 527 .into_response() 528}