this repo has no description
1use super::validation::validate_record;
2use crate::api::repo::record::utils::{commit_and_log, RecordOp};
3use crate::repo::tracking::TrackingBlockStore;
4use crate::state::AppState;
5use axum::{
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9 Json,
10};
11use cid::Cid;
12use jacquard::types::{integer::LimitedU32, string::{Nsid, Tid}};
13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
14use serde::{Deserialize, Serialize};
15use serde_json::json;
16use sqlx::{PgPool, Row};
17use std::str::FromStr;
18use std::sync::Arc;
19use tracing::error;
20use uuid::Uuid;
21pub async fn has_verified_notification_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
22 let row = sqlx::query(
23 r#"
24 SELECT
25 email_confirmed,
26 discord_verified,
27 telegram_verified,
28 signal_verified
29 FROM users
30 WHERE did = $1
31 "#
32 )
33 .bind(did)
34 .fetch_optional(db)
35 .await?;
36 match row {
37 Some(r) => {
38 let email_confirmed: bool = r.get("email_confirmed");
39 let discord_verified: bool = r.get("discord_verified");
40 let telegram_verified: bool = r.get("telegram_verified");
41 let signal_verified: bool = r.get("signal_verified");
42 Ok(email_confirmed || discord_verified || telegram_verified || signal_verified)
43 }
44 None => Ok(false),
45 }
46}
47pub async fn prepare_repo_write(
48 state: &AppState,
49 headers: &HeaderMap,
50 repo_did: &str,
51) -> Result<(String, Uuid, Cid), Response> {
52 let token = crate::auth::extract_bearer_token_from_header(
53 headers.get("Authorization").and_then(|h| h.to_str().ok())
54 ).ok_or_else(|| {
55 (
56 StatusCode::UNAUTHORIZED,
57 Json(json!({"error": "AuthenticationRequired"})),
58 )
59 .into_response()
60 })?;
61 let auth_user = crate::auth::validate_bearer_token(&state.db, &token)
62 .await
63 .map_err(|_| {
64 (
65 StatusCode::UNAUTHORIZED,
66 Json(json!({"error": "AuthenticationFailed"})),
67 )
68 .into_response()
69 })?;
70 if repo_did != auth_user.did {
71 return Err((
72 StatusCode::FORBIDDEN,
73 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
74 )
75 .into_response());
76 }
77 match has_verified_notification_channel(&state.db, &auth_user.did).await {
78 Ok(true) => {}
79 Ok(false) => {
80 return Err((
81 StatusCode::FORBIDDEN,
82 Json(json!({
83 "error": "AccountNotVerified",
84 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
85 })),
86 )
87 .into_response());
88 }
89 Err(e) => {
90 error!("DB error checking notification channels: {}", e);
91 return Err((
92 StatusCode::INTERNAL_SERVER_ERROR,
93 Json(json!({"error": "InternalError"})),
94 )
95 .into_response());
96 }
97 }
98 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
99 .fetch_optional(&state.db)
100 .await
101 .map_err(|e| {
102 error!("DB error fetching user: {}", e);
103 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
104 })?
105 .ok_or_else(|| {
106 (
107 StatusCode::INTERNAL_SERVER_ERROR,
108 Json(json!({"error": "InternalError", "message": "User not found"})),
109 )
110 .into_response()
111 })?;
112 let root_cid_str: String =
113 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
114 .fetch_optional(&state.db)
115 .await
116 .map_err(|e| {
117 error!("DB error fetching repo root: {}", e);
118 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
119 })?
120 .ok_or_else(|| {
121 (
122 StatusCode::INTERNAL_SERVER_ERROR,
123 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
124 )
125 .into_response()
126 })?;
127 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
128 (
129 StatusCode::INTERNAL_SERVER_ERROR,
130 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
131 )
132 .into_response()
133 })?;
134 Ok((auth_user.did, user_id, current_root_cid))
135}
136#[derive(Deserialize)]
137#[allow(dead_code)]
138pub struct CreateRecordInput {
139 pub repo: String,
140 pub collection: String,
141 pub rkey: Option<String>,
142 pub validate: Option<bool>,
143 pub record: serde_json::Value,
144 #[serde(rename = "swapCommit")]
145 pub swap_commit: Option<String>,
146}
147#[derive(Serialize)]
148#[serde(rename_all = "camelCase")]
149pub struct CreateRecordOutput {
150 pub uri: String,
151 pub cid: String,
152}
153pub async fn create_record(
154 State(state): State<AppState>,
155 headers: HeaderMap,
156 Json(input): Json<CreateRecordInput>,
157) -> Response {
158 let (did, user_id, current_root_cid) =
159 match prepare_repo_write(&state, &headers, &input.repo).await {
160 Ok(res) => res,
161 Err(err_res) => return err_res,
162 };
163 if let Some(swap_commit) = &input.swap_commit {
164 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
165 return (
166 StatusCode::CONFLICT,
167 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
168 )
169 .into_response();
170 }
171 }
172 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
173 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
174 Ok(Some(b)) => b,
175 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
176 };
177 let commit = match Commit::from_cbor(&commit_bytes) {
178 Ok(c) => c,
179 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(),
180 };
181 let mst = Mst::load(
182 Arc::new(tracking_store.clone()),
183 commit.data,
184 None,
185 );
186 let collection_nsid = match input.collection.parse::<Nsid>() {
187 Ok(n) => n,
188 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
189 };
190 if input.validate.unwrap_or(true) {
191 if let Err(err_response) = validate_record(&input.record, &input.collection) {
192 return err_response;
193 }
194 }
195 let rkey = input.rkey.unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
196 let mut record_bytes = Vec::new();
197 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
198 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
199 }
200 let record_cid = match tracking_store.put(&record_bytes).await {
201 Ok(c) => c,
202 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(),
203 };
204 let key = format!("{}/{}", collection_nsid, rkey);
205 let new_mst = match mst.add(&key, record_cid).await {
206 Ok(m) => m,
207 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(),
208 };
209 let new_mst_root = match new_mst.persist().await {
210 Ok(c) => c,
211 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(),
212 };
213 let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid };
214 let mut relevant_blocks = std::collections::BTreeMap::new();
215 if let Err(_) = new_mst.blocks_for_path(&key, &mut relevant_blocks).await {
216 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
217 }
218 if let Err(_) = mst.blocks_for_path(&key, &mut relevant_blocks).await {
219 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
220 }
221 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
222 let mut written_cids = tracking_store.get_all_relevant_cids();
223 for cid in relevant_blocks.keys() {
224 if !written_cids.contains(cid) {
225 written_cids.push(*cid);
226 }
227 }
228 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>();
229 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), Some(commit.data), new_mst_root, vec![op], &written_cids_str).await {
230 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response();
231 };
232 (StatusCode::OK, Json(CreateRecordOutput {
233 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
234 cid: record_cid.to_string(),
235 })).into_response()
236}
237#[derive(Deserialize)]
238#[allow(dead_code)]
239pub struct PutRecordInput {
240 pub repo: String,
241 pub collection: String,
242 pub rkey: String,
243 pub validate: Option<bool>,
244 pub record: serde_json::Value,
245 #[serde(rename = "swapCommit")]
246 pub swap_commit: Option<String>,
247 #[serde(rename = "swapRecord")]
248 pub swap_record: Option<String>,
249}
250#[derive(Serialize)]
251#[serde(rename_all = "camelCase")]
252pub struct PutRecordOutput {
253 pub uri: String,
254 pub cid: String,
255}
256pub async fn put_record(
257 State(state): State<AppState>,
258 headers: HeaderMap,
259 Json(input): Json<PutRecordInput>,
260) -> Response {
261 let (did, user_id, current_root_cid) =
262 match prepare_repo_write(&state, &headers, &input.repo).await {
263 Ok(res) => res,
264 Err(err_res) => return err_res,
265 };
266 if let Some(swap_commit) = &input.swap_commit {
267 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
268 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response();
269 }
270 }
271 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
272 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
273 Ok(Some(b)) => b,
274 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
275 };
276 let commit = match Commit::from_cbor(&commit_bytes) {
277 Ok(c) => c,
278 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(),
279 };
280 let mst = Mst::load(
281 Arc::new(tracking_store.clone()),
282 commit.data,
283 None,
284 );
285 let collection_nsid = match input.collection.parse::<Nsid>() {
286 Ok(n) => n,
287 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
288 };
289 let key = format!("{}/{}", collection_nsid, input.rkey);
290 if input.validate.unwrap_or(true) {
291 if let Err(err_response) = validate_record(&input.record, &input.collection) {
292 return err_response;
293 }
294 }
295 if let Some(swap_record_str) = &input.swap_record {
296 let expected_cid = Cid::from_str(swap_record_str).ok();
297 let actual_cid = mst.get(&key).await.ok().flatten();
298 if expected_cid != actual_cid {
299 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
300 }
301 }
302 let existing_cid = mst.get(&key).await.ok().flatten();
303 let mut record_bytes = Vec::new();
304 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
305 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
306 }
307 let record_cid = match tracking_store.put(&record_bytes).await {
308 Ok(c) => c,
309 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(),
310 };
311 let new_mst = if existing_cid.is_some() {
312 match mst.update(&key, record_cid).await {
313 Ok(m) => m,
314 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update MST"}))).into_response(),
315 }
316 } else {
317 match mst.add(&key, record_cid).await {
318 Ok(m) => m,
319 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(),
320 }
321 };
322 let new_mst_root = match new_mst.persist().await {
323 Ok(c) => c,
324 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(),
325 };
326 let op = if existing_cid.is_some() {
327 RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid, prev: existing_cid }
328 } else {
329 RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid }
330 };
331 let mut relevant_blocks = std::collections::BTreeMap::new();
332 if let Err(_) = new_mst.blocks_for_path(&key, &mut relevant_blocks).await {
333 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
334 }
335 if let Err(_) = mst.blocks_for_path(&key, &mut relevant_blocks).await {
336 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
337 }
338 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
339 let mut written_cids = tracking_store.get_all_relevant_cids();
340 for cid in relevant_blocks.keys() {
341 if !written_cids.contains(cid) {
342 written_cids.push(*cid);
343 }
344 }
345 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>();
346 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), Some(commit.data), new_mst_root, vec![op], &written_cids_str).await {
347 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response();
348 };
349 (StatusCode::OK, Json(PutRecordOutput {
350 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
351 cid: record_cid.to_string(),
352 })).into_response()
353}