this repo has no description
1use super::validation::validate_record; 2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log}; 3use crate::repo::tracking::TrackingBlockStore; 4use crate::state::AppState; 5use axum::{ 6 Json, 7 extract::State, 8 http::{HeaderMap, StatusCode}, 9 response::{IntoResponse, Response}, 10}; 11use cid::Cid; 12use jacquard::types::{ 13 integer::LimitedU32, 14 string::{Nsid, Tid}, 15}; 16use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 17use serde::{Deserialize, Serialize}; 18use serde_json::json; 19use sqlx::{PgPool, Row}; 20use std::str::FromStr; 21use std::sync::Arc; 22use tracing::error; 23use uuid::Uuid; 24 25pub async fn has_verified_notification_channel( 26 db: &PgPool, 27 did: &str, 28) -> Result<bool, sqlx::Error> { 29 let row = sqlx::query( 30 r#" 31 SELECT 32 email_confirmed, 33 discord_verified, 34 telegram_verified, 35 signal_verified 36 FROM users 37 WHERE did = $1 38 "#, 39 ) 40 .bind(did) 41 .fetch_optional(db) 42 .await?; 43 match row { 44 Some(r) => { 45 let email_confirmed: bool = r.get("email_confirmed"); 46 let discord_verified: bool = r.get("discord_verified"); 47 let telegram_verified: bool = r.get("telegram_verified"); 48 let signal_verified: bool = r.get("signal_verified"); 49 Ok(email_confirmed || discord_verified || telegram_verified || signal_verified) 50 } 51 None => Ok(false), 52 } 53} 54 55pub async fn prepare_repo_write( 56 state: &AppState, 57 headers: &HeaderMap, 58 repo_did: &str, 59) -> Result<(String, Uuid, Cid), Response> { 60 let token = crate::auth::extract_bearer_token_from_header( 61 headers.get("Authorization").and_then(|h| h.to_str().ok()), 62 ) 63 .ok_or_else(|| { 64 ( 65 StatusCode::UNAUTHORIZED, 66 Json(json!({"error": "AuthenticationRequired"})), 67 ) 68 .into_response() 69 })?; 70 let auth_user = crate::auth::validate_bearer_token(&state.db, &token) 71 .await 72 .map_err(|_| { 73 ( 74 StatusCode::UNAUTHORIZED, 75 Json(json!({"error": "AuthenticationFailed"})), 76 ) 77 .into_response() 78 })?; 79 if repo_did != auth_user.did { 80 return Err(( 81 StatusCode::FORBIDDEN, 82 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 83 ) 84 .into_response()); 85 } 86 match has_verified_notification_channel(&state.db, &auth_user.did).await { 87 Ok(true) => {} 88 Ok(false) => { 89 return Err(( 90 StatusCode::FORBIDDEN, 91 Json(json!({ 92 "error": "AccountNotVerified", 93 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 94 })), 95 ) 96 .into_response()); 97 } 98 Err(e) => { 99 error!("DB error checking notification channels: {}", e); 100 return Err(( 101 StatusCode::INTERNAL_SERVER_ERROR, 102 Json(json!({"error": "InternalError"})), 103 ) 104 .into_response()); 105 } 106 } 107 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 108 .fetch_optional(&state.db) 109 .await 110 .map_err(|e| { 111 error!("DB error fetching user: {}", e); 112 ( 113 StatusCode::INTERNAL_SERVER_ERROR, 114 Json(json!({"error": "InternalError"})), 115 ) 116 .into_response() 117 })? 118 .ok_or_else(|| { 119 ( 120 StatusCode::INTERNAL_SERVER_ERROR, 121 Json(json!({"error": "InternalError", "message": "User not found"})), 122 ) 123 .into_response() 124 })?; 125 let root_cid_str: String = sqlx::query_scalar!( 126 "SELECT repo_root_cid FROM repos WHERE user_id = $1", 127 user_id 128 ) 129 .fetch_optional(&state.db) 130 .await 131 .map_err(|e| { 132 error!("DB error fetching repo root: {}", e); 133 ( 134 StatusCode::INTERNAL_SERVER_ERROR, 135 Json(json!({"error": "InternalError"})), 136 ) 137 .into_response() 138 })? 139 .ok_or_else(|| { 140 ( 141 StatusCode::INTERNAL_SERVER_ERROR, 142 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 143 ) 144 .into_response() 145 })?; 146 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 147 ( 148 StatusCode::INTERNAL_SERVER_ERROR, 149 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 150 ) 151 .into_response() 152 })?; 153 Ok((auth_user.did, user_id, current_root_cid)) 154} 155#[derive(Deserialize)] 156#[allow(dead_code)] 157pub struct CreateRecordInput { 158 pub repo: String, 159 pub collection: String, 160 pub rkey: Option<String>, 161 pub validate: Option<bool>, 162 pub record: serde_json::Value, 163 #[serde(rename = "swapCommit")] 164 pub swap_commit: Option<String>, 165} 166#[derive(Serialize)] 167#[serde(rename_all = "camelCase")] 168pub struct CreateRecordOutput { 169 pub uri: String, 170 pub cid: String, 171} 172pub async fn create_record( 173 State(state): State<AppState>, 174 headers: HeaderMap, 175 Json(input): Json<CreateRecordInput>, 176) -> Response { 177 let (did, user_id, current_root_cid) = 178 match prepare_repo_write(&state, &headers, &input.repo).await { 179 Ok(res) => res, 180 Err(err_res) => return err_res, 181 }; 182 if let Some(swap_commit) = &input.swap_commit 183 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 184 return ( 185 StatusCode::CONFLICT, 186 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 187 ) 188 .into_response(); 189 } 190 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 191 let commit_bytes = match tracking_store.get(&current_root_cid).await { 192 Ok(Some(b)) => b, 193 _ => { 194 return ( 195 StatusCode::INTERNAL_SERVER_ERROR, 196 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 197 ) 198 .into_response(); 199 } 200 }; 201 let commit = match Commit::from_cbor(&commit_bytes) { 202 Ok(c) => c, 203 _ => { 204 return ( 205 StatusCode::INTERNAL_SERVER_ERROR, 206 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 207 ) 208 .into_response(); 209 } 210 }; 211 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 212 let collection_nsid = match input.collection.parse::<Nsid>() { 213 Ok(n) => n, 214 Err(_) => { 215 return ( 216 StatusCode::BAD_REQUEST, 217 Json(json!({"error": "InvalidCollection"})), 218 ) 219 .into_response(); 220 } 221 }; 222 if input.validate.unwrap_or(true) 223 && let Err(err_response) = validate_record(&input.record, &input.collection) { 224 return *err_response; 225 } 226 let rkey = input 227 .rkey 228 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 229 let mut record_bytes = Vec::new(); 230 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 231 return ( 232 StatusCode::BAD_REQUEST, 233 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 234 ) 235 .into_response(); 236 } 237 let record_cid = match tracking_store.put(&record_bytes).await { 238 Ok(c) => c, 239 _ => { 240 return ( 241 StatusCode::INTERNAL_SERVER_ERROR, 242 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 243 ) 244 .into_response(); 245 } 246 }; 247 let key = format!("{}/{}", collection_nsid, rkey); 248 let new_mst = match mst.add(&key, record_cid).await { 249 Ok(m) => m, 250 _ => { 251 return ( 252 StatusCode::INTERNAL_SERVER_ERROR, 253 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 254 ) 255 .into_response(); 256 } 257 }; 258 let new_mst_root = match new_mst.persist().await { 259 Ok(c) => c, 260 _ => { 261 return ( 262 StatusCode::INTERNAL_SERVER_ERROR, 263 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 264 ) 265 .into_response(); 266 } 267 }; 268 let op = RecordOp::Create { 269 collection: input.collection.clone(), 270 rkey: rkey.clone(), 271 cid: record_cid, 272 }; 273 let mut relevant_blocks = std::collections::BTreeMap::new(); 274 if new_mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() { 275 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 276 } 277 if mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() { 278 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 279 } 280 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 281 let mut written_cids = tracking_store.get_all_relevant_cids(); 282 for cid in relevant_blocks.keys() { 283 if !written_cids.contains(cid) { 284 written_cids.push(*cid); 285 } 286 } 287 let written_cids_str = written_cids 288 .iter() 289 .map(|c| c.to_string()) 290 .collect::<Vec<_>>(); 291 if let Err(e) = commit_and_log( 292 &state, 293 CommitParams { 294 did: &did, 295 user_id, 296 current_root_cid: Some(current_root_cid), 297 prev_data_cid: Some(commit.data), 298 new_mst_root, 299 ops: vec![op], 300 blocks_cids: &written_cids_str, 301 }, 302 ) 303 .await 304 { 305 return ( 306 StatusCode::INTERNAL_SERVER_ERROR, 307 Json(json!({"error": "InternalError", "message": e})), 308 ) 309 .into_response(); 310 }; 311 ( 312 StatusCode::OK, 313 Json(CreateRecordOutput { 314 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 315 cid: record_cid.to_string(), 316 }), 317 ) 318 .into_response() 319} 320#[derive(Deserialize)] 321#[allow(dead_code)] 322pub struct PutRecordInput { 323 pub repo: String, 324 pub collection: String, 325 pub rkey: String, 326 pub validate: Option<bool>, 327 pub record: serde_json::Value, 328 #[serde(rename = "swapCommit")] 329 pub swap_commit: Option<String>, 330 #[serde(rename = "swapRecord")] 331 pub swap_record: Option<String>, 332} 333#[derive(Serialize)] 334#[serde(rename_all = "camelCase")] 335pub struct PutRecordOutput { 336 pub uri: String, 337 pub cid: String, 338} 339pub async fn put_record( 340 State(state): State<AppState>, 341 headers: HeaderMap, 342 Json(input): Json<PutRecordInput>, 343) -> Response { 344 let (did, user_id, current_root_cid) = 345 match prepare_repo_write(&state, &headers, &input.repo).await { 346 Ok(res) => res, 347 Err(err_res) => return err_res, 348 }; 349 if let Some(swap_commit) = &input.swap_commit 350 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 351 return ( 352 StatusCode::CONFLICT, 353 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 354 ) 355 .into_response(); 356 } 357 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 358 let commit_bytes = match tracking_store.get(&current_root_cid).await { 359 Ok(Some(b)) => b, 360 _ => { 361 return ( 362 StatusCode::INTERNAL_SERVER_ERROR, 363 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 364 ) 365 .into_response(); 366 } 367 }; 368 let commit = match Commit::from_cbor(&commit_bytes) { 369 Ok(c) => c, 370 _ => { 371 return ( 372 StatusCode::INTERNAL_SERVER_ERROR, 373 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 374 ) 375 .into_response(); 376 } 377 }; 378 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 379 let collection_nsid = match input.collection.parse::<Nsid>() { 380 Ok(n) => n, 381 Err(_) => { 382 return ( 383 StatusCode::BAD_REQUEST, 384 Json(json!({"error": "InvalidCollection"})), 385 ) 386 .into_response(); 387 } 388 }; 389 let key = format!("{}/{}", collection_nsid, input.rkey); 390 if input.validate.unwrap_or(true) 391 && let Err(err_response) = validate_record(&input.record, &input.collection) { 392 return *err_response; 393 } 394 if let Some(swap_record_str) = &input.swap_record { 395 let expected_cid = Cid::from_str(swap_record_str).ok(); 396 let actual_cid = mst.get(&key).await.ok().flatten(); 397 if expected_cid != actual_cid { 398 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 399 } 400 } 401 let existing_cid = mst.get(&key).await.ok().flatten(); 402 let mut record_bytes = Vec::new(); 403 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 404 return ( 405 StatusCode::BAD_REQUEST, 406 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 407 ) 408 .into_response(); 409 } 410 let record_cid = match tracking_store.put(&record_bytes).await { 411 Ok(c) => c, 412 _ => { 413 return ( 414 StatusCode::INTERNAL_SERVER_ERROR, 415 Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 416 ) 417 .into_response(); 418 } 419 }; 420 let new_mst = if existing_cid.is_some() { 421 match mst.update(&key, record_cid).await { 422 Ok(m) => m, 423 Err(_) => { 424 return ( 425 StatusCode::INTERNAL_SERVER_ERROR, 426 Json(json!({"error": "InternalError", "message": "Failed to update MST"})), 427 ) 428 .into_response(); 429 } 430 } 431 } else { 432 match mst.add(&key, record_cid).await { 433 Ok(m) => m, 434 Err(_) => { 435 return ( 436 StatusCode::INTERNAL_SERVER_ERROR, 437 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})), 438 ) 439 .into_response(); 440 } 441 } 442 }; 443 let new_mst_root = match new_mst.persist().await { 444 Ok(c) => c, 445 Err(_) => { 446 return ( 447 StatusCode::INTERNAL_SERVER_ERROR, 448 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 449 ) 450 .into_response(); 451 } 452 }; 453 let op = if existing_cid.is_some() { 454 RecordOp::Update { 455 collection: input.collection.clone(), 456 rkey: input.rkey.clone(), 457 cid: record_cid, 458 prev: existing_cid, 459 } 460 } else { 461 RecordOp::Create { 462 collection: input.collection.clone(), 463 rkey: input.rkey.clone(), 464 cid: record_cid, 465 } 466 }; 467 let mut relevant_blocks = std::collections::BTreeMap::new(); 468 if new_mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() { 469 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 470 } 471 if mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() { 472 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 473 } 474 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 475 let mut written_cids = tracking_store.get_all_relevant_cids(); 476 for cid in relevant_blocks.keys() { 477 if !written_cids.contains(cid) { 478 written_cids.push(*cid); 479 } 480 } 481 let written_cids_str = written_cids 482 .iter() 483 .map(|c| c.to_string()) 484 .collect::<Vec<_>>(); 485 if let Err(e) = commit_and_log( 486 &state, 487 CommitParams { 488 did: &did, 489 user_id, 490 current_root_cid: Some(current_root_cid), 491 prev_data_cid: Some(commit.data), 492 new_mst_root, 493 ops: vec![op], 494 blocks_cids: &written_cids_str, 495 }, 496 ) 497 .await 498 { 499 return ( 500 StatusCode::INTERNAL_SERVER_ERROR, 501 Json(json!({"error": "InternalError", "message": e})), 502 ) 503 .into_response(); 504 }; 505 ( 506 StatusCode::OK, 507 Json(PutRecordOutput { 508 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 509 cid: record_cid.to_string(), 510 }), 511 ) 512 .into_response() 513}