this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::State,
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use chrono::Utc;
9use cid::Cid;
10use jacquard::types::{
11 did::Did,
12 integer::LimitedU32,
13 string::{Nsid, Tid},
14};
15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use sqlx::Row;
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::error;
22
23#[derive(Deserialize)]
24#[serde(tag = "$type")]
25pub enum WriteOp {
26 #[serde(rename = "com.atproto.repo.applyWrites#create")]
27 Create {
28 collection: String,
29 rkey: Option<String>,
30 value: serde_json::Value,
31 },
32 #[serde(rename = "com.atproto.repo.applyWrites#update")]
33 Update {
34 collection: String,
35 rkey: String,
36 value: serde_json::Value,
37 },
38 #[serde(rename = "com.atproto.repo.applyWrites#delete")]
39 Delete { collection: String, rkey: String },
40}
41
42#[derive(Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub struct ApplyWritesInput {
45 pub repo: String,
46 pub validate: Option<bool>,
47 pub writes: Vec<WriteOp>,
48 pub swap_commit: Option<String>,
49}
50
51#[derive(Serialize)]
52#[serde(tag = "$type")]
53pub enum WriteResult {
54 #[serde(rename = "com.atproto.repo.applyWrites#createResult")]
55 CreateResult { uri: String, cid: String },
56 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")]
57 UpdateResult { uri: String, cid: String },
58 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")]
59 DeleteResult {},
60}
61
62#[derive(Serialize)]
63pub struct ApplyWritesOutput {
64 pub commit: CommitInfo,
65 pub results: Vec<WriteResult>,
66}
67
68#[derive(Serialize)]
69pub struct CommitInfo {
70 pub cid: String,
71 pub rev: String,
72}
73
74pub async fn apply_writes(
75 State(state): State<AppState>,
76 headers: axum::http::HeaderMap,
77 Json(input): Json<ApplyWritesInput>,
78) -> Response {
79 let auth_header = headers.get("Authorization");
80 if auth_header.is_none() {
81 return (
82 StatusCode::UNAUTHORIZED,
83 Json(json!({"error": "AuthenticationRequired"})),
84 )
85 .into_response();
86 }
87 let token = auth_header
88 .unwrap()
89 .to_str()
90 .unwrap_or("")
91 .replace("Bearer ", "");
92
93 let session = sqlx::query(
94 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
95 )
96 .bind(&token)
97 .fetch_optional(&state.db)
98 .await
99 .unwrap_or(None);
100
101 let (did, key_bytes) = match session {
102 Some(row) => (
103 row.get::<String, _>("did"),
104 row.get::<Vec<u8>, _>("key_bytes"),
105 ),
106 None => {
107 return (
108 StatusCode::UNAUTHORIZED,
109 Json(json!({"error": "AuthenticationFailed"})),
110 )
111 .into_response();
112 }
113 };
114
115 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
116 return (
117 StatusCode::UNAUTHORIZED,
118 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
119 )
120 .into_response();
121 }
122
123 if input.repo != did {
124 return (
125 StatusCode::FORBIDDEN,
126 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
127 )
128 .into_response();
129 }
130
131 if input.writes.is_empty() {
132 return (
133 StatusCode::BAD_REQUEST,
134 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})),
135 )
136 .into_response();
137 }
138
139 if input.writes.len() > 200 {
140 return (
141 StatusCode::BAD_REQUEST,
142 Json(json!({"error": "InvalidRequest", "message": "Too many writes (max 200)"})),
143 )
144 .into_response();
145 }
146
147 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
148 .bind(&did)
149 .fetch_optional(&state.db)
150 .await;
151
152 let user_id: uuid::Uuid = match user_query {
153 Ok(Some(row)) => row.get("id"),
154 _ => {
155 return (
156 StatusCode::INTERNAL_SERVER_ERROR,
157 Json(json!({"error": "InternalError", "message": "User not found"})),
158 )
159 .into_response();
160 }
161 };
162
163 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
164 .bind(user_id)
165 .fetch_optional(&state.db)
166 .await;
167
168 let current_root_cid = match repo_root_query {
169 Ok(Some(row)) => {
170 let cid_str: String = row.get("repo_root_cid");
171 match Cid::from_str(&cid_str) {
172 Ok(c) => c,
173 Err(_) => {
174 return (
175 StatusCode::INTERNAL_SERVER_ERROR,
176 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
177 )
178 .into_response();
179 }
180 }
181 }
182 _ => {
183 return (
184 StatusCode::INTERNAL_SERVER_ERROR,
185 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
186 )
187 .into_response();
188 }
189 };
190
191 if let Some(swap_commit) = &input.swap_commit {
192 let swap_cid = match Cid::from_str(swap_commit) {
193 Ok(c) => c,
194 Err(_) => {
195 return (
196 StatusCode::BAD_REQUEST,
197 Json(json!({"error": "InvalidSwap", "message": "Invalid swapCommit CID"})),
198 )
199 .into_response();
200 }
201 };
202 if swap_cid != current_root_cid {
203 return (
204 StatusCode::CONFLICT,
205 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
206 )
207 .into_response();
208 }
209 }
210
211 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
212 Ok(Some(b)) => b,
213 Ok(None) => {
214 return (
215 StatusCode::INTERNAL_SERVER_ERROR,
216 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
217 )
218 .into_response();
219 }
220 Err(e) => {
221 error!("Failed to load commit block: {:?}", e);
222 return (
223 StatusCode::INTERNAL_SERVER_ERROR,
224 Json(json!({"error": "InternalError"})),
225 )
226 .into_response();
227 }
228 };
229
230 let commit = match Commit::from_cbor(&commit_bytes) {
231 Ok(c) => c,
232 Err(e) => {
233 error!("Failed to parse commit: {:?}", e);
234 return (
235 StatusCode::INTERNAL_SERVER_ERROR,
236 Json(json!({"error": "InternalError"})),
237 )
238 .into_response();
239 }
240 };
241
242 let mst_root = commit.data;
243 let store = Arc::new(state.block_store.clone());
244 let mut mst = Mst::load(store.clone(), mst_root, None);
245
246 let mut results: Vec<WriteResult> = Vec::new();
247 let mut record_ops: Vec<(String, String, Option<String>)> = Vec::new();
248
249 for write in &input.writes {
250 match write {
251 WriteOp::Create {
252 collection,
253 rkey,
254 value,
255 } => {
256 let collection_nsid = match collection.parse::<Nsid>() {
257 Ok(n) => n,
258 Err(_) => {
259 return (
260 StatusCode::BAD_REQUEST,
261 Json(json!({"error": "InvalidCollection"})),
262 )
263 .into_response();
264 }
265 };
266
267 let rkey = rkey
268 .clone()
269 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
270
271 let mut record_bytes = Vec::new();
272 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, value) {
273 error!("Error serializing record: {:?}", e);
274 return (
275 StatusCode::BAD_REQUEST,
276 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
277 )
278 .into_response();
279 }
280
281 let record_cid = match state.block_store.put(&record_bytes).await {
282 Ok(c) => c,
283 Err(e) => {
284 error!("Failed to save record block: {:?}", e);
285 return (
286 StatusCode::INTERNAL_SERVER_ERROR,
287 Json(json!({"error": "InternalError"})),
288 )
289 .into_response();
290 }
291 };
292
293 let key = format!("{}/{}", collection_nsid, rkey);
294 mst = match mst.add(&key, record_cid).await {
295 Ok(m) => m,
296 Err(e) => {
297 error!("Failed to add to MST: {:?}", e);
298 return (
299 StatusCode::INTERNAL_SERVER_ERROR,
300 Json(json!({"error": "InternalError"})),
301 )
302 .into_response();
303 }
304 };
305
306 let uri = format!("at://{}/{}/{}", did, collection, rkey);
307 results.push(WriteResult::CreateResult {
308 uri: uri.clone(),
309 cid: record_cid.to_string(),
310 });
311 record_ops.push((collection.clone(), rkey, Some(record_cid.to_string())));
312 }
313 WriteOp::Update {
314 collection,
315 rkey,
316 value,
317 } => {
318 let collection_nsid = match collection.parse::<Nsid>() {
319 Ok(n) => n,
320 Err(_) => {
321 return (
322 StatusCode::BAD_REQUEST,
323 Json(json!({"error": "InvalidCollection"})),
324 )
325 .into_response();
326 }
327 };
328
329 let mut record_bytes = Vec::new();
330 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, value) {
331 error!("Error serializing record: {:?}", e);
332 return (
333 StatusCode::BAD_REQUEST,
334 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
335 )
336 .into_response();
337 }
338
339 let record_cid = match state.block_store.put(&record_bytes).await {
340 Ok(c) => c,
341 Err(e) => {
342 error!("Failed to save record block: {:?}", e);
343 return (
344 StatusCode::INTERNAL_SERVER_ERROR,
345 Json(json!({"error": "InternalError"})),
346 )
347 .into_response();
348 }
349 };
350
351 let key = format!("{}/{}", collection_nsid, rkey);
352 mst = match mst.update(&key, record_cid).await {
353 Ok(m) => m,
354 Err(e) => {
355 error!("Failed to update MST: {:?}", e);
356 return (
357 StatusCode::INTERNAL_SERVER_ERROR,
358 Json(json!({"error": "InternalError"})),
359 )
360 .into_response();
361 }
362 };
363
364 let uri = format!("at://{}/{}/{}", did, collection, rkey);
365 results.push(WriteResult::UpdateResult {
366 uri: uri.clone(),
367 cid: record_cid.to_string(),
368 });
369 record_ops.push((collection.clone(), rkey.clone(), Some(record_cid.to_string())));
370 }
371 WriteOp::Delete { collection, rkey } => {
372 let collection_nsid = match collection.parse::<Nsid>() {
373 Ok(n) => n,
374 Err(_) => {
375 return (
376 StatusCode::BAD_REQUEST,
377 Json(json!({"error": "InvalidCollection"})),
378 )
379 .into_response();
380 }
381 };
382
383 let key = format!("{}/{}", collection_nsid, rkey);
384 mst = match mst.delete(&key).await {
385 Ok(m) => m,
386 Err(e) => {
387 error!("Failed to delete from MST: {:?}", e);
388 return (
389 StatusCode::INTERNAL_SERVER_ERROR,
390 Json(json!({"error": "InternalError"})),
391 )
392 .into_response();
393 }
394 };
395
396 results.push(WriteResult::DeleteResult {});
397 record_ops.push((collection.clone(), rkey.clone(), None));
398 }
399 }
400 }
401
402 let new_mst_root = match mst.persist().await {
403 Ok(c) => c,
404 Err(e) => {
405 error!("Failed to persist MST: {:?}", e);
406 return (
407 StatusCode::INTERNAL_SERVER_ERROR,
408 Json(json!({"error": "InternalError"})),
409 )
410 .into_response();
411 }
412 };
413
414 let did_obj = match Did::new(&did) {
415 Ok(d) => d,
416 Err(_) => {
417 return (
418 StatusCode::INTERNAL_SERVER_ERROR,
419 Json(json!({"error": "InternalError", "message": "Invalid DID"})),
420 )
421 .into_response();
422 }
423 };
424
425 let rev = Tid::now(LimitedU32::MIN);
426 let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), Some(current_root_cid));
427
428 let new_commit_bytes = match new_commit.to_cbor() {
429 Ok(b) => b,
430 Err(e) => {
431 error!("Failed to serialize new commit: {:?}", e);
432 return (
433 StatusCode::INTERNAL_SERVER_ERROR,
434 Json(json!({"error": "InternalError"})),
435 )
436 .into_response();
437 }
438 };
439
440 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
441 Ok(c) => c,
442 Err(e) => {
443 error!("Failed to save new commit: {:?}", e);
444 return (
445 StatusCode::INTERNAL_SERVER_ERROR,
446 Json(json!({"error": "InternalError"})),
447 )
448 .into_response();
449 }
450 };
451
452 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
453 .bind(new_root_cid.to_string())
454 .bind(user_id)
455 .execute(&state.db)
456 .await;
457
458 if let Err(e) = update_repo {
459 error!("Failed to update repo root in DB: {:?}", e);
460 return (
461 StatusCode::INTERNAL_SERVER_ERROR,
462 Json(json!({"error": "InternalError"})),
463 )
464 .into_response();
465 }
466
467 for (collection, rkey, record_cid) in record_ops {
468 match record_cid {
469 Some(cid) => {
470 let _ = sqlx::query(
471 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
472 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
473 )
474 .bind(user_id)
475 .bind(&collection)
476 .bind(&rkey)
477 .bind(&cid)
478 .execute(&state.db)
479 .await;
480 }
481 None => {
482 let _ = sqlx::query(
483 "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
484 )
485 .bind(user_id)
486 .bind(&collection)
487 .bind(&rkey)
488 .execute(&state.db)
489 .await;
490 }
491 }
492 }
493
494 (
495 StatusCode::OK,
496 Json(ApplyWritesOutput {
497 commit: CommitInfo {
498 cid: new_root_cid.to_string(),
499 rev: rev.to_string(),
500 },
501 results,
502 }),
503 )
504 .into_response()
505}