this repo has no description
1use super::validation::validate_record;
2use super::write::has_verified_comms_channel;
3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log};
4use crate::delegation::{self, DelegationActionType};
5use crate::repo::tracking::TrackingBlockStore;
6use crate::state::AppState;
7use axum::{
8 Json,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use cid::Cid;
14use jacquard::types::{
15 integer::LimitedU32,
16 string::{Nsid, Tid},
17};
18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21use std::str::FromStr;
22use std::sync::Arc;
23use tracing::{error, info};
24
25const MAX_BATCH_WRITES: usize = 200;
26
27#[derive(Deserialize)]
28#[serde(tag = "$type")]
29pub enum WriteOp {
30 #[serde(rename = "com.atproto.repo.applyWrites#create")]
31 Create {
32 collection: String,
33 rkey: Option<String>,
34 value: serde_json::Value,
35 },
36 #[serde(rename = "com.atproto.repo.applyWrites#update")]
37 Update {
38 collection: String,
39 rkey: String,
40 value: serde_json::Value,
41 },
42 #[serde(rename = "com.atproto.repo.applyWrites#delete")]
43 Delete { collection: String, rkey: String },
44}
45
46#[derive(Deserialize)]
47#[serde(rename_all = "camelCase")]
48pub struct ApplyWritesInput {
49 pub repo: String,
50 pub validate: Option<bool>,
51 pub writes: Vec<WriteOp>,
52 pub swap_commit: Option<String>,
53}
54
55#[derive(Serialize)]
56#[serde(tag = "$type")]
57pub enum WriteResult {
58 #[serde(rename = "com.atproto.repo.applyWrites#createResult")]
59 CreateResult { uri: String, cid: String },
60 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")]
61 UpdateResult { uri: String, cid: String },
62 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")]
63 DeleteResult {},
64}
65
66#[derive(Serialize)]
67pub struct ApplyWritesOutput {
68 pub commit: CommitInfo,
69 pub results: Vec<WriteResult>,
70}
71
72#[derive(Serialize)]
73pub struct CommitInfo {
74 pub cid: String,
75 pub rev: String,
76}
77
78pub async fn apply_writes(
79 State(state): State<AppState>,
80 headers: axum::http::HeaderMap,
81 Json(input): Json<ApplyWritesInput>,
82) -> Response {
83 info!(
84 "apply_writes called: repo={}, writes={}",
85 input.repo,
86 input.writes.len()
87 );
88 let token = match crate::auth::extract_bearer_token_from_header(
89 headers.get("Authorization").and_then(|h| h.to_str().ok()),
90 ) {
91 Some(t) => t,
92 None => {
93 return (
94 StatusCode::UNAUTHORIZED,
95 Json(json!({"error": "AuthenticationRequired"})),
96 )
97 .into_response();
98 }
99 };
100 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
101 Ok(user) => user,
102 Err(_) => {
103 return (
104 StatusCode::UNAUTHORIZED,
105 Json(json!({"error": "AuthenticationFailed"})),
106 )
107 .into_response();
108 }
109 };
110 let did = auth_user.did.clone();
111 let is_oauth = auth_user.is_oauth;
112 let scope = auth_user.scope;
113 let controller_did = auth_user.controller_did.clone();
114 if input.repo != did {
115 return (
116 StatusCode::FORBIDDEN,
117 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
118 )
119 .into_response();
120 }
121 let is_verified = has_verified_comms_channel(&state.db, &did)
122 .await
123 .unwrap_or(false);
124 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did)
125 .await
126 .unwrap_or(false);
127 if !is_verified && !is_delegated {
128 return (
129 StatusCode::FORBIDDEN,
130 Json(json!({
131 "error": "AccountNotVerified",
132 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
133 })),
134 )
135 .into_response();
136 }
137 if input.writes.is_empty() {
138 return (
139 StatusCode::BAD_REQUEST,
140 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})),
141 )
142 .into_response();
143 }
144 if input.writes.len() > MAX_BATCH_WRITES {
145 return (
146 StatusCode::BAD_REQUEST,
147 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})),
148 )
149 .into_response();
150 }
151
152 let has_custom_scope = scope
153 .as_ref()
154 .map(|s| s != "com.atproto.access")
155 .unwrap_or(false);
156 if is_oauth || has_custom_scope {
157 use std::collections::HashSet;
158 let create_collections: HashSet<&str> = input
159 .writes
160 .iter()
161 .filter_map(|w| {
162 if let WriteOp::Create { collection, .. } = w {
163 Some(collection.as_str())
164 } else {
165 None
166 }
167 })
168 .collect();
169 let update_collections: HashSet<&str> = input
170 .writes
171 .iter()
172 .filter_map(|w| {
173 if let WriteOp::Update { collection, .. } = w {
174 Some(collection.as_str())
175 } else {
176 None
177 }
178 })
179 .collect();
180 let delete_collections: HashSet<&str> = input
181 .writes
182 .iter()
183 .filter_map(|w| {
184 if let WriteOp::Delete { collection, .. } = w {
185 Some(collection.as_str())
186 } else {
187 None
188 }
189 })
190 .collect();
191
192 for collection in create_collections {
193 if let Err(e) = crate::auth::scope_check::check_repo_scope(
194 is_oauth,
195 scope.as_deref(),
196 crate::oauth::RepoAction::Create,
197 collection,
198 ) {
199 return e;
200 }
201 }
202 for collection in update_collections {
203 if let Err(e) = crate::auth::scope_check::check_repo_scope(
204 is_oauth,
205 scope.as_deref(),
206 crate::oauth::RepoAction::Update,
207 collection,
208 ) {
209 return e;
210 }
211 }
212 for collection in delete_collections {
213 if let Err(e) = crate::auth::scope_check::check_repo_scope(
214 is_oauth,
215 scope.as_deref(),
216 crate::oauth::RepoAction::Delete,
217 collection,
218 ) {
219 return e;
220 }
221 }
222 }
223
224 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
225 .fetch_optional(&state.db)
226 .await
227 {
228 Ok(Some(id)) => id,
229 _ => {
230 return (
231 StatusCode::INTERNAL_SERVER_ERROR,
232 Json(json!({"error": "InternalError", "message": "User not found"})),
233 )
234 .into_response();
235 }
236 };
237 let root_cid_str: String = match sqlx::query_scalar!(
238 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
239 user_id
240 )
241 .fetch_optional(&state.db)
242 .await
243 {
244 Ok(Some(cid_str)) => cid_str,
245 _ => {
246 return (
247 StatusCode::INTERNAL_SERVER_ERROR,
248 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
249 )
250 .into_response();
251 }
252 };
253 let current_root_cid = match Cid::from_str(&root_cid_str) {
254 Ok(c) => c,
255 Err(_) => {
256 return (
257 StatusCode::INTERNAL_SERVER_ERROR,
258 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
259 )
260 .into_response();
261 }
262 };
263 if let Some(swap_commit) = &input.swap_commit
264 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
265 {
266 return (
267 StatusCode::CONFLICT,
268 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
269 )
270 .into_response();
271 }
272 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
273 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
274 Ok(Some(b)) => b,
275 _ => {
276 return (
277 StatusCode::INTERNAL_SERVER_ERROR,
278 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
279 )
280 .into_response();
281 }
282 };
283 let commit = match Commit::from_cbor(&commit_bytes) {
284 Ok(c) => c,
285 _ => {
286 return (
287 StatusCode::INTERNAL_SERVER_ERROR,
288 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
289 )
290 .into_response();
291 }
292 };
293 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
294 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
295 let mut results: Vec<WriteResult> = Vec::new();
296 let mut ops: Vec<RecordOp> = Vec::new();
297 let mut modified_keys: Vec<String> = Vec::new();
298 for write in &input.writes {
299 match write {
300 WriteOp::Create {
301 collection,
302 rkey,
303 value,
304 } => {
305 if input.validate.unwrap_or(true)
306 && let Err(err_response) = validate_record(value, collection)
307 {
308 return *err_response;
309 }
310 let rkey = rkey
311 .clone()
312 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
313 let mut record_bytes = Vec::new();
314 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() {
315 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
316 }
317 let record_cid = match tracking_store.put(&record_bytes).await {
318 Ok(c) => c,
319 Err(_) => return (
320 StatusCode::INTERNAL_SERVER_ERROR,
321 Json(
322 json!({"error": "InternalError", "message": "Failed to store record"}),
323 ),
324 )
325 .into_response(),
326 };
327 let collection_nsid = match collection.parse::<Nsid>() {
328 Ok(n) => n,
329 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
330 };
331 let key = format!("{}/{}", collection_nsid, rkey);
332 modified_keys.push(key.clone());
333 mst = match mst.add(&key, record_cid).await {
334 Ok(m) => m,
335 Err(_) => return (
336 StatusCode::INTERNAL_SERVER_ERROR,
337 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
338 )
339 .into_response(),
340 };
341 let uri = format!("at://{}/{}/{}", did, collection, rkey);
342 results.push(WriteResult::CreateResult {
343 uri,
344 cid: record_cid.to_string(),
345 });
346 ops.push(RecordOp::Create {
347 collection: collection.clone(),
348 rkey,
349 cid: record_cid,
350 });
351 }
352 WriteOp::Update {
353 collection,
354 rkey,
355 value,
356 } => {
357 if input.validate.unwrap_or(true)
358 && let Err(err_response) = validate_record(value, collection)
359 {
360 return *err_response;
361 }
362 let mut record_bytes = Vec::new();
363 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() {
364 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
365 }
366 let record_cid = match tracking_store.put(&record_bytes).await {
367 Ok(c) => c,
368 Err(_) => return (
369 StatusCode::INTERNAL_SERVER_ERROR,
370 Json(
371 json!({"error": "InternalError", "message": "Failed to store record"}),
372 ),
373 )
374 .into_response(),
375 };
376 let collection_nsid = match collection.parse::<Nsid>() {
377 Ok(n) => n,
378 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
379 };
380 let key = format!("{}/{}", collection_nsid, rkey);
381 modified_keys.push(key.clone());
382 let prev_record_cid = mst.get(&key).await.ok().flatten();
383 mst = match mst.update(&key, record_cid).await {
384 Ok(m) => m,
385 Err(_) => return (
386 StatusCode::INTERNAL_SERVER_ERROR,
387 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
388 )
389 .into_response(),
390 };
391 let uri = format!("at://{}/{}/{}", did, collection, rkey);
392 results.push(WriteResult::UpdateResult {
393 uri,
394 cid: record_cid.to_string(),
395 });
396 ops.push(RecordOp::Update {
397 collection: collection.clone(),
398 rkey: rkey.clone(),
399 cid: record_cid,
400 prev: prev_record_cid,
401 });
402 }
403 WriteOp::Delete { collection, rkey } => {
404 let collection_nsid = match collection.parse::<Nsid>() {
405 Ok(n) => n,
406 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
407 };
408 let key = format!("{}/{}", collection_nsid, rkey);
409 modified_keys.push(key.clone());
410 let prev_record_cid = mst.get(&key).await.ok().flatten();
411 mst = match mst.delete(&key).await {
412 Ok(m) => m,
413 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(),
414 };
415 results.push(WriteResult::DeleteResult {});
416 ops.push(RecordOp::Delete {
417 collection: collection.clone(),
418 rkey: rkey.clone(),
419 prev: prev_record_cid,
420 });
421 }
422 }
423 }
424 let new_mst_root = match mst.persist().await {
425 Ok(c) => c,
426 Err(_) => {
427 return (
428 StatusCode::INTERNAL_SERVER_ERROR,
429 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
430 )
431 .into_response();
432 }
433 };
434 let mut relevant_blocks = std::collections::BTreeMap::new();
435 for key in &modified_keys {
436 if mst
437 .blocks_for_path(key, &mut relevant_blocks)
438 .await
439 .is_err()
440 {
441 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
442 }
443 if original_mst
444 .blocks_for_path(key, &mut relevant_blocks)
445 .await
446 .is_err()
447 {
448 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
449 }
450 }
451 let mut written_cids = tracking_store.get_all_relevant_cids();
452 for cid in relevant_blocks.keys() {
453 if !written_cids.contains(cid) {
454 written_cids.push(*cid);
455 }
456 }
457 let written_cids_str = written_cids
458 .iter()
459 .map(|c| c.to_string())
460 .collect::<Vec<_>>();
461 let commit_res = match commit_and_log(
462 &state,
463 CommitParams {
464 did: &did,
465 user_id,
466 current_root_cid: Some(current_root_cid),
467 prev_data_cid: Some(commit.data),
468 new_mst_root,
469 ops,
470 blocks_cids: &written_cids_str,
471 },
472 )
473 .await
474 {
475 Ok(res) => res,
476 Err(e) => {
477 error!("Commit failed: {}", e);
478 return (
479 StatusCode::INTERNAL_SERVER_ERROR,
480 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})),
481 )
482 .into_response();
483 }
484 };
485
486 if let Some(ref controller) = controller_did {
487 let write_summary: Vec<serde_json::Value> = input
488 .writes
489 .iter()
490 .map(|w| match w {
491 WriteOp::Create {
492 collection, rkey, ..
493 } => json!({
494 "action": "create",
495 "collection": collection,
496 "rkey": rkey
497 }),
498 WriteOp::Update {
499 collection, rkey, ..
500 } => json!({
501 "action": "update",
502 "collection": collection,
503 "rkey": rkey
504 }),
505 WriteOp::Delete { collection, rkey } => json!({
506 "action": "delete",
507 "collection": collection,
508 "rkey": rkey
509 }),
510 })
511 .collect();
512
513 let _ = delegation::log_delegation_action(
514 &state.db,
515 &did,
516 controller,
517 Some(controller),
518 DelegationActionType::RepoWrite,
519 Some(json!({
520 "action": "apply_writes",
521 "count": input.writes.len(),
522 "writes": write_summary
523 })),
524 None,
525 None,
526 )
527 .await;
528 }
529
530 (
531 StatusCode::OK,
532 Json(ApplyWritesOutput {
533 commit: CommitInfo {
534 cid: commit_res.commit_cid.to_string(),
535 rev: commit_res.rev,
536 },
537 results,
538 }),
539 )
540 .into_response()
541}