this repo has no description
1use super::validation::validate_record;
2use super::write::has_verified_comms_channel;
3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
4use crate::delegation::{self, DelegationActionType};
5use crate::repo::tracking::TrackingBlockStore;
6use crate::state::AppState;
7use axum::{
8 Json,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use cid::Cid;
14use jacquard::types::{
15 integer::LimitedU32,
16 string::{Nsid, Tid},
17};
18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21use std::str::FromStr;
22use std::sync::Arc;
23use tracing::{error, info};
24
25const MAX_BATCH_WRITES: usize = 200;
26
27#[derive(Deserialize)]
28#[serde(tag = "$type")]
29pub enum WriteOp {
30 #[serde(rename = "com.atproto.repo.applyWrites#create")]
31 Create {
32 collection: String,
33 rkey: Option<String>,
34 value: serde_json::Value,
35 },
36 #[serde(rename = "com.atproto.repo.applyWrites#update")]
37 Update {
38 collection: String,
39 rkey: String,
40 value: serde_json::Value,
41 },
42 #[serde(rename = "com.atproto.repo.applyWrites#delete")]
43 Delete { collection: String, rkey: String },
44}
45
46#[derive(Deserialize)]
47#[serde(rename_all = "camelCase")]
48pub struct ApplyWritesInput {
49 pub repo: String,
50 pub validate: Option<bool>,
51 pub writes: Vec<WriteOp>,
52 pub swap_commit: Option<String>,
53}
54
55#[derive(Serialize)]
56#[serde(tag = "$type")]
57pub enum WriteResult {
58 #[serde(rename = "com.atproto.repo.applyWrites#createResult")]
59 CreateResult { uri: String, cid: String },
60 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")]
61 UpdateResult { uri: String, cid: String },
62 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")]
63 DeleteResult {},
64}
65
66#[derive(Serialize)]
67pub struct ApplyWritesOutput {
68 pub commit: CommitInfo,
69 pub results: Vec<WriteResult>,
70}
71
72#[derive(Serialize)]
73pub struct CommitInfo {
74 pub cid: String,
75 pub rev: String,
76}
77
78pub async fn apply_writes(
79 State(state): State<AppState>,
80 headers: axum::http::HeaderMap,
81 Json(input): Json<ApplyWritesInput>,
82) -> Response {
83 info!(
84 "apply_writes called: repo={}, writes={}",
85 input.repo,
86 input.writes.len()
87 );
88 let token = match crate::auth::extract_bearer_token_from_header(
89 headers.get("Authorization").and_then(|h| h.to_str().ok()),
90 ) {
91 Some(t) => t,
92 None => {
93 return (
94 StatusCode::UNAUTHORIZED,
95 Json(json!({"error": "AuthenticationRequired"})),
96 )
97 .into_response();
98 }
99 };
100 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
101 Ok(user) => user,
102 Err(_) => {
103 return (
104 StatusCode::UNAUTHORIZED,
105 Json(json!({"error": "AuthenticationFailed"})),
106 )
107 .into_response();
108 }
109 };
110 let did = auth_user.did.clone();
111 let is_oauth = auth_user.is_oauth;
112 let scope = auth_user.scope;
113 let controller_did = auth_user.controller_did.clone();
114 if input.repo != did {
115 return (
116 StatusCode::FORBIDDEN,
117 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
118 )
119 .into_response();
120 }
121 let is_verified = has_verified_comms_channel(&state.db, &did)
122 .await
123 .unwrap_or(false);
124 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did)
125 .await
126 .unwrap_or(false);
127 if !is_verified && !is_delegated {
128 return (
129 StatusCode::FORBIDDEN,
130 Json(json!({
131 "error": "AccountNotVerified",
132 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
133 })),
134 )
135 .into_response();
136 }
137 if input.writes.is_empty() {
138 return (
139 StatusCode::BAD_REQUEST,
140 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})),
141 )
142 .into_response();
143 }
144 if input.writes.len() > MAX_BATCH_WRITES {
145 return (
146 StatusCode::BAD_REQUEST,
147 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})),
148 )
149 .into_response();
150 }
151
152 let has_custom_scope = scope
153 .as_ref()
154 .map(|s| s != "com.atproto.access")
155 .unwrap_or(false);
156 if is_oauth || has_custom_scope {
157 use std::collections::HashSet;
158 let create_collections: HashSet<&str> = input
159 .writes
160 .iter()
161 .filter_map(|w| {
162 if let WriteOp::Create { collection, .. } = w {
163 Some(collection.as_str())
164 } else {
165 None
166 }
167 })
168 .collect();
169 let update_collections: HashSet<&str> = input
170 .writes
171 .iter()
172 .filter_map(|w| {
173 if let WriteOp::Update { collection, .. } = w {
174 Some(collection.as_str())
175 } else {
176 None
177 }
178 })
179 .collect();
180 let delete_collections: HashSet<&str> = input
181 .writes
182 .iter()
183 .filter_map(|w| {
184 if let WriteOp::Delete { collection, .. } = w {
185 Some(collection.as_str())
186 } else {
187 None
188 }
189 })
190 .collect();
191
192 for collection in create_collections {
193 if let Err(e) = crate::auth::scope_check::check_repo_scope(
194 is_oauth,
195 scope.as_deref(),
196 crate::oauth::RepoAction::Create,
197 collection,
198 ) {
199 return e;
200 }
201 }
202 for collection in update_collections {
203 if let Err(e) = crate::auth::scope_check::check_repo_scope(
204 is_oauth,
205 scope.as_deref(),
206 crate::oauth::RepoAction::Update,
207 collection,
208 ) {
209 return e;
210 }
211 }
212 for collection in delete_collections {
213 if let Err(e) = crate::auth::scope_check::check_repo_scope(
214 is_oauth,
215 scope.as_deref(),
216 crate::oauth::RepoAction::Delete,
217 collection,
218 ) {
219 return e;
220 }
221 }
222 }
223
224 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
225 .fetch_optional(&state.db)
226 .await
227 {
228 Ok(Some(id)) => id,
229 _ => {
230 return (
231 StatusCode::INTERNAL_SERVER_ERROR,
232 Json(json!({"error": "InternalError", "message": "User not found"})),
233 )
234 .into_response();
235 }
236 };
237 let root_cid_str: String = match sqlx::query_scalar!(
238 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
239 user_id
240 )
241 .fetch_optional(&state.db)
242 .await
243 {
244 Ok(Some(cid_str)) => cid_str,
245 _ => {
246 return (
247 StatusCode::INTERNAL_SERVER_ERROR,
248 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
249 )
250 .into_response();
251 }
252 };
253 let current_root_cid = match Cid::from_str(&root_cid_str) {
254 Ok(c) => c,
255 Err(_) => {
256 return (
257 StatusCode::INTERNAL_SERVER_ERROR,
258 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
259 )
260 .into_response();
261 }
262 };
263 if let Some(swap_commit) = &input.swap_commit
264 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
265 {
266 return (
267 StatusCode::CONFLICT,
268 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
269 )
270 .into_response();
271 }
272 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
273 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
274 Ok(Some(b)) => b,
275 _ => {
276 return (
277 StatusCode::INTERNAL_SERVER_ERROR,
278 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
279 )
280 .into_response();
281 }
282 };
283 let commit = match Commit::from_cbor(&commit_bytes) {
284 Ok(c) => c,
285 _ => {
286 return (
287 StatusCode::INTERNAL_SERVER_ERROR,
288 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
289 )
290 .into_response();
291 }
292 };
293 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
294 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
295 let mut results: Vec<WriteResult> = Vec::new();
296 let mut ops: Vec<RecordOp> = Vec::new();
297 let mut modified_keys: Vec<String> = Vec::new();
298 let mut all_blob_cids: Vec<String> = Vec::new();
299 for write in &input.writes {
300 match write {
301 WriteOp::Create {
302 collection,
303 rkey,
304 value,
305 } => {
306 if input.validate.unwrap_or(true)
307 && let Err(err_response) = validate_record(value, collection)
308 {
309 return *err_response;
310 }
311 all_blob_cids.extend(extract_blob_cids(value));
312 let rkey = rkey
313 .clone()
314 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
315 let mut record_bytes = Vec::new();
316 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() {
317 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
318 }
319 let record_cid = match tracking_store.put(&record_bytes).await {
320 Ok(c) => c,
321 Err(_) => return (
322 StatusCode::INTERNAL_SERVER_ERROR,
323 Json(
324 json!({"error": "InternalError", "message": "Failed to store record"}),
325 ),
326 )
327 .into_response(),
328 };
329 let collection_nsid = match collection.parse::<Nsid>() {
330 Ok(n) => n,
331 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
332 };
333 let key = format!("{}/{}", collection_nsid, rkey);
334 modified_keys.push(key.clone());
335 mst = match mst.add(&key, record_cid).await {
336 Ok(m) => m,
337 Err(_) => return (
338 StatusCode::INTERNAL_SERVER_ERROR,
339 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
340 )
341 .into_response(),
342 };
343 let uri = format!("at://{}/{}/{}", did, collection, rkey);
344 results.push(WriteResult::CreateResult {
345 uri,
346 cid: record_cid.to_string(),
347 });
348 ops.push(RecordOp::Create {
349 collection: collection.clone(),
350 rkey,
351 cid: record_cid,
352 });
353 }
354 WriteOp::Update {
355 collection,
356 rkey,
357 value,
358 } => {
359 if input.validate.unwrap_or(true)
360 && let Err(err_response) = validate_record(value, collection)
361 {
362 return *err_response;
363 }
364 all_blob_cids.extend(extract_blob_cids(value));
365 let mut record_bytes = Vec::new();
366 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() {
367 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
368 }
369 let record_cid = match tracking_store.put(&record_bytes).await {
370 Ok(c) => c,
371 Err(_) => return (
372 StatusCode::INTERNAL_SERVER_ERROR,
373 Json(
374 json!({"error": "InternalError", "message": "Failed to store record"}),
375 ),
376 )
377 .into_response(),
378 };
379 let collection_nsid = match collection.parse::<Nsid>() {
380 Ok(n) => n,
381 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
382 };
383 let key = format!("{}/{}", collection_nsid, rkey);
384 modified_keys.push(key.clone());
385 let prev_record_cid = mst.get(&key).await.ok().flatten();
386 mst = match mst.update(&key, record_cid).await {
387 Ok(m) => m,
388 Err(_) => return (
389 StatusCode::INTERNAL_SERVER_ERROR,
390 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
391 )
392 .into_response(),
393 };
394 let uri = format!("at://{}/{}/{}", did, collection, rkey);
395 results.push(WriteResult::UpdateResult {
396 uri,
397 cid: record_cid.to_string(),
398 });
399 ops.push(RecordOp::Update {
400 collection: collection.clone(),
401 rkey: rkey.clone(),
402 cid: record_cid,
403 prev: prev_record_cid,
404 });
405 }
406 WriteOp::Delete { collection, rkey } => {
407 let collection_nsid = match collection.parse::<Nsid>() {
408 Ok(n) => n,
409 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
410 };
411 let key = format!("{}/{}", collection_nsid, rkey);
412 modified_keys.push(key.clone());
413 let prev_record_cid = mst.get(&key).await.ok().flatten();
414 mst = match mst.delete(&key).await {
415 Ok(m) => m,
416 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(),
417 };
418 results.push(WriteResult::DeleteResult {});
419 ops.push(RecordOp::Delete {
420 collection: collection.clone(),
421 rkey: rkey.clone(),
422 prev: prev_record_cid,
423 });
424 }
425 }
426 }
427 let new_mst_root = match mst.persist().await {
428 Ok(c) => c,
429 Err(_) => {
430 return (
431 StatusCode::INTERNAL_SERVER_ERROR,
432 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
433 )
434 .into_response();
435 }
436 };
437 let mut relevant_blocks = std::collections::BTreeMap::new();
438 for key in &modified_keys {
439 if mst
440 .blocks_for_path(key, &mut relevant_blocks)
441 .await
442 .is_err()
443 {
444 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
445 }
446 if original_mst
447 .blocks_for_path(key, &mut relevant_blocks)
448 .await
449 .is_err()
450 {
451 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
452 }
453 }
454 let mut written_cids = tracking_store.get_all_relevant_cids();
455 for cid in relevant_blocks.keys() {
456 if !written_cids.contains(cid) {
457 written_cids.push(*cid);
458 }
459 }
460 let written_cids_str = written_cids
461 .iter()
462 .map(|c| c.to_string())
463 .collect::<Vec<_>>();
464 let commit_res = match commit_and_log(
465 &state,
466 CommitParams {
467 did: &did,
468 user_id,
469 current_root_cid: Some(current_root_cid),
470 prev_data_cid: Some(commit.data),
471 new_mst_root,
472 ops,
473 blocks_cids: &written_cids_str,
474 blobs: &all_blob_cids,
475 },
476 )
477 .await
478 {
479 Ok(res) => res,
480 Err(e) => {
481 error!("Commit failed: {}", e);
482 return (
483 StatusCode::INTERNAL_SERVER_ERROR,
484 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})),
485 )
486 .into_response();
487 }
488 };
489
490 if let Some(ref controller) = controller_did {
491 let write_summary: Vec<serde_json::Value> = input
492 .writes
493 .iter()
494 .map(|w| match w {
495 WriteOp::Create {
496 collection, rkey, ..
497 } => json!({
498 "action": "create",
499 "collection": collection,
500 "rkey": rkey
501 }),
502 WriteOp::Update {
503 collection, rkey, ..
504 } => json!({
505 "action": "update",
506 "collection": collection,
507 "rkey": rkey
508 }),
509 WriteOp::Delete { collection, rkey } => json!({
510 "action": "delete",
511 "collection": collection,
512 "rkey": rkey
513 }),
514 })
515 .collect();
516
517 let _ = delegation::log_delegation_action(
518 &state.db,
519 &did,
520 controller,
521 Some(controller),
522 DelegationActionType::RepoWrite,
523 Some(json!({
524 "action": "apply_writes",
525 "count": input.writes.len(),
526 "writes": write_summary
527 })),
528 None,
529 None,
530 )
531 .await;
532 }
533
534 (
535 StatusCode::OK,
536 Json(ApplyWritesOutput {
537 commit: CommitInfo {
538 cid: commit_res.commit_cid.to_string(),
539 rev: commit_res.rev,
540 },
541 results,
542 }),
543 )
544 .into_response()
545}