this repo has no description
1use super::validation::validate_record_with_rkey;
2use super::write::has_verified_comms_channel;
3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
4use crate::delegation::{self, DelegationActionType};
5use crate::repo::tracking::TrackingBlockStore;
6use crate::state::AppState;
7use axum::{
8 Json,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use cid::Cid;
14use jacquard::types::{
15 integer::LimitedU32,
16 string::{Nsid, Tid},
17};
18use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
19use serde::{Deserialize, Serialize};
20use serde_json::json;
21use std::str::FromStr;
22use std::sync::Arc;
23use tracing::{error, info};
24
25const MAX_BATCH_WRITES: usize = 200;
26
27#[derive(Deserialize)]
28#[serde(tag = "$type")]
29pub enum WriteOp {
30 #[serde(rename = "com.atproto.repo.applyWrites#create")]
31 Create {
32 collection: String,
33 rkey: Option<String>,
34 value: serde_json::Value,
35 },
36 #[serde(rename = "com.atproto.repo.applyWrites#update")]
37 Update {
38 collection: String,
39 rkey: String,
40 value: serde_json::Value,
41 },
42 #[serde(rename = "com.atproto.repo.applyWrites#delete")]
43 Delete { collection: String, rkey: String },
44}
45
46#[derive(Deserialize)]
47#[serde(rename_all = "camelCase")]
48pub struct ApplyWritesInput {
49 pub repo: String,
50 pub validate: Option<bool>,
51 pub writes: Vec<WriteOp>,
52 pub swap_commit: Option<String>,
53}
54
55#[derive(Serialize)]
56#[serde(tag = "$type")]
57pub enum WriteResult {
58 #[serde(rename = "com.atproto.repo.applyWrites#createResult")]
59 CreateResult { uri: String, cid: String },
60 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")]
61 UpdateResult { uri: String, cid: String },
62 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")]
63 DeleteResult {},
64}
65
66#[derive(Serialize)]
67pub struct ApplyWritesOutput {
68 pub commit: CommitInfo,
69 pub results: Vec<WriteResult>,
70}
71
72#[derive(Serialize)]
73pub struct CommitInfo {
74 pub cid: String,
75 pub rev: String,
76}
77
78pub async fn apply_writes(
79 State(state): State<AppState>,
80 headers: axum::http::HeaderMap,
81 Json(input): Json<ApplyWritesInput>,
82) -> Response {
83 info!(
84 "apply_writes called: repo={}, writes={}",
85 input.repo,
86 input.writes.len()
87 );
88 let token = match crate::auth::extract_bearer_token_from_header(
89 headers.get("Authorization").and_then(|h| h.to_str().ok()),
90 ) {
91 Some(t) => t,
92 None => {
93 return (
94 StatusCode::UNAUTHORIZED,
95 Json(json!({"error": "AuthenticationRequired"})),
96 )
97 .into_response();
98 }
99 };
100 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
101 Ok(user) => user,
102 Err(_) => {
103 return (
104 StatusCode::UNAUTHORIZED,
105 Json(json!({"error": "AuthenticationFailed"})),
106 )
107 .into_response();
108 }
109 };
110 let did = auth_user.did.clone();
111 let is_oauth = auth_user.is_oauth;
112 let scope = auth_user.scope;
113 let controller_did = auth_user.controller_did.clone();
114 if input.repo != did {
115 return (
116 StatusCode::FORBIDDEN,
117 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
118 )
119 .into_response();
120 }
121 let is_verified = has_verified_comms_channel(&state.db, &did)
122 .await
123 .unwrap_or(false);
124 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did)
125 .await
126 .unwrap_or(false);
127 if !is_verified && !is_delegated {
128 return (
129 StatusCode::FORBIDDEN,
130 Json(json!({
131 "error": "AccountNotVerified",
132 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
133 })),
134 )
135 .into_response();
136 }
137 if input.writes.is_empty() {
138 return (
139 StatusCode::BAD_REQUEST,
140 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})),
141 )
142 .into_response();
143 }
144 if input.writes.len() > MAX_BATCH_WRITES {
145 return (
146 StatusCode::BAD_REQUEST,
147 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})),
148 )
149 .into_response();
150 }
151
152 let has_custom_scope = scope
153 .as_ref()
154 .map(|s| s != "com.atproto.access")
155 .unwrap_or(false);
156 if is_oauth || has_custom_scope {
157 use std::collections::HashSet;
158 let create_collections: HashSet<&str> = input
159 .writes
160 .iter()
161 .filter_map(|w| {
162 if let WriteOp::Create { collection, .. } = w {
163 Some(collection.as_str())
164 } else {
165 None
166 }
167 })
168 .collect();
169 let update_collections: HashSet<&str> = input
170 .writes
171 .iter()
172 .filter_map(|w| {
173 if let WriteOp::Update { collection, .. } = w {
174 Some(collection.as_str())
175 } else {
176 None
177 }
178 })
179 .collect();
180 let delete_collections: HashSet<&str> = input
181 .writes
182 .iter()
183 .filter_map(|w| {
184 if let WriteOp::Delete { collection, .. } = w {
185 Some(collection.as_str())
186 } else {
187 None
188 }
189 })
190 .collect();
191
192 for collection in create_collections {
193 if let Err(e) = crate::auth::scope_check::check_repo_scope(
194 is_oauth,
195 scope.as_deref(),
196 crate::oauth::RepoAction::Create,
197 collection,
198 ) {
199 return e;
200 }
201 }
202 for collection in update_collections {
203 if let Err(e) = crate::auth::scope_check::check_repo_scope(
204 is_oauth,
205 scope.as_deref(),
206 crate::oauth::RepoAction::Update,
207 collection,
208 ) {
209 return e;
210 }
211 }
212 for collection in delete_collections {
213 if let Err(e) = crate::auth::scope_check::check_repo_scope(
214 is_oauth,
215 scope.as_deref(),
216 crate::oauth::RepoAction::Delete,
217 collection,
218 ) {
219 return e;
220 }
221 }
222 }
223
224 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
225 .fetch_optional(&state.db)
226 .await
227 {
228 Ok(Some(id)) => id,
229 _ => {
230 return (
231 StatusCode::INTERNAL_SERVER_ERROR,
232 Json(json!({"error": "InternalError", "message": "User not found"})),
233 )
234 .into_response();
235 }
236 };
237 let root_cid_str: String = match sqlx::query_scalar!(
238 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
239 user_id
240 )
241 .fetch_optional(&state.db)
242 .await
243 {
244 Ok(Some(cid_str)) => cid_str,
245 _ => {
246 return (
247 StatusCode::INTERNAL_SERVER_ERROR,
248 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
249 )
250 .into_response();
251 }
252 };
253 let current_root_cid = match Cid::from_str(&root_cid_str) {
254 Ok(c) => c,
255 Err(_) => {
256 return (
257 StatusCode::INTERNAL_SERVER_ERROR,
258 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
259 )
260 .into_response();
261 }
262 };
263 if let Some(swap_commit) = &input.swap_commit
264 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
265 {
266 return (
267 StatusCode::CONFLICT,
268 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
269 )
270 .into_response();
271 }
272 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
273 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
274 Ok(Some(b)) => b,
275 _ => {
276 return (
277 StatusCode::INTERNAL_SERVER_ERROR,
278 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
279 )
280 .into_response();
281 }
282 };
283 let commit = match Commit::from_cbor(&commit_bytes) {
284 Ok(c) => c,
285 _ => {
286 return (
287 StatusCode::INTERNAL_SERVER_ERROR,
288 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
289 )
290 .into_response();
291 }
292 };
293 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
294 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
295 let mut results: Vec<WriteResult> = Vec::new();
296 let mut ops: Vec<RecordOp> = Vec::new();
297 let mut modified_keys: Vec<String> = Vec::new();
298 let mut all_blob_cids: Vec<String> = Vec::new();
299 for write in &input.writes {
300 match write {
301 WriteOp::Create {
302 collection,
303 rkey,
304 value,
305 } => {
306 if input.validate.unwrap_or(true)
307 && let Err(err_response) =
308 validate_record_with_rkey(value, collection, rkey.as_deref())
309 {
310 return *err_response;
311 }
312 all_blob_cids.extend(extract_blob_cids(value));
313 let rkey = rkey
314 .clone()
315 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
316 let mut record_bytes = Vec::new();
317 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() {
318 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
319 }
320 let record_cid = match tracking_store.put(&record_bytes).await {
321 Ok(c) => c,
322 Err(_) => return (
323 StatusCode::INTERNAL_SERVER_ERROR,
324 Json(
325 json!({"error": "InternalError", "message": "Failed to store record"}),
326 ),
327 )
328 .into_response(),
329 };
330 let collection_nsid = match collection.parse::<Nsid>() {
331 Ok(n) => n,
332 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
333 };
334 let key = format!("{}/{}", collection_nsid, rkey);
335 modified_keys.push(key.clone());
336 mst = match mst.add(&key, record_cid).await {
337 Ok(m) => m,
338 Err(_) => return (
339 StatusCode::INTERNAL_SERVER_ERROR,
340 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
341 )
342 .into_response(),
343 };
344 let uri = format!("at://{}/{}/{}", did, collection, rkey);
345 results.push(WriteResult::CreateResult {
346 uri,
347 cid: record_cid.to_string(),
348 });
349 ops.push(RecordOp::Create {
350 collection: collection.clone(),
351 rkey,
352 cid: record_cid,
353 });
354 }
355 WriteOp::Update {
356 collection,
357 rkey,
358 value,
359 } => {
360 if input.validate.unwrap_or(true)
361 && let Err(err_response) =
362 validate_record_with_rkey(value, collection, Some(rkey))
363 {
364 return *err_response;
365 }
366 all_blob_cids.extend(extract_blob_cids(value));
367 let mut record_bytes = Vec::new();
368 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() {
369 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
370 }
371 let record_cid = match tracking_store.put(&record_bytes).await {
372 Ok(c) => c,
373 Err(_) => return (
374 StatusCode::INTERNAL_SERVER_ERROR,
375 Json(
376 json!({"error": "InternalError", "message": "Failed to store record"}),
377 ),
378 )
379 .into_response(),
380 };
381 let collection_nsid = match collection.parse::<Nsid>() {
382 Ok(n) => n,
383 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
384 };
385 let key = format!("{}/{}", collection_nsid, rkey);
386 modified_keys.push(key.clone());
387 let prev_record_cid = mst.get(&key).await.ok().flatten();
388 mst = match mst.update(&key, record_cid).await {
389 Ok(m) => m,
390 Err(_) => return (
391 StatusCode::INTERNAL_SERVER_ERROR,
392 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
393 )
394 .into_response(),
395 };
396 let uri = format!("at://{}/{}/{}", did, collection, rkey);
397 results.push(WriteResult::UpdateResult {
398 uri,
399 cid: record_cid.to_string(),
400 });
401 ops.push(RecordOp::Update {
402 collection: collection.clone(),
403 rkey: rkey.clone(),
404 cid: record_cid,
405 prev: prev_record_cid,
406 });
407 }
408 WriteOp::Delete { collection, rkey } => {
409 let collection_nsid = match collection.parse::<Nsid>() {
410 Ok(n) => n,
411 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
412 };
413 let key = format!("{}/{}", collection_nsid, rkey);
414 modified_keys.push(key.clone());
415 let prev_record_cid = mst.get(&key).await.ok().flatten();
416 mst = match mst.delete(&key).await {
417 Ok(m) => m,
418 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(),
419 };
420 results.push(WriteResult::DeleteResult {});
421 ops.push(RecordOp::Delete {
422 collection: collection.clone(),
423 rkey: rkey.clone(),
424 prev: prev_record_cid,
425 });
426 }
427 }
428 }
429 let new_mst_root = match mst.persist().await {
430 Ok(c) => c,
431 Err(_) => {
432 return (
433 StatusCode::INTERNAL_SERVER_ERROR,
434 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
435 )
436 .into_response();
437 }
438 };
439 let mut relevant_blocks = std::collections::BTreeMap::new();
440 for key in &modified_keys {
441 if mst
442 .blocks_for_path(key, &mut relevant_blocks)
443 .await
444 .is_err()
445 {
446 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
447 }
448 if original_mst
449 .blocks_for_path(key, &mut relevant_blocks)
450 .await
451 .is_err()
452 {
453 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
454 }
455 }
456 let mut written_cids = tracking_store.get_all_relevant_cids();
457 for cid in relevant_blocks.keys() {
458 if !written_cids.contains(cid) {
459 written_cids.push(*cid);
460 }
461 }
462 let written_cids_str = written_cids
463 .iter()
464 .map(|c| c.to_string())
465 .collect::<Vec<_>>();
466 let commit_res = match commit_and_log(
467 &state,
468 CommitParams {
469 did: &did,
470 user_id,
471 current_root_cid: Some(current_root_cid),
472 prev_data_cid: Some(commit.data),
473 new_mst_root,
474 ops,
475 blocks_cids: &written_cids_str,
476 blobs: &all_blob_cids,
477 },
478 )
479 .await
480 {
481 Ok(res) => res,
482 Err(e) => {
483 error!("Commit failed: {}", e);
484 return (
485 StatusCode::INTERNAL_SERVER_ERROR,
486 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})),
487 )
488 .into_response();
489 }
490 };
491
492 if let Some(ref controller) = controller_did {
493 let write_summary: Vec<serde_json::Value> = input
494 .writes
495 .iter()
496 .map(|w| match w {
497 WriteOp::Create {
498 collection, rkey, ..
499 } => json!({
500 "action": "create",
501 "collection": collection,
502 "rkey": rkey
503 }),
504 WriteOp::Update {
505 collection, rkey, ..
506 } => json!({
507 "action": "update",
508 "collection": collection,
509 "rkey": rkey
510 }),
511 WriteOp::Delete { collection, rkey } => json!({
512 "action": "delete",
513 "collection": collection,
514 "rkey": rkey
515 }),
516 })
517 .collect();
518
519 let _ = delegation::log_delegation_action(
520 &state.db,
521 &did,
522 controller,
523 Some(controller),
524 DelegationActionType::RepoWrite,
525 Some(json!({
526 "action": "apply_writes",
527 "count": input.writes.len(),
528 "writes": write_summary
529 })),
530 None,
531 None,
532 )
533 .await;
534 }
535
536 (
537 StatusCode::OK,
538 Json(ApplyWritesOutput {
539 commit: CommitInfo {
540 cid: commit_res.commit_cid.to_string(),
541 rev: commit_res.rev,
542 },
543 results,
544 }),
545 )
546 .into_response()
547}