this repo has no description
1use crate::api::repo::record::utils::{commit_and_log, RecordOp}; 2use crate::repo::tracking::TrackingBlockStore; 3use crate::state::AppState; 4use axum::{ 5 extract::State, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8 Json, 9}; 10use chrono::Utc; 11use cid::Cid; 12use jacquard::types::string::Nsid; 13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 14use serde::{Deserialize, Serialize}; 15use serde_json::json; 16use std::str::FromStr; 17use std::sync::Arc; 18use tracing::error; 19 20const MAX_BATCH_WRITES: usize = 200; 21 22#[derive(Deserialize)] 23#[serde(tag = "$type")] 24pub enum WriteOp { 25 #[serde(rename = "com.atproto.repo.applyWrites#create")] 26 Create { 27 collection: String, 28 rkey: Option<String>, 29 value: serde_json::Value, 30 }, 31 #[serde(rename = "com.atproto.repo.applyWrites#update")] 32 Update { 33 collection: String, 34 rkey: String, 35 value: serde_json::Value, 36 }, 37 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 38 Delete { collection: String, rkey: String }, 39} 40 41#[derive(Deserialize)] 42#[serde(rename_all = "camelCase")] 43pub struct ApplyWritesInput { 44 pub repo: String, 45 pub validate: Option<bool>, 46 pub writes: Vec<WriteOp>, 47 pub swap_commit: Option<String>, 48} 49 50#[derive(Serialize)] 51#[serde(tag = "$type")] 52pub enum WriteResult { 53 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 54 CreateResult { uri: String, cid: String }, 55 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 56 UpdateResult { uri: String, cid: String }, 57 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 58 DeleteResult {}, 59} 60 61#[derive(Serialize)] 62pub struct ApplyWritesOutput { 63 pub commit: CommitInfo, 64 pub results: Vec<WriteResult>, 65} 66 67#[derive(Serialize)] 68pub struct CommitInfo { 69 pub cid: String, 70 pub rev: String, 71} 72 73pub async fn apply_writes( 74 State(state): State<AppState>, 75 headers: axum::http::HeaderMap, 76 Json(input): Json<ApplyWritesInput>, 77) -> Response { 78 let token = match crate::auth::extract_bearer_token_from_header( 79 headers.get("Authorization").and_then(|h| h.to_str().ok()) 80 ) { 81 Some(t) => t, 82 None => { 83 return ( 84 StatusCode::UNAUTHORIZED, 85 Json(json!({"error": "AuthenticationRequired"})), 86 ) 87 .into_response(); 88 } 89 }; 90 91 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 92 Ok(user) => user, 93 Err(_) => { 94 return ( 95 StatusCode::UNAUTHORIZED, 96 Json(json!({"error": "AuthenticationFailed"})), 97 ) 98 .into_response(); 99 } 100 }; 101 102 let did = auth_user.did; 103 104 if input.repo != did { 105 return ( 106 StatusCode::FORBIDDEN, 107 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 108 ) 109 .into_response(); 110 } 111 112 if input.writes.is_empty() { 113 return ( 114 StatusCode::BAD_REQUEST, 115 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})), 116 ) 117 .into_response(); 118 } 119 120 if input.writes.len() > MAX_BATCH_WRITES { 121 return ( 122 StatusCode::BAD_REQUEST, 123 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})), 124 ) 125 .into_response(); 126 } 127 128 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 129 .fetch_optional(&state.db) 130 .await 131 { 132 Ok(Some(id)) => id, 133 _ => { 134 return ( 135 StatusCode::INTERNAL_SERVER_ERROR, 136 Json(json!({"error": "InternalError", "message": "User not found"})), 137 ) 138 .into_response(); 139 } 140 }; 141 142 let root_cid_str: String = 143 match sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 144 .fetch_optional(&state.db) 145 .await 146 { 147 Ok(Some(cid_str)) => cid_str, 148 _ => { 149 return ( 150 StatusCode::INTERNAL_SERVER_ERROR, 151 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 152 ) 153 .into_response(); 154 } 155 }; 156 157 let current_root_cid = match Cid::from_str(&root_cid_str) { 158 Ok(c) => c, 159 Err(_) => { 160 return ( 161 StatusCode::INTERNAL_SERVER_ERROR, 162 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 163 ) 164 .into_response(); 165 } 166 }; 167 168 if let Some(swap_commit) = &input.swap_commit { 169 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 170 return ( 171 StatusCode::CONFLICT, 172 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 173 ) 174 .into_response(); 175 } 176 } 177 178 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 179 180 let commit_bytes = match tracking_store.get(&current_root_cid).await { 181 Ok(Some(b)) => b, 182 _ => { 183 return ( 184 StatusCode::INTERNAL_SERVER_ERROR, 185 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 186 ) 187 .into_response() 188 } 189 }; 190 191 let commit = match Commit::from_cbor(&commit_bytes) { 192 Ok(c) => c, 193 _ => { 194 return ( 195 StatusCode::INTERNAL_SERVER_ERROR, 196 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 197 ) 198 .into_response() 199 } 200 }; 201 202 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 203 204 let mut results: Vec<WriteResult> = Vec::new(); 205 let mut ops: Vec<RecordOp> = Vec::new(); 206 207 for write in &input.writes { 208 match write { 209 WriteOp::Create { 210 collection, 211 rkey, 212 value, 213 } => { 214 let rkey = rkey 215 .clone() 216 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 217 let mut record_bytes = Vec::new(); 218 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() { 219 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 220 } 221 let record_cid = match tracking_store.put(&record_bytes).await { 222 Ok(c) => c, 223 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to store record"}))).into_response(), 224 }; 225 226 let collection_nsid = match collection.parse::<Nsid>() { 227 Ok(n) => n, 228 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 229 }; 230 let key = format!("{}/{}", collection_nsid, rkey); 231 mst = match mst.add(&key, record_cid).await { 232 Ok(m) => m, 233 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(), 234 }; 235 236 let uri = format!("at://{}/{}/{}", did, collection, rkey); 237 results.push(WriteResult::CreateResult { 238 uri, 239 cid: record_cid.to_string(), 240 }); 241 ops.push(RecordOp::Create { 242 collection: collection.clone(), 243 rkey, 244 cid: record_cid, 245 }); 246 } 247 WriteOp::Update { 248 collection, 249 rkey, 250 value, 251 } => { 252 let mut record_bytes = Vec::new(); 253 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() { 254 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 255 } 256 let record_cid = match tracking_store.put(&record_bytes).await { 257 Ok(c) => c, 258 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to store record"}))).into_response(), 259 }; 260 261 let collection_nsid = match collection.parse::<Nsid>() { 262 Ok(n) => n, 263 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 264 }; 265 let key = format!("{}/{}", collection_nsid, rkey); 266 mst = match mst.update(&key, record_cid).await { 267 Ok(m) => m, 268 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update MST"}))).into_response(), 269 }; 270 271 let uri = format!("at://{}/{}/{}", did, collection, rkey); 272 results.push(WriteResult::UpdateResult { 273 uri, 274 cid: record_cid.to_string(), 275 }); 276 ops.push(RecordOp::Update { 277 collection: collection.clone(), 278 rkey: rkey.clone(), 279 cid: record_cid, 280 }); 281 } 282 WriteOp::Delete { collection, rkey } => { 283 let collection_nsid = match collection.parse::<Nsid>() { 284 Ok(n) => n, 285 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(), 286 }; 287 let key = format!("{}/{}", collection_nsid, rkey); 288 mst = match mst.delete(&key).await { 289 Ok(m) => m, 290 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(), 291 }; 292 293 results.push(WriteResult::DeleteResult {}); 294 ops.push(RecordOp::Delete { 295 collection: collection.clone(), 296 rkey: rkey.clone(), 297 }); 298 } 299 } 300 } 301 302 let new_mst_root = match mst.persist().await { 303 Ok(c) => c, 304 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(), 305 }; 306 let written_cids = tracking_store.get_written_cids(); 307 let written_cids_str = written_cids 308 .iter() 309 .map(|c| c.to_string()) 310 .collect::<Vec<_>>(); 311 312 let commit_res = match commit_and_log( 313 &state, 314 &did, 315 user_id, 316 Some(current_root_cid), 317 new_mst_root, 318 ops, 319 &written_cids_str, 320 ) 321 .await 322 { 323 Ok(res) => res, 324 Err(e) => { 325 error!("Commit failed: {}", e); 326 return ( 327 StatusCode::INTERNAL_SERVER_ERROR, 328 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})), 329 ) 330 .into_response(); 331 } 332 }; 333 334 ( 335 StatusCode::OK, 336 Json(ApplyWritesOutput { 337 commit: CommitInfo { 338 cid: commit_res.commit_cid.to_string(), 339 rev: commit_res.rev, 340 }, 341 results, 342 }), 343 ) 344 .into_response() 345}