this repo has no description
1use super::validation::validate_record_with_status;
2use super::write::has_verified_comms_channel;
3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
4use crate::delegation::{self, DelegationActionType};
5use crate::repo::tracking::TrackingBlockStore;
6use crate::state::AppState;
7use crate::validation::ValidationStatus;
8use axum::{
9 Json,
10 extract::State,
11 http::StatusCode,
12 response::{IntoResponse, Response},
13};
14use cid::Cid;
15use jacquard::types::{
16 integer::LimitedU32,
17 string::{Nsid, Tid},
18};
19use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
20use serde::{Deserialize, Serialize};
21use serde_json::json;
22use std::str::FromStr;
23use std::sync::Arc;
24use tracing::{error, info};
25
26const MAX_BATCH_WRITES: usize = 200;
27
28#[derive(Deserialize)]
29#[serde(tag = "$type")]
30pub enum WriteOp {
31 #[serde(rename = "com.atproto.repo.applyWrites#create")]
32 Create {
33 collection: String,
34 rkey: Option<String>,
35 value: serde_json::Value,
36 },
37 #[serde(rename = "com.atproto.repo.applyWrites#update")]
38 Update {
39 collection: String,
40 rkey: String,
41 value: serde_json::Value,
42 },
43 #[serde(rename = "com.atproto.repo.applyWrites#delete")]
44 Delete { collection: String, rkey: String },
45}
46
47#[derive(Deserialize)]
48#[serde(rename_all = "camelCase")]
49pub struct ApplyWritesInput {
50 pub repo: String,
51 pub validate: Option<bool>,
52 pub writes: Vec<WriteOp>,
53 pub swap_commit: Option<String>,
54}
55
56#[derive(Serialize)]
57#[serde(tag = "$type")]
58pub enum WriteResult {
59 #[serde(rename = "com.atproto.repo.applyWrites#createResult")]
60 CreateResult {
61 uri: String,
62 cid: String,
63 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")]
64 validation_status: Option<String>,
65 },
66 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")]
67 UpdateResult {
68 uri: String,
69 cid: String,
70 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")]
71 validation_status: Option<String>,
72 },
73 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")]
74 DeleteResult {},
75}
76
77#[derive(Serialize)]
78pub struct ApplyWritesOutput {
79 pub commit: CommitInfo,
80 pub results: Vec<WriteResult>,
81}
82
83#[derive(Serialize)]
84pub struct CommitInfo {
85 pub cid: String,
86 pub rev: String,
87}
88
89pub async fn apply_writes(
90 State(state): State<AppState>,
91 headers: axum::http::HeaderMap,
92 Json(input): Json<ApplyWritesInput>,
93) -> Response {
94 info!(
95 "apply_writes called: repo={}, writes={}",
96 input.repo,
97 input.writes.len()
98 );
99 let token = match crate::auth::extract_bearer_token_from_header(
100 headers.get("Authorization").and_then(|h| h.to_str().ok()),
101 ) {
102 Some(t) => t,
103 None => {
104 return (
105 StatusCode::UNAUTHORIZED,
106 Json(json!({"error": "AuthenticationRequired"})),
107 )
108 .into_response();
109 }
110 };
111 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
112 Ok(user) => user,
113 Err(_) => {
114 return (
115 StatusCode::UNAUTHORIZED,
116 Json(json!({"error": "AuthenticationFailed"})),
117 )
118 .into_response();
119 }
120 };
121 let did = auth_user.did.clone();
122 let is_oauth = auth_user.is_oauth;
123 let scope = auth_user.scope;
124 let controller_did = auth_user.controller_did.clone();
125 if input.repo != did {
126 return (
127 StatusCode::FORBIDDEN,
128 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
129 )
130 .into_response();
131 }
132 if crate::util::is_account_migrated(&state.db, &did)
133 .await
134 .unwrap_or(false)
135 {
136 return (
137 StatusCode::FORBIDDEN,
138 Json(json!({
139 "error": "AccountMigrated",
140 "message": "Account has been migrated to another PDS. Repo operations are not allowed."
141 })),
142 )
143 .into_response();
144 }
145 let is_verified = has_verified_comms_channel(&state.db, &did)
146 .await
147 .unwrap_or(false);
148 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did)
149 .await
150 .unwrap_or(false);
151 if !is_verified && !is_delegated {
152 return (
153 StatusCode::FORBIDDEN,
154 Json(json!({
155 "error": "AccountNotVerified",
156 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
157 })),
158 )
159 .into_response();
160 }
161 if input.writes.is_empty() {
162 return (
163 StatusCode::BAD_REQUEST,
164 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})),
165 )
166 .into_response();
167 }
168 if input.writes.len() > MAX_BATCH_WRITES {
169 return (
170 StatusCode::BAD_REQUEST,
171 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})),
172 )
173 .into_response();
174 }
175
176 let has_custom_scope = scope
177 .as_ref()
178 .map(|s| s != "com.atproto.access")
179 .unwrap_or(false);
180 if is_oauth || has_custom_scope {
181 use std::collections::HashSet;
182 let create_collections: HashSet<&str> = input
183 .writes
184 .iter()
185 .filter_map(|w| {
186 if let WriteOp::Create { collection, .. } = w {
187 Some(collection.as_str())
188 } else {
189 None
190 }
191 })
192 .collect();
193 let update_collections: HashSet<&str> = input
194 .writes
195 .iter()
196 .filter_map(|w| {
197 if let WriteOp::Update { collection, .. } = w {
198 Some(collection.as_str())
199 } else {
200 None
201 }
202 })
203 .collect();
204 let delete_collections: HashSet<&str> = input
205 .writes
206 .iter()
207 .filter_map(|w| {
208 if let WriteOp::Delete { collection, .. } = w {
209 Some(collection.as_str())
210 } else {
211 None
212 }
213 })
214 .collect();
215
216 for collection in create_collections {
217 if let Err(e) = crate::auth::scope_check::check_repo_scope(
218 is_oauth,
219 scope.as_deref(),
220 crate::oauth::RepoAction::Create,
221 collection,
222 ) {
223 return e;
224 }
225 }
226 for collection in update_collections {
227 if let Err(e) = crate::auth::scope_check::check_repo_scope(
228 is_oauth,
229 scope.as_deref(),
230 crate::oauth::RepoAction::Update,
231 collection,
232 ) {
233 return e;
234 }
235 }
236 for collection in delete_collections {
237 if let Err(e) = crate::auth::scope_check::check_repo_scope(
238 is_oauth,
239 scope.as_deref(),
240 crate::oauth::RepoAction::Delete,
241 collection,
242 ) {
243 return e;
244 }
245 }
246 }
247
248 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
249 .fetch_optional(&state.db)
250 .await
251 {
252 Ok(Some(id)) => id,
253 _ => {
254 return (
255 StatusCode::INTERNAL_SERVER_ERROR,
256 Json(json!({"error": "InternalError", "message": "User not found"})),
257 )
258 .into_response();
259 }
260 };
261 let root_cid_str: String = match sqlx::query_scalar!(
262 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
263 user_id
264 )
265 .fetch_optional(&state.db)
266 .await
267 {
268 Ok(Some(cid_str)) => cid_str,
269 _ => {
270 return (
271 StatusCode::INTERNAL_SERVER_ERROR,
272 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
273 )
274 .into_response();
275 }
276 };
277 let current_root_cid = match Cid::from_str(&root_cid_str) {
278 Ok(c) => c,
279 Err(_) => {
280 return (
281 StatusCode::INTERNAL_SERVER_ERROR,
282 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
283 )
284 .into_response();
285 }
286 };
287 if let Some(swap_commit) = &input.swap_commit
288 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
289 {
290 return (
291 StatusCode::CONFLICT,
292 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
293 )
294 .into_response();
295 }
296 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
297 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
298 Ok(Some(b)) => b,
299 _ => {
300 return (
301 StatusCode::INTERNAL_SERVER_ERROR,
302 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
303 )
304 .into_response();
305 }
306 };
307 let commit = match Commit::from_cbor(&commit_bytes) {
308 Ok(c) => c,
309 _ => {
310 return (
311 StatusCode::INTERNAL_SERVER_ERROR,
312 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
313 )
314 .into_response();
315 }
316 };
317 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
318 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
319 let mut results: Vec<WriteResult> = Vec::new();
320 let mut ops: Vec<RecordOp> = Vec::new();
321 let mut modified_keys: Vec<String> = Vec::new();
322 let mut all_blob_cids: Vec<String> = Vec::new();
323 for write in &input.writes {
324 match write {
325 WriteOp::Create {
326 collection,
327 rkey,
328 value,
329 } => {
330 let validation_status = if input.validate == Some(false) {
331 None
332 } else {
333 let require_lexicon = input.validate == Some(true);
334 match validate_record_with_status(
335 value,
336 collection,
337 rkey.as_deref(),
338 require_lexicon,
339 ) {
340 Ok(status) => Some(status),
341 Err(err_response) => return *err_response,
342 }
343 };
344 all_blob_cids.extend(extract_blob_cids(value));
345 let rkey = rkey
346 .clone()
347 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
348 let mut record_bytes = Vec::new();
349 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() {
350 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
351 }
352 let record_cid = match tracking_store.put(&record_bytes).await {
353 Ok(c) => c,
354 Err(_) => return (
355 StatusCode::INTERNAL_SERVER_ERROR,
356 Json(
357 json!({"error": "InternalError", "message": "Failed to store record"}),
358 ),
359 )
360 .into_response(),
361 };
362 let collection_nsid = match collection.parse::<Nsid>() {
363 Ok(n) => n,
364 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
365 };
366 let key = format!("{}/{}", collection_nsid, rkey);
367 modified_keys.push(key.clone());
368 mst = match mst.add(&key, record_cid).await {
369 Ok(m) => m,
370 Err(_) => return (
371 StatusCode::INTERNAL_SERVER_ERROR,
372 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
373 )
374 .into_response(),
375 };
376 let uri = format!("at://{}/{}/{}", did, collection, rkey);
377 results.push(WriteResult::CreateResult {
378 uri,
379 cid: record_cid.to_string(),
380 validation_status: validation_status.map(|s| match s {
381 ValidationStatus::Valid => "valid".to_string(),
382 ValidationStatus::Unknown => "unknown".to_string(),
383 ValidationStatus::Invalid => "invalid".to_string(),
384 }),
385 });
386 ops.push(RecordOp::Create {
387 collection: collection.clone(),
388 rkey,
389 cid: record_cid,
390 });
391 }
392 WriteOp::Update {
393 collection,
394 rkey,
395 value,
396 } => {
397 let validation_status = if input.validate == Some(false) {
398 None
399 } else {
400 let require_lexicon = input.validate == Some(true);
401 match validate_record_with_status(
402 value,
403 collection,
404 Some(rkey),
405 require_lexicon,
406 ) {
407 Ok(status) => Some(status),
408 Err(err_response) => return *err_response,
409 }
410 };
411 all_blob_cids.extend(extract_blob_cids(value));
412 let mut record_bytes = Vec::new();
413 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() {
414 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
415 }
416 let record_cid = match tracking_store.put(&record_bytes).await {
417 Ok(c) => c,
418 Err(_) => return (
419 StatusCode::INTERNAL_SERVER_ERROR,
420 Json(
421 json!({"error": "InternalError", "message": "Failed to store record"}),
422 ),
423 )
424 .into_response(),
425 };
426 let collection_nsid = match collection.parse::<Nsid>() {
427 Ok(n) => n,
428 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
429 };
430 let key = format!("{}/{}", collection_nsid, rkey);
431 modified_keys.push(key.clone());
432 let prev_record_cid = mst.get(&key).await.ok().flatten();
433 mst = match mst.update(&key, record_cid).await {
434 Ok(m) => m,
435 Err(_) => return (
436 StatusCode::INTERNAL_SERVER_ERROR,
437 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
438 )
439 .into_response(),
440 };
441 let uri = format!("at://{}/{}/{}", did, collection, rkey);
442 results.push(WriteResult::UpdateResult {
443 uri,
444 cid: record_cid.to_string(),
445 validation_status: validation_status.map(|s| match s {
446 ValidationStatus::Valid => "valid".to_string(),
447 ValidationStatus::Unknown => "unknown".to_string(),
448 ValidationStatus::Invalid => "invalid".to_string(),
449 }),
450 });
451 ops.push(RecordOp::Update {
452 collection: collection.clone(),
453 rkey: rkey.clone(),
454 cid: record_cid,
455 prev: prev_record_cid,
456 });
457 }
458 WriteOp::Delete { collection, rkey } => {
459 let collection_nsid = match collection.parse::<Nsid>() {
460 Ok(n) => n,
461 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
462 };
463 let key = format!("{}/{}", collection_nsid, rkey);
464 modified_keys.push(key.clone());
465 let prev_record_cid = mst.get(&key).await.ok().flatten();
466 mst = match mst.delete(&key).await {
467 Ok(m) => m,
468 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(),
469 };
470 results.push(WriteResult::DeleteResult {});
471 ops.push(RecordOp::Delete {
472 collection: collection.clone(),
473 rkey: rkey.clone(),
474 prev: prev_record_cid,
475 });
476 }
477 }
478 }
479 let new_mst_root = match mst.persist().await {
480 Ok(c) => c,
481 Err(_) => {
482 return (
483 StatusCode::INTERNAL_SERVER_ERROR,
484 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
485 )
486 .into_response();
487 }
488 };
489 let mut relevant_blocks = std::collections::BTreeMap::new();
490 for key in &modified_keys {
491 if mst
492 .blocks_for_path(key, &mut relevant_blocks)
493 .await
494 .is_err()
495 {
496 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
497 }
498 if original_mst
499 .blocks_for_path(key, &mut relevant_blocks)
500 .await
501 .is_err()
502 {
503 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
504 }
505 }
506 let mut written_cids = tracking_store.get_all_relevant_cids();
507 for cid in relevant_blocks.keys() {
508 if !written_cids.contains(cid) {
509 written_cids.push(*cid);
510 }
511 }
512 let written_cids_str = written_cids
513 .iter()
514 .map(|c| c.to_string())
515 .collect::<Vec<_>>();
516 let commit_res = match commit_and_log(
517 &state,
518 CommitParams {
519 did: &did,
520 user_id,
521 current_root_cid: Some(current_root_cid),
522 prev_data_cid: Some(commit.data),
523 new_mst_root,
524 ops,
525 blocks_cids: &written_cids_str,
526 blobs: &all_blob_cids,
527 },
528 )
529 .await
530 {
531 Ok(res) => res,
532 Err(e) => {
533 error!("Commit failed: {}", e);
534 return (
535 StatusCode::INTERNAL_SERVER_ERROR,
536 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})),
537 )
538 .into_response();
539 }
540 };
541
542 if let Some(ref controller) = controller_did {
543 let write_summary: Vec<serde_json::Value> = input
544 .writes
545 .iter()
546 .map(|w| match w {
547 WriteOp::Create {
548 collection, rkey, ..
549 } => json!({
550 "action": "create",
551 "collection": collection,
552 "rkey": rkey
553 }),
554 WriteOp::Update {
555 collection, rkey, ..
556 } => json!({
557 "action": "update",
558 "collection": collection,
559 "rkey": rkey
560 }),
561 WriteOp::Delete { collection, rkey } => json!({
562 "action": "delete",
563 "collection": collection,
564 "rkey": rkey
565 }),
566 })
567 .collect();
568
569 let _ = delegation::log_delegation_action(
570 &state.db,
571 &did,
572 controller,
573 Some(controller),
574 DelegationActionType::RepoWrite,
575 Some(json!({
576 "action": "apply_writes",
577 "count": input.writes.len(),
578 "writes": write_summary
579 })),
580 None,
581 None,
582 )
583 .await;
584 }
585
586 (
587 StatusCode::OK,
588 Json(ApplyWritesOutput {
589 commit: CommitInfo {
590 cid: commit_res.commit_cid.to_string(),
591 rev: commit_res.rev,
592 },
593 results,
594 }),
595 )
596 .into_response()
597}