this repo has no description
1use super::validation::validate_record_with_status;
2use super::write::has_verified_comms_channel;
3use crate::validation::ValidationStatus;
4use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
5use crate::delegation::{self, DelegationActionType};
6use crate::repo::tracking::TrackingBlockStore;
7use crate::state::AppState;
8use axum::{
9 Json,
10 extract::State,
11 http::StatusCode,
12 response::{IntoResponse, Response},
13};
14use cid::Cid;
15use jacquard::types::{
16 integer::LimitedU32,
17 string::{Nsid, Tid},
18};
19use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
20use serde::{Deserialize, Serialize};
21use serde_json::json;
22use std::str::FromStr;
23use std::sync::Arc;
24use tracing::{error, info};
25
26const MAX_BATCH_WRITES: usize = 200;
27
28#[derive(Deserialize)]
29#[serde(tag = "$type")]
30pub enum WriteOp {
31 #[serde(rename = "com.atproto.repo.applyWrites#create")]
32 Create {
33 collection: String,
34 rkey: Option<String>,
35 value: serde_json::Value,
36 },
37 #[serde(rename = "com.atproto.repo.applyWrites#update")]
38 Update {
39 collection: String,
40 rkey: String,
41 value: serde_json::Value,
42 },
43 #[serde(rename = "com.atproto.repo.applyWrites#delete")]
44 Delete { collection: String, rkey: String },
45}
46
47#[derive(Deserialize)]
48#[serde(rename_all = "camelCase")]
49pub struct ApplyWritesInput {
50 pub repo: String,
51 pub validate: Option<bool>,
52 pub writes: Vec<WriteOp>,
53 pub swap_commit: Option<String>,
54}
55
56#[derive(Serialize)]
57#[serde(tag = "$type")]
58pub enum WriteResult {
59 #[serde(rename = "com.atproto.repo.applyWrites#createResult")]
60 CreateResult {
61 uri: String,
62 cid: String,
63 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")]
64 validation_status: Option<String>,
65 },
66 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")]
67 UpdateResult {
68 uri: String,
69 cid: String,
70 #[serde(rename = "validationStatus", skip_serializing_if = "Option::is_none")]
71 validation_status: Option<String>,
72 },
73 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")]
74 DeleteResult {},
75}
76
77#[derive(Serialize)]
78pub struct ApplyWritesOutput {
79 pub commit: CommitInfo,
80 pub results: Vec<WriteResult>,
81}
82
83#[derive(Serialize)]
84pub struct CommitInfo {
85 pub cid: String,
86 pub rev: String,
87}
88
89pub async fn apply_writes(
90 State(state): State<AppState>,
91 headers: axum::http::HeaderMap,
92 Json(input): Json<ApplyWritesInput>,
93) -> Response {
94 info!(
95 "apply_writes called: repo={}, writes={}",
96 input.repo,
97 input.writes.len()
98 );
99 let token = match crate::auth::extract_bearer_token_from_header(
100 headers.get("Authorization").and_then(|h| h.to_str().ok()),
101 ) {
102 Some(t) => t,
103 None => {
104 return (
105 StatusCode::UNAUTHORIZED,
106 Json(json!({"error": "AuthenticationRequired"})),
107 )
108 .into_response();
109 }
110 };
111 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
112 Ok(user) => user,
113 Err(_) => {
114 return (
115 StatusCode::UNAUTHORIZED,
116 Json(json!({"error": "AuthenticationFailed"})),
117 )
118 .into_response();
119 }
120 };
121 let did = auth_user.did.clone();
122 let is_oauth = auth_user.is_oauth;
123 let scope = auth_user.scope;
124 let controller_did = auth_user.controller_did.clone();
125 if input.repo != did {
126 return (
127 StatusCode::FORBIDDEN,
128 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
129 )
130 .into_response();
131 }
132 let is_verified = has_verified_comms_channel(&state.db, &did)
133 .await
134 .unwrap_or(false);
135 let is_delegated = crate::delegation::is_delegated_account(&state.db, &did)
136 .await
137 .unwrap_or(false);
138 if !is_verified && !is_delegated {
139 return (
140 StatusCode::FORBIDDEN,
141 Json(json!({
142 "error": "AccountNotVerified",
143 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
144 })),
145 )
146 .into_response();
147 }
148 if input.writes.is_empty() {
149 return (
150 StatusCode::BAD_REQUEST,
151 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})),
152 )
153 .into_response();
154 }
155 if input.writes.len() > MAX_BATCH_WRITES {
156 return (
157 StatusCode::BAD_REQUEST,
158 Json(json!({"error": "InvalidRequest", "message": format!("Too many writes (max {})", MAX_BATCH_WRITES)})),
159 )
160 .into_response();
161 }
162
163 let has_custom_scope = scope
164 .as_ref()
165 .map(|s| s != "com.atproto.access")
166 .unwrap_or(false);
167 if is_oauth || has_custom_scope {
168 use std::collections::HashSet;
169 let create_collections: HashSet<&str> = input
170 .writes
171 .iter()
172 .filter_map(|w| {
173 if let WriteOp::Create { collection, .. } = w {
174 Some(collection.as_str())
175 } else {
176 None
177 }
178 })
179 .collect();
180 let update_collections: HashSet<&str> = input
181 .writes
182 .iter()
183 .filter_map(|w| {
184 if let WriteOp::Update { collection, .. } = w {
185 Some(collection.as_str())
186 } else {
187 None
188 }
189 })
190 .collect();
191 let delete_collections: HashSet<&str> = input
192 .writes
193 .iter()
194 .filter_map(|w| {
195 if let WriteOp::Delete { collection, .. } = w {
196 Some(collection.as_str())
197 } else {
198 None
199 }
200 })
201 .collect();
202
203 for collection in create_collections {
204 if let Err(e) = crate::auth::scope_check::check_repo_scope(
205 is_oauth,
206 scope.as_deref(),
207 crate::oauth::RepoAction::Create,
208 collection,
209 ) {
210 return e;
211 }
212 }
213 for collection in update_collections {
214 if let Err(e) = crate::auth::scope_check::check_repo_scope(
215 is_oauth,
216 scope.as_deref(),
217 crate::oauth::RepoAction::Update,
218 collection,
219 ) {
220 return e;
221 }
222 }
223 for collection in delete_collections {
224 if let Err(e) = crate::auth::scope_check::check_repo_scope(
225 is_oauth,
226 scope.as_deref(),
227 crate::oauth::RepoAction::Delete,
228 collection,
229 ) {
230 return e;
231 }
232 }
233 }
234
235 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
236 .fetch_optional(&state.db)
237 .await
238 {
239 Ok(Some(id)) => id,
240 _ => {
241 return (
242 StatusCode::INTERNAL_SERVER_ERROR,
243 Json(json!({"error": "InternalError", "message": "User not found"})),
244 )
245 .into_response();
246 }
247 };
248 let root_cid_str: String = match sqlx::query_scalar!(
249 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
250 user_id
251 )
252 .fetch_optional(&state.db)
253 .await
254 {
255 Ok(Some(cid_str)) => cid_str,
256 _ => {
257 return (
258 StatusCode::INTERNAL_SERVER_ERROR,
259 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
260 )
261 .into_response();
262 }
263 };
264 let current_root_cid = match Cid::from_str(&root_cid_str) {
265 Ok(c) => c,
266 Err(_) => {
267 return (
268 StatusCode::INTERNAL_SERVER_ERROR,
269 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
270 )
271 .into_response();
272 }
273 };
274 if let Some(swap_commit) = &input.swap_commit
275 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
276 {
277 return (
278 StatusCode::CONFLICT,
279 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
280 )
281 .into_response();
282 }
283 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
284 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
285 Ok(Some(b)) => b,
286 _ => {
287 return (
288 StatusCode::INTERNAL_SERVER_ERROR,
289 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
290 )
291 .into_response();
292 }
293 };
294 let commit = match Commit::from_cbor(&commit_bytes) {
295 Ok(c) => c,
296 _ => {
297 return (
298 StatusCode::INTERNAL_SERVER_ERROR,
299 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
300 )
301 .into_response();
302 }
303 };
304 let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
305 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
306 let mut results: Vec<WriteResult> = Vec::new();
307 let mut ops: Vec<RecordOp> = Vec::new();
308 let mut modified_keys: Vec<String> = Vec::new();
309 let mut all_blob_cids: Vec<String> = Vec::new();
310 for write in &input.writes {
311 match write {
312 WriteOp::Create {
313 collection,
314 rkey,
315 value,
316 } => {
317 let validation_status = if input.validate == Some(false) {
318 None
319 } else {
320 let require_lexicon = input.validate == Some(true);
321 match validate_record_with_status(
322 value,
323 collection,
324 rkey.as_deref(),
325 require_lexicon,
326 ) {
327 Ok(status) => Some(status),
328 Err(err_response) => return *err_response,
329 }
330 };
331 all_blob_cids.extend(extract_blob_cids(value));
332 let rkey = rkey
333 .clone()
334 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
335 let mut record_bytes = Vec::new();
336 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() {
337 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
338 }
339 let record_cid = match tracking_store.put(&record_bytes).await {
340 Ok(c) => c,
341 Err(_) => return (
342 StatusCode::INTERNAL_SERVER_ERROR,
343 Json(
344 json!({"error": "InternalError", "message": "Failed to store record"}),
345 ),
346 )
347 .into_response(),
348 };
349 let collection_nsid = match collection.parse::<Nsid>() {
350 Ok(n) => n,
351 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
352 };
353 let key = format!("{}/{}", collection_nsid, rkey);
354 modified_keys.push(key.clone());
355 mst = match mst.add(&key, record_cid).await {
356 Ok(m) => m,
357 Err(_) => return (
358 StatusCode::INTERNAL_SERVER_ERROR,
359 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
360 )
361 .into_response(),
362 };
363 let uri = format!("at://{}/{}/{}", did, collection, rkey);
364 results.push(WriteResult::CreateResult {
365 uri,
366 cid: record_cid.to_string(),
367 validation_status: validation_status.map(|s| match s {
368 ValidationStatus::Valid => "valid".to_string(),
369 ValidationStatus::Unknown => "unknown".to_string(),
370 ValidationStatus::Invalid => "invalid".to_string(),
371 }),
372 });
373 ops.push(RecordOp::Create {
374 collection: collection.clone(),
375 rkey,
376 cid: record_cid,
377 });
378 }
379 WriteOp::Update {
380 collection,
381 rkey,
382 value,
383 } => {
384 let validation_status = if input.validate == Some(false) {
385 None
386 } else {
387 let require_lexicon = input.validate == Some(true);
388 match validate_record_with_status(
389 value,
390 collection,
391 Some(rkey),
392 require_lexicon,
393 ) {
394 Ok(status) => Some(status),
395 Err(err_response) => return *err_response,
396 }
397 };
398 all_blob_cids.extend(extract_blob_cids(value));
399 let mut record_bytes = Vec::new();
400 if serde_ipld_dagcbor::to_writer(&mut record_bytes, value).is_err() {
401 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
402 }
403 let record_cid = match tracking_store.put(&record_bytes).await {
404 Ok(c) => c,
405 Err(_) => return (
406 StatusCode::INTERNAL_SERVER_ERROR,
407 Json(
408 json!({"error": "InternalError", "message": "Failed to store record"}),
409 ),
410 )
411 .into_response(),
412 };
413 let collection_nsid = match collection.parse::<Nsid>() {
414 Ok(n) => n,
415 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
416 };
417 let key = format!("{}/{}", collection_nsid, rkey);
418 modified_keys.push(key.clone());
419 let prev_record_cid = mst.get(&key).await.ok().flatten();
420 mst = match mst.update(&key, record_cid).await {
421 Ok(m) => m,
422 Err(_) => return (
423 StatusCode::INTERNAL_SERVER_ERROR,
424 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
425 )
426 .into_response(),
427 };
428 let uri = format!("at://{}/{}/{}", did, collection, rkey);
429 results.push(WriteResult::UpdateResult {
430 uri,
431 cid: record_cid.to_string(),
432 validation_status: validation_status.map(|s| match s {
433 ValidationStatus::Valid => "valid".to_string(),
434 ValidationStatus::Unknown => "unknown".to_string(),
435 ValidationStatus::Invalid => "invalid".to_string(),
436 }),
437 });
438 ops.push(RecordOp::Update {
439 collection: collection.clone(),
440 rkey: rkey.clone(),
441 cid: record_cid,
442 prev: prev_record_cid,
443 });
444 }
445 WriteOp::Delete { collection, rkey } => {
446 let collection_nsid = match collection.parse::<Nsid>() {
447 Ok(n) => n,
448 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection", "message": "Invalid collection NSID"}))).into_response(),
449 };
450 let key = format!("{}/{}", collection_nsid, rkey);
451 modified_keys.push(key.clone());
452 let prev_record_cid = mst.get(&key).await.ok().flatten();
453 mst = match mst.delete(&key).await {
454 Ok(m) => m,
455 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to delete from MST"}))).into_response(),
456 };
457 results.push(WriteResult::DeleteResult {});
458 ops.push(RecordOp::Delete {
459 collection: collection.clone(),
460 rkey: rkey.clone(),
461 prev: prev_record_cid,
462 });
463 }
464 }
465 }
466 let new_mst_root = match mst.persist().await {
467 Ok(c) => c,
468 Err(_) => {
469 return (
470 StatusCode::INTERNAL_SERVER_ERROR,
471 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
472 )
473 .into_response();
474 }
475 };
476 let mut relevant_blocks = std::collections::BTreeMap::new();
477 for key in &modified_keys {
478 if mst
479 .blocks_for_path(key, &mut relevant_blocks)
480 .await
481 .is_err()
482 {
483 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
484 }
485 if original_mst
486 .blocks_for_path(key, &mut relevant_blocks)
487 .await
488 .is_err()
489 {
490 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
491 }
492 }
493 let mut written_cids = tracking_store.get_all_relevant_cids();
494 for cid in relevant_blocks.keys() {
495 if !written_cids.contains(cid) {
496 written_cids.push(*cid);
497 }
498 }
499 let written_cids_str = written_cids
500 .iter()
501 .map(|c| c.to_string())
502 .collect::<Vec<_>>();
503 let commit_res = match commit_and_log(
504 &state,
505 CommitParams {
506 did: &did,
507 user_id,
508 current_root_cid: Some(current_root_cid),
509 prev_data_cid: Some(commit.data),
510 new_mst_root,
511 ops,
512 blocks_cids: &written_cids_str,
513 blobs: &all_blob_cids,
514 },
515 )
516 .await
517 {
518 Ok(res) => res,
519 Err(e) => {
520 error!("Commit failed: {}", e);
521 return (
522 StatusCode::INTERNAL_SERVER_ERROR,
523 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})),
524 )
525 .into_response();
526 }
527 };
528
529 if let Some(ref controller) = controller_did {
530 let write_summary: Vec<serde_json::Value> = input
531 .writes
532 .iter()
533 .map(|w| match w {
534 WriteOp::Create {
535 collection, rkey, ..
536 } => json!({
537 "action": "create",
538 "collection": collection,
539 "rkey": rkey
540 }),
541 WriteOp::Update {
542 collection, rkey, ..
543 } => json!({
544 "action": "update",
545 "collection": collection,
546 "rkey": rkey
547 }),
548 WriteOp::Delete { collection, rkey } => json!({
549 "action": "delete",
550 "collection": collection,
551 "rkey": rkey
552 }),
553 })
554 .collect();
555
556 let _ = delegation::log_delegation_action(
557 &state.db,
558 &did,
559 controller,
560 Some(controller),
561 DelegationActionType::RepoWrite,
562 Some(json!({
563 "action": "apply_writes",
564 "count": input.writes.len(),
565 "writes": write_summary
566 })),
567 None,
568 None,
569 )
570 .await;
571 }
572
573 (
574 StatusCode::OK,
575 Json(ApplyWritesOutput {
576 commit: CommitInfo {
577 cid: commit_res.commit_cid.to_string(),
578 rev: commit_res.rev,
579 },
580 results,
581 }),
582 )
583 .into_response()
584}