this repo has no description
1use crate::api::repo::record::utils::{commit_and_log, RecordOp}; 2use crate::repo::tracking::TrackingBlockStore; 3use crate::state::AppState; 4use axum::{ 5 extract::State, 6 http::{HeaderMap, StatusCode}, 7 response::{IntoResponse, Response}, 8 Json, 9}; 10use chrono::Utc; 11use cid::Cid; 12use jacquard::types::string::Nsid; 13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 14use serde::{Deserialize, Serialize}; 15use serde_json::json; 16use std::str::FromStr; 17use std::sync::Arc; 18use tracing::error; 19use uuid::Uuid; 20 21pub async fn prepare_repo_write( 22 state: &AppState, 23 headers: &HeaderMap, 24 repo_did: &str, 25) -> Result<(String, Uuid, Cid), Response> { 26 let token = crate::auth::extract_bearer_token_from_header( 27 headers.get("Authorization").and_then(|h| h.to_str().ok()) 28 ).ok_or_else(|| { 29 ( 30 StatusCode::UNAUTHORIZED, 31 Json(json!({"error": "AuthenticationRequired"})), 32 ) 33 .into_response() 34 })?; 35 36 let auth_user = crate::auth::validate_bearer_token(&state.db, &token) 37 .await 38 .map_err(|_| { 39 ( 40 StatusCode::UNAUTHORIZED, 41 Json(json!({"error": "AuthenticationFailed"})), 42 ) 43 .into_response() 44 })?; 45 46 if repo_did != auth_user.did { 47 return Err(( 48 StatusCode::FORBIDDEN, 49 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 50 ) 51 .into_response()); 52 } 53 54 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did) 55 .fetch_optional(&state.db) 56 .await 57 .map_err(|e| { 58 error!("DB error fetching user: {}", e); 59 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 60 })? 61 .ok_or_else(|| { 62 ( 63 StatusCode::INTERNAL_SERVER_ERROR, 64 Json(json!({"error": "InternalError", "message": "User not found"})), 65 ) 66 .into_response() 67 })?; 68 69 let root_cid_str: String = 70 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 71 .fetch_optional(&state.db) 72 .await 73 .map_err(|e| { 74 error!("DB error fetching repo root: {}", e); 75 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 76 })? 77 .ok_or_else(|| { 78 ( 79 StatusCode::INTERNAL_SERVER_ERROR, 80 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 81 ) 82 .into_response() 83 })?; 84 85 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 86 ( 87 StatusCode::INTERNAL_SERVER_ERROR, 88 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 89 ) 90 .into_response() 91 })?; 92 93 Ok((auth_user.did, user_id, current_root_cid)) 94} 95 96#[derive(Deserialize)] 97#[allow(dead_code)] 98pub struct CreateRecordInput { 99 pub repo: String, 100 pub collection: String, 101 pub rkey: Option<String>, 102 pub validate: Option<bool>, 103 pub record: serde_json::Value, 104 #[serde(rename = "swapCommit")] 105 pub swap_commit: Option<String>, 106} 107 108#[derive(Serialize)] 109#[serde(rename_all = "camelCase")] 110pub struct CreateRecordOutput { 111 pub uri: String, 112 pub cid: String, 113} 114 115pub async fn create_record( 116 State(state): State<AppState>, 117 headers: HeaderMap, 118 Json(input): Json<CreateRecordInput>, 119) -> Response { 120 let (did, user_id, current_root_cid) = 121 match prepare_repo_write(&state, &headers, &input.repo).await { 122 Ok(res) => res, 123 Err(err_res) => return err_res, 124 }; 125 126 if let Some(swap_commit) = &input.swap_commit { 127 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 128 return ( 129 StatusCode::CONFLICT, 130 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 131 ) 132 .into_response(); 133 } 134 } 135 136 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 137 138 let commit_bytes = match tracking_store.get(&current_root_cid).await { 139 Ok(Some(b)) => b, 140 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 141 }; 142 let commit = match Commit::from_cbor(&commit_bytes) { 143 Ok(c) => c, 144 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 145 }; 146 147 let mst = Mst::load( 148 Arc::new(tracking_store.clone()), 149 commit.data, 150 None, 151 ); 152 153 let collection_nsid = match input.collection.parse::<Nsid>() { 154 Ok(n) => n, 155 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 156 }; 157 158 if input.validate.unwrap_or(true) { 159 if input.collection == "app.bsky.feed.post" { 160 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() { 161 return ( 162 StatusCode::BAD_REQUEST, 163 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})), 164 ) 165 .into_response(); 166 } 167 } 168 } 169 170 let rkey = input.rkey.unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 171 172 let mut record_bytes = Vec::new(); 173 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 174 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 175 } 176 let record_cid = match tracking_store.put(&record_bytes).await { 177 Ok(c) => c, 178 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 179 }; 180 181 let key = format!("{}/{}", collection_nsid, rkey); 182 let new_mst = match mst.add(&key, record_cid).await { 183 Ok(m) => m, 184 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(), 185 }; 186 let new_mst_root = match new_mst.persist().await { 187 Ok(c) => c, 188 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(), 189 }; 190 191 let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid }; 192 let written_cids = tracking_store.get_written_cids(); 193 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 194 195 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await { 196 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 197 }; 198 199 (StatusCode::OK, Json(CreateRecordOutput { 200 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 201 cid: record_cid.to_string(), 202 })).into_response() 203} 204 205#[derive(Deserialize)] 206#[allow(dead_code)] 207pub struct PutRecordInput { 208 pub repo: String, 209 pub collection: String, 210 pub rkey: String, 211 pub validate: Option<bool>, 212 pub record: serde_json::Value, 213 #[serde(rename = "swapCommit")] 214 pub swap_commit: Option<String>, 215 #[serde(rename = "swapRecord")] 216 pub swap_record: Option<String>, 217} 218 219#[derive(Serialize)] 220#[serde(rename_all = "camelCase")] 221pub struct PutRecordOutput { 222 pub uri: String, 223 pub cid: String, 224} 225 226pub async fn put_record( 227 State(state): State<AppState>, 228 headers: HeaderMap, 229 Json(input): Json<PutRecordInput>, 230) -> Response { 231 let (did, user_id, current_root_cid) = 232 match prepare_repo_write(&state, &headers, &input.repo).await { 233 Ok(res) => res, 234 Err(err_res) => return err_res, 235 }; 236 237 if let Some(swap_commit) = &input.swap_commit { 238 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 239 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response(); 240 } 241 } 242 243 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 244 245 let commit_bytes = match tracking_store.get(&current_root_cid).await { 246 Ok(Some(b)) => b, 247 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 248 }; 249 let commit = match Commit::from_cbor(&commit_bytes) { 250 Ok(c) => c, 251 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 252 }; 253 254 let mst = Mst::load( 255 Arc::new(tracking_store.clone()), 256 commit.data, 257 None, 258 ); 259 let collection_nsid = match input.collection.parse::<Nsid>() { 260 Ok(n) => n, 261 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 262 }; 263 let key = format!("{}/{}", collection_nsid, input.rkey); 264 265 if input.validate.unwrap_or(true) { 266 if input.collection == "app.bsky.feed.post" { 267 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() { 268 return ( 269 StatusCode::BAD_REQUEST, 270 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})), 271 ) 272 .into_response(); 273 } 274 } 275 } 276 277 if let Some(swap_record_str) = &input.swap_record { 278 let expected_cid = Cid::from_str(swap_record_str).ok(); 279 let actual_cid = mst.get(&key).await.ok().flatten(); 280 if expected_cid != actual_cid { 281 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 282 } 283 } 284 285 let existing_cid = mst.get(&key).await.ok().flatten(); 286 287 let mut record_bytes = Vec::new(); 288 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 289 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 290 } 291 let record_cid = match tracking_store.put(&record_bytes).await { 292 Ok(c) => c, 293 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 294 }; 295 296 let new_mst = if existing_cid.is_some() { 297 mst.update(&key, record_cid).await.unwrap() 298 } else { 299 mst.add(&key, record_cid).await.unwrap() 300 }; 301 let new_mst_root = new_mst.persist().await.unwrap(); 302 303 let op = if existing_cid.is_some() { 304 RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid } 305 } else { 306 RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid } 307 }; 308 309 let written_cids = tracking_store.get_written_cids(); 310 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 311 312 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await { 313 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 314 }; 315 316 (StatusCode::OK, Json(PutRecordOutput { 317 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 318 cid: record_cid.to_string(), 319 })).into_response() 320}