this repo has no description
1use crate::api::repo::record::utils::{commit_and_log, RecordOp};
2use crate::repo::tracking::TrackingBlockStore;
3use crate::state::AppState;
4use axum::{
5 extract::State,
6 http::StatusCode,
7 response::{IntoResponse, Response},
8 Json,
9};
10use chrono::Utc;
11use cid::Cid;
12use jacquard::types::string::Nsid;
13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
14use serde::{Deserialize, Serialize};
15use serde_json::json;
16use std::str::FromStr;
17use std::sync::Arc;
18use tracing::error;
19
20#[derive(Deserialize)]
21#[serde(tag = "$type")]
22pub enum WriteOp {
23 #[serde(rename = "com.atproto.repo.applyWrites#create")]
24 Create {
25 collection: String,
26 rkey: Option<String>,
27 value: serde_json::Value,
28 },
29 #[serde(rename = "com.atproto.repo.applyWrites#update")]
30 Update {
31 collection: String,
32 rkey: String,
33 value: serde_json::Value,
34 },
35 #[serde(rename = "com.atproto.repo.applyWrites#delete")]
36 Delete { collection: String, rkey: String },
37}
38
39#[derive(Deserialize)]
40#[serde(rename_all = "camelCase")]
41pub struct ApplyWritesInput {
42 pub repo: String,
43 pub validate: Option<bool>,
44 pub writes: Vec<WriteOp>,
45 pub swap_commit: Option<String>,
46}
47
48#[derive(Serialize)]
49#[serde(tag = "$type")]
50pub enum WriteResult {
51 #[serde(rename = "com.atproto.repo.applyWrites#createResult")]
52 CreateResult { uri: String, cid: String },
53 #[serde(rename = "com.atproto.repo.applyWrites#updateResult")]
54 UpdateResult { uri: String, cid: String },
55 #[serde(rename = "com.atproto.repo.applyWrites#deleteResult")]
56 DeleteResult {},
57}
58
59#[derive(Serialize)]
60pub struct ApplyWritesOutput {
61 pub commit: CommitInfo,
62 pub results: Vec<WriteResult>,
63}
64
65#[derive(Serialize)]
66pub struct CommitInfo {
67 pub cid: String,
68 pub rev: String,
69}
70
71pub async fn apply_writes(
72 State(state): State<AppState>,
73 headers: axum::http::HeaderMap,
74 Json(input): Json<ApplyWritesInput>,
75) -> Response {
76 let token = match crate::auth::extract_bearer_token_from_header(
77 headers.get("Authorization").and_then(|h| h.to_str().ok())
78 ) {
79 Some(t) => t,
80 None => {
81 return (
82 StatusCode::UNAUTHORIZED,
83 Json(json!({"error": "AuthenticationRequired"})),
84 )
85 .into_response();
86 }
87 };
88
89 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
90 Ok(user) => user,
91 Err(_) => {
92 return (
93 StatusCode::UNAUTHORIZED,
94 Json(json!({"error": "AuthenticationFailed"})),
95 )
96 .into_response();
97 }
98 };
99
100 let did = auth_user.did;
101
102 if input.repo != did {
103 return (
104 StatusCode::FORBIDDEN,
105 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
106 )
107 .into_response();
108 }
109
110 if input.writes.is_empty() {
111 return (
112 StatusCode::BAD_REQUEST,
113 Json(json!({"error": "InvalidRequest", "message": "writes array is empty"})),
114 )
115 .into_response();
116 }
117
118 if input.writes.len() > 200 {
119 return (
120 StatusCode::BAD_REQUEST,
121 Json(json!({"error": "InvalidRequest", "message": "Too many writes (max 200)"})),
122 )
123 .into_response();
124 }
125
126 let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
127 .fetch_optional(&state.db)
128 .await
129 {
130 Ok(Some(id)) => id,
131 _ => {
132 return (
133 StatusCode::INTERNAL_SERVER_ERROR,
134 Json(json!({"error": "InternalError", "message": "User not found"})),
135 )
136 .into_response();
137 }
138 };
139
140 let root_cid_str: String =
141 match sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
142 .fetch_optional(&state.db)
143 .await
144 {
145 Ok(Some(cid_str)) => cid_str,
146 _ => {
147 return (
148 StatusCode::INTERNAL_SERVER_ERROR,
149 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
150 )
151 .into_response();
152 }
153 };
154
155 let current_root_cid = match Cid::from_str(&root_cid_str) {
156 Ok(c) => c,
157 Err(_) => {
158 return (
159 StatusCode::INTERNAL_SERVER_ERROR,
160 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
161 )
162 .into_response();
163 }
164 };
165
166 if let Some(swap_commit) = &input.swap_commit {
167 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
168 return (
169 StatusCode::CONFLICT,
170 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
171 )
172 .into_response();
173 }
174 }
175
176 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
177
178 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
179 Ok(Some(b)) => b,
180 _ => {
181 return (
182 StatusCode::INTERNAL_SERVER_ERROR,
183 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
184 )
185 .into_response()
186 }
187 };
188
189 let commit = match Commit::from_cbor(&commit_bytes) {
190 Ok(c) => c,
191 _ => {
192 return (
193 StatusCode::INTERNAL_SERVER_ERROR,
194 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
195 )
196 .into_response()
197 }
198 };
199
200 let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
201
202 let mut results: Vec<WriteResult> = Vec::new();
203 let mut ops: Vec<RecordOp> = Vec::new();
204
205 for write in &input.writes {
206 match write {
207 WriteOp::Create {
208 collection,
209 rkey,
210 value,
211 } => {
212 let rkey = rkey
213 .clone()
214 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
215 let mut record_bytes = Vec::new();
216 serde_ipld_dagcbor::to_writer(&mut record_bytes, value).unwrap();
217 let record_cid = tracking_store.put(&record_bytes).await.unwrap();
218
219 let key = format!("{}/{}", collection.parse::<Nsid>().unwrap(), rkey);
220 mst = mst.add(&key, record_cid).await.unwrap();
221
222 let uri = format!("at://{}/{}/{}", did, collection, rkey);
223 results.push(WriteResult::CreateResult {
224 uri,
225 cid: record_cid.to_string(),
226 });
227 ops.push(RecordOp::Create {
228 collection: collection.clone(),
229 rkey,
230 cid: record_cid,
231 });
232 }
233 WriteOp::Update {
234 collection,
235 rkey,
236 value,
237 } => {
238 let mut record_bytes = Vec::new();
239 serde_ipld_dagcbor::to_writer(&mut record_bytes, value).unwrap();
240 let record_cid = tracking_store.put(&record_bytes).await.unwrap();
241
242 let key = format!("{}/{}", collection.parse::<Nsid>().unwrap(), rkey);
243 mst = mst.update(&key, record_cid).await.unwrap();
244
245 let uri = format!("at://{}/{}/{}", did, collection, rkey);
246 results.push(WriteResult::UpdateResult {
247 uri,
248 cid: record_cid.to_string(),
249 });
250 ops.push(RecordOp::Update {
251 collection: collection.clone(),
252 rkey: rkey.clone(),
253 cid: record_cid,
254 });
255 }
256 WriteOp::Delete { collection, rkey } => {
257 let key = format!("{}/{}", collection.parse::<Nsid>().unwrap(), rkey);
258 mst = mst.delete(&key).await.unwrap();
259
260 results.push(WriteResult::DeleteResult {});
261 ops.push(RecordOp::Delete {
262 collection: collection.clone(),
263 rkey: rkey.clone(),
264 });
265 }
266 }
267 }
268
269 let new_mst_root = mst.persist().await.unwrap();
270 let written_cids = tracking_store.get_written_cids();
271 let written_cids_str = written_cids
272 .iter()
273 .map(|c| c.to_string())
274 .collect::<Vec<_>>();
275
276 let commit_res = match commit_and_log(
277 &state,
278 &did,
279 user_id,
280 Some(current_root_cid),
281 new_mst_root,
282 ops,
283 &written_cids_str,
284 )
285 .await
286 {
287 Ok(res) => res,
288 Err(e) => {
289 error!("Commit failed: {}", e);
290 return (
291 StatusCode::INTERNAL_SERVER_ERROR,
292 Json(json!({"error": "InternalError", "message": "Failed to commit changes"})),
293 )
294 .into_response();
295 }
296 };
297
298 (
299 StatusCode::OK,
300 Json(ApplyWritesOutput {
301 commit: CommitInfo {
302 cid: commit_res.commit_cid.to_string(),
303 rev: commit_res.rev,
304 },
305 results,
306 }),
307 )
308 .into_response()
309}