this repo has no description
1use crate::api::repo::record::utils::{commit_and_log, RecordOp};
2use crate::repo::tracking::TrackingBlockStore;
3use crate::state::AppState;
4use axum::{
5 extract::State,
6 http::StatusCode,
7 response::{IntoResponse, Response},
8 Json,
9};
10use chrono::Utc;
11use cid::Cid;
12use jacquard::types::string::Nsid;
13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
14use serde::{Deserialize, Serialize};
15use serde_json::json;
16use std::str::FromStr;
17use std::sync::Arc;
18use tracing::error;
19
20const MAX_BATCH_WRITES: usize = 200;
21
22#[derive(Deserialize)]
23#[serde(tag = "$type")]
24pub enum WriteOp {
25 #[serde(rename = "com.atproto.repo.applyWrites#create")]
26 Create {
27 collection: String,
28 rkey: Option<String>,
29 value: serde_json::Value,
30 },
31 #[serde(rename = "com.atproto.repo.applyWrites#update")]
32 Update {
33 collection: String,
34 rkey: String,
35 value: serde_json::Value,
36 },
37 #[serde(rename = "com.atproto.repo.applyWrites#delete")]
38 Delete { collection: String, rkey: String },
39}
40
41#[derive(Deserialize)]
42#[serde(rename_all = "camelCase")]
43pub struct ApplyWritesInput {
44 pub repo: String,
45 pub validate: Option<bool>,
46 pub writes: Vec<WriteOp>,
47 pub swap_commit: Option<String>,
48}
49
50#[derive(Serialize)]
51#[serde(tag = "$type")]
52pub enum WriteResult {
53 #[serde(rename = "com.atproto.repo.applyWrites#createResult")]
54 CreateResult { uri: String, cid: String },
55 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")]
56 UpdateResult { uri: String, cid: String },
57 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")]
58 DeleteResult {},
59}
60
61#[derive(Serialize)]
62pub struct ApplyWritesOutput {
63 pub commit: CommitInfo,
64 pub results: Vec<WriteResult>,
65}
66
67#[derive(Serialize)]
68pub struct CommitInfo {
69 pub cid: String,
70 pub rev: String,
71}
72
73pub async fn apply_writes(
74 State(state): State<AppState>,
75 headers: axum::http::HeaderMap,
76 Json(input): Json<ApplyWritesInput>,
77) -> Response {
78 let token = match crate::auth::extract_bearer_token_from_header(
79 headers.get("Authorization").and_then(|h| h.to_str().ok())
80 ) {
81 Some(t) => t,
82 None => {
83 return (
84 StatusCode::UNAUTHORIZED,
85 Json(json!({"error": "AuthenticationRequired"})),
86 )
87 .into_response();
88 }
89 };
90
91 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
92 Ok(user) => user,
93 Err(_) => {
94 return (
95 StatusCode::UNAUTHORIZED,
96 Json(json!({"error": "AuthenticationFailed"})),
97 )
98 .into_response();
99 }
100 };
101
102 let did = auth_user.did;
103
104 if input.repo != did {
105 return (
106 StatusCode::FORBIDDEN,
107 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
108 )
109 .into_response();
110 }
111
112 if input.writes.is_empty() {
113 return (
114 StatusCode::BAD_REQUEST,
115 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})),
116 )
117 .into_response();
118 }
119
120 if input.writes.len() > MAX_BATCH_WRITES {
121 return (
122 StatusCode::BAD_REQUEST,
123 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})),
124 )
125 .into_response();
126 }
127
128 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
129 .fetch_optional(&state.db)
130 .await
131 {
132 Ok(Some(id)) => id,
133 _ => {
134 return (
135 StatusCode::INTERNAL_SERVER_ERROR,
136 Json(json!({"error": "InternalError", "message": "User not found"})),
137 )
138 .into_response();
139 }
140 };
141
142 let root_cid_str: String =
143 match sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
144 .fetch_optional(&state.db)
145 .await
146 {
147 Ok(Some(cid_str)) => cid_str,
148 _ => {
149 return (
150 StatusCode::INTERNAL_SERVER_ERROR,
151 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
152 )
153 .into_response();
154 }
155 };
156
157 let current_root_cid = match Cid::from_str(&root_cid_str) {
158 Ok(c) => c,
159 Err(_) => {
160 return (
161 StatusCode::INTERNAL_SERVER_ERROR,
162 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
163 )
164 .into_response();
165 }
166 };
167
168 if let Some(swap_commit) = &input.swap_commit {
169 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
170 return (
171 StatusCode::CONFLICT,
172 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
173 )
174 .into_response();
175 }
176 }
177
178 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
179
180 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
181 Ok(Some(b)) => b,
182 _ => {
183 return (
184 StatusCode::INTERNAL_SERVER_ERROR,
185 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
186 )
187 .into_response()
188 }
189 };
190
191 let commit = match Commit::from_cbor(&commit_bytes) {
192 Ok(c) => c,
193 _ => {
194 return (
195 StatusCode::INTERNAL_SERVER_ERROR,
196 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
197 )
198 .into_response()
199 }
200 };
201
202 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
203
204 let mut results: Vec<WriteResult> = Vec::new();
205 let mut ops: Vec<RecordOp> = Vec::new();
206
207 for write in &input.writes {
208 match write {
209 WriteOp::Create {
210 collection,
211 rkey,
212 value,
213 } => {
214 let rkey = rkey
215 .clone()
216 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
217 let mut record_bytes = Vec::new();
218 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() {
219 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
220 }
221 let record_cid = match tracking_store.put(&record_bytes).await {
222 Ok(c) => c,
223 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to store record"}))).into_response(),
224 };
225
226 let collection_nsid = match collection.parse::<Nsid>() {
227 Ok(n) => n,
228 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
229 };
230 let key = format!("{}/{}", collection_nsid, rkey);
231 mst = match mst.add(&key, record_cid).await {
232 Ok(m) => m,
233 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(),
234 };
235
236 let uri = format!("at://{}/{}/{}", did, collection, rkey);
237 results.push(WriteResult::CreateResult {
238 uri,
239 cid: record_cid.to_string(),
240 });
241 ops.push(RecordOp::Create {
242 collection: collection.clone(),
243 rkey,
244 cid: record_cid,
245 });
246 }
247 WriteOp::Update {
248 collection,
249 rkey,
250 value,
251 } => {
252 let mut record_bytes = Vec::new();
253 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() {
254 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
255 }
256 let record_cid = match tracking_store.put(&record_bytes).await {
257 Ok(c) => c,
258 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to store record"}))).into_response(),
259 };
260
261 let collection_nsid = match collection.parse::<Nsid>() {
262 Ok(n) => n,
263 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
264 };
265 let key = format!("{}/{}", collection_nsid, rkey);
266 mst = match mst.update(&key, record_cid).await {
267 Ok(m) => m,
268 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update MST"}))).into_response(),
269 };
270
271 let uri = format!("at://{}/{}/{}", did, collection, rkey);
272 results.push(WriteResult::UpdateResult {
273 uri,
274 cid: record_cid.to_string(),
275 });
276 ops.push(RecordOp::Update {
277 collection: collection.clone(),
278 rkey: rkey.clone(),
279 cid: record_cid,
280 });
281 }
282 WriteOp::Delete { collection, rkey } => {
283 let collection_nsid = match collection.parse::<Nsid>() {
284 Ok(n) => n,
285 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
286 };
287 let key = format!("{}/{}", collection_nsid, rkey);
288 mst = match mst.delete(&key).await {
289 Ok(m) => m,
290 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(),
291 };
292
293 results.push(WriteResult::DeleteResult {});
294 ops.push(RecordOp::Delete {
295 collection: collection.clone(),
296 rkey: rkey.clone(),
297 });
298 }
299 }
300 }
301
302 let new_mst_root = match mst.persist().await {
303 Ok(c) => c,
304 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(),
305 };
306 let written_cids = tracking_store.get_written_cids();
307 let written_cids_str = written_cids
308 .iter()
309 .map(|c| c.to_string())
310 .collect::<Vec<_>>();
311
312 let commit_res = match commit_and_log(
313 &state,
314 &did,
315 user_id,
316 Some(current_root_cid),
317 new_mst_root,
318 ops,
319 &written_cids_str,
320 )
321 .await
322 {
323 Ok(res) => res,
324 Err(e) => {
325 error!("Commit failed: {}", e);
326 return (
327 StatusCode::INTERNAL_SERVER_ERROR,
328 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})),
329 )
330 .into_response();
331 }
332 };
333
334 (
335 StatusCode::OK,
336 Json(ApplyWritesOutput {
337 commit: CommitInfo {
338 cid: commit_res.commit_cid.to_string(),
339 rev: commit_res.rev,
340 },
341 results,
342 }),
343 )
344 .into_response()
345}