this repo has no description
1use super::validation::validate_record_with_status;
2use super::write::has_verified_comms_channel;
3use crate::api::error::ApiError;
4use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
5use crate::delegation::{self, DelegationActionType};
6use crate::repo::tracking::TrackingBlockStore;
7use crate::state::AppState;
8use crate::types::{AtIdentifier, AtUri, Nsid, Rkey};
9use axum::{
10 Json,
11 extract::State,
12 http::StatusCode,
13 response::{IntoResponse, Response},
14};
15use cid::Cid;
16use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::{error, info};
22
23const MAX_BATCH_WRITES: usize = 200;
24
25#[derive(Deserialize)]
26#[serde(tag = "$type")]
27pub enum WriteOp {
28 #[serde(rename = "com.atproto.repo.applyWrites#create")]
29 Create {
30 collection: Nsid,
31 rkey: Option<Rkey>,
32 value: serde_json::Value,
33 },
34 #[serde(rename = "com.atproto.repo.applyWrites#update")]
35 Update {
36 collection: Nsid,
37 rkey: Rkey,
38 value: serde_json::Value,
39 },
40 #[serde(rename = "com.atproto.repo.applyWrites#delete")]
41 Delete { collection: Nsid, rkey: Rkey },
42}
43
44#[derive(Deserialize)]
45#[serde(rename_all = "camelCase")]
46pub struct ApplyWritesInput {
47 pub repo: AtIdentifier,
48 pub validate: Option<bool>,
49 pub writes: Vec<WriteOp>,
50 pub swap_commit: Option<String>,
51}
52
53#[derive(Serialize)]
54#[serde(tag = "$type")]
55pub enum WriteResult {
56 #[serde(rename = "com.atproto.repo.applyWrites#createResult")]
57 CreateResult {
58 uri: AtUri,
59 cid: String,
60 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")]
61 validation_status: Option<String>,
62 },
63 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")]
64 UpdateResult {
65 uri: AtUri,
66 cid: String,
67 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")]
68 validation_status: Option<String>,
69 },
70 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")]
71 DeleteResult {},
72}
73
74#[derive(Serialize)]
75pub struct ApplyWritesOutput {
76 pub commit: CommitInfo,
77 pub results: Vec<WriteResult>,
78}
79
80#[derive(Serialize)]
81pub struct CommitInfo {
82 pub cid: String,
83 pub rev: String,
84}
85
86pub async fn apply_writes(
87 State(state): State<AppState>,
88 headers: axum::http::HeaderMap,
89 Json(input): Json<ApplyWritesInput>,
90) -> Response {
91 info!(
92 "apply_writes called: repo={}, writes={}",
93 input.repo,
94 input.writes.len()
95 );
96 let Some(token) = crate::auth::extract_bearer_token_from_header(
97 headers.get("Authorization").and_then(|h| h.to_str().ok()),
98 ) else {
99 return ApiError::AuthenticationRequired.into_response();
100 };
101 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
102 Ok(user) => user,
103 Err(_) => return ApiError::AuthenticationFailed(None).into_response(),
104 };
105 let did = auth_user.did.clone();
106 let is_oauth = auth_user.is_oauth;
107 let scope = auth_user.scope;
108 let controller_did = auth_user.controller_did.clone();
109 if input.repo.as_str() != did {
110 return ApiError::InvalidRepo("Repo does not match authenticated user".into())
111 .into_response();
112 }
113 if crate::util::is_account_migrated(&state.db, &did)
114 .await
115 .unwrap_or(false)
116 {
117 return ApiError::AccountMigrated.into_response();
118 }
119 let is_verified = has_verified_comms_channel(&state.db, &did)
120 .await
121 .unwrap_or(false);
122 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did)
123 .await
124 .unwrap_or(false);
125 if !is_verified && !is_delegated {
126 return ApiError::AccountNotVerified.into_response();
127 }
128 if input.writes.is_empty() {
129 return ApiError::InvalidRequest("writes array is empty".into()).into_response();
130 }
131 if input.writes.len() > MAX_BATCH_WRITES {
132 return ApiError::InvalidRequest(format!("Too many writes (max {})", MAX_BATCH_WRITES))
133 .into_response();
134 }
135
136 let has_custom_scope = scope
137 .as_ref()
138 .map(|s| s != "com.atproto.access")
139 .unwrap_or(false);
140 if is_oauth || has_custom_scope {
141 use std::collections::HashSet;
142 let create_collections: HashSet<&Nsid> = input
143 .writes
144 .iter()
145 .filter_map(|w| {
146 if let WriteOp::Create { collection, .. } = w {
147 Some(collection)
148 } else {
149 None
150 }
151 })
152 .collect();
153 let update_collections: HashSet<&Nsid> = input
154 .writes
155 .iter()
156 .filter_map(|w| {
157 if let WriteOp::Update { collection, .. } = w {
158 Some(collection)
159 } else {
160 None
161 }
162 })
163 .collect();
164 let delete_collections: HashSet<&Nsid> = input
165 .writes
166 .iter()
167 .filter_map(|w| {
168 if let WriteOp::Delete { collection, .. } = w {
169 Some(collection)
170 } else {
171 None
172 }
173 })
174 .collect();
175
176 for collection in create_collections {
177 if let Err(e) = crate::auth::scope_check::check_repo_scope(
178 is_oauth,
179 scope.as_deref(),
180 crate::oauth::RepoAction::Create,
181 collection,
182 ) {
183 return e;
184 }
185 }
186 for collection in update_collections {
187 if let Err(e) = crate::auth::scope_check::check_repo_scope(
188 is_oauth,
189 scope.as_deref(),
190 crate::oauth::RepoAction::Update,
191 collection,
192 ) {
193 return e;
194 }
195 }
196 for collection in delete_collections {
197 if let Err(e) = crate::auth::scope_check::check_repo_scope(
198 is_oauth,
199 scope.as_deref(),
200 crate::oauth::RepoAction::Delete,
201 collection,
202 ) {
203 return e;
204 }
205 }
206 }
207
208 let user_id: uuid::Uuid =
209 match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
210 .fetch_optional(&state.db)
211 .await
212 {
213 Ok(Some(id)) => id,
214 _ => return ApiError::InternalError(Some("User not found".into())).into_response(),
215 };
216 let root_cid_str: String = match sqlx::query_scalar!(
217 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
218 user_id
219 )
220 .fetch_optional(&state.db)
221 .await
222 {
223 Ok(Some(cid_str)) => cid_str,
224 _ => return ApiError::InternalError(Some("Repo root not found".into())).into_response(),
225 };
226 let current_root_cid = match Cid::from_str(&root_cid_str) {
227 Ok(c) => c,
228 Err(_) => {
229 return ApiError::InternalError(Some("Invalid repo root CID".into())).into_response();
230 }
231 };
232 if let Some(swap_commit) = &input.swap_commit
233 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
234 {
235 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
236 }
237 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
238 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
239 Ok(Some(b)) => b,
240 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
241 };
242 let commit = match Commit::from_cbor(&commit_bytes) {
243 Ok(c) => c,
244 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
245 };
246 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
247 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
248 let mut results: Vec<WriteResult> = Vec::new();
249 let mut ops: Vec<RecordOp> = Vec::new();
250 let mut modified_keys: Vec<String> = Vec::new();
251 let mut all_blob_cids: Vec<String> = Vec::new();
252 for write in &input.writes {
253 match write {
254 WriteOp::Create {
255 collection,
256 rkey,
257 value,
258 } => {
259 let validation_status = if input.validate == Some(false) {
260 None
261 } else {
262 let require_lexicon = input.validate == Some(true);
263 match validate_record_with_status(
264 value,
265 collection,
266 rkey.as_ref().map(|r| r.as_str()),
267 require_lexicon,
268 ) {
269 Ok(status) => Some(status),
270 Err(err_response) => return *err_response,
271 }
272 };
273 all_blob_cids.extend(extract_blob_cids(value));
274 let rkey = rkey.clone().unwrap_or_else(Rkey::generate);
275 let record_ipld = crate::util::json_to_ipld(value);
276 let mut record_bytes = Vec::new();
277 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
278 return ApiError::InvalidRecord("Failed to serialize record".into())
279 .into_response();
280 }
281 let record_cid = match tracking_store.put(&record_bytes).await {
282 Ok(c) => c,
283 Err(_) => {
284 return ApiError::InternalError(Some("Failed to store record".into()))
285 .into_response();
286 }
287 };
288 let key = format!("{}/{}", collection, rkey);
289 modified_keys.push(key.clone());
290 mst = match mst.add(&key, record_cid).await {
291 Ok(m) => m,
292 Err(_) => {
293 return ApiError::InternalError(Some("Failed to add to MST".into()))
294 .into_response();
295 }
296 };
297 let uri = AtUri::from_parts(&did, collection, &rkey);
298 results.push(WriteResult::CreateResult {
299 uri,
300 cid: record_cid.to_string(),
301 validation_status: validation_status.map(|s| s.to_string()),
302 });
303 ops.push(RecordOp::Create {
304 collection: collection.to_string(),
305 rkey: rkey.to_string(),
306 cid: record_cid,
307 });
308 }
309 WriteOp::Update {
310 collection,
311 rkey,
312 value,
313 } => {
314 let validation_status = if input.validate == Some(false) {
315 None
316 } else {
317 let require_lexicon = input.validate == Some(true);
318 match validate_record_with_status(
319 value,
320 collection,
321 Some(rkey.as_str()),
322 require_lexicon,
323 ) {
324 Ok(status) => Some(status),
325 Err(err_response) => return *err_response,
326 }
327 };
328 all_blob_cids.extend(extract_blob_cids(value));
329 let record_ipld = crate::util::json_to_ipld(value);
330 let mut record_bytes = Vec::new();
331 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
332 return ApiError::InvalidRecord("Failed to serialize record".into())
333 .into_response();
334 }
335 let record_cid = match tracking_store.put(&record_bytes).await {
336 Ok(c) => c,
337 Err(_) => {
338 return ApiError::InternalError(Some("Failed to store record".into()))
339 .into_response();
340 }
341 };
342 let key = format!("{}/{}", collection, rkey);
343 modified_keys.push(key.clone());
344 let prev_record_cid = mst.get(&key).await.ok().flatten();
345 mst = match mst.update(&key, record_cid).await {
346 Ok(m) => m,
347 Err(_) => {
348 return ApiError::InternalError(Some("Failed to update MST".into()))
349 .into_response();
350 }
351 };
352 let uri = AtUri::from_parts(&did, collection, rkey);
353 results.push(WriteResult::UpdateResult {
354 uri,
355 cid: record_cid.to_string(),
356 validation_status: validation_status.map(|s| s.to_string()),
357 });
358 ops.push(RecordOp::Update {
359 collection: collection.to_string(),
360 rkey: rkey.to_string(),
361 cid: record_cid,
362 prev: prev_record_cid,
363 });
364 }
365 WriteOp::Delete { collection, rkey } => {
366 let key = format!("{}/{}", collection, rkey);
367 modified_keys.push(key.clone());
368 let prev_record_cid = mst.get(&key).await.ok().flatten();
369 mst = match mst.delete(&key).await {
370 Ok(m) => m,
371 Err(_) => {
372 return ApiError::InternalError(Some("Failed to delete from MST".into()))
373 .into_response();
374 }
375 };
376 results.push(WriteResult::DeleteResult {});
377 ops.push(RecordOp::Delete {
378 collection: collection.to_string(),
379 rkey: rkey.to_string(),
380 prev: prev_record_cid,
381 });
382 }
383 }
384 }
385 let new_mst_root = match mst.persist().await {
386 Ok(c) => c,
387 Err(_) => {
388 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response();
389 }
390 };
391 let mut new_mst_blocks = std::collections::BTreeMap::new();
392 let mut old_mst_blocks = std::collections::BTreeMap::new();
393 for key in &modified_keys {
394 if mst.blocks_for_path(key, &mut new_mst_blocks).await.is_err() {
395 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
396 .into_response();
397 }
398 if original_mst
399 .blocks_for_path(key, &mut old_mst_blocks)
400 .await
401 .is_err()
402 {
403 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
404 .into_response();
405 }
406 }
407 let mut relevant_blocks = new_mst_blocks.clone();
408 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
409 let written_cids: Vec<Cid> = tracking_store
410 .get_all_relevant_cids()
411 .into_iter()
412 .chain(relevant_blocks.keys().copied())
413 .collect::<std::collections::HashSet<_>>()
414 .into_iter()
415 .collect();
416 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
417 let prev_record_cids = ops.iter().filter_map(|op| match op {
418 RecordOp::Update {
419 prev: Some(cid), ..
420 }
421 | RecordOp::Delete {
422 prev: Some(cid), ..
423 } => Some(*cid),
424 _ => None,
425 });
426 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
427 .chain(
428 old_mst_blocks
429 .keys()
430 .filter(|cid| !new_mst_blocks.contains_key(*cid))
431 .copied(),
432 )
433 .chain(prev_record_cids)
434 .collect::<std::collections::HashSet<_>>()
435 .into_iter()
436 .collect();
437 let commit_res = match commit_and_log(
438 &state,
439 CommitParams {
440 did: &did,
441 user_id,
442 current_root_cid: Some(current_root_cid),
443 prev_data_cid: Some(commit.data),
444 new_mst_root,
445 ops,
446 blocks_cids: &written_cids_str,
447 blobs: &all_blob_cids,
448 obsolete_cids,
449 },
450 )
451 .await
452 {
453 Ok(res) => res,
454 Err(e) => {
455 error!("Commit failed: {}", e);
456 return ApiError::InternalError(Some("Failed to commit changes".into()))
457 .into_response();
458 }
459 };
460
461 if let Some(ref controller) = controller_did {
462 let write_summary: Vec<serde_json::Value> = input
463 .writes
464 .iter()
465 .map(|w| match w {
466 WriteOp::Create {
467 collection, rkey, ..
468 } => json!({
469 "action": "create",
470 "collection": collection,
471 "rkey": rkey
472 }),
473 WriteOp::Update {
474 collection, rkey, ..
475 } => json!({
476 "action": "update",
477 "collection": collection,
478 "rkey": rkey
479 }),
480 WriteOp::Delete { collection, rkey } => json!({
481 "action": "delete",
482 "collection": collection,
483 "rkey": rkey
484 }),
485 })
486 .collect();
487
488 let _ = delegation::log_delegation_action(
489 &state.db,
490 &did,
491 controller,
492 Some(controller),
493 DelegationActionType::RepoWrite,
494 Some(json!({
495 "action": "apply_writes",
496 "count": input.writes.len(),
497 "writes": write_summary
498 })),
499 None,
500 None,
501 )
502 .await;
503 }
504
505 (
506 StatusCode::OK,
507 Json(ApplyWritesOutput {
508 commit: CommitInfo {
509 cid: commit_res.commit_cid.to_string(),
510 rev: commit_res.rev,
511 },
512 results,
513 }),
514 )
515 .into_response()
516}