this repo has no description
1use super::validation::validate_record_with_status;
2use super::write::has_verified_comms_channel;
3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
4use crate::delegation::{self, DelegationActionType};
5use crate::repo::tracking::TrackingBlockStore;
6use crate::state::AppState;
7use crate::validation::ValidationStatus;
8use axum::{
9 Json,
10 extract::State,
11 http::StatusCode,
12 response::{IntoResponse, Response},
13};
14use cid::Cid;
15use jacquard::types::{
16 integer::LimitedU32,
17 string::{Nsid, Tid},
18};
19use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
20use serde::{Deserialize, Serialize};
21use serde_json::json;
22use std::str::FromStr;
23use std::sync::Arc;
24use tracing::{error, info};
25
26const MAX_BATCH_WRITES: usize = 200;
27
28#[derive(Deserialize)]
29#[serde(tag = "$type")]
30pub enum WriteOp {
31 #[serde(rename = "com.atproto.repo.applyWrites#create")]
32 Create {
33 collection: String,
34 rkey: Option<String>,
35 value: serde_json::Value,
36 },
37 #[serde(rename = "com.atproto.repo.applyWrites#update")]
38 Update {
39 collection: String,
40 rkey: String,
41 value: serde_json::Value,
42 },
43 #[serde(rename = "com.atproto.repo.applyWrites#delete")]
44 Delete { collection: String, rkey: String },
45}
46
47#[derive(Deserialize)]
48#[serde(rename_all = "camelCase")]
49pub struct ApplyWritesInput {
50 pub repo: String,
51 pub validate: Option<bool>,
52 pub writes: Vec<WriteOp>,
53 pub swap_commit: Option<String>,
54}
55
56#[derive(Serialize)]
57#[serde(tag = "$type")]
58pub enum WriteResult {
59 #[serde(rename = "com.atproto.repo.applyWrites#createResult")]
60 CreateResult {
61 uri: String,
62 cid: String,
63 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")]
64 validation_status: Option<String>,
65 },
66 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")]
67 UpdateResult {
68 uri: String,
69 cid: String,
70 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")]
71 validation_status: Option<String>,
72 },
73 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")]
74 DeleteResult {},
75}
76
77#[derive(Serialize)]
78pub struct ApplyWritesOutput {
79 pub commit: CommitInfo,
80 pub results: Vec<WriteResult>,
81}
82
83#[derive(Serialize)]
84pub struct CommitInfo {
85 pub cid: String,
86 pub rev: String,
87}
88
89pub async fn apply_writes(
90 State(state): State<AppState>,
91 headers: axum::http::HeaderMap,
92 Json(input): Json<ApplyWritesInput>,
93) -> Response {
94 info!(
95 "apply_writes called: repo={}, writes={}",
96 input.repo,
97 input.writes.len()
98 );
99 let token = match crate::auth::extract_bearer_token_from_header(
100 headers.get("Authorization").and_then(|h| h.to_str().ok()),
101 ) {
102 Some(t) => t,
103 None => {
104 return (
105 StatusCode::UNAUTHORIZED,
106 Json(json!({"error": "AuthenticationRequired"})),
107 )
108 .into_response();
109 }
110 };
111 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
112 Ok(user) => user,
113 Err(_) => {
114 return (
115 StatusCode::UNAUTHORIZED,
116 Json(json!({"error": "AuthenticationFailed"})),
117 )
118 .into_response();
119 }
120 };
121 let did = auth_user.did.clone();
122 let is_oauth = auth_user.is_oauth;
123 let scope = auth_user.scope;
124 let controller_did = auth_user.controller_did.clone();
125 if input.repo != did {
126 return (
127 StatusCode::FORBIDDEN,
128 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
129 )
130 .into_response();
131 }
132 if crate::util::is_account_migrated(&state.db, &did)
133 .await
134 .unwrap_or(false)
135 {
136 return (
137 StatusCode::FORBIDDEN,
138 Json(json!({
139 "error": "AccountMigrated",
140 "message": "Account has been migrated to another PDS. Repo operations are not allowed."
141 })),
142 )
143 .into_response();
144 }
145 let is_verified = has_verified_comms_channel(&state.db, &did)
146 .await
147 .unwrap_or(false);
148 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did)
149 .await
150 .unwrap_or(false);
151 if !is_verified && !is_delegated {
152 return (
153 StatusCode::FORBIDDEN,
154 Json(json!({
155 "error": "AccountNotVerified",
156 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
157 })),
158 )
159 .into_response();
160 }
161 if input.writes.is_empty() {
162 return (
163 StatusCode::BAD_REQUEST,
164 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})),
165 )
166 .into_response();
167 }
168 if input.writes.len() > MAX_BATCH_WRITES {
169 return (
170 StatusCode::BAD_REQUEST,
171 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})),
172 )
173 .into_response();
174 }
175
176 let has_custom_scope = scope
177 .as_ref()
178 .map(|s| s != "com.atproto.access")
179 .unwrap_or(false);
180 if is_oauth || has_custom_scope {
181 use std::collections::HashSet;
182 let create_collections: HashSet<&str> = input
183 .writes
184 .iter()
185 .filter_map(|w| {
186 if let WriteOp::Create { collection, .. } = w {
187 Some(collection.as_str())
188 } else {
189 None
190 }
191 })
192 .collect();
193 let update_collections: HashSet<&str> = input
194 .writes
195 .iter()
196 .filter_map(|w| {
197 if let WriteOp::Update { collection, .. } = w {
198 Some(collection.as_str())
199 } else {
200 None
201 }
202 })
203 .collect();
204 let delete_collections: HashSet<&str> = input
205 .writes
206 .iter()
207 .filter_map(|w| {
208 if let WriteOp::Delete { collection, .. } = w {
209 Some(collection.as_str())
210 } else {
211 None
212 }
213 })
214 .collect();
215
216 for collection in create_collections {
217 if let Err(e) = crate::auth::scope_check::check_repo_scope(
218 is_oauth,
219 scope.as_deref(),
220 crate::oauth::RepoAction::Create,
221 collection,
222 ) {
223 return e;
224 }
225 }
226 for collection in update_collections {
227 if let Err(e) = crate::auth::scope_check::check_repo_scope(
228 is_oauth,
229 scope.as_deref(),
230 crate::oauth::RepoAction::Update,
231 collection,
232 ) {
233 return e;
234 }
235 }
236 for collection in delete_collections {
237 if let Err(e) = crate::auth::scope_check::check_repo_scope(
238 is_oauth,
239 scope.as_deref(),
240 crate::oauth::RepoAction::Delete,
241 collection,
242 ) {
243 return e;
244 }
245 }
246 }
247
248 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
249 .fetch_optional(&state.db)
250 .await
251 {
252 Ok(Some(id)) => id,
253 _ => {
254 return (
255 StatusCode::INTERNAL_SERVER_ERROR,
256 Json(json!({"error": "InternalError", "message": "User not found"})),
257 )
258 .into_response();
259 }
260 };
261 let root_cid_str: String = match sqlx::query_scalar!(
262 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
263 user_id
264 )
265 .fetch_optional(&state.db)
266 .await
267 {
268 Ok(Some(cid_str)) => cid_str,
269 _ => {
270 return (
271 StatusCode::INTERNAL_SERVER_ERROR,
272 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
273 )
274 .into_response();
275 }
276 };
277 let current_root_cid = match Cid::from_str(&root_cid_str) {
278 Ok(c) => c,
279 Err(_) => {
280 return (
281 StatusCode::INTERNAL_SERVER_ERROR,
282 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
283 )
284 .into_response();
285 }
286 };
287 if let Some(swap_commit) = &input.swap_commit
288 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
289 {
290 return (
291 StatusCode::CONFLICT,
292 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
293 )
294 .into_response();
295 }
296 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
297 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
298 Ok(Some(b)) => b,
299 _ => {
300 return (
301 StatusCode::INTERNAL_SERVER_ERROR,
302 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
303 )
304 .into_response();
305 }
306 };
307 let commit = match Commit::from_cbor(&commit_bytes) {
308 Ok(c) => c,
309 _ => {
310 return (
311 StatusCode::INTERNAL_SERVER_ERROR,
312 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
313 )
314 .into_response();
315 }
316 };
317 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
318 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
319 let mut results: Vec<WriteResult> = Vec::new();
320 let mut ops: Vec<RecordOp> = Vec::new();
321 let mut modified_keys: Vec<String> = Vec::new();
322 let mut all_blob_cids: Vec<String> = Vec::new();
323 for write in &input.writes {
324 match write {
325 WriteOp::Create {
326 collection,
327 rkey,
328 value,
329 } => {
330 let validation_status = if input.validate == Some(false) {
331 None
332 } else {
333 let require_lexicon = input.validate == Some(true);
334 match validate_record_with_status(
335 value,
336 collection,
337 rkey.as_deref(),
338 require_lexicon,
339 ) {
340 Ok(status) => Some(status),
341 Err(err_response) => return *err_response,
342 }
343 };
344 all_blob_cids.extend(extract_blob_cids(value));
345 let rkey = rkey
346 .clone()
347 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
348 let record_ipld = crate::util::json_to_ipld(value);
349 let mut record_bytes = Vec::new();
350 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
351 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
352 }
353 let record_cid = match tracking_store.put(&record_bytes).await {
354 Ok(c) => c,
355 Err(_) => return (
356 StatusCode::INTERNAL_SERVER_ERROR,
357 Json(
358 json!({"error": "InternalError", "message": "Failed to store record"}),
359 ),
360 )
361 .into_response(),
362 };
363 let collection_nsid = match collection.parse::<Nsid>() {
364 Ok(n) => n,
365 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
366 };
367 let key = format!("{}/{}", collection_nsid, rkey);
368 modified_keys.push(key.clone());
369 mst = match mst.add(&key, record_cid).await {
370 Ok(m) => m,
371 Err(_) => return (
372 StatusCode::INTERNAL_SERVER_ERROR,
373 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
374 )
375 .into_response(),
376 };
377 let uri = format!("at://{}/{}/{}", did, collection, rkey);
378 results.push(WriteResult::CreateResult {
379 uri,
380 cid: record_cid.to_string(),
381 validation_status: validation_status.map(|s| match s {
382 ValidationStatus::Valid => "valid".to_string(),
383 ValidationStatus::Unknown => "unknown".to_string(),
384 ValidationStatus::Invalid => "invalid".to_string(),
385 }),
386 });
387 ops.push(RecordOp::Create {
388 collection: collection.clone(),
389 rkey,
390 cid: record_cid,
391 });
392 }
393 WriteOp::Update {
394 collection,
395 rkey,
396 value,
397 } => {
398 let validation_status = if input.validate == Some(false) {
399 None
400 } else {
401 let require_lexicon = input.validate == Some(true);
402 match validate_record_with_status(
403 value,
404 collection,
405 Some(rkey),
406 require_lexicon,
407 ) {
408 Ok(status) => Some(status),
409 Err(err_response) => return *err_response,
410 }
411 };
412 all_blob_cids.extend(extract_blob_cids(value));
413 let record_ipld = crate::util::json_to_ipld(value);
414 let mut record_bytes = Vec::new();
415 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
416 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
417 }
418 let record_cid = match tracking_store.put(&record_bytes).await {
419 Ok(c) => c,
420 Err(_) => return (
421 StatusCode::INTERNAL_SERVER_ERROR,
422 Json(
423 json!({"error": "InternalError", "message": "Failed to store record"}),
424 ),
425 )
426 .into_response(),
427 };
428 let collection_nsid = match collection.parse::<Nsid>() {
429 Ok(n) => n,
430 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
431 };
432 let key = format!("{}/{}", collection_nsid, rkey);
433 modified_keys.push(key.clone());
434 let prev_record_cid = mst.get(&key).await.ok().flatten();
435 mst = match mst.update(&key, record_cid).await {
436 Ok(m) => m,
437 Err(_) => return (
438 StatusCode::INTERNAL_SERVER_ERROR,
439 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
440 )
441 .into_response(),
442 };
443 let uri = format!("at://{}/{}/{}", did, collection, rkey);
444 results.push(WriteResult::UpdateResult {
445 uri,
446 cid: record_cid.to_string(),
447 validation_status: validation_status.map(|s| match s {
448 ValidationStatus::Valid => "valid".to_string(),
449 ValidationStatus::Unknown => "unknown".to_string(),
450 ValidationStatus::Invalid => "invalid".to_string(),
451 }),
452 });
453 ops.push(RecordOp::Update {
454 collection: collection.clone(),
455 rkey: rkey.clone(),
456 cid: record_cid,
457 prev: prev_record_cid,
458 });
459 }
460 WriteOp::Delete { collection, rkey } => {
461 let collection_nsid = match collection.parse::<Nsid>() {
462 Ok(n) => n,
463 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
464 };
465 let key = format!("{}/{}", collection_nsid, rkey);
466 modified_keys.push(key.clone());
467 let prev_record_cid = mst.get(&key).await.ok().flatten();
468 mst = match mst.delete(&key).await {
469 Ok(m) => m,
470 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(),
471 };
472 results.push(WriteResult::DeleteResult {});
473 ops.push(RecordOp::Delete {
474 collection: collection.clone(),
475 rkey: rkey.clone(),
476 prev: prev_record_cid,
477 });
478 }
479 }
480 }
481 let new_mst_root = match mst.persist().await {
482 Ok(c) => c,
483 Err(_) => {
484 return (
485 StatusCode::INTERNAL_SERVER_ERROR,
486 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
487 )
488 .into_response();
489 }
490 };
491 let mut relevant_blocks = std::collections::BTreeMap::new();
492 for key in &modified_keys {
493 if mst
494 .blocks_for_path(key, &mut relevant_blocks)
495 .await
496 .is_err()
497 {
498 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
499 }
500 if original_mst
501 .blocks_for_path(key, &mut relevant_blocks)
502 .await
503 .is_err()
504 {
505 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
506 }
507 }
508 let mut written_cids = tracking_store.get_all_relevant_cids();
509 for cid in relevant_blocks.keys() {
510 if !written_cids.contains(cid) {
511 written_cids.push(*cid);
512 }
513 }
514 let written_cids_str = written_cids
515 .iter()
516 .map(|c| c.to_string())
517 .collect::<Vec<_>>();
518 let commit_res = match commit_and_log(
519 &state,
520 CommitParams {
521 did: &did,
522 user_id,
523 current_root_cid: Some(current_root_cid),
524 prev_data_cid: Some(commit.data),
525 new_mst_root,
526 ops,
527 blocks_cids: &written_cids_str,
528 blobs: &all_blob_cids,
529 },
530 )
531 .await
532 {
533 Ok(res) => res,
534 Err(e) => {
535 error!("Commit failed: {}", e);
536 return (
537 StatusCode::INTERNAL_SERVER_ERROR,
538 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})),
539 )
540 .into_response();
541 }
542 };
543
544 if let Some(ref controller) = controller_did {
545 let write_summary: Vec<serde_json::Value> = input
546 .writes
547 .iter()
548 .map(|w| match w {
549 WriteOp::Create {
550 collection, rkey, ..
551 } => json!({
552 "action": "create",
553 "collection": collection,
554 "rkey": rkey
555 }),
556 WriteOp::Update {
557 collection, rkey, ..
558 } => json!({
559 "action": "update",
560 "collection": collection,
561 "rkey": rkey
562 }),
563 WriteOp::Delete { collection, rkey } => json!({
564 "action": "delete",
565 "collection": collection,
566 "rkey": rkey
567 }),
568 })
569 .collect();
570
571 let _ = delegation::log_delegation_action(
572 &state.db,
573 &did,
574 controller,
575 Some(controller),
576 DelegationActionType::RepoWrite,
577 Some(json!({
578 "action": "apply_writes",
579 "count": input.writes.len(),
580 "writes": write_summary
581 })),
582 None,
583 None,
584 )
585 .await;
586 }
587
588 (
589 StatusCode::OK,
590 Json(ApplyWritesOutput {
591 commit: CommitInfo {
592 cid: commit_res.commit_cid.to_string(),
593 rev: commit_res.rev,
594 },
595 results,
596 }),
597 )
598 .into_response()
599}