this repo has no description
1use super::validation::validate_record; 2use crate::api::repo::record::utils::{commit_and_log, RecordOp}; 3use crate::repo::tracking::TrackingBlockStore; 4use crate::state::AppState; 5use axum::{ 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9 Json, 10}; 11use cid::Cid; 12use jacquard::types::{integer::LimitedU32, string::{Nsid, Tid}}; 13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 14use serde::{Deserialize, Serialize}; 15use serde_json::json; 16use sqlx::{PgPool, Row}; 17use std::str::FromStr; 18use std::sync::Arc; 19use tracing::error; 20use uuid::Uuid; 21 22pub async fn has_verified_notification_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 23 let row = sqlx::query( 24 r#" 25 SELECT 26 email_confirmed, 27 discord_verified, 28 telegram_verified, 29 signal_verified 30 FROM users 31 WHERE did = $1 32 "# 33 ) 34 .bind(did) 35 .fetch_optional(db) 36 .await?; 37 match row { 38 Some(r) => { 39 let email_confirmed: bool = r.get("email_confirmed"); 40 let discord_verified: bool = r.get("discord_verified"); 41 let telegram_verified: bool = r.get("telegram_verified"); 42 let signal_verified: bool = r.get("signal_verified"); 43 Ok(email_confirmed || discord_verified || telegram_verified || signal_verified) 44 } 45 None => Ok(false), 46 } 47} 48 49pub async fn prepare_repo_write( 50 state: &AppState, 51 headers: &HeaderMap, 52 repo_did: &str, 53) -> Result<(String, Uuid, Cid), Response> { 54 let token = crate::auth::extract_bearer_token_from_header( 55 headers.get("Authorization").and_then(|h| h.to_str().ok()) 56 ).ok_or_else(|| { 57 ( 58 StatusCode::UNAUTHORIZED, 59 Json(json!({"error": "AuthenticationRequired"})), 60 ) 61 .into_response() 62 })?; 63 let auth_user = crate::auth::validate_bearer_token(&state.db, &token) 64 .await 65 .map_err(|_| { 66 ( 67 StatusCode::UNAUTHORIZED, 68 Json(json!({"error": "AuthenticationFailed"})), 69 ) 70 .into_response() 71 })?; 72 if repo_did != auth_user.did { 73 return Err(( 74 StatusCode::FORBIDDEN, 75 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 76 ) 77 .into_response()); 78 } 79 match has_verified_notification_channel(&state.db, &auth_user.did).await { 80 Ok(true) => {} 81 Ok(false) => { 82 return Err(( 83 StatusCode::FORBIDDEN, 84 Json(json!({ 85 "error": "AccountNotVerified", 86 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 87 })), 88 ) 89 .into_response()); 90 } 91 Err(e) => { 92 error!("DB error checking notification channels: {}", e); 93 return Err(( 94 StatusCode::INTERNAL_SERVER_ERROR, 95 Json(json!({"error": "InternalError"})), 96 ) 97 .into_response()); 98 } 99 } 100 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 101 .fetch_optional(&state.db) 102 .await 103 .map_err(|e| { 104 error!("DB error fetching user: {}", e); 105 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 106 })? 107 .ok_or_else(|| { 108 ( 109 StatusCode::INTERNAL_SERVER_ERROR, 110 Json(json!({"error": "InternalError", "message": "User not found"})), 111 ) 112 .into_response() 113 })?; 114 let root_cid_str: String = 115 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 116 .fetch_optional(&state.db) 117 .await 118 .map_err(|e| { 119 error!("DB error fetching repo root: {}", e); 120 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 121 })? 122 .ok_or_else(|| { 123 ( 124 StatusCode::INTERNAL_SERVER_ERROR, 125 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 126 ) 127 .into_response() 128 })?; 129 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 130 ( 131 StatusCode::INTERNAL_SERVER_ERROR, 132 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 133 ) 134 .into_response() 135 })?; 136 Ok((auth_user.did, user_id, current_root_cid)) 137} 138#[derive(Deserialize)] 139#[allow(dead_code)] 140pub struct CreateRecordInput { 141 pub repo: String, 142 pub collection: String, 143 pub rkey: Option<String>, 144 pub validate: Option<bool>, 145 pub record: serde_json::Value, 146 #[serde(rename = "swapCommit")] 147 pub swap_commit: Option<String>, 148} 149#[derive(Serialize)] 150#[serde(rename_all = "camelCase")] 151pub struct CreateRecordOutput { 152 pub uri: String, 153 pub cid: String, 154} 155pub async fn create_record( 156 State(state): State<AppState>, 157 headers: HeaderMap, 158 Json(input): Json<CreateRecordInput>, 159) -> Response { 160 let (did, user_id, current_root_cid) = 161 match prepare_repo_write(&state, &headers, &input.repo).await { 162 Ok(res) => res, 163 Err(err_res) => return err_res, 164 }; 165 if let Some(swap_commit) = &input.swap_commit { 166 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 167 return ( 168 StatusCode::CONFLICT, 169 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 170 ) 171 .into_response(); 172 } 173 } 174 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 175 let commit_bytes = match tracking_store.get(&current_root_cid).await { 176 Ok(Some(b)) => b, 177 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 178 }; 179 let commit = match Commit::from_cbor(&commit_bytes) { 180 Ok(c) => c, 181 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 182 }; 183 let mst = Mst::load( 184 Arc::new(tracking_store.clone()), 185 commit.data, 186 None, 187 ); 188 let collection_nsid = match input.collection.parse::<Nsid>() { 189 Ok(n) => n, 190 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 191 }; 192 if input.validate.unwrap_or(true) { 193 if let Err(err_response) = validate_record(&input.record, &input.collection) { 194 return err_response; 195 } 196 } 197 let rkey = input.rkey.unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string()); 198 let mut record_bytes = Vec::new(); 199 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 200 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 201 } 202 let record_cid = match tracking_store.put(&record_bytes).await { 203 Ok(c) => c, 204 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 205 }; 206 let key = format!("{}/{}", collection_nsid, rkey); 207 let new_mst = match mst.add(&key, record_cid).await { 208 Ok(m) => m, 209 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(), 210 }; 211 let new_mst_root = match new_mst.persist().await { 212 Ok(c) => c, 213 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(), 214 }; 215 let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid }; 216 let mut relevant_blocks = std::collections::BTreeMap::new(); 217 if let Err(_) = new_mst.blocks_for_path(&key, &mut relevant_blocks).await { 218 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 219 } 220 if let Err(_) = mst.blocks_for_path(&key, &mut relevant_blocks).await { 221 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 222 } 223 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 224 let mut written_cids = tracking_store.get_all_relevant_cids(); 225 for cid in relevant_blocks.keys() { 226 if !written_cids.contains(cid) { 227 written_cids.push(*cid); 228 } 229 } 230 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 231 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), Some(commit.data), new_mst_root, vec![op], &written_cids_str).await { 232 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 233 }; 234 (StatusCode::OK, Json(CreateRecordOutput { 235 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 236 cid: record_cid.to_string(), 237 })).into_response() 238} 239#[derive(Deserialize)] 240#[allow(dead_code)] 241pub struct PutRecordInput { 242 pub repo: String, 243 pub collection: String, 244 pub rkey: String, 245 pub validate: Option<bool>, 246 pub record: serde_json::Value, 247 #[serde(rename = "swapCommit")] 248 pub swap_commit: Option<String>, 249 #[serde(rename = "swapRecord")] 250 pub swap_record: Option<String>, 251} 252#[derive(Serialize)] 253#[serde(rename_all = "camelCase")] 254pub struct PutRecordOutput { 255 pub uri: String, 256 pub cid: String, 257} 258pub async fn put_record( 259 State(state): State<AppState>, 260 headers: HeaderMap, 261 Json(input): Json<PutRecordInput>, 262) -> Response { 263 let (did, user_id, current_root_cid) = 264 match prepare_repo_write(&state, &headers, &input.repo).await { 265 Ok(res) => res, 266 Err(err_res) => return err_res, 267 }; 268 if let Some(swap_commit) = &input.swap_commit { 269 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 270 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response(); 271 } 272 } 273 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 274 let commit_bytes = match tracking_store.get(&current_root_cid).await { 275 Ok(Some(b)) => b, 276 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 277 }; 278 let commit = match Commit::from_cbor(&commit_bytes) { 279 Ok(c) => c, 280 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 281 }; 282 let mst = Mst::load( 283 Arc::new(tracking_store.clone()), 284 commit.data, 285 None, 286 ); 287 let collection_nsid = match input.collection.parse::<Nsid>() { 288 Ok(n) => n, 289 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 290 }; 291 let key = format!("{}/{}", collection_nsid, input.rkey); 292 if input.validate.unwrap_or(true) { 293 if let Err(err_response) = validate_record(&input.record, &input.collection) { 294 return err_response; 295 } 296 } 297 if let Some(swap_record_str) = &input.swap_record { 298 let expected_cid = Cid::from_str(swap_record_str).ok(); 299 let actual_cid = mst.get(&key).await.ok().flatten(); 300 if expected_cid != actual_cid { 301 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 302 } 303 } 304 let existing_cid = mst.get(&key).await.ok().flatten(); 305 let mut record_bytes = Vec::new(); 306 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 307 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 308 } 309 let record_cid = match tracking_store.put(&record_bytes).await { 310 Ok(c) => c, 311 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 312 }; 313 let new_mst = if existing_cid.is_some() { 314 match mst.update(&key, record_cid).await { 315 Ok(m) => m, 316 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update MST"}))).into_response(), 317 } 318 } else { 319 match mst.add(&key, record_cid).await { 320 Ok(m) => m, 321 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(), 322 } 323 }; 324 let new_mst_root = match new_mst.persist().await { 325 Ok(c) => c, 326 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(), 327 }; 328 let op = if existing_cid.is_some() { 329 RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid, prev: existing_cid } 330 } else { 331 RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid } 332 }; 333 let mut relevant_blocks = std::collections::BTreeMap::new(); 334 if let Err(_) = new_mst.blocks_for_path(&key, &mut relevant_blocks).await { 335 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response(); 336 } 337 if let Err(_) = mst.blocks_for_path(&key, &mut relevant_blocks).await { 338 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response(); 339 } 340 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes)); 341 let mut written_cids = tracking_store.get_all_relevant_cids(); 342 for cid in relevant_blocks.keys() { 343 if !written_cids.contains(cid) { 344 written_cids.push(*cid); 345 } 346 } 347 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 348 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), Some(commit.data), new_mst_root, vec![op], &written_cids_str).await { 349 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 350 }; 351 (StatusCode::OK, Json(PutRecordOutput { 352 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 353 cid: record_cid.to_string(), 354 })).into_response() 355}