this repo has no description
1use crate::api::repo::record::utils::{commit_and_log, RecordOp};
2use crate::repo::tracking::TrackingBlockStore;
3use crate::state::AppState;
4use axum::{
5 extract::State,
6 http::{HeaderMap, StatusCode},
7 response::{IntoResponse, Response},
8 Json,
9};
10use chrono::Utc;
11use cid::Cid;
12use jacquard::types::string::Nsid;
13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
14use serde::{Deserialize, Serialize};
15use serde_json::json;
16use std::str::FromStr;
17use std::sync::Arc;
18use tracing::error;
19use uuid::Uuid;
20
21pub async fn prepare_repo_write(
22 state: &AppState,
23 headers: &HeaderMap,
24 repo_did: &str,
25) -> Result<(String, Uuid, Cid), Response> {
26 let auth_header = headers.get("Authorization").ok_or_else(|| {
27 (
28 StatusCode::UNAUTHORIZED,
29 Json(json!({"error": "AuthenticationRequired"})),
30 )
31 .into_response()
32 })?;
33 let token = auth_header
34 .to_str()
35 .unwrap_or("")
36 .replace("Bearer ", "");
37
38 let session = sqlx::query!(
39 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1",
40 token
41 )
42 .fetch_optional(&state.db)
43 .await
44 .map_err(|e| {
45 error!("DB error fetching session: {}", e);
46 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
47 })?
48 .ok_or_else(|| {
49 (
50 StatusCode::UNAUTHORIZED,
51 Json(json!({"error": "AuthenticationFailed"})),
52 )
53 .into_response()
54 })?;
55
56 crate::auth::verify_token(&token, &session.key_bytes).map_err(|_| {
57 (
58 StatusCode::UNAUTHORIZED,
59 Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
60 )
61 .into_response()
62 })?;
63
64 if repo_did != session.did {
65 return Err((
66 StatusCode::FORBIDDEN,
67 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
68 )
69 .into_response());
70 }
71
72 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", session.did)
73 .fetch_optional(&state.db)
74 .await
75 .map_err(|e| {
76 error!("DB error fetching user: {}", e);
77 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
78 })?
79 .ok_or_else(|| {
80 (
81 StatusCode::INTERNAL_SERVER_ERROR,
82 Json(json!({"error": "InternalError", "message": "User not found"})),
83 )
84 .into_response()
85 })?;
86
87 let root_cid_str: String =
88 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
89 .fetch_optional(&state.db)
90 .await
91 .map_err(|e| {
92 error!("DB error fetching repo root: {}", e);
93 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
94 })?
95 .ok_or_else(|| {
96 (
97 StatusCode::INTERNAL_SERVER_ERROR,
98 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
99 )
100 .into_response()
101 })?;
102
103 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
104 (
105 StatusCode::INTERNAL_SERVER_ERROR,
106 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
107 )
108 .into_response()
109 })?;
110
111 Ok((session.did, user_id, current_root_cid))
112}
113
114#[derive(Deserialize)]
115#[allow(dead_code)]
116pub struct CreateRecordInput {
117 pub repo: String,
118 pub collection: String,
119 pub rkey: Option<String>,
120 pub validate: Option<bool>,
121 pub record: serde_json::Value,
122 #[serde(rename = "swapCommit")]
123 pub swap_commit: Option<String>,
124}
125
126#[derive(Serialize)]
127#[serde(rename_all = "camelCase")]
128pub struct CreateRecordOutput {
129 pub uri: String,
130 pub cid: String,
131}
132
133pub async fn create_record(
134 State(state): State<AppState>,
135 headers: HeaderMap,
136 Json(input): Json<CreateRecordInput>,
137) -> Response {
138 let (did, user_id, current_root_cid) =
139 match prepare_repo_write(&state, &headers, &input.repo).await {
140 Ok(res) => res,
141 Err(err_res) => return err_res,
142 };
143
144 if let Some(swap_commit) = &input.swap_commit {
145 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
146 return (
147 StatusCode::CONFLICT,
148 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
149 )
150 .into_response();
151 }
152 }
153
154 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
155
156 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
157 Ok(Some(b)) => b,
158 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
159 };
160 let commit = match Commit::from_cbor(&commit_bytes) {
161 Ok(c) => c,
162 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(),
163 };
164
165 let mst = Mst::load(
166 Arc::new(tracking_store.clone()),
167 commit.data,
168 None,
169 );
170
171 let collection_nsid = match input.collection.parse::<Nsid>() {
172 Ok(n) => n,
173 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
174 };
175
176 if input.validate.unwrap_or(true) {
177 if input.collection == "app.bsky.feed.post" {
178 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() {
179 return (
180 StatusCode::BAD_REQUEST,
181 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})),
182 )
183 .into_response();
184 }
185 }
186 }
187
188 let rkey = input.rkey.unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
189
190 let mut record_bytes = Vec::new();
191 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
192 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
193 }
194 let record_cid = match tracking_store.put(&record_bytes).await {
195 Ok(c) => c,
196 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(),
197 };
198
199 let key = format!("{}/{}", collection_nsid, rkey);
200 let new_mst = match mst.add(&key, record_cid).await {
201 Ok(m) => m,
202 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(),
203 };
204 let new_mst_root = match new_mst.persist().await {
205 Ok(c) => c,
206 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(),
207 };
208
209 let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid };
210 let written_cids = tracking_store.get_written_cids();
211 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>();
212
213 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await {
214 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response();
215 };
216
217 (StatusCode::OK, Json(CreateRecordOutput {
218 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
219 cid: record_cid.to_string(),
220 })).into_response()
221}
222
223#[derive(Deserialize)]
224#[allow(dead_code)]
225pub struct PutRecordInput {
226 pub repo: String,
227 pub collection: String,
228 pub rkey: String,
229 pub validate: Option<bool>,
230 pub record: serde_json::Value,
231 #[serde(rename = "swapCommit")]
232 pub swap_commit: Option<String>,
233 #[serde(rename = "swapRecord")]
234 pub swap_record: Option<String>,
235}
236
237#[derive(Serialize)]
238#[serde(rename_all = "camelCase")]
239pub struct PutRecordOutput {
240 pub uri: String,
241 pub cid: String,
242}
243
244pub async fn put_record(
245 State(state): State<AppState>,
246 headers: HeaderMap,
247 Json(input): Json<PutRecordInput>,
248) -> Response {
249 let (did, user_id, current_root_cid) =
250 match prepare_repo_write(&state, &headers, &input.repo).await {
251 Ok(res) => res,
252 Err(err_res) => return err_res,
253 };
254
255 if let Some(swap_commit) = &input.swap_commit {
256 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
257 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response();
258 }
259 }
260
261 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
262
263 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
264 Ok(Some(b)) => b,
265 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
266 };
267 let commit = match Commit::from_cbor(&commit_bytes) {
268 Ok(c) => c,
269 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(),
270 };
271
272 let mst = Mst::load(
273 Arc::new(tracking_store.clone()),
274 commit.data,
275 None,
276 );
277 let collection_nsid = match input.collection.parse::<Nsid>() {
278 Ok(n) => n,
279 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
280 };
281 let key = format!("{}/{}", collection_nsid, input.rkey);
282
283 if input.validate.unwrap_or(true) {
284 if input.collection == "app.bsky.feed.post" {
285 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() {
286 return (
287 StatusCode::BAD_REQUEST,
288 Json(json!({"error": "InvalidRecord", "message": "Record validation failed"})),
289 )
290 .into_response();
291 }
292 }
293 }
294
295 if let Some(swap_record_str) = &input.swap_record {
296 let expected_cid = Cid::from_str(swap_record_str).ok();
297 let actual_cid = mst.get(&key).await.ok().flatten();
298 if expected_cid != actual_cid {
299 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
300 }
301 }
302
303 let existing_cid = mst.get(&key).await.ok().flatten();
304
305 let mut record_bytes = Vec::new();
306 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
307 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
308 }
309 let record_cid = match tracking_store.put(&record_bytes).await {
310 Ok(c) => c,
311 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(),
312 };
313
314 let new_mst = if existing_cid.is_some() {
315 mst.update(&key, record_cid).await.unwrap()
316 } else {
317 mst.add(&key, record_cid).await.unwrap()
318 };
319 let new_mst_root = new_mst.persist().await.unwrap();
320
321 let op = if existing_cid.is_some() {
322 RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid }
323 } else {
324 RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid }
325 };
326
327 let written_cids = tracking_store.get_written_cids();
328 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>();
329
330 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await {
331 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response();
332 };
333
334 (StatusCode::OK, Json(PutRecordOutput {
335 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
336 cid: record_cid.to_string(),
337 })).into_response()
338}