this repo has no description
1use super::validation::validate_record;
2use crate::api::repo::record::utils::{commit_and_log, RecordOp};
3use crate::repo::tracking::TrackingBlockStore;
4use crate::state::AppState;
5use axum::{
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9 Json,
10};
11use cid::Cid;
12use jacquard::types::{integer::LimitedU32, string::{Nsid, Tid}};
13use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
14use serde::{Deserialize, Serialize};
15use serde_json::json;
16use sqlx::{PgPool, Row};
17use std::str::FromStr;
18use std::sync::Arc;
19use tracing::error;
20use uuid::Uuid;
21
22pub async fn has_verified_notification_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
23 let row = sqlx::query(
24 r#"
25 SELECT
26 email_confirmed,
27 discord_verified,
28 telegram_verified,
29 signal_verified
30 FROM users
31 WHERE did = $1
32 "#
33 )
34 .bind(did)
35 .fetch_optional(db)
36 .await?;
37 match row {
38 Some(r) => {
39 let email_confirmed: bool = r.get("email_confirmed");
40 let discord_verified: bool = r.get("discord_verified");
41 let telegram_verified: bool = r.get("telegram_verified");
42 let signal_verified: bool = r.get("signal_verified");
43 Ok(email_confirmed || discord_verified || telegram_verified || signal_verified)
44 }
45 None => Ok(false),
46 }
47}
48
49pub async fn prepare_repo_write(
50 state: &AppState,
51 headers: &HeaderMap,
52 repo_did: &str,
53) -> Result<(String, Uuid, Cid), Response> {
54 let token = crate::auth::extract_bearer_token_from_header(
55 headers.get("Authorization").and_then(|h| h.to_str().ok())
56 ).ok_or_else(|| {
57 (
58 StatusCode::UNAUTHORIZED,
59 Json(json!({"error": "AuthenticationRequired"})),
60 )
61 .into_response()
62 })?;
63 let auth_user = crate::auth::validate_bearer_token(&state.db, &token)
64 .await
65 .map_err(|_| {
66 (
67 StatusCode::UNAUTHORIZED,
68 Json(json!({"error": "AuthenticationFailed"})),
69 )
70 .into_response()
71 })?;
72 if repo_did != auth_user.did {
73 return Err((
74 StatusCode::FORBIDDEN,
75 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
76 )
77 .into_response());
78 }
79 match has_verified_notification_channel(&state.db, &auth_user.did).await {
80 Ok(true) => {}
81 Ok(false) => {
82 return Err((
83 StatusCode::FORBIDDEN,
84 Json(json!({
85 "error": "AccountNotVerified",
86 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
87 })),
88 )
89 .into_response());
90 }
91 Err(e) => {
92 error!("DB error checking notification channels: {}", e);
93 return Err((
94 StatusCode::INTERNAL_SERVER_ERROR,
95 Json(json!({"error": "InternalError"})),
96 )
97 .into_response());
98 }
99 }
100 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
101 .fetch_optional(&state.db)
102 .await
103 .map_err(|e| {
104 error!("DB error fetching user: {}", e);
105 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
106 })?
107 .ok_or_else(|| {
108 (
109 StatusCode::INTERNAL_SERVER_ERROR,
110 Json(json!({"error": "InternalError", "message": "User not found"})),
111 )
112 .into_response()
113 })?;
114 let root_cid_str: String =
115 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
116 .fetch_optional(&state.db)
117 .await
118 .map_err(|e| {
119 error!("DB error fetching repo root: {}", e);
120 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
121 })?
122 .ok_or_else(|| {
123 (
124 StatusCode::INTERNAL_SERVER_ERROR,
125 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
126 )
127 .into_response()
128 })?;
129 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
130 (
131 StatusCode::INTERNAL_SERVER_ERROR,
132 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
133 )
134 .into_response()
135 })?;
136 Ok((auth_user.did, user_id, current_root_cid))
137}
138#[derive(Deserialize)]
139#[allow(dead_code)]
140pub struct CreateRecordInput {
141 pub repo: String,
142 pub collection: String,
143 pub rkey: Option<String>,
144 pub validate: Option<bool>,
145 pub record: serde_json::Value,
146 #[serde(rename = "swapCommit")]
147 pub swap_commit: Option<String>,
148}
149#[derive(Serialize)]
150#[serde(rename_all = "camelCase")]
151pub struct CreateRecordOutput {
152 pub uri: String,
153 pub cid: String,
154}
155pub async fn create_record(
156 State(state): State<AppState>,
157 headers: HeaderMap,
158 Json(input): Json<CreateRecordInput>,
159) -> Response {
160 let (did, user_id, current_root_cid) =
161 match prepare_repo_write(&state, &headers, &input.repo).await {
162 Ok(res) => res,
163 Err(err_res) => return err_res,
164 };
165 if let Some(swap_commit) = &input.swap_commit {
166 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
167 return (
168 StatusCode::CONFLICT,
169 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
170 )
171 .into_response();
172 }
173 }
174 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
175 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
176 Ok(Some(b)) => b,
177 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
178 };
179 let commit = match Commit::from_cbor(&commit_bytes) {
180 Ok(c) => c,
181 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(),
182 };
183 let mst = Mst::load(
184 Arc::new(tracking_store.clone()),
185 commit.data,
186 None,
187 );
188 let collection_nsid = match input.collection.parse::<Nsid>() {
189 Ok(n) => n,
190 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
191 };
192 if input.validate.unwrap_or(true) {
193 if let Err(err_response) = validate_record(&input.record, &input.collection) {
194 return err_response;
195 }
196 }
197 let rkey = input.rkey.unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
198 let mut record_bytes = Vec::new();
199 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
200 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
201 }
202 let record_cid = match tracking_store.put(&record_bytes).await {
203 Ok(c) => c,
204 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(),
205 };
206 let key = format!("{}/{}", collection_nsid, rkey);
207 let new_mst = match mst.add(&key, record_cid).await {
208 Ok(m) => m,
209 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(),
210 };
211 let new_mst_root = match new_mst.persist().await {
212 Ok(c) => c,
213 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(),
214 };
215 let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid };
216 let mut relevant_blocks = std::collections::BTreeMap::new();
217 if let Err(_) = new_mst.blocks_for_path(&key, &mut relevant_blocks).await {
218 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
219 }
220 if let Err(_) = mst.blocks_for_path(&key, &mut relevant_blocks).await {
221 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
222 }
223 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
224 let mut written_cids = tracking_store.get_all_relevant_cids();
225 for cid in relevant_blocks.keys() {
226 if !written_cids.contains(cid) {
227 written_cids.push(*cid);
228 }
229 }
230 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>();
231 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), Some(commit.data), new_mst_root, vec![op], &written_cids_str).await {
232 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response();
233 };
234 (StatusCode::OK, Json(CreateRecordOutput {
235 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
236 cid: record_cid.to_string(),
237 })).into_response()
238}
239#[derive(Deserialize)]
240#[allow(dead_code)]
241pub struct PutRecordInput {
242 pub repo: String,
243 pub collection: String,
244 pub rkey: String,
245 pub validate: Option<bool>,
246 pub record: serde_json::Value,
247 #[serde(rename = "swapCommit")]
248 pub swap_commit: Option<String>,
249 #[serde(rename = "swapRecord")]
250 pub swap_record: Option<String>,
251}
252#[derive(Serialize)]
253#[serde(rename_all = "camelCase")]
254pub struct PutRecordOutput {
255 pub uri: String,
256 pub cid: String,
257}
258pub async fn put_record(
259 State(state): State<AppState>,
260 headers: HeaderMap,
261 Json(input): Json<PutRecordInput>,
262) -> Response {
263 let (did, user_id, current_root_cid) =
264 match prepare_repo_write(&state, &headers, &input.repo).await {
265 Ok(res) => res,
266 Err(err_res) => return err_res,
267 };
268 if let Some(swap_commit) = &input.swap_commit {
269 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
270 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response();
271 }
272 }
273 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
274 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
275 Ok(Some(b)) => b,
276 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
277 };
278 let commit = match Commit::from_cbor(&commit_bytes) {
279 Ok(c) => c,
280 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(),
281 };
282 let mst = Mst::load(
283 Arc::new(tracking_store.clone()),
284 commit.data,
285 None,
286 );
287 let collection_nsid = match input.collection.parse::<Nsid>() {
288 Ok(n) => n,
289 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
290 };
291 let key = format!("{}/{}", collection_nsid, input.rkey);
292 if input.validate.unwrap_or(true) {
293 if let Err(err_response) = validate_record(&input.record, &input.collection) {
294 return err_response;
295 }
296 }
297 if let Some(swap_record_str) = &input.swap_record {
298 let expected_cid = Cid::from_str(swap_record_str).ok();
299 let actual_cid = mst.get(&key).await.ok().flatten();
300 if expected_cid != actual_cid {
301 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
302 }
303 }
304 let existing_cid = mst.get(&key).await.ok().flatten();
305 let mut record_bytes = Vec::new();
306 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
307 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
308 }
309 let record_cid = match tracking_store.put(&record_bytes).await {
310 Ok(c) => c,
311 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(),
312 };
313 let new_mst = if existing_cid.is_some() {
314 match mst.update(&key, record_cid).await {
315 Ok(m) => m,
316 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update MST"}))).into_response(),
317 }
318 } else {
319 match mst.add(&key, record_cid).await {
320 Ok(m) => m,
321 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(),
322 }
323 };
324 let new_mst_root = match new_mst.persist().await {
325 Ok(c) => c,
326 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(),
327 };
328 let op = if existing_cid.is_some() {
329 RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid, prev: existing_cid }
330 } else {
331 RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid }
332 };
333 let mut relevant_blocks = std::collections::BTreeMap::new();
334 if let Err(_) = new_mst.blocks_for_path(&key, &mut relevant_blocks).await {
335 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
336 }
337 if let Err(_) = mst.blocks_for_path(&key, &mut relevant_blocks).await {
338 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
339 }
340 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
341 let mut written_cids = tracking_store.get_all_relevant_cids();
342 for cid in relevant_blocks.keys() {
343 if !written_cids.contains(cid) {
344 written_cids.push(*cid);
345 }
346 }
347 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>();
348 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), Some(commit.data), new_mst_root, vec![op], &written_cids_str).await {
349 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response();
350 };
351 (StatusCode::OK, Json(PutRecordOutput {
352 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
353 cid: record_cid.to_string(),
354 })).into_response()
355}