this repo has no description
1use crate::api::repo::record::utils::{commit_and_log, RecordOp}; 2use crate::repo::tracking::TrackingBlockStore; 3use crate::state::AppState; 4use axum::{ 5 extract::State, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8 Json, 9}; 10use chrono::Utc; 11use cid::Cid; 12use jacquard::types::string::Nsid; 13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 14use serde::{Deserialize, Serialize}; 15use serde_json::json; 16use std::str::FromStr; 17use std::sync::Arc; 18use tracing::error; 19 20#[derive(Deserialize)] 21#[serde(tag = "$type")] 22pub enum WriteOp { 23 #[serde(rename = "com.atproto.repo.applyWrites#create")] 24 Create { 25 collection: String, 26 rkey: Option<String>, 27 value: serde_json::Value, 28 }, 29 #[serde(rename = "com.atproto.repo.applyWrites#update")] 30 Update { 31 collection: String, 32 rkey: String, 33 value: serde_json::Value, 34 }, 35 #[serde(rename = "com.atproto.repo.applyWrites#delete")] 36 Delete { collection: String, rkey: String }, 37} 38 39#[derive(Deserialize)] 40#[serde(rename_all = "camelCase")] 41pub struct ApplyWritesInput { 42 pub repo: String, 43 pub validate: Option<bool>, 44 pub writes: Vec<WriteOp>, 45 pub swap_commit: Option<String>, 46} 47 48#[derive(Serialize)] 49#[serde(tag = "$type")] 50pub enum WriteResult { 51 #[serde(rename = "com.atproto.repo.applyWrites#createResult")] 52 CreateResult { uri: String, cid: String }, 53 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")] 54 UpdateResult { uri: String, cid: String }, 55 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")] 56 DeleteResult {}, 57} 58 59#[derive(Serialize)] 60pub struct ApplyWritesOutput { 61 pub commit: CommitInfo, 62 pub results: Vec<WriteResult>, 63} 64 65#[derive(Serialize)] 66pub struct CommitInfo { 67 pub cid: String, 68 pub rev: String, 69} 70 71pub async fn apply_writes( 72 State(state): State<AppState>, 73 headers: axum::http::HeaderMap, 74 Json(input): Json<ApplyWritesInput>, 75) -> Response { 76 let token = match crate::auth::extract_bearer_token_from_header( 77 headers.get("Authorization").and_then(|h| h.to_str().ok()) 78 ) { 79 Some(t) => t, 80 None => { 81 return ( 82 StatusCode::UNAUTHORIZED, 83 Json(json!({"error": "AuthenticationRequired"})), 84 ) 85 .into_response(); 86 } 87 }; 88 89 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 90 Ok(user) => user, 91 Err(_) => { 92 return ( 93 StatusCode::UNAUTHORIZED, 94 Json(json!({"error": "AuthenticationFailed"})), 95 ) 96 .into_response(); 97 } 98 }; 99 100 let did = auth_user.did; 101 102 if input.repo != did { 103 return ( 104 StatusCode::FORBIDDEN, 105 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 106 ) 107 .into_response(); 108 } 109 110 if input.writes.is_empty() { 111 return ( 112 StatusCode::BAD_REQUEST, 113 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})), 114 ) 115 .into_response(); 116 } 117 118 if input.writes.len() > 200 { 119 return ( 120 StatusCode::BAD_REQUEST, 121 Json(json!({"error": "InvalidRequest", "message": "Too many writes (max 200)"})), 122 ) 123 .into_response(); 124 } 125 126 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 127 .fetch_optional(&state.db) 128 .await 129 { 130 Ok(Some(id)) => id, 131 _ => { 132 return ( 133 StatusCode::INTERNAL_SERVER_ERROR, 134 Json(json!({"error": "InternalError", "message": "User not found"})), 135 ) 136 .into_response(); 137 } 138 }; 139 140 let root_cid_str: String = 141 match sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 142 .fetch_optional(&state.db) 143 .await 144 { 145 Ok(Some(cid_str)) => cid_str, 146 _ => { 147 return ( 148 StatusCode::INTERNAL_SERVER_ERROR, 149 Json(json!({"error": "InternalError", "message": "Repo root not found"})), 150 ) 151 .into_response(); 152 } 153 }; 154 155 let current_root_cid = match Cid::from_str(&root_cid_str) { 156 Ok(c) => c, 157 Err(_) => { 158 return ( 159 StatusCode::INTERNAL_SERVER_ERROR, 160 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 161 ) 162 .into_response(); 163 } 164 }; 165 166 if let Some(swap_commit) = &input.swap_commit { 167 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 168 return ( 169 StatusCode::CONFLICT, 170 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 171 ) 172 .into_response(); 173 } 174 } 175 176 let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 177 178 let commit_bytes = match tracking_store.get(&current_root_cid).await { 179 Ok(Some(b)) => b, 180 _ => { 181 return ( 182 StatusCode::INTERNAL_SERVER_ERROR, 183 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 184 ) 185 .into_response() 186 } 187 }; 188 189 let commit = match Commit::from_cbor(&commit_bytes) { 190 Ok(c) => c, 191 _ => { 192 return ( 193 StatusCode::INTERNAL_SERVER_ERROR, 194 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 195 ) 196 .into_response() 197 } 198 }; 199 200 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 201 202 let mut results: Vec<WriteResult> = Vec::new(); 203 let mut ops: Vec<RecordOp> = Vec::new(); 204 205 for write in &input.writes { 206 match write { 207 WriteOp::Create { 208 collection, 209 rkey, 210 value, 211 } => { 212 let rkey = rkey 213 .clone() 214 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 215 let mut record_bytes = Vec::new(); 216 serde_ipld_dagcbor::to_writer(&mut record_bytes, value).unwrap(); 217 let record_cid = tracking_store.put(&record_bytes).await.unwrap(); 218 219 let key = format!("{}/{}", collection.parse::<Nsid>().unwrap(), rkey); 220 mst = mst.add(&key, record_cid).await.unwrap(); 221 222 let uri = format!("at://{}/{}/{}", did, collection, rkey); 223 results.push(WriteResult::CreateResult { 224 uri, 225 cid: record_cid.to_string(), 226 }); 227 ops.push(RecordOp::Create { 228 collection: collection.clone(), 229 rkey, 230 cid: record_cid, 231 }); 232 } 233 WriteOp::Update { 234 collection, 235 rkey, 236 value, 237 } => { 238 let mut record_bytes = Vec::new(); 239 serde_ipld_dagcbor::to_writer(&mut record_bytes, value).unwrap(); 240 let record_cid = tracking_store.put(&record_bytes).await.unwrap(); 241 242 let key = format!("{}/{}", collection.parse::<Nsid>().unwrap(), rkey); 243 mst = mst.update(&key, record_cid).await.unwrap(); 244 245 let uri = format!("at://{}/{}/{}", did, collection, rkey); 246 results.push(WriteResult::UpdateResult { 247 uri, 248 cid: record_cid.to_string(), 249 }); 250 ops.push(RecordOp::Update { 251 collection: collection.clone(), 252 rkey: rkey.clone(), 253 cid: record_cid, 254 }); 255 } 256 WriteOp::Delete { collection, rkey } => { 257 let key = format!("{}/{}", collection.parse::<Nsid>().unwrap(), rkey); 258 mst = mst.delete(&key).await.unwrap(); 259 260 results.push(WriteResult::DeleteResult {}); 261 ops.push(RecordOp::Delete { 262 collection: collection.clone(), 263 rkey: rkey.clone(), 264 }); 265 } 266 } 267 } 268 269 let new_mst_root = mst.persist().await.unwrap(); 270 let written_cids = tracking_store.get_written_cids(); 271 let written_cids_str = written_cids 272 .iter() 273 .map(|c| c.to_string()) 274 .collect::<Vec<_>>(); 275 276 let commit_res = match commit_and_log( 277 &state, 278 &did, 279 user_id, 280 Some(current_root_cid), 281 new_mst_root, 282 ops, 283 &written_cids_str, 284 ) 285 .await 286 { 287 Ok(res) => res, 288 Err(e) => { 289 error!("Commit failed: {}", e); 290 return ( 291 StatusCode::INTERNAL_SERVER_ERROR, 292 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})), 293 ) 294 .into_response(); 295 } 296 }; 297 298 ( 299 StatusCode::OK, 300 Json(ApplyWritesOutput { 301 commit: CommitInfo { 302 cid: commit_res.commit_cid.to_string(), 303 rev: commit_res.rev, 304 }, 305 results, 306 }), 307 ) 308 .into_response() 309}