this repo has no description
1use super::validation::validate_record; 2use crate::api::repo::record::utils::{commit_and_log, RecordOp}; 3use crate::repo::tracking::TrackingBlockStore; 4use crate::state::AppState; 5use axum::{ 6 extract::State, 7 http::{HeaderMap, StatusCode}, 8 response::{IntoResponse, Response}, 9 Json, 10}; 11use chrono::Utc; 12use cid::Cid; 13use jacquard::types::string::Nsid; 14use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 15use serde::{Deserialize, Serialize}; 16use serde_json::json; 17use std::str::FromStr; 18use std::sync::Arc; 19use tracing::error; 20use uuid::Uuid; 21 22pub async fn prepare_repo_write( 23 state: &AppState, 24 headers: &HeaderMap, 25 repo_did: &str, 26) -> Result<(String, Uuid, Cid), Response> { 27 let token = crate::auth::extract_bearer_token_from_header( 28 headers.get("Authorization").and_then(|h| h.to_str().ok()) 29 ).ok_or_else(|| { 30 ( 31 StatusCode::UNAUTHORIZED, 32 Json(json!({"error": "AuthenticationRequired"})), 33 ) 34 .into_response() 35 })?; 36 37 let auth_user = crate::auth::validate_bearer_token(&state.db, &token) 38 .await 39 .map_err(|_| { 40 ( 41 StatusCode::UNAUTHORIZED, 42 Json(json!({"error": "AuthenticationFailed"})), 43 ) 44 .into_response() 45 })?; 46 47 if repo_did != auth_user.did { 48 return Err(( 49 StatusCode::FORBIDDEN, 50 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 51 ) 52 .into_response()); 53 } 54 55 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 56 .fetch_optional(&state.db) 57 .await 58 .map_err(|e| { 59 error!("DB error fetching user: {}", e); 60 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 61 })? 62 .ok_or_else(|| { 63 ( 64 StatusCode::INTERNAL_SERVER_ERROR, 65 Json(json!({"error": "InternalError", "message": "User not found"})), 66 ) 67 .into_response() 68 })?; 69 70 let root_cid_str: String = 71 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 72 .fetch_optional(&state.db) 73 .await 74 .map_err(|e| { 75 error!("DB error fetching repo root: {}", e); 76 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 77 })? 78 .ok_or_else(|| { 79 ( 80 StatusCode::INTERNAL_SERVER_ERROR, 81 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 82 ) 83 .into_response() 84 })?; 85 86 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 87 ( 88 StatusCode::INTERNAL_SERVER_ERROR, 89 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 90 ) 91 .into_response() 92 })?; 93 94 Ok((auth_user.did, user_id, current_root_cid)) 95} 96 97#[derive(Deserialize)] 98#[allow(dead_code)] 99pub struct CreateRecordInput { 100 pub repo: String, 101 pub collection: String, 102 pub rkey: Option<String>, 103 pub validate: Option<bool>, 104 pub record: serde_json::Value, 105 #[serde(rename = "swapCommit")] 106 pub swap_commit: Option<String>, 107} 108 109#[derive(Serialize)] 110#[serde(rename_all = "camelCase")] 111pub struct CreateRecordOutput { 112 pub uri: String, 113 pub cid: String, 114} 115 116pub async fn create_record( 117 State(state): State<AppState>, 118 headers: HeaderMap, 119 Json(input): Json<CreateRecordInput>, 120) -> Response { 121 let (did, user_id, current_root_cid) = 122 match prepare_repo_write(&state, &headers, &input.repo).await { 123 Ok(res) => res, 124 Err(err_res) => return err_res, 125 }; 126 127 if let Some(swap_commit) = &input.swap_commit { 128 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 129 return ( 130 StatusCode::CONFLICT, 131 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 132 ) 133 .into_response(); 134 } 135 } 136 137 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 138 139 let commit_bytes = match tracking_store.get(&current_root_cid).await { 140 Ok(Some(b)) => b, 141 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 142 }; 143 let commit = match Commit::from_cbor(&commit_bytes) { 144 Ok(c) => c, 145 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 146 }; 147 148 let mst = Mst::load( 149 Arc::new(tracking_store.clone()), 150 commit.data, 151 None, 152 ); 153 154 let collection_nsid = match input.collection.parse::<Nsid>() { 155 Ok(n) => n, 156 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 157 }; 158 159 if input.validate.unwrap_or(true) { 160 if let Err(err_response) = validate_record(&input.record, &input.collection) { 161 return err_response; 162 } 163 } 164 165 let rkey = input.rkey.unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 166 167 let mut record_bytes = Vec::new(); 168 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 169 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 170 } 171 let record_cid = match tracking_store.put(&record_bytes).await { 172 Ok(c) => c, 173 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 174 }; 175 176 let key = format!("{}/{}", collection_nsid, rkey); 177 let new_mst = match mst.add(&key, record_cid).await { 178 Ok(m) => m, 179 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(), 180 }; 181 let new_mst_root = match new_mst.persist().await { 182 Ok(c) => c, 183 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(), 184 }; 185 186 let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid }; 187 let written_cids = tracking_store.get_written_cids(); 188 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 189 190 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await { 191 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 192 }; 193 194 (StatusCode::OK, Json(CreateRecordOutput { 195 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 196 cid: record_cid.to_string(), 197 })).into_response() 198} 199 200#[derive(Deserialize)] 201#[allow(dead_code)] 202pub struct PutRecordInput { 203 pub repo: String, 204 pub collection: String, 205 pub rkey: String, 206 pub validate: Option<bool>, 207 pub record: serde_json::Value, 208 #[serde(rename = "swapCommit")] 209 pub swap_commit: Option<String>, 210 #[serde(rename = "swapRecord")] 211 pub swap_record: Option<String>, 212} 213 214#[derive(Serialize)] 215#[serde(rename_all = "camelCase")] 216pub struct PutRecordOutput { 217 pub uri: String, 218 pub cid: String, 219} 220 221pub async fn put_record( 222 State(state): State<AppState>, 223 headers: HeaderMap, 224 Json(input): Json<PutRecordInput>, 225) -> Response { 226 let (did, user_id, current_root_cid) = 227 match prepare_repo_write(&state, &headers, &input.repo).await { 228 Ok(res) => res, 229 Err(err_res) => return err_res, 230 }; 231 232 if let Some(swap_commit) = &input.swap_commit { 233 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 234 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response(); 235 } 236 } 237 238 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 239 240 let commit_bytes = match tracking_store.get(&current_root_cid).await { 241 Ok(Some(b)) => b, 242 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 243 }; 244 let commit = match Commit::from_cbor(&commit_bytes) { 245 Ok(c) => c, 246 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 247 }; 248 249 let mst = Mst::load( 250 Arc::new(tracking_store.clone()), 251 commit.data, 252 None, 253 ); 254 let collection_nsid = match input.collection.parse::<Nsid>() { 255 Ok(n) => n, 256 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 257 }; 258 let key = format!("{}/{}", collection_nsid, input.rkey); 259 260 if input.validate.unwrap_or(true) { 261 if let Err(err_response) = validate_record(&input.record, &input.collection) { 262 return err_response; 263 } 264 } 265 266 if let Some(swap_record_str) = &input.swap_record { 267 let expected_cid = Cid::from_str(swap_record_str).ok(); 268 let actual_cid = mst.get(&key).await.ok().flatten(); 269 if expected_cid != actual_cid { 270 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 271 } 272 } 273 274 let existing_cid = mst.get(&key).await.ok().flatten(); 275 276 let mut record_bytes = Vec::new(); 277 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 278 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 279 } 280 let record_cid = match tracking_store.put(&record_bytes).await { 281 Ok(c) => c, 282 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 283 }; 284 285 let new_mst = if existing_cid.is_some() { 286 match mst.update(&key, record_cid).await { 287 Ok(m) => m, 288 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update MST"}))).into_response(), 289 } 290 } else { 291 match mst.add(&key, record_cid).await { 292 Ok(m) => m, 293 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(), 294 } 295 }; 296 let new_mst_root = match new_mst.persist().await { 297 Ok(c) => c, 298 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(), 299 }; 300 301 let op = if existing_cid.is_some() { 302 RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid } 303 } else { 304 RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid } 305 }; 306 307 let written_cids = tracking_store.get_written_cids(); 308 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 309 310 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await { 311 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 312 }; 313 314 (StatusCode::OK, Json(PutRecordOutput { 315 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 316 cid: record_cid.to_string(), 317 })).into_response() 318}