this repo has no description
1use super::validation::validate_record;
2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log};
3use crate::repo::tracking::TrackingBlockStore;
4use crate::state::AppState;
5use axum::{
6 Json,
7 extract::State,
8 http::{HeaderMap, StatusCode},
9 response::{IntoResponse, Response},
10};
11use cid::Cid;
12use jacquard::types::{
13 integer::LimitedU32,
14 string::{Nsid, Tid},
15};
16use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use sqlx::{PgPool, Row};
20use std::str::FromStr;
21use std::sync::Arc;
22use tracing::error;
23use uuid::Uuid;
24
25pub async fn has_verified_notification_channel(
26 db: &PgPool,
27 did: &str,
28) -> Result<bool, sqlx::Error> {
29 let row = sqlx::query(
30 r#"
31 SELECT
32 email_confirmed,
33 discord_verified,
34 telegram_verified,
35 signal_verified
36 FROM users
37 WHERE did = $1
38 "#,
39 )
40 .bind(did)
41 .fetch_optional(db)
42 .await?;
43 match row {
44 Some(r) => {
45 let email_confirmed: bool = r.get("email_confirmed");
46 let discord_verified: bool = r.get("discord_verified");
47 let telegram_verified: bool = r.get("telegram_verified");
48 let signal_verified: bool = r.get("signal_verified");
49 Ok(email_confirmed || discord_verified || telegram_verified || signal_verified)
50 }
51 None => Ok(false),
52 }
53}
54
55pub async fn prepare_repo_write(
56 state: &AppState,
57 headers: &HeaderMap,
58 repo_did: &str,
59) -> Result<(String, Uuid, Cid), Response> {
60 let token = crate::auth::extract_bearer_token_from_header(
61 headers.get("Authorization").and_then(|h| h.to_str().ok()),
62 )
63 .ok_or_else(|| {
64 (
65 StatusCode::UNAUTHORIZED,
66 Json(json!({"error": "AuthenticationRequired"})),
67 )
68 .into_response()
69 })?;
70 let auth_user = crate::auth::validate_bearer_token(&state.db, &token)
71 .await
72 .map_err(|_| {
73 (
74 StatusCode::UNAUTHORIZED,
75 Json(json!({"error": "AuthenticationFailed"})),
76 )
77 .into_response()
78 })?;
79 if repo_did != auth_user.did {
80 return Err((
81 StatusCode::FORBIDDEN,
82 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
83 )
84 .into_response());
85 }
86 match has_verified_notification_channel(&state.db, &auth_user.did).await {
87 Ok(true) => {}
88 Ok(false) => {
89 return Err((
90 StatusCode::FORBIDDEN,
91 Json(json!({
92 "error": "AccountNotVerified",
93 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
94 })),
95 )
96 .into_response());
97 }
98 Err(e) => {
99 error!("DB error checking notification channels: {}", e);
100 return Err((
101 StatusCode::INTERNAL_SERVER_ERROR,
102 Json(json!({"error": "InternalError"})),
103 )
104 .into_response());
105 }
106 }
107 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
108 .fetch_optional(&state.db)
109 .await
110 .map_err(|e| {
111 error!("DB error fetching user: {}", e);
112 (
113 StatusCode::INTERNAL_SERVER_ERROR,
114 Json(json!({"error": "InternalError"})),
115 )
116 .into_response()
117 })?
118 .ok_or_else(|| {
119 (
120 StatusCode::INTERNAL_SERVER_ERROR,
121 Json(json!({"error": "InternalError", "message": "User not found"})),
122 )
123 .into_response()
124 })?;
125 let root_cid_str: String = sqlx::query_scalar!(
126 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
127 user_id
128 )
129 .fetch_optional(&state.db)
130 .await
131 .map_err(|e| {
132 error!("DB error fetching repo root: {}", e);
133 (
134 StatusCode::INTERNAL_SERVER_ERROR,
135 Json(json!({"error": "InternalError"})),
136 )
137 .into_response()
138 })?
139 .ok_or_else(|| {
140 (
141 StatusCode::INTERNAL_SERVER_ERROR,
142 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
143 )
144 .into_response()
145 })?;
146 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
147 (
148 StatusCode::INTERNAL_SERVER_ERROR,
149 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
150 )
151 .into_response()
152 })?;
153 Ok((auth_user.did, user_id, current_root_cid))
154}
155#[derive(Deserialize)]
156#[allow(dead_code)]
157pub struct CreateRecordInput {
158 pub repo: String,
159 pub collection: String,
160 pub rkey: Option<String>,
161 pub validate: Option<bool>,
162 pub record: serde_json::Value,
163 #[serde(rename = "swapCommit")]
164 pub swap_commit: Option<String>,
165}
166#[derive(Serialize)]
167#[serde(rename_all = "camelCase")]
168pub struct CreateRecordOutput {
169 pub uri: String,
170 pub cid: String,
171}
172pub async fn create_record(
173 State(state): State<AppState>,
174 headers: HeaderMap,
175 Json(input): Json<CreateRecordInput>,
176) -> Response {
177 let (did, user_id, current_root_cid) =
178 match prepare_repo_write(&state, &headers, &input.repo).await {
179 Ok(res) => res,
180 Err(err_res) => return err_res,
181 };
182 if let Some(swap_commit) = &input.swap_commit
183 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
184 return (
185 StatusCode::CONFLICT,
186 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
187 )
188 .into_response();
189 }
190 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
191 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
192 Ok(Some(b)) => b,
193 _ => {
194 return (
195 StatusCode::INTERNAL_SERVER_ERROR,
196 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
197 )
198 .into_response();
199 }
200 };
201 let commit = match Commit::from_cbor(&commit_bytes) {
202 Ok(c) => c,
203 _ => {
204 return (
205 StatusCode::INTERNAL_SERVER_ERROR,
206 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
207 )
208 .into_response();
209 }
210 };
211 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
212 let collection_nsid = match input.collection.parse::<Nsid>() {
213 Ok(n) => n,
214 Err(_) => {
215 return (
216 StatusCode::BAD_REQUEST,
217 Json(json!({"error": "InvalidCollection"})),
218 )
219 .into_response();
220 }
221 };
222 if input.validate.unwrap_or(true)
223 && let Err(err_response) = validate_record(&input.record, &input.collection) {
224 return *err_response;
225 }
226 let rkey = input
227 .rkey
228 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
229 let mut record_bytes = Vec::new();
230 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
231 return (
232 StatusCode::BAD_REQUEST,
233 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
234 )
235 .into_response();
236 }
237 let record_cid = match tracking_store.put(&record_bytes).await {
238 Ok(c) => c,
239 _ => {
240 return (
241 StatusCode::INTERNAL_SERVER_ERROR,
242 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
243 )
244 .into_response();
245 }
246 };
247 let key = format!("{}/{}", collection_nsid, rkey);
248 let new_mst = match mst.add(&key, record_cid).await {
249 Ok(m) => m,
250 _ => {
251 return (
252 StatusCode::INTERNAL_SERVER_ERROR,
253 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
254 )
255 .into_response();
256 }
257 };
258 let new_mst_root = match new_mst.persist().await {
259 Ok(c) => c,
260 _ => {
261 return (
262 StatusCode::INTERNAL_SERVER_ERROR,
263 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
264 )
265 .into_response();
266 }
267 };
268 let op = RecordOp::Create {
269 collection: input.collection.clone(),
270 rkey: rkey.clone(),
271 cid: record_cid,
272 };
273 let mut relevant_blocks = std::collections::BTreeMap::new();
274 if new_mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() {
275 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
276 }
277 if mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() {
278 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
279 }
280 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
281 let mut written_cids = tracking_store.get_all_relevant_cids();
282 for cid in relevant_blocks.keys() {
283 if !written_cids.contains(cid) {
284 written_cids.push(*cid);
285 }
286 }
287 let written_cids_str = written_cids
288 .iter()
289 .map(|c| c.to_string())
290 .collect::<Vec<_>>();
291 if let Err(e) = commit_and_log(
292 &state,
293 CommitParams {
294 did: &did,
295 user_id,
296 current_root_cid: Some(current_root_cid),
297 prev_data_cid: Some(commit.data),
298 new_mst_root,
299 ops: vec![op],
300 blocks_cids: &written_cids_str,
301 },
302 )
303 .await
304 {
305 return (
306 StatusCode::INTERNAL_SERVER_ERROR,
307 Json(json!({"error": "InternalError", "message": e})),
308 )
309 .into_response();
310 };
311 (
312 StatusCode::OK,
313 Json(CreateRecordOutput {
314 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
315 cid: record_cid.to_string(),
316 }),
317 )
318 .into_response()
319}
320#[derive(Deserialize)]
321#[allow(dead_code)]
322pub struct PutRecordInput {
323 pub repo: String,
324 pub collection: String,
325 pub rkey: String,
326 pub validate: Option<bool>,
327 pub record: serde_json::Value,
328 #[serde(rename = "swapCommit")]
329 pub swap_commit: Option<String>,
330 #[serde(rename = "swapRecord")]
331 pub swap_record: Option<String>,
332}
333#[derive(Serialize)]
334#[serde(rename_all = "camelCase")]
335pub struct PutRecordOutput {
336 pub uri: String,
337 pub cid: String,
338}
339pub async fn put_record(
340 State(state): State<AppState>,
341 headers: HeaderMap,
342 Json(input): Json<PutRecordInput>,
343) -> Response {
344 let (did, user_id, current_root_cid) =
345 match prepare_repo_write(&state, &headers, &input.repo).await {
346 Ok(res) => res,
347 Err(err_res) => return err_res,
348 };
349 if let Some(swap_commit) = &input.swap_commit
350 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
351 return (
352 StatusCode::CONFLICT,
353 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
354 )
355 .into_response();
356 }
357 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
358 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
359 Ok(Some(b)) => b,
360 _ => {
361 return (
362 StatusCode::INTERNAL_SERVER_ERROR,
363 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
364 )
365 .into_response();
366 }
367 };
368 let commit = match Commit::from_cbor(&commit_bytes) {
369 Ok(c) => c,
370 _ => {
371 return (
372 StatusCode::INTERNAL_SERVER_ERROR,
373 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
374 )
375 .into_response();
376 }
377 };
378 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
379 let collection_nsid = match input.collection.parse::<Nsid>() {
380 Ok(n) => n,
381 Err(_) => {
382 return (
383 StatusCode::BAD_REQUEST,
384 Json(json!({"error": "InvalidCollection"})),
385 )
386 .into_response();
387 }
388 };
389 let key = format!("{}/{}", collection_nsid, input.rkey);
390 if input.validate.unwrap_or(true)
391 && let Err(err_response) = validate_record(&input.record, &input.collection) {
392 return *err_response;
393 }
394 if let Some(swap_record_str) = &input.swap_record {
395 let expected_cid = Cid::from_str(swap_record_str).ok();
396 let actual_cid = mst.get(&key).await.ok().flatten();
397 if expected_cid != actual_cid {
398 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
399 }
400 }
401 let existing_cid = mst.get(&key).await.ok().flatten();
402 let mut record_bytes = Vec::new();
403 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
404 return (
405 StatusCode::BAD_REQUEST,
406 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
407 )
408 .into_response();
409 }
410 let record_cid = match tracking_store.put(&record_bytes).await {
411 Ok(c) => c,
412 _ => {
413 return (
414 StatusCode::INTERNAL_SERVER_ERROR,
415 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
416 )
417 .into_response();
418 }
419 };
420 let new_mst = if existing_cid.is_some() {
421 match mst.update(&key, record_cid).await {
422 Ok(m) => m,
423 Err(_) => {
424 return (
425 StatusCode::INTERNAL_SERVER_ERROR,
426 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
427 )
428 .into_response();
429 }
430 }
431 } else {
432 match mst.add(&key, record_cid).await {
433 Ok(m) => m,
434 Err(_) => {
435 return (
436 StatusCode::INTERNAL_SERVER_ERROR,
437 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
438 )
439 .into_response();
440 }
441 }
442 };
443 let new_mst_root = match new_mst.persist().await {
444 Ok(c) => c,
445 Err(_) => {
446 return (
447 StatusCode::INTERNAL_SERVER_ERROR,
448 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
449 )
450 .into_response();
451 }
452 };
453 let op = if existing_cid.is_some() {
454 RecordOp::Update {
455 collection: input.collection.clone(),
456 rkey: input.rkey.clone(),
457 cid: record_cid,
458 prev: existing_cid,
459 }
460 } else {
461 RecordOp::Create {
462 collection: input.collection.clone(),
463 rkey: input.rkey.clone(),
464 cid: record_cid,
465 }
466 };
467 let mut relevant_blocks = std::collections::BTreeMap::new();
468 if new_mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() {
469 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
470 }
471 if mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() {
472 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
473 }
474 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
475 let mut written_cids = tracking_store.get_all_relevant_cids();
476 for cid in relevant_blocks.keys() {
477 if !written_cids.contains(cid) {
478 written_cids.push(*cid);
479 }
480 }
481 let written_cids_str = written_cids
482 .iter()
483 .map(|c| c.to_string())
484 .collect::<Vec<_>>();
485 if let Err(e) = commit_and_log(
486 &state,
487 CommitParams {
488 did: &did,
489 user_id,
490 current_root_cid: Some(current_root_cid),
491 prev_data_cid: Some(commit.data),
492 new_mst_root,
493 ops: vec![op],
494 blocks_cids: &written_cids_str,
495 },
496 )
497 .await
498 {
499 return (
500 StatusCode::INTERNAL_SERVER_ERROR,
501 Json(json!({"error": "InternalError", "message": e})),
502 )
503 .into_response();
504 };
505 (
506 StatusCode::OK,
507 Json(PutRecordOutput {
508 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
509 cid: record_cid.to_string(),
510 }),
511 )
512 .into_response()
513}