this repo has no description
1use crate::api::repo::record::utils::{commit_and_log, RecordOp}; 2use crate::repo::tracking::TrackingBlockStore; 3use crate::state::AppState; 4use axum::{ 5 extract::State, 6 http::{HeaderMap, StatusCode}, 7 response::{IntoResponse, Response}, 8 Json, 9}; 10use chrono::Utc; 11use cid::Cid; 12use jacquard::types::string::Nsid; 13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 14use serde::{Deserialize, Serialize}; 15use serde_json::json; 16use std::str::FromStr; 17use std::sync::Arc; 18use tracing::error; 19use uuid::Uuid; 20 21pub async fn prepare_repo_write( 22 state: &AppState, 23 headers: &HeaderMap, 24 repo_did: &str, 25) -> Result<(String, Uuid, Cid), Response> { 26 let auth_header = headers.get("Authorization").ok_or_else(|| { 27 ( 28 StatusCode::UNAUTHORIZED, 29 Json(json!({"error": "AuthenticationRequired"})), 30 ) 31 .into_response() 32 })?; 33 let token = auth_header 34 .to_str() 35 .unwrap_or("") 36 .replace("Bearer ", ""); 37 38 let session = sqlx::query!( 39 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", 40 token 41 ) 42 .fetch_optional(&state.db) 43 .await 44 .map_err(|e| { 45 error!("DB error fetching session: {}", e); 46 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 47 })? 48 .ok_or_else(|| { 49 ( 50 StatusCode::UNAUTHORIZED, 51 Json(json!({"error": "AuthenticationFailed"})), 52 ) 53 .into_response() 54 })?; 55 56 crate::auth::verify_token(&token, &session.key_bytes).map_err(|_| { 57 ( 58 StatusCode::UNAUTHORIZED, 59 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 60 ) 61 .into_response() 62 })?; 63 64 if repo_did != session.did { 65 return Err(( 66 StatusCode::FORBIDDEN, 67 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 68 ) 69 .into_response()); 70 } 71 72 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", session.did) 73 .fetch_optional(&state.db) 74 .await 75 .map_err(|e| { 76 error!("DB error fetching user: {}", e); 77 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 78 })? 79 .ok_or_else(|| { 80 ( 81 StatusCode::INTERNAL_SERVER_ERROR, 82 Json(json!({"error": "InternalError", "message": "User not found"})), 83 ) 84 .into_response() 85 })?; 86 87 let root_cid_str: String = 88 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 89 .fetch_optional(&state.db) 90 .await 91 .map_err(|e| { 92 error!("DB error fetching repo root: {}", e); 93 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 94 })? 95 .ok_or_else(|| { 96 ( 97 StatusCode::INTERNAL_SERVER_ERROR, 98 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 99 ) 100 .into_response() 101 })?; 102 103 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 104 ( 105 StatusCode::INTERNAL_SERVER_ERROR, 106 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 107 ) 108 .into_response() 109 })?; 110 111 Ok((session.did, user_id, current_root_cid)) 112} 113 114#[derive(Deserialize)] 115#[allow(dead_code)] 116pub struct CreateRecordInput { 117 pub repo: String, 118 pub collection: String, 119 pub rkey: Option<String>, 120 pub validate: Option<bool>, 121 pub record: serde_json::Value, 122 #[serde(rename = "swapCommit")] 123 pub swap_commit: Option<String>, 124} 125 126#[derive(Serialize)] 127#[serde(rename_all = "camelCase")] 128pub struct CreateRecordOutput { 129 pub uri: String, 130 pub cid: String, 131} 132 133pub async fn create_record( 134 State(state): State<AppState>, 135 headers: HeaderMap, 136 Json(input): Json<CreateRecordInput>, 137) -> Response { 138 let (did, user_id, current_root_cid) = 139 match prepare_repo_write(&state, &headers, &input.repo).await { 140 Ok(res) => res, 141 Err(err_res) => return err_res, 142 }; 143 144 if let Some(swap_commit) = &input.swap_commit { 145 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 146 return ( 147 StatusCode::CONFLICT, 148 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 149 ) 150 .into_response(); 151 } 152 } 153 154 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 155 156 let commit_bytes = match tracking_store.get(&current_root_cid).await { 157 Ok(Some(b)) => b, 158 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 159 }; 160 let commit = match Commit::from_cbor(&commit_bytes) { 161 Ok(c) => c, 162 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 163 }; 164 165 let mst = Mst::load( 166 Arc::new(tracking_store.clone()), 167 commit.data, 168 None, 169 ); 170 171 let collection_nsid = match input.collection.parse::<Nsid>() { 172 Ok(n) => n, 173 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 174 }; 175 176 if input.validate.unwrap_or(true) { 177 if input.collection == "app.bsky.feed.post" { 178 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() { 179 return ( 180 StatusCode::BAD_REQUEST, 181 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})), 182 ) 183 .into_response(); 184 } 185 } 186 } 187 188 let rkey = input.rkey.unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 189 190 let mut record_bytes = Vec::new(); 191 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 192 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 193 } 194 let record_cid = match tracking_store.put(&record_bytes).await { 195 Ok(c) => c, 196 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 197 }; 198 199 let key = format!("{}/{}", collection_nsid, rkey); 200 let new_mst = match mst.add(&key, record_cid).await { 201 Ok(m) => m, 202 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(), 203 }; 204 let new_mst_root = match new_mst.persist().await { 205 Ok(c) => c, 206 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(), 207 }; 208 209 let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid }; 210 let written_cids = tracking_store.get_written_cids(); 211 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 212 213 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await { 214 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 215 }; 216 217 (StatusCode::OK, Json(CreateRecordOutput { 218 uri: format!("at://{}/{}/{}", did, input.collection, rkey), 219 cid: record_cid.to_string(), 220 })).into_response() 221} 222 223#[derive(Deserialize)] 224#[allow(dead_code)] 225pub struct PutRecordInput { 226 pub repo: String, 227 pub collection: String, 228 pub rkey: String, 229 pub validate: Option<bool>, 230 pub record: serde_json::Value, 231 #[serde(rename = "swapCommit")] 232 pub swap_commit: Option<String>, 233 #[serde(rename = "swapRecord")] 234 pub swap_record: Option<String>, 235} 236 237#[derive(Serialize)] 238#[serde(rename_all = "camelCase")] 239pub struct PutRecordOutput { 240 pub uri: String, 241 pub cid: String, 242} 243 244pub async fn put_record( 245 State(state): State<AppState>, 246 headers: HeaderMap, 247 Json(input): Json<PutRecordInput>, 248) -> Response { 249 let (did, user_id, current_root_cid) = 250 match prepare_repo_write(&state, &headers, &input.repo).await { 251 Ok(res) => res, 252 Err(err_res) => return err_res, 253 }; 254 255 if let Some(swap_commit) = &input.swap_commit { 256 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 257 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response(); 258 } 259 } 260 261 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 262 263 let commit_bytes = match tracking_store.get(&current_root_cid).await { 264 Ok(Some(b)) => b, 265 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 266 }; 267 let commit = match Commit::from_cbor(&commit_bytes) { 268 Ok(c) => c, 269 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 270 }; 271 272 let mst = Mst::load( 273 Arc::new(tracking_store.clone()), 274 commit.data, 275 None, 276 ); 277 let collection_nsid = match input.collection.parse::<Nsid>() { 278 Ok(n) => n, 279 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 280 }; 281 let key = format!("{}/{}", collection_nsid, input.rkey); 282 283 if input.validate.unwrap_or(true) { 284 if input.collection == "app.bsky.feed.post" { 285 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() { 286 return ( 287 StatusCode::BAD_REQUEST, 288 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})), 289 ) 290 .into_response(); 291 } 292 } 293 } 294 295 if let Some(swap_record_str) = &input.swap_record { 296 let expected_cid = Cid::from_str(swap_record_str).ok(); 297 let actual_cid = mst.get(&key).await.ok().flatten(); 298 if expected_cid != actual_cid { 299 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 300 } 301 } 302 303 let existing_cid = mst.get(&key).await.ok().flatten(); 304 305 let mut record_bytes = Vec::new(); 306 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 307 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 308 } 309 let record_cid = match tracking_store.put(&record_bytes).await { 310 Ok(c) => c, 311 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 312 }; 313 314 let new_mst = if existing_cid.is_some() { 315 mst.update(&key, record_cid).await.unwrap() 316 } else { 317 mst.add(&key, record_cid).await.unwrap() 318 }; 319 let new_mst_root = new_mst.persist().await.unwrap(); 320 321 let op = if existing_cid.is_some() { 322 RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid } 323 } else { 324 RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid } 325 }; 326 327 let written_cids = tracking_store.get_written_cids(); 328 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 329 330 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await { 331 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 332 }; 333 334 (StatusCode::OK, Json(PutRecordOutput { 335 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 336 cid: record_cid.to_string(), 337 })).into_response() 338}