this repo has no description
1use super::validation::validate_record_with_status;
2use super::write::has_verified_comms_channel;
3use crate::api::error::ApiError;
4use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
5use crate::delegation::{self, DelegationActionType};
6use crate::repo::tracking::TrackingBlockStore;
7use crate::state::AppState;
8use crate::types::{AtIdentifier, AtUri, Nsid, Rkey};
9use axum::{
10 Json,
11 extract::State,
12 http::StatusCode,
13 response::{IntoResponse, Response},
14};
15use cid::Cid;
16use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::{error, info};
22
23const MAX_BATCH_WRITES: usize = 200;
24
25#[derive(Deserialize)]
26#[serde(tag = "$type")]
27pub enum WriteOp {
28 #[serde(rename = "com.atproto.repo.applyWrites#create")]
29 Create {
30 collection: Nsid,
31 rkey: Option<Rkey>,
32 value: serde_json::Value,
33 },
34 #[serde(rename = "com.atproto.repo.applyWrites#update")]
35 Update {
36 collection: Nsid,
37 rkey: Rkey,
38 value: serde_json::Value,
39 },
40 #[serde(rename = "com.atproto.repo.applyWrites#delete")]
41 Delete { collection: Nsid, rkey: Rkey },
42}
43
44#[derive(Deserialize)]
45#[serde(rename_all = "camelCase")]
46pub struct ApplyWritesInput {
47 pub repo: AtIdentifier,
48 pub validate: Option<bool>,
49 pub writes: Vec<WriteOp>,
50 pub swap_commit: Option<String>,
51}
52
53#[derive(Serialize)]
54#[serde(tag = "$type")]
55pub enum WriteResult {
56 #[serde(rename = "com.atproto.repo.applyWrites#createResult")]
57 CreateResult {
58 uri: AtUri,
59 cid: String,
60 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")]
61 validation_status: Option<String>,
62 },
63 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")]
64 UpdateResult {
65 uri: AtUri,
66 cid: String,
67 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")]
68 validation_status: Option<String>,
69 },
70 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")]
71 DeleteResult {},
72}
73
74#[derive(Serialize)]
75pub struct ApplyWritesOutput {
76 pub commit: CommitInfo,
77 pub results: Vec<WriteResult>,
78}
79
80#[derive(Serialize)]
81pub struct CommitInfo {
82 pub cid: String,
83 pub rev: String,
84}
85
86pub async fn apply_writes(
87 State(state): State<AppState>,
88 headers: axum::http::HeaderMap,
89 Json(input): Json<ApplyWritesInput>,
90) -> Response {
91 info!(
92 "apply_writes called: repo={}, writes={}",
93 input.repo,
94 input.writes.len()
95 );
96 let Some(token) = crate::auth::extract_bearer_token_from_header(
97 headers.get("Authorization").and_then(|h| h.to_str().ok()),
98 ) else {
99 return ApiError::AuthenticationRequired.into_response();
100 };
101 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
102 Ok(user) => user,
103 Err(_) => return ApiError::AuthenticationFailed(None).into_response(),
104 };
105 let did = auth_user.did.clone();
106 let is_oauth = auth_user.is_oauth;
107 let scope = auth_user.scope;
108 let controller_did = auth_user.controller_did.clone();
109 if input.repo.as_str() != did {
110 return ApiError::InvalidRepo("Repo does not match authenticated user".into())
111 .into_response();
112 }
113 if crate::util::is_account_migrated(&state.db, &did)
114 .await
115 .unwrap_or(false)
116 {
117 return ApiError::AccountMigrated.into_response();
118 }
119 let is_verified = has_verified_comms_channel(&state.db, &did)
120 .await
121 .unwrap_or(false);
122 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did)
123 .await
124 .unwrap_or(false);
125 if !is_verified && !is_delegated {
126 return ApiError::AccountNotVerified.into_response();
127 }
128 if input.writes.is_empty() {
129 return ApiError::InvalidRequest("writes array is empty".into()).into_response();
130 }
131 if input.writes.len() > MAX_BATCH_WRITES {
132 return ApiError::InvalidRequest(format!("Too many writes (max {})", MAX_BATCH_WRITES))
133 .into_response();
134 }
135
136 let has_custom_scope = scope
137 .as_ref()
138 .map(|s| s != "com.atproto.access")
139 .unwrap_or(false);
140 if is_oauth || has_custom_scope {
141 use std::collections::HashSet;
142 let create_collections: HashSet<&Nsid> = input
143 .writes
144 .iter()
145 .filter_map(|w| {
146 if let WriteOp::Create { collection, .. } = w {
147 Some(collection)
148 } else {
149 None
150 }
151 })
152 .collect();
153 let update_collections: HashSet<&Nsid> = input
154 .writes
155 .iter()
156 .filter_map(|w| {
157 if let WriteOp::Update { collection, .. } = w {
158 Some(collection)
159 } else {
160 None
161 }
162 })
163 .collect();
164 let delete_collections: HashSet<&Nsid> = input
165 .writes
166 .iter()
167 .filter_map(|w| {
168 if let WriteOp::Delete { collection, .. } = w {
169 Some(collection)
170 } else {
171 None
172 }
173 })
174 .collect();
175
176 for collection in create_collections {
177 if let Err(e) = crate::auth::scope_check::check_repo_scope(
178 is_oauth,
179 scope.as_deref(),
180 crate::oauth::RepoAction::Create,
181 collection,
182 ) {
183 return e;
184 }
185 }
186 for collection in update_collections {
187 if let Err(e) = crate::auth::scope_check::check_repo_scope(
188 is_oauth,
189 scope.as_deref(),
190 crate::oauth::RepoAction::Update,
191 collection,
192 ) {
193 return e;
194 }
195 }
196 for collection in delete_collections {
197 if let Err(e) = crate::auth::scope_check::check_repo_scope(
198 is_oauth,
199 scope.as_deref(),
200 crate::oauth::RepoAction::Delete,
201 collection,
202 ) {
203 return e;
204 }
205 }
206 }
207
208 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did.as_str())
209 .fetch_optional(&state.db)
210 .await
211 {
212 Ok(Some(id)) => id,
213 _ => return ApiError::InternalError(Some("User not found".into())).into_response(),
214 };
215 let root_cid_str: String = match sqlx::query_scalar!(
216 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
217 user_id
218 )
219 .fetch_optional(&state.db)
220 .await
221 {
222 Ok(Some(cid_str)) => cid_str,
223 _ => return ApiError::InternalError(Some("Repo root not found".into())).into_response(),
224 };
225 let current_root_cid = match Cid::from_str(&root_cid_str) {
226 Ok(c) => c,
227 Err(_) => {
228 return ApiError::InternalError(Some("Invalid repo root CID".into())).into_response()
229 }
230 };
231 if let Some(swap_commit) = &input.swap_commit
232 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
233 {
234 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
235 }
236 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
237 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
238 Ok(Some(b)) => b,
239 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
240 };
241 let commit = match Commit::from_cbor(&commit_bytes) {
242 Ok(c) => c,
243 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
244 };
245 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
246 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
247 let mut results: Vec<WriteResult> = Vec::new();
248 let mut ops: Vec<RecordOp> = Vec::new();
249 let mut modified_keys: Vec<String> = Vec::new();
250 let mut all_blob_cids: Vec<String> = Vec::new();
251 for write in &input.writes {
252 match write {
253 WriteOp::Create {
254 collection,
255 rkey,
256 value,
257 } => {
258 let validation_status = if input.validate == Some(false) {
259 None
260 } else {
261 let require_lexicon = input.validate == Some(true);
262 match validate_record_with_status(
263 value,
264 collection,
265 rkey.as_ref().map(|r| r.as_str()),
266 require_lexicon,
267 ) {
268 Ok(status) => Some(status),
269 Err(err_response) => return *err_response,
270 }
271 };
272 all_blob_cids.extend(extract_blob_cids(value));
273 let rkey = rkey.clone().unwrap_or_else(Rkey::generate);
274 let record_ipld = crate::util::json_to_ipld(value);
275 let mut record_bytes = Vec::new();
276 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
277 return ApiError::InvalidRecord("Failed to serialize record".into())
278 .into_response();
279 }
280 let record_cid = match tracking_store.put(&record_bytes).await {
281 Ok(c) => c,
282 Err(_) => {
283 return ApiError::InternalError(Some("Failed to store record".into()))
284 .into_response()
285 }
286 };
287 let key = format!("{}/{}", collection, rkey);
288 modified_keys.push(key.clone());
289 mst = match mst.add(&key, record_cid).await {
290 Ok(m) => m,
291 Err(_) => {
292 return ApiError::InternalError(Some("Failed to add to MST".into()))
293 .into_response()
294 }
295 };
296 let uri = AtUri::from_parts(&did, collection, &rkey);
297 results.push(WriteResult::CreateResult {
298 uri,
299 cid: record_cid.to_string(),
300 validation_status: validation_status.map(|s| s.to_string()),
301 });
302 ops.push(RecordOp::Create {
303 collection: collection.to_string(),
304 rkey: rkey.to_string(),
305 cid: record_cid,
306 });
307 }
308 WriteOp::Update {
309 collection,
310 rkey,
311 value,
312 } => {
313 let validation_status = if input.validate == Some(false) {
314 None
315 } else {
316 let require_lexicon = input.validate == Some(true);
317 match validate_record_with_status(
318 value,
319 collection,
320 Some(rkey.as_str()),
321 require_lexicon,
322 ) {
323 Ok(status) => Some(status),
324 Err(err_response) => return *err_response,
325 }
326 };
327 all_blob_cids.extend(extract_blob_cids(value));
328 let record_ipld = crate::util::json_to_ipld(value);
329 let mut record_bytes = Vec::new();
330 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
331 return ApiError::InvalidRecord("Failed to serialize record".into())
332 .into_response();
333 }
334 let record_cid = match tracking_store.put(&record_bytes).await {
335 Ok(c) => c,
336 Err(_) => {
337 return ApiError::InternalError(Some("Failed to store record".into()))
338 .into_response()
339 }
340 };
341 let key = format!("{}/{}", collection, rkey);
342 modified_keys.push(key.clone());
343 let prev_record_cid = mst.get(&key).await.ok().flatten();
344 mst = match mst.update(&key, record_cid).await {
345 Ok(m) => m,
346 Err(_) => {
347 return ApiError::InternalError(Some("Failed to update MST".into()))
348 .into_response()
349 }
350 };
351 let uri = AtUri::from_parts(&did, collection, rkey);
352 results.push(WriteResult::UpdateResult {
353 uri,
354 cid: record_cid.to_string(),
355 validation_status: validation_status.map(|s| s.to_string()),
356 });
357 ops.push(RecordOp::Update {
358 collection: collection.to_string(),
359 rkey: rkey.to_string(),
360 cid: record_cid,
361 prev: prev_record_cid,
362 });
363 }
364 WriteOp::Delete { collection, rkey } => {
365 let key = format!("{}/{}", collection, rkey);
366 modified_keys.push(key.clone());
367 let prev_record_cid = mst.get(&key).await.ok().flatten();
368 mst = match mst.delete(&key).await {
369 Ok(m) => m,
370 Err(_) => {
371 return ApiError::InternalError(Some("Failed to delete from MST".into()))
372 .into_response()
373 }
374 };
375 results.push(WriteResult::DeleteResult {});
376 ops.push(RecordOp::Delete {
377 collection: collection.to_string(),
378 rkey: rkey.to_string(),
379 prev: prev_record_cid,
380 });
381 }
382 }
383 }
384 let new_mst_root = match mst.persist().await {
385 Ok(c) => c,
386 Err(_) => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(),
387 };
388 let mut relevant_blocks = std::collections::BTreeMap::new();
389 for key in &modified_keys {
390 if mst
391 .blocks_for_path(key, &mut relevant_blocks)
392 .await
393 .is_err()
394 {
395 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
396 .into_response();
397 }
398 if original_mst
399 .blocks_for_path(key, &mut relevant_blocks)
400 .await
401 .is_err()
402 {
403 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
404 .into_response();
405 }
406 }
407 let mut written_cids = tracking_store.get_all_relevant_cids();
408 for cid in relevant_blocks.keys() {
409 if !written_cids.contains(cid) {
410 written_cids.push(*cid);
411 }
412 }
413 let written_cids_str = written_cids
414 .iter()
415 .map(|c| c.to_string())
416 .collect::<Vec<_>>();
417 let commit_res = match commit_and_log(
418 &state,
419 CommitParams {
420 did: &did,
421 user_id,
422 current_root_cid: Some(current_root_cid),
423 prev_data_cid: Some(commit.data),
424 new_mst_root,
425 ops,
426 blocks_cids: &written_cids_str,
427 blobs: &all_blob_cids,
428 },
429 )
430 .await
431 {
432 Ok(res) => res,
433 Err(e) => {
434 error!("Commit failed: {}", e);
435 return ApiError::InternalError(Some("Failed to commit changes".into())).into_response();
436 }
437 };
438
439 if let Some(ref controller) = controller_did {
440 let write_summary: Vec<serde_json::Value> = input
441 .writes
442 .iter()
443 .map(|w| match w {
444 WriteOp::Create {
445 collection, rkey, ..
446 } => json!({
447 "action": "create",
448 "collection": collection,
449 "rkey": rkey
450 }),
451 WriteOp::Update {
452 collection, rkey, ..
453 } => json!({
454 "action": "update",
455 "collection": collection,
456 "rkey": rkey
457 }),
458 WriteOp::Delete { collection, rkey } => json!({
459 "action": "delete",
460 "collection": collection,
461 "rkey": rkey
462 }),
463 })
464 .collect();
465
466 let _ = delegation::log_delegation_action(
467 &state.db,
468 &did,
469 controller,
470 Some(controller),
471 DelegationActionType::RepoWrite,
472 Some(json!({
473 "action": "apply_writes",
474 "count": input.writes.len(),
475 "writes": write_summary
476 })),
477 None,
478 None,
479 )
480 .await;
481 }
482
483 (
484 StatusCode::OK,
485 Json(ApplyWritesOutput {
486 commit: CommitInfo {
487 cid: commit_res.commit_cid.to_string(),
488 rev: commit_res.rev,
489 },
490 results,
491 }),
492 )
493 .into_response()
494}