this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::State,
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use chrono::Utc;
9use cid::Cid;
10use jacquard::types::{
11 did::Did,
12 integer::LimitedU32,
13 string::{Nsid, Tid},
14};
15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use std::str::FromStr;
19use std::sync::Arc;
20use tracing::error;
21
22#[derive(Deserialize)]
23#[serde(tag = "$type")]
24pub enum WriteOp {
25 #[serde(rename = "com.atproto.repo.applyWrites#create")]
26 Create {
27 collection: String,
28 rkey: Option<String>,
29 value: serde_json::Value,
30 },
31 #[serde(rename = "com.atproto.repo.applyWrites#update")]
32 Update {
33 collection: String,
34 rkey: String,
35 value: serde_json::Value,
36 },
37 #[serde(rename = "com.atproto.repo.applyWrites#delete")]
38 Delete { collection: String, rkey: String },
39}
40
41#[derive(Deserialize)]
42#[serde(rename_all = "camelCase")]
43pub struct ApplyWritesInput {
44 pub repo: String,
45 pub validate: Option<bool>,
46 pub writes: Vec<WriteOp>,
47 pub swap_commit: Option<String>,
48}
49
50#[derive(Serialize)]
51#[serde(tag = "$type")]
52pub enum WriteResult {
53 #[serde(rename = "com.atproto.repo.applyWrites#createResult")]
54 CreateResult { uri: String, cid: String },
55 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")]
56 UpdateResult { uri: String, cid: String },
57 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")]
58 DeleteResult {},
59}
60
61#[derive(Serialize)]
62pub struct ApplyWritesOutput {
63 pub commit: CommitInfo,
64 pub results: Vec<WriteResult>,
65}
66
67#[derive(Serialize)]
68pub struct CommitInfo {
69 pub cid: String,
70 pub rev: String,
71}
72
73pub async fn apply_writes(
74 State(state): State<AppState>,
75 headers: axum::http::HeaderMap,
76 Json(input): Json<ApplyWritesInput>,
77) -> Response {
78 let auth_header = headers.get("Authorization");
79 if auth_header.is_none() {
80 return (
81 StatusCode::UNAUTHORIZED,
82 Json(json!({"error": "AuthenticationRequired"})),
83 )
84 .into_response();
85 }
86 let token = auth_header
87 .unwrap()
88 .to_str()
89 .unwrap_or("")
90 .replace("Bearer ", "");
91
92 let session = sqlx::query!(
93 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1",
94 token
95 )
96 .fetch_optional(&state.db)
97 .await
98 .unwrap_or(None);
99
100 let (did, key_bytes) = match session {
101 Some(row) => (
102 row.did,
103 row.key_bytes,
104 ),
105 None => {
106 return (
107 StatusCode::UNAUTHORIZED,
108 Json(json!({"error": "AuthenticationFailed"})),
109 )
110 .into_response();
111 }
112 };
113
114 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
115 return (
116 StatusCode::UNAUTHORIZED,
117 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
118 )
119 .into_response();
120 }
121
122 if input.repo != did {
123 return (
124 StatusCode::FORBIDDEN,
125 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
126 )
127 .into_response();
128 }
129
130 if input.writes.is_empty() {
131 return (
132 StatusCode::BAD_REQUEST,
133 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})),
134 )
135 .into_response();
136 }
137
138 if input.writes.len() > 200 {
139 return (
140 StatusCode::BAD_REQUEST,
141 Json(json!({"error": "InvalidRequest", "message": "Too many writes (max 200)"})),
142 )
143 .into_response();
144 }
145
146 let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
147 .fetch_optional(&state.db)
148 .await;
149
150 let user_id: uuid::Uuid = match user_query {
151 Ok(Some(row)) => row.id,
152 _ => {
153 return (
154 StatusCode::INTERNAL_SERVER_ERROR,
155 Json(json!({"error": "InternalError", "message": "User not found"})),
156 )
157 .into_response();
158 }
159 };
160
161 let repo_root_query = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
162 .fetch_optional(&state.db)
163 .await;
164
165 let current_root_cid = match repo_root_query {
166 Ok(Some(row)) => {
167 let cid_str: String = row.repo_root_cid;
168 match Cid::from_str(&cid_str) {
169 Ok(c) => c,
170 Err(_) => {
171 return (
172 StatusCode::INTERNAL_SERVER_ERROR,
173 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
174 )
175 .into_response();
176 }
177 }
178 }
179 _ => {
180 return (
181 StatusCode::INTERNAL_SERVER_ERROR,
182 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
183 )
184 .into_response();
185 }
186 };
187
188 if let Some(swap_commit) = &input.swap_commit {
189 let swap_cid = match Cid::from_str(swap_commit) {
190 Ok(c) => c,
191 Err(_) => {
192 return (
193 StatusCode::BAD_REQUEST,
194 Json(json!({"error": "InvalidSwap", "message": "Invalid swapCommit CID"})),
195 )
196 .into_response();
197 }
198 };
199 if swap_cid != current_root_cid {
200 return (
201 StatusCode::CONFLICT,
202 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
203 )
204 .into_response();
205 }
206 }
207
208 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
209 Ok(Some(b)) => b,
210 Ok(None) => {
211 return (
212 StatusCode::INTERNAL_SERVER_ERROR,
213 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
214 )
215 .into_response();
216 }
217 Err(e) => {
218 error!("Failed to load commit block: {:?}", e);
219 return (
220 StatusCode::INTERNAL_SERVER_ERROR,
221 Json(json!({"error": "InternalError"})),
222 )
223 .into_response();
224 }
225 };
226
227 let commit = match Commit::from_cbor(&commit_bytes) {
228 Ok(c) => c,
229 Err(e) => {
230 error!("Failed to parse commit: {:?}", e);
231 return (
232 StatusCode::INTERNAL_SERVER_ERROR,
233 Json(json!({"error": "InternalError"})),
234 )
235 .into_response();
236 }
237 };
238
239 let mst_root = commit.data;
240 let store = Arc::new(state.block_store.clone());
241 let mut mst = Mst::load(store.clone(), mst_root, None);
242
243 let mut results: Vec<WriteResult> = Vec::new();
244 let mut record_ops: Vec<(String, String, Option<String>)> = Vec::new();
245
246 for write in &input.writes {
247 match write {
248 WriteOp::Create {
249 collection,
250 rkey,
251 value,
252 } => {
253 let collection_nsid = match collection.parse::<Nsid>() {
254 Ok(n) => n,
255 Err(_) => {
256 return (
257 StatusCode::BAD_REQUEST,
258 Json(json!({"error": "InvalidCollection"})),
259 )
260 .into_response();
261 }
262 };
263
264 let rkey = rkey
265 .clone()
266 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
267
268 let mut record_bytes = Vec::new();
269 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, value) {
270 error!("Error serializing record: {:?}", e);
271 return (
272 StatusCode::BAD_REQUEST,
273 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
274 )
275 .into_response();
276 }
277
278 let record_cid = match state.block_store.put(&record_bytes).await {
279 Ok(c) => c,
280 Err(e) => {
281 error!("Failed to save record block: {:?}", e);
282 return (
283 StatusCode::INTERNAL_SERVER_ERROR,
284 Json(json!({"error": "InternalError"})),
285 )
286 .into_response();
287 }
288 };
289
290 let key = format!("{}/{}", collection_nsid, rkey);
291 mst = match mst.add(&key, record_cid).await {
292 Ok(m) => m,
293 Err(e) => {
294 error!("Failed to add to MST: {:?}", e);
295 return (
296 StatusCode::INTERNAL_SERVER_ERROR,
297 Json(json!({"error": "InternalError"})),
298 )
299 .into_response();
300 }
301 };
302
303 let uri = format!("at://{}/{}/{}", did, collection, rkey);
304 results.push(WriteResult::CreateResult {
305 uri: uri.clone(),
306 cid: record_cid.to_string(),
307 });
308 record_ops.push((collection.clone(), rkey, Some(record_cid.to_string())));
309 }
310 WriteOp::Update {
311 collection,
312 rkey,
313 value,
314 } => {
315 let collection_nsid = match collection.parse::<Nsid>() {
316 Ok(n) => n,
317 Err(_) => {
318 return (
319 StatusCode::BAD_REQUEST,
320 Json(json!({"error": "InvalidCollection"})),
321 )
322 .into_response();
323 }
324 };
325
326 let mut record_bytes = Vec::new();
327 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, value) {
328 error!("Error serializing record: {:?}", e);
329 return (
330 StatusCode::BAD_REQUEST,
331 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
332 )
333 .into_response();
334 }
335
336 let record_cid = match state.block_store.put(&record_bytes).await {
337 Ok(c) => c,
338 Err(e) => {
339 error!("Failed to save record block: {:?}", e);
340 return (
341 StatusCode::INTERNAL_SERVER_ERROR,
342 Json(json!({"error": "InternalError"})),
343 )
344 .into_response();
345 }
346 };
347
348 let key = format!("{}/{}", collection_nsid, rkey);
349 mst = match mst.update(&key, record_cid).await {
350 Ok(m) => m,
351 Err(e) => {
352 error!("Failed to update MST: {:?}", e);
353 return (
354 StatusCode::INTERNAL_SERVER_ERROR,
355 Json(json!({"error": "InternalError"})),
356 )
357 .into_response();
358 }
359 };
360
361 let uri = format!("at://{}/{}/{}", did, collection, rkey);
362 results.push(WriteResult::UpdateResult {
363 uri: uri.clone(),
364 cid: record_cid.to_string(),
365 });
366 record_ops.push((collection.clone(), rkey.clone(), Some(record_cid.to_string())));
367 }
368 WriteOp::Delete { collection, rkey } => {
369 let collection_nsid = match collection.parse::<Nsid>() {
370 Ok(n) => n,
371 Err(_) => {
372 return (
373 StatusCode::BAD_REQUEST,
374 Json(json!({"error": "InvalidCollection"})),
375 )
376 .into_response();
377 }
378 };
379
380 let key = format!("{}/{}", collection_nsid, rkey);
381 mst = match mst.delete(&key).await {
382 Ok(m) => m,
383 Err(e) => {
384 error!("Failed to delete from MST: {:?}", e);
385 return (
386 StatusCode::INTERNAL_SERVER_ERROR,
387 Json(json!({"error": "InternalError"})),
388 )
389 .into_response();
390 }
391 };
392
393 results.push(WriteResult::DeleteResult {});
394 record_ops.push((collection.clone(), rkey.clone(), None));
395 }
396 }
397 }
398
399 let new_mst_root = match mst.persist().await {
400 Ok(c) => c,
401 Err(e) => {
402 error!("Failed to persist MST: {:?}", e);
403 return (
404 StatusCode::INTERNAL_SERVER_ERROR,
405 Json(json!({"error": "InternalError"})),
406 )
407 .into_response();
408 }
409 };
410
411 let did_obj = match Did::new(&did) {
412 Ok(d) => d,
413 Err(_) => {
414 return (
415 StatusCode::INTERNAL_SERVER_ERROR,
416 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
417 )
418 .into_response();
419 }
420 };
421
422 let rev = Tid::now(LimitedU32::MIN);
423 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), Some(current_root_cid));
424
425 let new_commit_bytes = match new_commit.to_cbor() {
426 Ok(b) => b,
427 Err(e) => {
428 error!("Failed to serialize new commit: {:?}", e);
429 return (
430 StatusCode::INTERNAL_SERVER_ERROR,
431 Json(json!({"error": "InternalError"})),
432 )
433 .into_response();
434 }
435 };
436
437 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
438 Ok(c) => c,
439 Err(e) => {
440 error!("Failed to save new commit: {:?}", e);
441 return (
442 StatusCode::INTERNAL_SERVER_ERROR,
443 Json(json!({"error": "InternalError"})),
444 )
445 .into_response();
446 }
447 };
448
449 let update_repo = sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id)
450 .execute(&state.db)
451 .await;
452
453 if let Err(e) = update_repo {
454 error!("Failed to update repo root in DB: {:?}", e);
455 return (
456 StatusCode::INTERNAL_SERVER_ERROR,
457 Json(json!({"error": "InternalError"})),
458 )
459 .into_response();
460 }
461
462 for (collection, rkey, record_cid) in record_ops {
463 match record_cid {
464 Some(cid) => {
465 let _ = sqlx::query!(
466 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
467 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
468 user_id,
469 collection,
470 rkey,
471 cid
472 )
473 .execute(&state.db)
474 .await;
475 }
476 None => {
477 let _ = sqlx::query!(
478 "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
479 user_id,
480 collection,
481 rkey
482 )
483 .execute(&state.db)
484 .await;
485 }
486 }
487 }
488
489 (
490 StatusCode::OK,
491 Json(ApplyWritesOutput {
492 commit: CommitInfo {
493 cid: new_root_cid.to_string(),
494 rev: rev.to_string(),
495 },
496 results,
497 }),
498 )
499 .into_response()
500}