this repo has no description
1use super::validation::validate_record;
2use crate::api::repo::record::utils::{commit_and_log, RecordOp};
3use crate::repo::tracking::TrackingBlockStore;
4use crate::state::AppState;
5use axum::{
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9 Json,
10};
11use chrono::Utc;
12use cid::Cid;
13use jacquard::types::string::Nsid;
14use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::str::FromStr;
18use std::sync::Arc;
19use tracing::error;
20use uuid::Uuid;
21
22pub async fn prepare_repo_write(
23 state: &AppState,
24 headers: &HeaderMap,
25 repo_did: &str,
26) -> Result<(String, Uuid, Cid), Response> {
27 let token = crate::auth::extract_bearer_token_from_header(
28 headers.get("Authorization").and_then(|h| h.to_str().ok())
29 ).ok_or_else(|| {
30 (
31 StatusCode::UNAUTHORIZED,
32 Json(json!({"error": "AuthenticationRequired"})),
33 )
34 .into_response()
35 })?;
36
37 let auth_user = crate::auth::validate_bearer_token(&state.db, &token)
38 .await
39 .map_err(|_| {
40 (
41 StatusCode::UNAUTHORIZED,
42 Json(json!({"error": "AuthenticationFailed"})),
43 )
44 .into_response()
45 })?;
46
47 if repo_did != auth_user.did {
48 return Err((
49 StatusCode::FORBIDDEN,
50 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
51 )
52 .into_response());
53 }
54
55 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
56 .fetch_optional(&state.db)
57 .await
58 .map_err(|e| {
59 error!("DB error fetching user: {}", e);
60 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
61 })?
62 .ok_or_else(|| {
63 (
64 StatusCode::INTERNAL_SERVER_ERROR,
65 Json(json!({"error": "InternalError", "message": "User not found"})),
66 )
67 .into_response()
68 })?;
69
70 let root_cid_str: String =
71 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
72 .fetch_optional(&state.db)
73 .await
74 .map_err(|e| {
75 error!("DB error fetching repo root: {}", e);
76 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
77 })?
78 .ok_or_else(|| {
79 (
80 StatusCode::INTERNAL_SERVER_ERROR,
81 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
82 )
83 .into_response()
84 })?;
85
86 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
87 (
88 StatusCode::INTERNAL_SERVER_ERROR,
89 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
90 )
91 .into_response()
92 })?;
93
94 Ok((auth_user.did, user_id, current_root_cid))
95}
96
97#[derive(Deserialize)]
98#[allow(dead_code)]
99pub struct CreateRecordInput {
100 pub repo: String,
101 pub collection: String,
102 pub rkey: Option<String>,
103 pub validate: Option<bool>,
104 pub record: serde_json::Value,
105 #[serde(rename = "swapCommit")]
106 pub swap_commit: Option<String>,
107}
108
109#[derive(Serialize)]
110#[serde(rename_all = "camelCase")]
111pub struct CreateRecordOutput {
112 pub uri: String,
113 pub cid: String,
114}
115
116pub async fn create_record(
117 State(state): State<AppState>,
118 headers: HeaderMap,
119 Json(input): Json<CreateRecordInput>,
120) -> Response {
121 let (did, user_id, current_root_cid) =
122 match prepare_repo_write(&state, &headers, &input.repo).await {
123 Ok(res) => res,
124 Err(err_res) => return err_res,
125 };
126
127 if let Some(swap_commit) = &input.swap_commit {
128 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
129 return (
130 StatusCode::CONFLICT,
131 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
132 )
133 .into_response();
134 }
135 }
136
137 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
138
139 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
140 Ok(Some(b)) => b,
141 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
142 };
143 let commit = match Commit::from_cbor(&commit_bytes) {
144 Ok(c) => c,
145 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(),
146 };
147
148 let mst = Mst::load(
149 Arc::new(tracking_store.clone()),
150 commit.data,
151 None,
152 );
153
154 let collection_nsid = match input.collection.parse::<Nsid>() {
155 Ok(n) => n,
156 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
157 };
158
159 if input.validate.unwrap_or(true) {
160 if let Err(err_response) = validate_record(&input.record, &input.collection) {
161 return err_response;
162 }
163 }
164
165 let rkey = input.rkey.unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
166
167 let mut record_bytes = Vec::new();
168 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
169 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
170 }
171 let record_cid = match tracking_store.put(&record_bytes).await {
172 Ok(c) => c,
173 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(),
174 };
175
176 let key = format!("{}/{}", collection_nsid, rkey);
177 let new_mst = match mst.add(&key, record_cid).await {
178 Ok(m) => m,
179 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(),
180 };
181 let new_mst_root = match new_mst.persist().await {
182 Ok(c) => c,
183 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(),
184 };
185
186 let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid };
187 let written_cids = tracking_store.get_written_cids();
188 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>();
189
190 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await {
191 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response();
192 };
193
194 (StatusCode::OK, Json(CreateRecordOutput {
195 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
196 cid: record_cid.to_string(),
197 })).into_response()
198}
199
200#[derive(Deserialize)]
201#[allow(dead_code)]
202pub struct PutRecordInput {
203 pub repo: String,
204 pub collection: String,
205 pub rkey: String,
206 pub validate: Option<bool>,
207 pub record: serde_json::Value,
208 #[serde(rename = "swapCommit")]
209 pub swap_commit: Option<String>,
210 #[serde(rename = "swapRecord")]
211 pub swap_record: Option<String>,
212}
213
214#[derive(Serialize)]
215#[serde(rename_all = "camelCase")]
216pub struct PutRecordOutput {
217 pub uri: String,
218 pub cid: String,
219}
220
221pub async fn put_record(
222 State(state): State<AppState>,
223 headers: HeaderMap,
224 Json(input): Json<PutRecordInput>,
225) -> Response {
226 let (did, user_id, current_root_cid) =
227 match prepare_repo_write(&state, &headers, &input.repo).await {
228 Ok(res) => res,
229 Err(err_res) => return err_res,
230 };
231
232 if let Some(swap_commit) = &input.swap_commit {
233 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
234 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response();
235 }
236 }
237
238 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
239
240 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
241 Ok(Some(b)) => b,
242 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
243 };
244 let commit = match Commit::from_cbor(&commit_bytes) {
245 Ok(c) => c,
246 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(),
247 };
248
249 let mst = Mst::load(
250 Arc::new(tracking_store.clone()),
251 commit.data,
252 None,
253 );
254 let collection_nsid = match input.collection.parse::<Nsid>() {
255 Ok(n) => n,
256 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
257 };
258 let key = format!("{}/{}", collection_nsid, input.rkey);
259
260 if input.validate.unwrap_or(true) {
261 if let Err(err_response) = validate_record(&input.record, &input.collection) {
262 return err_response;
263 }
264 }
265
266 if let Some(swap_record_str) = &input.swap_record {
267 let expected_cid = Cid::from_str(swap_record_str).ok();
268 let actual_cid = mst.get(&key).await.ok().flatten();
269 if expected_cid != actual_cid {
270 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
271 }
272 }
273
274 let existing_cid = mst.get(&key).await.ok().flatten();
275
276 let mut record_bytes = Vec::new();
277 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
278 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
279 }
280 let record_cid = match tracking_store.put(&record_bytes).await {
281 Ok(c) => c,
282 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(),
283 };
284
285 let new_mst = if existing_cid.is_some() {
286 match mst.update(&key, record_cid).await {
287 Ok(m) => m,
288 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update MST"}))).into_response(),
289 }
290 } else {
291 match mst.add(&key, record_cid).await {
292 Ok(m) => m,
293 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(),
294 }
295 };
296 let new_mst_root = match new_mst.persist().await {
297 Ok(c) => c,
298 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(),
299 };
300
301 let op = if existing_cid.is_some() {
302 RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid }
303 } else {
304 RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid }
305 };
306
307 let written_cids = tracking_store.get_written_cids();
308 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>();
309
310 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await {
311 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response();
312 };
313
314 (StatusCode::OK, Json(PutRecordOutput {
315 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
316 cid: record_cid.to_string(),
317 })).into_response()
318}