this repo has no description
1use super::validation::validate_record;
2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log};
3use crate::repo::tracking::TrackingBlockStore;
4use crate::state::AppState;
5use axum::{
6 Json,
7 extract::State,
8 http::{HeaderMap, StatusCode},
9 response::{IntoResponse, Response},
10};
11use cid::Cid;
12use jacquard::types::{
13 integer::LimitedU32,
14 string::{Nsid, Tid},
15};
16use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use sqlx::{PgPool, Row};
20use std::str::FromStr;
21use std::sync::Arc;
22use tracing::error;
23use uuid::Uuid;
24
25pub async fn has_verified_notification_channel(
26 db: &PgPool,
27 did: &str,
28) -> Result<bool, sqlx::Error> {
29 let row = sqlx::query(
30 r#"
31 SELECT
32 email_confirmed,
33 discord_verified,
34 telegram_verified,
35 signal_verified
36 FROM users
37 WHERE did = $1
38 "#,
39 )
40 .bind(did)
41 .fetch_optional(db)
42 .await?;
43 match row {
44 Some(r) => {
45 let email_confirmed: bool = r.get("email_confirmed");
46 let discord_verified: bool = r.get("discord_verified");
47 let telegram_verified: bool = r.get("telegram_verified");
48 let signal_verified: bool = r.get("signal_verified");
49 Ok(email_confirmed || discord_verified || telegram_verified || signal_verified)
50 }
51 None => Ok(false),
52 }
53}
54
55pub async fn prepare_repo_write(
56 state: &AppState,
57 headers: &HeaderMap,
58 repo_did: &str,
59 http_method: &str,
60 http_uri: &str,
61) -> Result<(String, Uuid, Cid), Response> {
62 let extracted = crate::auth::extract_auth_token_from_header(
63 headers.get("Authorization").and_then(|h| h.to_str().ok()),
64 )
65 .ok_or_else(|| {
66 (
67 StatusCode::UNAUTHORIZED,
68 Json(json!({"error": "AuthenticationRequired"})),
69 )
70 .into_response()
71 })?;
72 let dpop_proof = headers
73 .get("DPoP")
74 .and_then(|h| h.to_str().ok());
75 let auth_user = crate::auth::validate_token_with_dpop(
76 &state.db,
77 &extracted.token,
78 extracted.is_dpop,
79 dpop_proof,
80 http_method,
81 http_uri,
82 false,
83 )
84 .await
85 .map_err(|e| {
86 (
87 StatusCode::UNAUTHORIZED,
88 Json(json!({"error": e.to_string()})),
89 )
90 .into_response()
91 })?;
92 if repo_did != auth_user.did {
93 return Err((
94 StatusCode::FORBIDDEN,
95 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
96 )
97 .into_response());
98 }
99 match has_verified_notification_channel(&state.db, &auth_user.did).await {
100 Ok(true) => {}
101 Ok(false) => {
102 return Err((
103 StatusCode::FORBIDDEN,
104 Json(json!({
105 "error": "AccountNotVerified",
106 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
107 })),
108 )
109 .into_response());
110 }
111 Err(e) => {
112 error!("DB error checking notification channels: {}", e);
113 return Err((
114 StatusCode::INTERNAL_SERVER_ERROR,
115 Json(json!({"error": "InternalError"})),
116 )
117 .into_response());
118 }
119 }
120 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
121 .fetch_optional(&state.db)
122 .await
123 .map_err(|e| {
124 error!("DB error fetching user: {}", e);
125 (
126 StatusCode::INTERNAL_SERVER_ERROR,
127 Json(json!({"error": "InternalError"})),
128 )
129 .into_response()
130 })?
131 .ok_or_else(|| {
132 (
133 StatusCode::INTERNAL_SERVER_ERROR,
134 Json(json!({"error": "InternalError", "message": "User not found"})),
135 )
136 .into_response()
137 })?;
138 let root_cid_str: String = sqlx::query_scalar!(
139 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
140 user_id
141 )
142 .fetch_optional(&state.db)
143 .await
144 .map_err(|e| {
145 error!("DB error fetching repo root: {}", e);
146 (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(json!({"error": "InternalError"})),
149 )
150 .into_response()
151 })?
152 .ok_or_else(|| {
153 (
154 StatusCode::INTERNAL_SERVER_ERROR,
155 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
156 )
157 .into_response()
158 })?;
159 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
160 (
161 StatusCode::INTERNAL_SERVER_ERROR,
162 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
163 )
164 .into_response()
165 })?;
166 Ok((auth_user.did, user_id, current_root_cid))
167}
168#[derive(Deserialize)]
169#[allow(dead_code)]
170pub struct CreateRecordInput {
171 pub repo: String,
172 pub collection: String,
173 pub rkey: Option<String>,
174 pub validate: Option<bool>,
175 pub record: serde_json::Value,
176 #[serde(rename = "swapCommit")]
177 pub swap_commit: Option<String>,
178}
179#[derive(Serialize)]
180#[serde(rename_all = "camelCase")]
181pub struct CreateRecordOutput {
182 pub uri: String,
183 pub cid: String,
184}
185pub async fn create_record(
186 State(state): State<AppState>,
187 headers: HeaderMap,
188 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
189 Json(input): Json<CreateRecordInput>,
190) -> Response {
191 let (did, user_id, current_root_cid) =
192 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
193 Ok(res) => res,
194 Err(err_res) => return err_res,
195 };
196 if let Some(swap_commit) = &input.swap_commit
197 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
198 return (
199 StatusCode::CONFLICT,
200 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
201 )
202 .into_response();
203 }
204 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
205 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
206 Ok(Some(b)) => b,
207 _ => {
208 return (
209 StatusCode::INTERNAL_SERVER_ERROR,
210 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
211 )
212 .into_response();
213 }
214 };
215 let commit = match Commit::from_cbor(&commit_bytes) {
216 Ok(c) => c,
217 _ => {
218 return (
219 StatusCode::INTERNAL_SERVER_ERROR,
220 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
221 )
222 .into_response();
223 }
224 };
225 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
226 let collection_nsid = match input.collection.parse::<Nsid>() {
227 Ok(n) => n,
228 Err(_) => {
229 return (
230 StatusCode::BAD_REQUEST,
231 Json(json!({"error": "InvalidCollection"})),
232 )
233 .into_response();
234 }
235 };
236 if input.validate.unwrap_or(true)
237 && let Err(err_response) = validate_record(&input.record, &input.collection) {
238 return *err_response;
239 }
240 let rkey = input
241 .rkey
242 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
243 let mut record_bytes = Vec::new();
244 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
245 return (
246 StatusCode::BAD_REQUEST,
247 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
248 )
249 .into_response();
250 }
251 let record_cid = match tracking_store.put(&record_bytes).await {
252 Ok(c) => c,
253 _ => {
254 return (
255 StatusCode::INTERNAL_SERVER_ERROR,
256 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
257 )
258 .into_response();
259 }
260 };
261 let key = format!("{}/{}", collection_nsid, rkey);
262 let new_mst = match mst.add(&key, record_cid).await {
263 Ok(m) => m,
264 _ => {
265 return (
266 StatusCode::INTERNAL_SERVER_ERROR,
267 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
268 )
269 .into_response();
270 }
271 };
272 let new_mst_root = match new_mst.persist().await {
273 Ok(c) => c,
274 _ => {
275 return (
276 StatusCode::INTERNAL_SERVER_ERROR,
277 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
278 )
279 .into_response();
280 }
281 };
282 let op = RecordOp::Create {
283 collection: input.collection.clone(),
284 rkey: rkey.clone(),
285 cid: record_cid,
286 };
287 let mut relevant_blocks = std::collections::BTreeMap::new();
288 if new_mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() {
289 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
290 }
291 if mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() {
292 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
293 }
294 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
295 let mut written_cids = tracking_store.get_all_relevant_cids();
296 for cid in relevant_blocks.keys() {
297 if !written_cids.contains(cid) {
298 written_cids.push(*cid);
299 }
300 }
301 let written_cids_str = written_cids
302 .iter()
303 .map(|c| c.to_string())
304 .collect::<Vec<_>>();
305 if let Err(e) = commit_and_log(
306 &state,
307 CommitParams {
308 did: &did,
309 user_id,
310 current_root_cid: Some(current_root_cid),
311 prev_data_cid: Some(commit.data),
312 new_mst_root,
313 ops: vec![op],
314 blocks_cids: &written_cids_str,
315 },
316 )
317 .await
318 {
319 return (
320 StatusCode::INTERNAL_SERVER_ERROR,
321 Json(json!({"error": "InternalError", "message": e})),
322 )
323 .into_response();
324 };
325 (
326 StatusCode::OK,
327 Json(CreateRecordOutput {
328 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
329 cid: record_cid.to_string(),
330 }),
331 )
332 .into_response()
333}
334#[derive(Deserialize)]
335#[allow(dead_code)]
336pub struct PutRecordInput {
337 pub repo: String,
338 pub collection: String,
339 pub rkey: String,
340 pub validate: Option<bool>,
341 pub record: serde_json::Value,
342 #[serde(rename = "swapCommit")]
343 pub swap_commit: Option<String>,
344 #[serde(rename = "swapRecord")]
345 pub swap_record: Option<String>,
346}
347#[derive(Serialize)]
348#[serde(rename_all = "camelCase")]
349pub struct PutRecordOutput {
350 pub uri: String,
351 pub cid: String,
352}
353pub async fn put_record(
354 State(state): State<AppState>,
355 headers: HeaderMap,
356 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
357 Json(input): Json<PutRecordInput>,
358) -> Response {
359 let (did, user_id, current_root_cid) =
360 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
361 Ok(res) => res,
362 Err(err_res) => return err_res,
363 };
364 if let Some(swap_commit) = &input.swap_commit
365 && Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
366 return (
367 StatusCode::CONFLICT,
368 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
369 )
370 .into_response();
371 }
372 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
373 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
374 Ok(Some(b)) => b,
375 _ => {
376 return (
377 StatusCode::INTERNAL_SERVER_ERROR,
378 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
379 )
380 .into_response();
381 }
382 };
383 let commit = match Commit::from_cbor(&commit_bytes) {
384 Ok(c) => c,
385 _ => {
386 return (
387 StatusCode::INTERNAL_SERVER_ERROR,
388 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
389 )
390 .into_response();
391 }
392 };
393 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
394 let collection_nsid = match input.collection.parse::<Nsid>() {
395 Ok(n) => n,
396 Err(_) => {
397 return (
398 StatusCode::BAD_REQUEST,
399 Json(json!({"error": "InvalidCollection"})),
400 )
401 .into_response();
402 }
403 };
404 let key = format!("{}/{}", collection_nsid, input.rkey);
405 if input.validate.unwrap_or(true)
406 && let Err(err_response) = validate_record(&input.record, &input.collection) {
407 return *err_response;
408 }
409 if let Some(swap_record_str) = &input.swap_record {
410 let expected_cid = Cid::from_str(swap_record_str).ok();
411 let actual_cid = mst.get(&key).await.ok().flatten();
412 if expected_cid != actual_cid {
413 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
414 }
415 }
416 let existing_cid = mst.get(&key).await.ok().flatten();
417 let mut record_bytes = Vec::new();
418 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
419 return (
420 StatusCode::BAD_REQUEST,
421 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
422 )
423 .into_response();
424 }
425 let record_cid = match tracking_store.put(&record_bytes).await {
426 Ok(c) => c,
427 _ => {
428 return (
429 StatusCode::INTERNAL_SERVER_ERROR,
430 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
431 )
432 .into_response();
433 }
434 };
435 let new_mst = if existing_cid.is_some() {
436 match mst.update(&key, record_cid).await {
437 Ok(m) => m,
438 Err(_) => {
439 return (
440 StatusCode::INTERNAL_SERVER_ERROR,
441 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
442 )
443 .into_response();
444 }
445 }
446 } else {
447 match mst.add(&key, record_cid).await {
448 Ok(m) => m,
449 Err(_) => {
450 return (
451 StatusCode::INTERNAL_SERVER_ERROR,
452 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
453 )
454 .into_response();
455 }
456 }
457 };
458 let new_mst_root = match new_mst.persist().await {
459 Ok(c) => c,
460 Err(_) => {
461 return (
462 StatusCode::INTERNAL_SERVER_ERROR,
463 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
464 )
465 .into_response();
466 }
467 };
468 let op = if existing_cid.is_some() {
469 RecordOp::Update {
470 collection: input.collection.clone(),
471 rkey: input.rkey.clone(),
472 cid: record_cid,
473 prev: existing_cid,
474 }
475 } else {
476 RecordOp::Create {
477 collection: input.collection.clone(),
478 rkey: input.rkey.clone(),
479 cid: record_cid,
480 }
481 };
482 let mut relevant_blocks = std::collections::BTreeMap::new();
483 if new_mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() {
484 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
485 }
486 if mst.blocks_for_path(&key, &mut relevant_blocks).await.is_err() {
487 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
488 }
489 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
490 let mut written_cids = tracking_store.get_all_relevant_cids();
491 for cid in relevant_blocks.keys() {
492 if !written_cids.contains(cid) {
493 written_cids.push(*cid);
494 }
495 }
496 let written_cids_str = written_cids
497 .iter()
498 .map(|c| c.to_string())
499 .collect::<Vec<_>>();
500 if let Err(e) = commit_and_log(
501 &state,
502 CommitParams {
503 did: &did,
504 user_id,
505 current_root_cid: Some(current_root_cid),
506 prev_data_cid: Some(commit.data),
507 new_mst_root,
508 ops: vec![op],
509 blocks_cids: &written_cids_str,
510 },
511 )
512 .await
513 {
514 return (
515 StatusCode::INTERNAL_SERVER_ERROR,
516 Json(json!({"error": "InternalError", "message": e})),
517 )
518 .into_response();
519 };
520 (
521 StatusCode::OK,
522 Json(PutRecordOutput {
523 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
524 cid: record_cid.to_string(),
525 }),
526 )
527 .into_response()
528}