this repo has no description
1use crate::api::repo::record::utils::{commit_and_log, RecordOp};
2use crate::repo::tracking::TrackingBlockStore;
3use crate::state::AppState;
4use axum::{
5 extract::State,
6 http::{HeaderMap, StatusCode},
7 response::{IntoResponse, Response},
8 Json,
9};
10use chrono::Utc;
11use cid::Cid;
12use jacquard::types::string::Nsid;
13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
14use serde::{Deserialize, Serialize};
15use serde_json::json;
16use std::str::FromStr;
17use std::sync::Arc;
18use tracing::error;
19use uuid::Uuid;
20
21pub async fn prepare_repo_write(
22 state: &AppState,
23 headers: &HeaderMap,
24 repo_did: &str,
25) -> Result<(String, Uuid, Cid), Response> {
26 let token = crate::auth::extract_bearer_token_from_header(
27 headers.get("Authorization").and_then(|h| h.to_str().ok())
28 ).ok_or_else(|| {
29 (
30 StatusCode::UNAUTHORIZED,
31 Json(json!({"error": "AuthenticationRequired"})),
32 )
33 .into_response()
34 })?;
35
36 let auth_user = crate::auth::validate_bearer_token(&state.db, &token)
37 .await
38 .map_err(|_| {
39 (
40 StatusCode::UNAUTHORIZED,
41 Json(json!({"error": "AuthenticationFailed"})),
42 )
43 .into_response()
44 })?;
45
46 if repo_did != auth_user.did {
47 return Err((
48 StatusCode::FORBIDDEN,
49 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
50 )
51 .into_response());
52 }
53
54 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
55 .fetch_optional(&state.db)
56 .await
57 .map_err(|e| {
58 error!("DB error fetching user: {}", e);
59 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
60 })?
61 .ok_or_else(|| {
62 (
63 StatusCode::INTERNAL_SERVER_ERROR,
64 Json(json!({"error": "InternalError", "message": "User not found"})),
65 )
66 .into_response()
67 })?;
68
69 let root_cid_str: String =
70 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
71 .fetch_optional(&state.db)
72 .await
73 .map_err(|e| {
74 error!("DB error fetching repo root: {}", e);
75 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
76 })?
77 .ok_or_else(|| {
78 (
79 StatusCode::INTERNAL_SERVER_ERROR,
80 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
81 )
82 .into_response()
83 })?;
84
85 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
86 (
87 StatusCode::INTERNAL_SERVER_ERROR,
88 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
89 )
90 .into_response()
91 })?;
92
93 Ok((auth_user.did, user_id, current_root_cid))
94}
95
96#[derive(Deserialize)]
97#[allow(dead_code)]
98pub struct CreateRecordInput {
99 pub repo: String,
100 pub collection: String,
101 pub rkey: Option<String>,
102 pub validate: Option<bool>,
103 pub record: serde_json::Value,
104 #[serde(rename = "swapCommit")]
105 pub swap_commit: Option<String>,
106}
107
108#[derive(Serialize)]
109#[serde(rename_all = "camelCase")]
110pub struct CreateRecordOutput {
111 pub uri: String,
112 pub cid: String,
113}
114
115pub async fn create_record(
116 State(state): State<AppState>,
117 headers: HeaderMap,
118 Json(input): Json<CreateRecordInput>,
119) -> Response {
120 let (did, user_id, current_root_cid) =
121 match prepare_repo_write(&state, &headers, &input.repo).await {
122 Ok(res) => res,
123 Err(err_res) => return err_res,
124 };
125
126 if let Some(swap_commit) = &input.swap_commit {
127 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
128 return (
129 StatusCode::CONFLICT,
130 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
131 )
132 .into_response();
133 }
134 }
135
136 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
137
138 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
139 Ok(Some(b)) => b,
140 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
141 };
142 let commit = match Commit::from_cbor(&commit_bytes) {
143 Ok(c) => c,
144 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(),
145 };
146
147 let mst = Mst::load(
148 Arc::new(tracking_store.clone()),
149 commit.data,
150 None,
151 );
152
153 let collection_nsid = match input.collection.parse::<Nsid>() {
154 Ok(n) => n,
155 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
156 };
157
158 if input.validate.unwrap_or(true) {
159 if input.collection == "app.bsky.feed.post" {
160 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() {
161 return (
162 StatusCode::BAD_REQUEST,
163 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})),
164 )
165 .into_response();
166 }
167 }
168 }
169
170 let rkey = input.rkey.unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
171
172 let mut record_bytes = Vec::new();
173 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
174 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
175 }
176 let record_cid = match tracking_store.put(&record_bytes).await {
177 Ok(c) => c,
178 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(),
179 };
180
181 let key = format!("{}/{}", collection_nsid, rkey);
182 let new_mst = match mst.add(&key, record_cid).await {
183 Ok(m) => m,
184 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(),
185 };
186 let new_mst_root = match new_mst.persist().await {
187 Ok(c) => c,
188 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(),
189 };
190
191 let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid };
192 let written_cids = tracking_store.get_written_cids();
193 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>();
194
195 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await {
196 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response();
197 };
198
199 (StatusCode::OK, Json(CreateRecordOutput {
200 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
201 cid: record_cid.to_string(),
202 })).into_response()
203}
204
205#[derive(Deserialize)]
206#[allow(dead_code)]
207pub struct PutRecordInput {
208 pub repo: String,
209 pub collection: String,
210 pub rkey: String,
211 pub validate: Option<bool>,
212 pub record: serde_json::Value,
213 #[serde(rename = "swapCommit")]
214 pub swap_commit: Option<String>,
215 #[serde(rename = "swapRecord")]
216 pub swap_record: Option<String>,
217}
218
219#[derive(Serialize)]
220#[serde(rename_all = "camelCase")]
221pub struct PutRecordOutput {
222 pub uri: String,
223 pub cid: String,
224}
225
226pub async fn put_record(
227 State(state): State<AppState>,
228 headers: HeaderMap,
229 Json(input): Json<PutRecordInput>,
230) -> Response {
231 let (did, user_id, current_root_cid) =
232 match prepare_repo_write(&state, &headers, &input.repo).await {
233 Ok(res) => res,
234 Err(err_res) => return err_res,
235 };
236
237 if let Some(swap_commit) = &input.swap_commit {
238 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
239 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response();
240 }
241 }
242
243 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
244
245 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
246 Ok(Some(b)) => b,
247 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
248 };
249 let commit = match Commit::from_cbor(&commit_bytes) {
250 Ok(c) => c,
251 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(),
252 };
253
254 let mst = Mst::load(
255 Arc::new(tracking_store.clone()),
256 commit.data,
257 None,
258 );
259 let collection_nsid = match input.collection.parse::<Nsid>() {
260 Ok(n) => n,
261 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
262 };
263 let key = format!("{}/{}", collection_nsid, input.rkey);
264
265 if input.validate.unwrap_or(true) {
266 if input.collection == "app.bsky.feed.post" {
267 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() {
268 return (
269 StatusCode::BAD_REQUEST,
270 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})),
271 )
272 .into_response();
273 }
274 }
275 }
276
277 if let Some(swap_record_str) = &input.swap_record {
278 let expected_cid = Cid::from_str(swap_record_str).ok();
279 let actual_cid = mst.get(&key).await.ok().flatten();
280 if expected_cid != actual_cid {
281 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
282 }
283 }
284
285 let existing_cid = mst.get(&key).await.ok().flatten();
286
287 let mut record_bytes = Vec::new();
288 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
289 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
290 }
291 let record_cid = match tracking_store.put(&record_bytes).await {
292 Ok(c) => c,
293 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(),
294 };
295
296 let new_mst = if existing_cid.is_some() {
297 mst.update(&key, record_cid).await.unwrap()
298 } else {
299 mst.add(&key, record_cid).await.unwrap()
300 };
301 let new_mst_root = new_mst.persist().await.unwrap();
302
303 let op = if existing_cid.is_some() {
304 RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid }
305 } else {
306 RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid }
307 };
308
309 let written_cids = tracking_store.get_written_cids();
310 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>();
311
312 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await {
313 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response();
314 };
315
316 (StatusCode::OK, Json(PutRecordOutput {
317 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
318 cid: record_cid.to_string(),
319 })).into_response()
320}