this repo has no description
1use super::validation::validate_record;
2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log};
3use crate::repo::tracking::TrackingBlockStore;
4use crate::state::AppState;
5use axum::{
6 Json,
7 extract::State,
8 http::{HeaderMap, StatusCode},
9 response::{IntoResponse, Response},
10};
11use cid::Cid;
12use jacquard::types::{
13 integer::LimitedU32,
14 string::{Nsid, Tid},
15};
16use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19use sqlx::{PgPool, Row};
20use std::str::FromStr;
21use std::sync::Arc;
22use tracing::error;
23use uuid::Uuid;
24
25pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
26 let row = sqlx::query(
27 r#"
28 SELECT
29 email_verified,
30 discord_verified,
31 telegram_verified,
32 signal_verified
33 FROM users
34 WHERE did = $1
35 "#,
36 )
37 .bind(did)
38 .fetch_optional(db)
39 .await?;
40 match row {
41 Some(r) => {
42 let email_verified: bool = r.get("email_verified");
43 let discord_verified: bool = r.get("discord_verified");
44 let telegram_verified: bool = r.get("telegram_verified");
45 let signal_verified: bool = r.get("signal_verified");
46 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
47 }
48 None => Ok(false),
49 }
50}
51
52pub struct RepoWriteAuth {
53 pub did: String,
54 pub user_id: Uuid,
55 pub current_root_cid: Cid,
56 pub is_oauth: bool,
57 pub scope: Option<String>,
58}
59
60pub async fn prepare_repo_write(
61 state: &AppState,
62 headers: &HeaderMap,
63 repo_did: &str,
64 http_method: &str,
65 http_uri: &str,
66) -> Result<RepoWriteAuth, Response> {
67 let extracted = crate::auth::extract_auth_token_from_header(
68 headers.get("Authorization").and_then(|h| h.to_str().ok()),
69 )
70 .ok_or_else(|| {
71 (
72 StatusCode::UNAUTHORIZED,
73 Json(json!({"error": "AuthenticationRequired"})),
74 )
75 .into_response()
76 })?;
77 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
78 let auth_user = crate::auth::validate_token_with_dpop(
79 &state.db,
80 &extracted.token,
81 extracted.is_dpop,
82 dpop_proof,
83 http_method,
84 http_uri,
85 false,
86 )
87 .await
88 .map_err(|e| {
89 (
90 StatusCode::UNAUTHORIZED,
91 Json(json!({"error": e.to_string()})),
92 )
93 .into_response()
94 })?;
95 if repo_did != auth_user.did {
96 return Err((
97 StatusCode::FORBIDDEN,
98 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
99 )
100 .into_response());
101 }
102 match has_verified_comms_channel(&state.db, &auth_user.did).await {
103 Ok(true) => {}
104 Ok(false) => {
105 return Err((
106 StatusCode::FORBIDDEN,
107 Json(json!({
108 "error": "AccountNotVerified",
109 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
110 })),
111 )
112 .into_response());
113 }
114 Err(e) => {
115 error!("DB error checking notification channels: {}", e);
116 return Err((
117 StatusCode::INTERNAL_SERVER_ERROR,
118 Json(json!({"error": "InternalError"})),
119 )
120 .into_response());
121 }
122 }
123 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
124 .fetch_optional(&state.db)
125 .await
126 .map_err(|e| {
127 error!("DB error fetching user: {}", e);
128 (
129 StatusCode::INTERNAL_SERVER_ERROR,
130 Json(json!({"error": "InternalError"})),
131 )
132 .into_response()
133 })?
134 .ok_or_else(|| {
135 (
136 StatusCode::INTERNAL_SERVER_ERROR,
137 Json(json!({"error": "InternalError", "message": "User not found"})),
138 )
139 .into_response()
140 })?;
141 let root_cid_str: String = sqlx::query_scalar!(
142 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
143 user_id
144 )
145 .fetch_optional(&state.db)
146 .await
147 .map_err(|e| {
148 error!("DB error fetching repo root: {}", e);
149 (
150 StatusCode::INTERNAL_SERVER_ERROR,
151 Json(json!({"error": "InternalError"})),
152 )
153 .into_response()
154 })?
155 .ok_or_else(|| {
156 (
157 StatusCode::INTERNAL_SERVER_ERROR,
158 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
159 )
160 .into_response()
161 })?;
162 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
163 (
164 StatusCode::INTERNAL_SERVER_ERROR,
165 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
166 )
167 .into_response()
168 })?;
169 Ok(RepoWriteAuth {
170 did: auth_user.did,
171 user_id,
172 current_root_cid,
173 is_oauth: auth_user.is_oauth,
174 scope: auth_user.scope,
175 })
176}
177#[derive(Deserialize)]
178#[allow(dead_code)]
179pub struct CreateRecordInput {
180 pub repo: String,
181 pub collection: String,
182 pub rkey: Option<String>,
183 pub validate: Option<bool>,
184 pub record: serde_json::Value,
185 #[serde(rename = "swapCommit")]
186 pub swap_commit: Option<String>,
187}
188#[derive(Serialize)]
189#[serde(rename_all = "camelCase")]
190pub struct CreateRecordOutput {
191 pub uri: String,
192 pub cid: String,
193}
194pub async fn create_record(
195 State(state): State<AppState>,
196 headers: HeaderMap,
197 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
198 Json(input): Json<CreateRecordInput>,
199) -> Response {
200 let auth =
201 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
202 Ok(res) => res,
203 Err(err_res) => return err_res,
204 };
205
206 if let Err(e) = crate::auth::scope_check::check_repo_scope(
207 auth.is_oauth,
208 auth.scope.as_deref(),
209 crate::oauth::RepoAction::Create,
210 &input.collection,
211 ) {
212 return e;
213 }
214
215 let did = auth.did;
216 let user_id = auth.user_id;
217 let current_root_cid = auth.current_root_cid;
218
219 if let Some(swap_commit) = &input.swap_commit
220 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
221 {
222 return (
223 StatusCode::CONFLICT,
224 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
225 )
226 .into_response();
227 }
228 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
229 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
230 Ok(Some(b)) => b,
231 _ => {
232 return (
233 StatusCode::INTERNAL_SERVER_ERROR,
234 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
235 )
236 .into_response();
237 }
238 };
239 let commit = match Commit::from_cbor(&commit_bytes) {
240 Ok(c) => c,
241 _ => {
242 return (
243 StatusCode::INTERNAL_SERVER_ERROR,
244 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
245 )
246 .into_response();
247 }
248 };
249 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
250 let collection_nsid = match input.collection.parse::<Nsid>() {
251 Ok(n) => n,
252 Err(_) => {
253 return (
254 StatusCode::BAD_REQUEST,
255 Json(json!({"error": "InvalidCollection"})),
256 )
257 .into_response();
258 }
259 };
260 if input.validate.unwrap_or(true)
261 && let Err(err_response) = validate_record(&input.record, &input.collection)
262 {
263 return *err_response;
264 }
265 let rkey = input
266 .rkey
267 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
268 let mut record_bytes = Vec::new();
269 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
270 return (
271 StatusCode::BAD_REQUEST,
272 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
273 )
274 .into_response();
275 }
276 let record_cid = match tracking_store.put(&record_bytes).await {
277 Ok(c) => c,
278 _ => {
279 return (
280 StatusCode::INTERNAL_SERVER_ERROR,
281 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
282 )
283 .into_response();
284 }
285 };
286 let key = format!("{}/{}", collection_nsid, rkey);
287 let new_mst = match mst.add(&key, record_cid).await {
288 Ok(m) => m,
289 _ => {
290 return (
291 StatusCode::INTERNAL_SERVER_ERROR,
292 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
293 )
294 .into_response();
295 }
296 };
297 let new_mst_root = match new_mst.persist().await {
298 Ok(c) => c,
299 _ => {
300 return (
301 StatusCode::INTERNAL_SERVER_ERROR,
302 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
303 )
304 .into_response();
305 }
306 };
307 let op = RecordOp::Create {
308 collection: input.collection.clone(),
309 rkey: rkey.clone(),
310 cid: record_cid,
311 };
312 let mut relevant_blocks = std::collections::BTreeMap::new();
313 if new_mst
314 .blocks_for_path(&key, &mut relevant_blocks)
315 .await
316 .is_err()
317 {
318 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
319 }
320 if mst
321 .blocks_for_path(&key, &mut relevant_blocks)
322 .await
323 .is_err()
324 {
325 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
326 }
327 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
328 let mut written_cids = tracking_store.get_all_relevant_cids();
329 for cid in relevant_blocks.keys() {
330 if !written_cids.contains(cid) {
331 written_cids.push(*cid);
332 }
333 }
334 let written_cids_str = written_cids
335 .iter()
336 .map(|c| c.to_string())
337 .collect::<Vec<_>>();
338 if let Err(e) = commit_and_log(
339 &state,
340 CommitParams {
341 did: &did,
342 user_id,
343 current_root_cid: Some(current_root_cid),
344 prev_data_cid: Some(commit.data),
345 new_mst_root,
346 ops: vec![op],
347 blocks_cids: &written_cids_str,
348 },
349 )
350 .await
351 {
352 return (
353 StatusCode::INTERNAL_SERVER_ERROR,
354 Json(json!({"error": "InternalError", "message": e})),
355 )
356 .into_response();
357 };
358 (
359 StatusCode::OK,
360 Json(CreateRecordOutput {
361 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
362 cid: record_cid.to_string(),
363 }),
364 )
365 .into_response()
366}
367#[derive(Deserialize)]
368#[allow(dead_code)]
369pub struct PutRecordInput {
370 pub repo: String,
371 pub collection: String,
372 pub rkey: String,
373 pub validate: Option<bool>,
374 pub record: serde_json::Value,
375 #[serde(rename = "swapCommit")]
376 pub swap_commit: Option<String>,
377 #[serde(rename = "swapRecord")]
378 pub swap_record: Option<String>,
379}
380#[derive(Serialize)]
381#[serde(rename_all = "camelCase")]
382pub struct PutRecordOutput {
383 pub uri: String,
384 pub cid: String,
385}
386pub async fn put_record(
387 State(state): State<AppState>,
388 headers: HeaderMap,
389 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
390 Json(input): Json<PutRecordInput>,
391) -> Response {
392 let auth =
393 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
394 Ok(res) => res,
395 Err(err_res) => return err_res,
396 };
397
398 if let Err(e) = crate::auth::scope_check::check_repo_scope(
399 auth.is_oauth,
400 auth.scope.as_deref(),
401 crate::oauth::RepoAction::Create,
402 &input.collection,
403 ) {
404 return e;
405 }
406 if let Err(e) = crate::auth::scope_check::check_repo_scope(
407 auth.is_oauth,
408 auth.scope.as_deref(),
409 crate::oauth::RepoAction::Update,
410 &input.collection,
411 ) {
412 return e;
413 }
414
415 let did = auth.did;
416 let user_id = auth.user_id;
417 let current_root_cid = auth.current_root_cid;
418
419 if let Some(swap_commit) = &input.swap_commit
420 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
421 {
422 return (
423 StatusCode::CONFLICT,
424 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
425 )
426 .into_response();
427 }
428 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
429 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
430 Ok(Some(b)) => b,
431 _ => {
432 return (
433 StatusCode::INTERNAL_SERVER_ERROR,
434 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
435 )
436 .into_response();
437 }
438 };
439 let commit = match Commit::from_cbor(&commit_bytes) {
440 Ok(c) => c,
441 _ => {
442 return (
443 StatusCode::INTERNAL_SERVER_ERROR,
444 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
445 )
446 .into_response();
447 }
448 };
449 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
450 let collection_nsid = match input.collection.parse::<Nsid>() {
451 Ok(n) => n,
452 Err(_) => {
453 return (
454 StatusCode::BAD_REQUEST,
455 Json(json!({"error": "InvalidCollection"})),
456 )
457 .into_response();
458 }
459 };
460 let key = format!("{}/{}", collection_nsid, input.rkey);
461 if input.validate.unwrap_or(true)
462 && let Err(err_response) = validate_record(&input.record, &input.collection)
463 {
464 return *err_response;
465 }
466 if let Some(swap_record_str) = &input.swap_record {
467 let expected_cid = Cid::from_str(swap_record_str).ok();
468 let actual_cid = mst.get(&key).await.ok().flatten();
469 if expected_cid != actual_cid {
470 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
471 }
472 }
473 let existing_cid = mst.get(&key).await.ok().flatten();
474 let mut record_bytes = Vec::new();
475 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
476 return (
477 StatusCode::BAD_REQUEST,
478 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
479 )
480 .into_response();
481 }
482 let record_cid = match tracking_store.put(&record_bytes).await {
483 Ok(c) => c,
484 _ => {
485 return (
486 StatusCode::INTERNAL_SERVER_ERROR,
487 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
488 )
489 .into_response();
490 }
491 };
492 let new_mst = if existing_cid.is_some() {
493 match mst.update(&key, record_cid).await {
494 Ok(m) => m,
495 Err(_) => {
496 return (
497 StatusCode::INTERNAL_SERVER_ERROR,
498 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
499 )
500 .into_response();
501 }
502 }
503 } else {
504 match mst.add(&key, record_cid).await {
505 Ok(m) => m,
506 Err(_) => {
507 return (
508 StatusCode::INTERNAL_SERVER_ERROR,
509 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
510 )
511 .into_response();
512 }
513 }
514 };
515 let new_mst_root = match new_mst.persist().await {
516 Ok(c) => c,
517 Err(_) => {
518 return (
519 StatusCode::INTERNAL_SERVER_ERROR,
520 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
521 )
522 .into_response();
523 }
524 };
525 let op = if existing_cid.is_some() {
526 RecordOp::Update {
527 collection: input.collection.clone(),
528 rkey: input.rkey.clone(),
529 cid: record_cid,
530 prev: existing_cid,
531 }
532 } else {
533 RecordOp::Create {
534 collection: input.collection.clone(),
535 rkey: input.rkey.clone(),
536 cid: record_cid,
537 }
538 };
539 let mut relevant_blocks = std::collections::BTreeMap::new();
540 if new_mst
541 .blocks_for_path(&key, &mut relevant_blocks)
542 .await
543 .is_err()
544 {
545 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
546 }
547 if mst
548 .blocks_for_path(&key, &mut relevant_blocks)
549 .await
550 .is_err()
551 {
552 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
553 }
554 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
555 let mut written_cids = tracking_store.get_all_relevant_cids();
556 for cid in relevant_blocks.keys() {
557 if !written_cids.contains(cid) {
558 written_cids.push(*cid);
559 }
560 }
561 let written_cids_str = written_cids
562 .iter()
563 .map(|c| c.to_string())
564 .collect::<Vec<_>>();
565 if let Err(e) = commit_and_log(
566 &state,
567 CommitParams {
568 did: &did,
569 user_id,
570 current_root_cid: Some(current_root_cid),
571 prev_data_cid: Some(commit.data),
572 new_mst_root,
573 ops: vec![op],
574 blocks_cids: &written_cids_str,
575 },
576 )
577 .await
578 {
579 return (
580 StatusCode::INTERNAL_SERVER_ERROR,
581 Json(json!({"error": "InternalError", "message": e})),
582 )
583 .into_response();
584 };
585 (
586 StatusCode::OK,
587 Json(PutRecordOutput {
588 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
589 cid: record_cid.to_string(),
590 }),
591 )
592 .into_response()
593}