this repo has no description
1use super::validation::validate_record;
2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log};
3use crate::delegation::{self, DelegationActionType};
4use crate::repo::tracking::TrackingBlockStore;
5use crate::state::AppState;
6use axum::{
7 Json,
8 extract::State,
9 http::{HeaderMap, StatusCode},
10 response::{IntoResponse, Response},
11};
12use cid::Cid;
13use jacquard::types::{
14 integer::LimitedU32,
15 string::{Nsid, Tid},
16};
17use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
18use serde::{Deserialize, Serialize};
19use serde_json::json;
20use sqlx::{PgPool, Row};
21use std::str::FromStr;
22use std::sync::Arc;
23use tracing::error;
24use uuid::Uuid;
25
26pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
27 let row = sqlx::query(
28 r#"
29 SELECT
30 email_verified,
31 discord_verified,
32 telegram_verified,
33 signal_verified
34 FROM users
35 WHERE did = $1
36 "#,
37 )
38 .bind(did)
39 .fetch_optional(db)
40 .await?;
41 match row {
42 Some(r) => {
43 let email_verified: bool = r.get("email_verified");
44 let discord_verified: bool = r.get("discord_verified");
45 let telegram_verified: bool = r.get("telegram_verified");
46 let signal_verified: bool = r.get("signal_verified");
47 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
48 }
49 None => Ok(false),
50 }
51}
52
53pub struct RepoWriteAuth {
54 pub did: String,
55 pub user_id: Uuid,
56 pub current_root_cid: Cid,
57 pub is_oauth: bool,
58 pub scope: Option<String>,
59 pub controller_did: Option<String>,
60}
61
62pub async fn prepare_repo_write(
63 state: &AppState,
64 headers: &HeaderMap,
65 repo_did: &str,
66 http_method: &str,
67 http_uri: &str,
68) -> Result<RepoWriteAuth, Response> {
69 let extracted = crate::auth::extract_auth_token_from_header(
70 headers.get("Authorization").and_then(|h| h.to_str().ok()),
71 )
72 .ok_or_else(|| {
73 (
74 StatusCode::UNAUTHORIZED,
75 Json(json!({"error": "AuthenticationRequired"})),
76 )
77 .into_response()
78 })?;
79 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
80 let auth_user = crate::auth::validate_token_with_dpop(
81 &state.db,
82 &extracted.token,
83 extracted.is_dpop,
84 dpop_proof,
85 http_method,
86 http_uri,
87 false,
88 )
89 .await
90 .map_err(|e| {
91 (
92 StatusCode::UNAUTHORIZED,
93 Json(json!({"error": e.to_string()})),
94 )
95 .into_response()
96 })?;
97 if repo_did != auth_user.did {
98 return Err((
99 StatusCode::FORBIDDEN,
100 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
101 )
102 .into_response());
103 }
104 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
105 .await
106 .unwrap_or(false);
107 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
108 .await
109 .unwrap_or(false);
110 if !is_verified && !is_delegated {
111 return Err((
112 StatusCode::FORBIDDEN,
113 Json(json!({
114 "error": "AccountNotVerified",
115 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
116 })),
117 )
118 .into_response());
119 }
120 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
121 .fetch_optional(&state.db)
122 .await
123 .map_err(|e| {
124 error!("DB error fetching user: {}", e);
125 (
126 StatusCode::INTERNAL_SERVER_ERROR,
127 Json(json!({"error": "InternalError"})),
128 )
129 .into_response()
130 })?
131 .ok_or_else(|| {
132 (
133 StatusCode::INTERNAL_SERVER_ERROR,
134 Json(json!({"error": "InternalError", "message": "User not found"})),
135 )
136 .into_response()
137 })?;
138 let root_cid_str: String = sqlx::query_scalar!(
139 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
140 user_id
141 )
142 .fetch_optional(&state.db)
143 .await
144 .map_err(|e| {
145 error!("DB error fetching repo root: {}", e);
146 (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(json!({"error": "InternalError"})),
149 )
150 .into_response()
151 })?
152 .ok_or_else(|| {
153 (
154 StatusCode::INTERNAL_SERVER_ERROR,
155 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
156 )
157 .into_response()
158 })?;
159 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
160 (
161 StatusCode::INTERNAL_SERVER_ERROR,
162 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
163 )
164 .into_response()
165 })?;
166 Ok(RepoWriteAuth {
167 did: auth_user.did,
168 user_id,
169 current_root_cid,
170 is_oauth: auth_user.is_oauth,
171 scope: auth_user.scope,
172 controller_did: auth_user.controller_did,
173 })
174}
175#[derive(Deserialize)]
176#[allow(dead_code)]
177pub struct CreateRecordInput {
178 pub repo: String,
179 pub collection: String,
180 pub rkey: Option<String>,
181 pub validate: Option<bool>,
182 pub record: serde_json::Value,
183 #[serde(rename = "swapCommit")]
184 pub swap_commit: Option<String>,
185}
186#[derive(Serialize)]
187#[serde(rename_all = "camelCase")]
188pub struct CreateRecordOutput {
189 pub uri: String,
190 pub cid: String,
191}
192pub async fn create_record(
193 State(state): State<AppState>,
194 headers: HeaderMap,
195 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
196 Json(input): Json<CreateRecordInput>,
197) -> Response {
198 let auth =
199 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
200 Ok(res) => res,
201 Err(err_res) => return err_res,
202 };
203
204 if let Err(e) = crate::auth::scope_check::check_repo_scope(
205 auth.is_oauth,
206 auth.scope.as_deref(),
207 crate::oauth::RepoAction::Create,
208 &input.collection,
209 ) {
210 return e;
211 }
212
213 let did = auth.did;
214 let user_id = auth.user_id;
215 let current_root_cid = auth.current_root_cid;
216 let controller_did = auth.controller_did;
217
218 if let Some(swap_commit) = &input.swap_commit
219 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
220 {
221 return (
222 StatusCode::CONFLICT,
223 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
224 )
225 .into_response();
226 }
227 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
228 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
229 Ok(Some(b)) => b,
230 _ => {
231 return (
232 StatusCode::INTERNAL_SERVER_ERROR,
233 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
234 )
235 .into_response();
236 }
237 };
238 let commit = match Commit::from_cbor(&commit_bytes) {
239 Ok(c) => c,
240 _ => {
241 return (
242 StatusCode::INTERNAL_SERVER_ERROR,
243 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
244 )
245 .into_response();
246 }
247 };
248 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
249 let collection_nsid = match input.collection.parse::<Nsid>() {
250 Ok(n) => n,
251 Err(_) => {
252 return (
253 StatusCode::BAD_REQUEST,
254 Json(json!({"error": "InvalidCollection"})),
255 )
256 .into_response();
257 }
258 };
259 if input.validate.unwrap_or(true)
260 && let Err(err_response) = validate_record(&input.record, &input.collection)
261 {
262 return *err_response;
263 }
264 let rkey = input
265 .rkey
266 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
267 let mut record_bytes = Vec::new();
268 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
269 return (
270 StatusCode::BAD_REQUEST,
271 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
272 )
273 .into_response();
274 }
275 let record_cid = match tracking_store.put(&record_bytes).await {
276 Ok(c) => c,
277 _ => {
278 return (
279 StatusCode::INTERNAL_SERVER_ERROR,
280 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
281 )
282 .into_response();
283 }
284 };
285 let key = format!("{}/{}", collection_nsid, rkey);
286 let new_mst = match mst.add(&key, record_cid).await {
287 Ok(m) => m,
288 _ => {
289 return (
290 StatusCode::INTERNAL_SERVER_ERROR,
291 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
292 )
293 .into_response();
294 }
295 };
296 let new_mst_root = match new_mst.persist().await {
297 Ok(c) => c,
298 _ => {
299 return (
300 StatusCode::INTERNAL_SERVER_ERROR,
301 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
302 )
303 .into_response();
304 }
305 };
306 let op = RecordOp::Create {
307 collection: input.collection.clone(),
308 rkey: rkey.clone(),
309 cid: record_cid,
310 };
311 let mut relevant_blocks = std::collections::BTreeMap::new();
312 if new_mst
313 .blocks_for_path(&key, &mut relevant_blocks)
314 .await
315 .is_err()
316 {
317 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
318 }
319 if mst
320 .blocks_for_path(&key, &mut relevant_blocks)
321 .await
322 .is_err()
323 {
324 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
325 }
326 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
327 let mut written_cids = tracking_store.get_all_relevant_cids();
328 for cid in relevant_blocks.keys() {
329 if !written_cids.contains(cid) {
330 written_cids.push(*cid);
331 }
332 }
333 let written_cids_str = written_cids
334 .iter()
335 .map(|c| c.to_string())
336 .collect::<Vec<_>>();
337 if let Err(e) = commit_and_log(
338 &state,
339 CommitParams {
340 did: &did,
341 user_id,
342 current_root_cid: Some(current_root_cid),
343 prev_data_cid: Some(commit.data),
344 new_mst_root,
345 ops: vec![op],
346 blocks_cids: &written_cids_str,
347 },
348 )
349 .await
350 {
351 return (
352 StatusCode::INTERNAL_SERVER_ERROR,
353 Json(json!({"error": "InternalError", "message": e})),
354 )
355 .into_response();
356 };
357
358 if let Some(ref controller) = controller_did {
359 let _ = delegation::log_delegation_action(
360 &state.db,
361 &did,
362 controller,
363 Some(controller),
364 DelegationActionType::RepoWrite,
365 Some(json!({
366 "action": "create",
367 "collection": input.collection,
368 "rkey": rkey
369 })),
370 None,
371 None,
372 )
373 .await;
374 }
375
376 (
377 StatusCode::OK,
378 Json(CreateRecordOutput {
379 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
380 cid: record_cid.to_string(),
381 }),
382 )
383 .into_response()
384}
385#[derive(Deserialize)]
386#[allow(dead_code)]
387pub struct PutRecordInput {
388 pub repo: String,
389 pub collection: String,
390 pub rkey: String,
391 pub validate: Option<bool>,
392 pub record: serde_json::Value,
393 #[serde(rename = "swapCommit")]
394 pub swap_commit: Option<String>,
395 #[serde(rename = "swapRecord")]
396 pub swap_record: Option<String>,
397}
398#[derive(Serialize)]
399#[serde(rename_all = "camelCase")]
400pub struct PutRecordOutput {
401 pub uri: String,
402 pub cid: String,
403}
404pub async fn put_record(
405 State(state): State<AppState>,
406 headers: HeaderMap,
407 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
408 Json(input): Json<PutRecordInput>,
409) -> Response {
410 let auth =
411 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
412 Ok(res) => res,
413 Err(err_res) => return err_res,
414 };
415
416 if let Err(e) = crate::auth::scope_check::check_repo_scope(
417 auth.is_oauth,
418 auth.scope.as_deref(),
419 crate::oauth::RepoAction::Create,
420 &input.collection,
421 ) {
422 return e;
423 }
424 if let Err(e) = crate::auth::scope_check::check_repo_scope(
425 auth.is_oauth,
426 auth.scope.as_deref(),
427 crate::oauth::RepoAction::Update,
428 &input.collection,
429 ) {
430 return e;
431 }
432
433 let did = auth.did;
434 let user_id = auth.user_id;
435 let current_root_cid = auth.current_root_cid;
436 let controller_did = auth.controller_did;
437
438 if let Some(swap_commit) = &input.swap_commit
439 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
440 {
441 return (
442 StatusCode::CONFLICT,
443 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
444 )
445 .into_response();
446 }
447 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
448 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
449 Ok(Some(b)) => b,
450 _ => {
451 return (
452 StatusCode::INTERNAL_SERVER_ERROR,
453 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
454 )
455 .into_response();
456 }
457 };
458 let commit = match Commit::from_cbor(&commit_bytes) {
459 Ok(c) => c,
460 _ => {
461 return (
462 StatusCode::INTERNAL_SERVER_ERROR,
463 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
464 )
465 .into_response();
466 }
467 };
468 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
469 let collection_nsid = match input.collection.parse::<Nsid>() {
470 Ok(n) => n,
471 Err(_) => {
472 return (
473 StatusCode::BAD_REQUEST,
474 Json(json!({"error": "InvalidCollection"})),
475 )
476 .into_response();
477 }
478 };
479 let key = format!("{}/{}", collection_nsid, input.rkey);
480 if input.validate.unwrap_or(true)
481 && let Err(err_response) = validate_record(&input.record, &input.collection)
482 {
483 return *err_response;
484 }
485 if let Some(swap_record_str) = &input.swap_record {
486 let expected_cid = Cid::from_str(swap_record_str).ok();
487 let actual_cid = mst.get(&key).await.ok().flatten();
488 if expected_cid != actual_cid {
489 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
490 }
491 }
492 let existing_cid = mst.get(&key).await.ok().flatten();
493 let mut record_bytes = Vec::new();
494 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
495 return (
496 StatusCode::BAD_REQUEST,
497 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
498 )
499 .into_response();
500 }
501 let record_cid = match tracking_store.put(&record_bytes).await {
502 Ok(c) => c,
503 _ => {
504 return (
505 StatusCode::INTERNAL_SERVER_ERROR,
506 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
507 )
508 .into_response();
509 }
510 };
511 let new_mst = if existing_cid.is_some() {
512 match mst.update(&key, record_cid).await {
513 Ok(m) => m,
514 Err(_) => {
515 return (
516 StatusCode::INTERNAL_SERVER_ERROR,
517 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
518 )
519 .into_response();
520 }
521 }
522 } else {
523 match mst.add(&key, record_cid).await {
524 Ok(m) => m,
525 Err(_) => {
526 return (
527 StatusCode::INTERNAL_SERVER_ERROR,
528 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
529 )
530 .into_response();
531 }
532 }
533 };
534 let new_mst_root = match new_mst.persist().await {
535 Ok(c) => c,
536 Err(_) => {
537 return (
538 StatusCode::INTERNAL_SERVER_ERROR,
539 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
540 )
541 .into_response();
542 }
543 };
544 let op = if existing_cid.is_some() {
545 RecordOp::Update {
546 collection: input.collection.clone(),
547 rkey: input.rkey.clone(),
548 cid: record_cid,
549 prev: existing_cid,
550 }
551 } else {
552 RecordOp::Create {
553 collection: input.collection.clone(),
554 rkey: input.rkey.clone(),
555 cid: record_cid,
556 }
557 };
558 let mut relevant_blocks = std::collections::BTreeMap::new();
559 if new_mst
560 .blocks_for_path(&key, &mut relevant_blocks)
561 .await
562 .is_err()
563 {
564 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
565 }
566 if mst
567 .blocks_for_path(&key, &mut relevant_blocks)
568 .await
569 .is_err()
570 {
571 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
572 }
573 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
574 let mut written_cids = tracking_store.get_all_relevant_cids();
575 for cid in relevant_blocks.keys() {
576 if !written_cids.contains(cid) {
577 written_cids.push(*cid);
578 }
579 }
580 let written_cids_str = written_cids
581 .iter()
582 .map(|c| c.to_string())
583 .collect::<Vec<_>>();
584 let is_update = existing_cid.is_some();
585 if let Err(e) = commit_and_log(
586 &state,
587 CommitParams {
588 did: &did,
589 user_id,
590 current_root_cid: Some(current_root_cid),
591 prev_data_cid: Some(commit.data),
592 new_mst_root,
593 ops: vec![op],
594 blocks_cids: &written_cids_str,
595 },
596 )
597 .await
598 {
599 return (
600 StatusCode::INTERNAL_SERVER_ERROR,
601 Json(json!({"error": "InternalError", "message": e})),
602 )
603 .into_response();
604 };
605
606 if let Some(ref controller) = controller_did {
607 let _ = delegation::log_delegation_action(
608 &state.db,
609 &did,
610 controller,
611 Some(controller),
612 DelegationActionType::RepoWrite,
613 Some(json!({
614 "action": if is_update { "update" } else { "create" },
615 "collection": input.collection,
616 "rkey": input.rkey
617 })),
618 None,
619 None,
620 )
621 .await;
622 }
623
624 (
625 StatusCode::OK,
626 Json(PutRecordOutput {
627 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
628 cid: record_cid.to_string(),
629 }),
630 )
631 .into_response()
632}