this repo has no description
1use super::validation::validate_record; 2use crate::api::repo::record::utils::{commit_and_log, RecordOp}; 3use crate::repo::tracking::TrackingBlockStore; 4use crate::state::AppState; 5use axum::{ 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9 Json, 10}; 11use chrono::Utc; 12use cid::Cid; 13use jacquard::types::string::Nsid; 14use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 15use serde::{Deserialize, Serialize}; 16use serde_json::json; 17use sqlx::{PgPool, Row}; 18use std::str::FromStr; 19use std::sync::Arc; 20use tracing::error; 21use uuid::Uuid; 22 23pub async fn has_verified_notification_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> { 24 let row = sqlx::query( 25 r#" 26 SELECT 27 email_confirmed, 28 discord_verified, 29 telegram_verified, 30 signal_verified 31 FROM users 32 WHERE did = $1 33 "# 34 ) 35 .bind(did) 36 .fetch_optional(db) 37 .await?; 38 39 match row { 40 Some(r) => { 41 let email_confirmed: bool = r.get("email_confirmed"); 42 let discord_verified: bool = r.get("discord_verified"); 43 let telegram_verified: bool = r.get("telegram_verified"); 44 let signal_verified: bool = r.get("signal_verified"); 45 Ok(email_confirmed || discord_verified || telegram_verified || signal_verified) 46 } 47 None => Ok(false), 48 } 49} 50 51pub async fn prepare_repo_write( 52 state: &AppState, 53 headers: &HeaderMap, 54 repo_did: &str, 55) -> Result<(String, Uuid, Cid), Response> { 56 let token = crate::auth::extract_bearer_token_from_header( 57 headers.get("Authorization").and_then(|h| h.to_str().ok()) 58 ).ok_or_else(|| { 59 ( 60 StatusCode::UNAUTHORIZED, 61 Json(json!({"error": "AuthenticationRequired"})), 62 ) 63 .into_response() 64 })?; 65 66 let auth_user = crate::auth::validate_bearer_token(&state.db, &token) 67 .await 68 .map_err(|_| { 69 ( 70 StatusCode::UNAUTHORIZED, 71 Json(json!({"error": "AuthenticationFailed"})), 72 ) 73 .into_response() 74 })?; 75 76 if repo_did != auth_user.did { 77 return Err(( 78 StatusCode::FORBIDDEN, 79 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 80 ) 81 .into_response()); 82 } 83 84 match has_verified_notification_channel(&state.db, &auth_user.did).await { 85 Ok(true) => {} 86 Ok(false) => { 87 return Err(( 88 StatusCode::FORBIDDEN, 89 Json(json!({ 90 "error": "AccountNotVerified", 91 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records" 92 })), 93 ) 94 .into_response()); 95 } 96 Err(e) => { 97 error!("DB error checking notification channels: {}", e); 98 return Err(( 99 StatusCode::INTERNAL_SERVER_ERROR, 100 Json(json!({"error": "InternalError"})), 101 ) 102 .into_response()); 103 } 104 } 105 106 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 107 .fetch_optional(&state.db) 108 .await 109 .map_err(|e| { 110 error!("DB error fetching user: {}", e); 111 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 112 })? 113 .ok_or_else(|| { 114 ( 115 StatusCode::INTERNAL_SERVER_ERROR, 116 Json(json!({"error": "InternalError", "message": "User not found"})), 117 ) 118 .into_response() 119 })?; 120 121 let root_cid_str: String = 122 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 123 .fetch_optional(&state.db) 124 .await 125 .map_err(|e| { 126 error!("DB error fetching repo root: {}", e); 127 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 128 })? 129 .ok_or_else(|| { 130 ( 131 StatusCode::INTERNAL_SERVER_ERROR, 132 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 133 ) 134 .into_response() 135 })?; 136 137 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 138 ( 139 StatusCode::INTERNAL_SERVER_ERROR, 140 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 141 ) 142 .into_response() 143 })?; 144 145 Ok((auth_user.did, user_id, current_root_cid)) 146} 147 148#[derive(Deserialize)] 149#[allow(dead_code)] 150pub struct CreateRecordInput { 151 pub repo: String, 152 pub collection: String, 153 pub rkey: Option<String>, 154 pub validate: Option<bool>, 155 pub record: serde_json::Value, 156 #[serde(rename = "swapCommit")] 157 pub swap_commit: Option<String>, 158} 159 160#[derive(Serialize)] 161#[serde(rename_all = "camelCase")] 162pub struct CreateRecordOutput { 163 pub uri: String, 164 pub cid: String, 165} 166 167pub async fn create_record( 168 State(state): State<AppState>, 169 headers: HeaderMap, 170 Json(input): Json<CreateRecordInput>, 171) -> Response { 172 let (did, user_id, current_root_cid) = 173 match prepare_repo_write(&state, &headers, &input.repo).await { 174 Ok(res) => res, 175 Err(err_res) => return err_res, 176 }; 177 178 if let Some(swap_commit) = &input.swap_commit { 179 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 180 return ( 181 StatusCode::CONFLICT, 182 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 183 ) 184 .into_response(); 185 } 186 } 187 188 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 189 190 let commit_bytes = match tracking_store.get(&current_root_cid).await { 191 Ok(Some(b)) => b, 192 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 193 }; 194 let commit = match Commit::from_cbor(&commit_bytes) { 195 Ok(c) => c, 196 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 197 }; 198 199 let mst = Mst::load( 200 Arc::new(tracking_store.clone()), 201 commit.data, 202 None, 203 ); 204 205 let collection_nsid = match input.collection.parse::<Nsid>() { 206 Ok(n) => n, 207 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 208 }; 209 210 if input.validate.unwrap_or(true) { 211 if let Err(err_response) = validate_record(&input.record, &input.collection) { 212 return err_response; 213 } 214 } 215 216 let rkey = input.rkey.unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 217 218 let mut record_bytes = Vec::new(); 219 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 220 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 221 } 222 let record_cid = match tracking_store.put(&record_bytes).await { 223 Ok(c) => c, 224 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 225 }; 226 227 let key = format!("{}/{}", collection_nsid, rkey); 228 let new_mst = match mst.add(&key, record_cid).await { 229 Ok(m) => m, 230 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(), 231 }; 232 let new_mst_root = match new_mst.persist().await { 233 Ok(c) => c, 234 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(), 235 }; 236 237 let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid }; 238 let written_cids = tracking_store.get_written_cids(); 239 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 240 241 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await { 242 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 243 }; 244 245 (StatusCode::OK, Json(CreateRecordOutput { 246 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 247 cid: record_cid.to_string(), 248 })).into_response() 249} 250 251#[derive(Deserialize)] 252#[allow(dead_code)] 253pub struct PutRecordInput { 254 pub repo: String, 255 pub collection: String, 256 pub rkey: String, 257 pub validate: Option<bool>, 258 pub record: serde_json::Value, 259 #[serde(rename = "swapCommit")] 260 pub swap_commit: Option<String>, 261 #[serde(rename = "swapRecord")] 262 pub swap_record: Option<String>, 263} 264 265#[derive(Serialize)] 266#[serde(rename_all = "camelCase")] 267pub struct PutRecordOutput { 268 pub uri: String, 269 pub cid: String, 270} 271 272pub async fn put_record( 273 State(state): State<AppState>, 274 headers: HeaderMap, 275 Json(input): Json<PutRecordInput>, 276) -> Response { 277 let (did, user_id, current_root_cid) = 278 match prepare_repo_write(&state, &headers, &input.repo).await { 279 Ok(res) => res, 280 Err(err_res) => return err_res, 281 }; 282 283 if let Some(swap_commit) = &input.swap_commit { 284 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 285 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response(); 286 } 287 } 288 289 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 290 291 let commit_bytes = match tracking_store.get(&current_root_cid).await { 292 Ok(Some(b)) => b, 293 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 294 }; 295 let commit = match Commit::from_cbor(&commit_bytes) { 296 Ok(c) => c, 297 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 298 }; 299 300 let mst = Mst::load( 301 Arc::new(tracking_store.clone()), 302 commit.data, 303 None, 304 ); 305 let collection_nsid = match input.collection.parse::<Nsid>() { 306 Ok(n) => n, 307 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 308 }; 309 let key = format!("{}/{}", collection_nsid, input.rkey); 310 311 if input.validate.unwrap_or(true) { 312 if let Err(err_response) = validate_record(&input.record, &input.collection) { 313 return err_response; 314 } 315 } 316 317 if let Some(swap_record_str) = &input.swap_record { 318 let expected_cid = Cid::from_str(swap_record_str).ok(); 319 let actual_cid = mst.get(&key).await.ok().flatten(); 320 if expected_cid != actual_cid { 321 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 322 } 323 } 324 325 let existing_cid = mst.get(&key).await.ok().flatten(); 326 327 let mut record_bytes = Vec::new(); 328 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 329 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 330 } 331 let record_cid = match tracking_store.put(&record_bytes).await { 332 Ok(c) => c, 333 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 334 }; 335 336 let new_mst = if existing_cid.is_some() { 337 match mst.update(&key, record_cid).await { 338 Ok(m) => m, 339 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update MST"}))).into_response(), 340 } 341 } else { 342 match mst.add(&key, record_cid).await { 343 Ok(m) => m, 344 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(), 345 } 346 }; 347 let new_mst_root = match new_mst.persist().await { 348 Ok(c) => c, 349 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(), 350 }; 351 352 let op = if existing_cid.is_some() { 353 RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid } 354 } else { 355 RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid } 356 }; 357 358 let written_cids = tracking_store.get_written_cids(); 359 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 360 361 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await { 362 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 363 }; 364 365 (StatusCode::OK, Json(PutRecordOutput { 366 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 367 cid: record_cid.to_string(), 368 })).into_response() 369}