this repo has no description
1use super::validation::validate_record; 2use crate::api::repo::record::utils::{commit_and_log, RecordOp}; 3use crate::repo::tracking::TrackingBlockStore; 4use crate::state::AppState; 5use axum::{ 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9 Json, 10}; 11use cid::Cid; 12use jacquard::types::{integer::LimitedU32, string::{Nsid, Tid}}; 13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 14use serde::{Deserialize, Serialize}; 15use serde_json::json; 16use sqlx::{PgPool, Row}; 17use std::str::FromStr; 18use std::sync::Arc; 19use tracing::error; 20use uuid::Uuid; 21pub async fn has_verified_notification_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 22 let row = sqlx::query( 23 r#" 24 SELECT 25 email_confirmed, 26 discord_verified, 27 telegram_verified, 28 signal_verified 29 FROM users 30 WHERE did = $1 31 "# 32 ) 33 .bind(did) 34 .fetch_optional(db) 35 .await?; 36 match row { 37 Some(r) => { 38 let email_confirmed: bool = r.get("email_confirmed"); 39 let discord_verified: bool = r.get("discord_verified"); 40 let telegram_verified: bool = r.get("telegram_verified"); 41 let signal_verified: bool = r.get("signal_verified"); 42 Ok(email_confirmed || discord_verified || telegram_verified || signal_verified) 43 } 44 None => Ok(false), 45 } 46} 47pub async fn prepare_repo_write( 48 state: &AppState, 49 headers: &HeaderMap, 50 repo_did: &str, 51) -> Result<(String, Uuid, Cid), Response> { 52 let token = crate::auth::extract_bearer_token_from_header( 53 headers.get("Authorization").and_then(|h| h.to_str().ok()) 54 ).ok_or_else(|| { 55 ( 56 StatusCode::UNAUTHORIZED, 57 Json(json!({"error": "AuthenticationRequired"})), 58 ) 59 .into_response() 60 })?; 61 let auth_user = crate::auth::validate_bearer_token(&state.db, &token) 62 .await 63 .map_err(|_| { 64 ( 65 StatusCode::UNAUTHORIZED, 66 Json(json!({"error": "AuthenticationFailed"})), 67 ) 68 .into_response() 69 })?; 70 if repo_did != auth_user.did { 71 return Err(( 72 StatusCode::FORBIDDEN, 73 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 74 ) 75 .into_response()); 76 } 77 match has_verified_notification_channel(&state.db, &auth_user.did).await { 78 Ok(true) => {} 79 Ok(false) => { 80 return Err(( 81 StatusCode::FORBIDDEN, 82 Json(json!({ 83 "error": "AccountNotVerified", 84 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 85 })), 86 ) 87 .into_response()); 88 } 89 Err(e) => { 90 error!("DB error checking notification channels: {}", e); 91 return Err(( 92 StatusCode::INTERNAL_SERVER_ERROR, 93 Json(json!({"error": "InternalError"})), 94 ) 95 .into_response()); 96 } 97 } 98 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 99 .fetch_optional(&state.db) 100 .await 101 .map_err(|e| { 102 error!("DB error fetching user: {}", e); 103 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 104 })? 105 .ok_or_else(|| { 106 ( 107 StatusCode::INTERNAL_SERVER_ERROR, 108 Json(json!({"error": "InternalError", "message": "User not found"})), 109 ) 110 .into_response() 111 })?; 112 let root_cid_str: String = 113 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 114 .fetch_optional(&state.db) 115 .await 116 .map_err(|e| { 117 error!("DB error fetching repo root: {}", e); 118 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 119 })? 120 .ok_or_else(|| { 121 ( 122 StatusCode::INTERNAL_SERVER_ERROR, 123 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 124 ) 125 .into_response() 126 })?; 127 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 128 ( 129 StatusCode::INTERNAL_SERVER_ERROR, 130 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 131 ) 132 .into_response() 133 })?; 134 Ok((auth_user.did, user_id, current_root_cid)) 135} 136#[derive(Deserialize)] 137#[allow(dead_code)] 138pub struct CreateRecordInput { 139 pub repo: String, 140 pub collection: String, 141 pub rkey: Option<String>, 142 pub validate: Option<bool>, 143 pub record: serde_json::Value, 144 #[serde(rename = "swapCommit")] 145 pub swap_commit: Option<String>, 146} 147#[derive(Serialize)] 148#[serde(rename_all = "camelCase")] 149pub struct CreateRecordOutput { 150 pub uri: String, 151 pub cid: String, 152} 153pub async fn create_record( 154 State(state): State<AppState>, 155 headers: HeaderMap, 156 Json(input): Json<CreateRecordInput>, 157) -> Response { 158 let (did, user_id, current_root_cid) = 159 match prepare_repo_write(&state, &headers, &input.repo).await { 160 Ok(res) => res, 161 Err(err_res) => return err_res, 162 }; 163 if let Some(swap_commit) = &input.swap_commit { 164 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 165 return ( 166 StatusCode::CONFLICT, 167 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 168 ) 169 .into_response(); 170 } 171 } 172 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 173 let commit_bytes = match tracking_store.get(&current_root_cid).await { 174 Ok(Some(b)) => b, 175 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 176 }; 177 let commit = match Commit::from_cbor(&commit_bytes) { 178 Ok(c) => c, 179 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 180 }; 181 let mst = Mst::load( 182 Arc::new(tracking_store.clone()), 183 commit.data, 184 None, 185 ); 186 let collection_nsid = match input.collection.parse::<Nsid>() { 187 Ok(n) => n, 188 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 189 }; 190 if input.validate.unwrap_or(true) { 191 if let Err(err_response) = validate_record(&input.record, &input.collection) { 192 return err_response; 193 } 194 } 195 let rkey = input.rkey.unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 196 let mut record_bytes = Vec::new(); 197 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 198 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 199 } 200 let record_cid = match tracking_store.put(&record_bytes).await { 201 Ok(c) => c, 202 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 203 }; 204 let key = format!("{}/{}", collection_nsid, rkey); 205 let new_mst = match mst.add(&key, record_cid).await { 206 Ok(m) => m, 207 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(), 208 }; 209 let new_mst_root = match new_mst.persist().await { 210 Ok(c) => c, 211 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(), 212 }; 213 let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid }; 214 let mut relevant_blocks = std::collections::BTreeMap::new(); 215 if let Err(_) = new_mst.blocks_for_path(&key, &mut relevant_blocks).await { 216 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 217 } 218 if let Err(_) = mst.blocks_for_path(&key, &mut relevant_blocks).await { 219 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 220 } 221 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 222 let mut written_cids = tracking_store.get_all_relevant_cids(); 223 for cid in relevant_blocks.keys() { 224 if !written_cids.contains(cid) { 225 written_cids.push(*cid); 226 } 227 } 228 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 229 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), Some(commit.data), new_mst_root, vec![op], &written_cids_str).await { 230 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 231 }; 232 (StatusCode::OK, Json(CreateRecordOutput { 233 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 234 cid: record_cid.to_string(), 235 })).into_response() 236} 237#[derive(Deserialize)] 238#[allow(dead_code)] 239pub struct PutRecordInput { 240 pub repo: String, 241 pub collection: String, 242 pub rkey: String, 243 pub validate: Option<bool>, 244 pub record: serde_json::Value, 245 #[serde(rename = "swapCommit")] 246 pub swap_commit: Option<String>, 247 #[serde(rename = "swapRecord")] 248 pub swap_record: Option<String>, 249} 250#[derive(Serialize)] 251#[serde(rename_all = "camelCase")] 252pub struct PutRecordOutput { 253 pub uri: String, 254 pub cid: String, 255} 256pub async fn put_record( 257 State(state): State<AppState>, 258 headers: HeaderMap, 259 Json(input): Json<PutRecordInput>, 260) -> Response { 261 let (did, user_id, current_root_cid) = 262 match prepare_repo_write(&state, &headers, &input.repo).await { 263 Ok(res) => res, 264 Err(err_res) => return err_res, 265 }; 266 if let Some(swap_commit) = &input.swap_commit { 267 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 268 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response(); 269 } 270 } 271 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 272 let commit_bytes = match tracking_store.get(&current_root_cid).await { 273 Ok(Some(b)) => b, 274 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 275 }; 276 let commit = match Commit::from_cbor(&commit_bytes) { 277 Ok(c) => c, 278 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 279 }; 280 let mst = Mst::load( 281 Arc::new(tracking_store.clone()), 282 commit.data, 283 None, 284 ); 285 let collection_nsid = match input.collection.parse::<Nsid>() { 286 Ok(n) => n, 287 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 288 }; 289 let key = format!("{}/{}", collection_nsid, input.rkey); 290 if input.validate.unwrap_or(true) { 291 if let Err(err_response) = validate_record(&input.record, &input.collection) { 292 return err_response; 293 } 294 } 295 if let Some(swap_record_str) = &input.swap_record { 296 let expected_cid = Cid::from_str(swap_record_str).ok(); 297 let actual_cid = mst.get(&key).await.ok().flatten(); 298 if expected_cid != actual_cid { 299 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 300 } 301 } 302 let existing_cid = mst.get(&key).await.ok().flatten(); 303 let mut record_bytes = Vec::new(); 304 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 305 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 306 } 307 let record_cid = match tracking_store.put(&record_bytes).await { 308 Ok(c) => c, 309 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 310 }; 311 let new_mst = if existing_cid.is_some() { 312 match mst.update(&key, record_cid).await { 313 Ok(m) => m, 314 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update MST"}))).into_response(), 315 } 316 } else { 317 match mst.add(&key, record_cid).await { 318 Ok(m) => m, 319 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(), 320 } 321 }; 322 let new_mst_root = match new_mst.persist().await { 323 Ok(c) => c, 324 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(), 325 }; 326 let op = if existing_cid.is_some() { 327 RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid, prev: existing_cid } 328 } else { 329 RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid } 330 }; 331 let mut relevant_blocks = std::collections::BTreeMap::new(); 332 if let Err(_) = new_mst.blocks_for_path(&key, &mut relevant_blocks).await { 333 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 334 } 335 if let Err(_) = mst.blocks_for_path(&key, &mut relevant_blocks).await { 336 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 337 } 338 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 339 let mut written_cids = tracking_store.get_all_relevant_cids(); 340 for cid in relevant_blocks.keys() { 341 if !written_cids.contains(cid) { 342 written_cids.push(*cid); 343 } 344 } 345 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 346 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), Some(commit.data), new_mst_root, vec![op], &written_cids_str).await { 347 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 348 }; 349 (StatusCode::OK, Json(PutRecordOutput { 350 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 351 cid: record_cid.to_string(), 352 })).into_response() 353}