this repo has no description
1use super::validation::validate_record;
2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
3use crate::delegation::{self, DelegationActionType};
4use crate::repo::tracking::TrackingBlockStore;
5use crate::state::AppState;
6use axum::{
7 Json,
8 extract::State,
9 http::{HeaderMap, StatusCode},
10 response::{IntoResponse, Response},
11};
12use cid::Cid;
13use jacquard::types::{
14 integer::LimitedU32,
15 string::{Nsid, Tid},
16};
17use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
18use serde::{Deserialize, Serialize};
19use serde_json::json;
20use sqlx::{PgPool, Row};
21use std::str::FromStr;
22use std::sync::Arc;
23use tracing::error;
24use uuid::Uuid;
25
26pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
27 let row = sqlx::query(
28 r#"
29 SELECT
30 email_verified,
31 discord_verified,
32 telegram_verified,
33 signal_verified
34 FROM users
35 WHERE did = $1
36 "#,
37 )
38 .bind(did)
39 .fetch_optional(db)
40 .await?;
41 match row {
42 Some(r) => {
43 let email_verified: bool = r.get("email_verified");
44 let discord_verified: bool = r.get("discord_verified");
45 let telegram_verified: bool = r.get("telegram_verified");
46 let signal_verified: bool = r.get("signal_verified");
47 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
48 }
49 None => Ok(false),
50 }
51}
52
53pub struct RepoWriteAuth {
54 pub did: String,
55 pub user_id: Uuid,
56 pub current_root_cid: Cid,
57 pub is_oauth: bool,
58 pub scope: Option<String>,
59 pub controller_did: Option<String>,
60}
61
62pub async fn prepare_repo_write(
63 state: &AppState,
64 headers: &HeaderMap,
65 repo_did: &str,
66 http_method: &str,
67 http_uri: &str,
68) -> Result<RepoWriteAuth, Response> {
69 let extracted = crate::auth::extract_auth_token_from_header(
70 headers.get("Authorization").and_then(|h| h.to_str().ok()),
71 )
72 .ok_or_else(|| {
73 (
74 StatusCode::UNAUTHORIZED,
75 Json(json!({"error": "AuthenticationRequired"})),
76 )
77 .into_response()
78 })?;
79 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
80 let auth_user = crate::auth::validate_token_with_dpop(
81 &state.db,
82 &extracted.token,
83 extracted.is_dpop,
84 dpop_proof,
85 http_method,
86 http_uri,
87 false,
88 )
89 .await
90 .map_err(|e| {
91 (
92 StatusCode::UNAUTHORIZED,
93 Json(json!({"error": e.to_string()})),
94 )
95 .into_response()
96 })?;
97 if repo_did != auth_user.did {
98 return Err((
99 StatusCode::FORBIDDEN,
100 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
101 )
102 .into_response());
103 }
104 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
105 .await
106 .unwrap_or(false);
107 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
108 .await
109 .unwrap_or(false);
110 if !is_verified && !is_delegated {
111 return Err((
112 StatusCode::FORBIDDEN,
113 Json(json!({
114 "error": "AccountNotVerified",
115 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
116 })),
117 )
118 .into_response());
119 }
120 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
121 .fetch_optional(&state.db)
122 .await
123 .map_err(|e| {
124 error!("DB error fetching user: {}", e);
125 (
126 StatusCode::INTERNAL_SERVER_ERROR,
127 Json(json!({"error": "InternalError"})),
128 )
129 .into_response()
130 })?
131 .ok_or_else(|| {
132 (
133 StatusCode::INTERNAL_SERVER_ERROR,
134 Json(json!({"error": "InternalError", "message": "User not found"})),
135 )
136 .into_response()
137 })?;
138 let root_cid_str: String = sqlx::query_scalar!(
139 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
140 user_id
141 )
142 .fetch_optional(&state.db)
143 .await
144 .map_err(|e| {
145 error!("DB error fetching repo root: {}", e);
146 (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(json!({"error": "InternalError"})),
149 )
150 .into_response()
151 })?
152 .ok_or_else(|| {
153 (
154 StatusCode::INTERNAL_SERVER_ERROR,
155 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
156 )
157 .into_response()
158 })?;
159 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
160 (
161 StatusCode::INTERNAL_SERVER_ERROR,
162 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
163 )
164 .into_response()
165 })?;
166 Ok(RepoWriteAuth {
167 did: auth_user.did,
168 user_id,
169 current_root_cid,
170 is_oauth: auth_user.is_oauth,
171 scope: auth_user.scope,
172 controller_did: auth_user.controller_did,
173 })
174}
175#[derive(Deserialize)]
176#[allow(dead_code)]
177pub struct CreateRecordInput {
178 pub repo: String,
179 pub collection: String,
180 pub rkey: Option<String>,
181 pub validate: Option<bool>,
182 pub record: serde_json::Value,
183 #[serde(rename = "swapCommit")]
184 pub swap_commit: Option<String>,
185}
186#[derive(Serialize)]
187#[serde(rename_all = "camelCase")]
188pub struct CreateRecordOutput {
189 pub uri: String,
190 pub cid: String,
191}
192pub async fn create_record(
193 State(state): State<AppState>,
194 headers: HeaderMap,
195 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
196 Json(input): Json<CreateRecordInput>,
197) -> Response {
198 let auth =
199 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
200 Ok(res) => res,
201 Err(err_res) => return err_res,
202 };
203
204 if let Err(e) = crate::auth::scope_check::check_repo_scope(
205 auth.is_oauth,
206 auth.scope.as_deref(),
207 crate::oauth::RepoAction::Create,
208 &input.collection,
209 ) {
210 return e;
211 }
212
213 let did = auth.did;
214 let user_id = auth.user_id;
215 let current_root_cid = auth.current_root_cid;
216 let controller_did = auth.controller_did;
217
218 if let Some(swap_commit) = &input.swap_commit
219 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
220 {
221 return (
222 StatusCode::CONFLICT,
223 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
224 )
225 .into_response();
226 }
227 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
228 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
229 Ok(Some(b)) => b,
230 _ => {
231 return (
232 StatusCode::INTERNAL_SERVER_ERROR,
233 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
234 )
235 .into_response();
236 }
237 };
238 let commit = match Commit::from_cbor(&commit_bytes) {
239 Ok(c) => c,
240 _ => {
241 return (
242 StatusCode::INTERNAL_SERVER_ERROR,
243 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
244 )
245 .into_response();
246 }
247 };
248 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
249 let collection_nsid = match input.collection.parse::<Nsid>() {
250 Ok(n) => n,
251 Err(_) => {
252 return (
253 StatusCode::BAD_REQUEST,
254 Json(json!({"error": "InvalidCollection"})),
255 )
256 .into_response();
257 }
258 };
259 if input.validate.unwrap_or(true)
260 && let Err(err_response) = validate_record(&input.record, &input.collection)
261 {
262 return *err_response;
263 }
264 let rkey = input
265 .rkey
266 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
267 let mut record_bytes = Vec::new();
268 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
269 return (
270 StatusCode::BAD_REQUEST,
271 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
272 )
273 .into_response();
274 }
275 let record_cid = match tracking_store.put(&record_bytes).await {
276 Ok(c) => c,
277 _ => {
278 return (
279 StatusCode::INTERNAL_SERVER_ERROR,
280 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
281 )
282 .into_response();
283 }
284 };
285 let key = format!("{}/{}", collection_nsid, rkey);
286 let new_mst = match mst.add(&key, record_cid).await {
287 Ok(m) => m,
288 _ => {
289 return (
290 StatusCode::INTERNAL_SERVER_ERROR,
291 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
292 )
293 .into_response();
294 }
295 };
296 let new_mst_root = match new_mst.persist().await {
297 Ok(c) => c,
298 _ => {
299 return (
300 StatusCode::INTERNAL_SERVER_ERROR,
301 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
302 )
303 .into_response();
304 }
305 };
306 let op = RecordOp::Create {
307 collection: input.collection.clone(),
308 rkey: rkey.clone(),
309 cid: record_cid,
310 };
311 let mut relevant_blocks = std::collections::BTreeMap::new();
312 if new_mst
313 .blocks_for_path(&key, &mut relevant_blocks)
314 .await
315 .is_err()
316 {
317 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
318 }
319 if mst
320 .blocks_for_path(&key, &mut relevant_blocks)
321 .await
322 .is_err()
323 {
324 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
325 }
326 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
327 let mut written_cids = tracking_store.get_all_relevant_cids();
328 for cid in relevant_blocks.keys() {
329 if !written_cids.contains(cid) {
330 written_cids.push(*cid);
331 }
332 }
333 let written_cids_str = written_cids
334 .iter()
335 .map(|c| c.to_string())
336 .collect::<Vec<_>>();
337 let blob_cids = extract_blob_cids(&input.record);
338 if let Err(e) = commit_and_log(
339 &state,
340 CommitParams {
341 did: &did,
342 user_id,
343 current_root_cid: Some(current_root_cid),
344 prev_data_cid: Some(commit.data),
345 new_mst_root,
346 ops: vec![op],
347 blocks_cids: &written_cids_str,
348 blobs: &blob_cids,
349 },
350 )
351 .await
352 {
353 return (
354 StatusCode::INTERNAL_SERVER_ERROR,
355 Json(json!({"error": "InternalError", "message": e})),
356 )
357 .into_response();
358 };
359
360 if let Some(ref controller) = controller_did {
361 let _ = delegation::log_delegation_action(
362 &state.db,
363 &did,
364 controller,
365 Some(controller),
366 DelegationActionType::RepoWrite,
367 Some(json!({
368 "action": "create",
369 "collection": input.collection,
370 "rkey": rkey
371 })),
372 None,
373 None,
374 )
375 .await;
376 }
377
378 (
379 StatusCode::OK,
380 Json(CreateRecordOutput {
381 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
382 cid: record_cid.to_string(),
383 }),
384 )
385 .into_response()
386}
387#[derive(Deserialize)]
388#[allow(dead_code)]
389pub struct PutRecordInput {
390 pub repo: String,
391 pub collection: String,
392 pub rkey: String,
393 pub validate: Option<bool>,
394 pub record: serde_json::Value,
395 #[serde(rename = "swapCommit")]
396 pub swap_commit: Option<String>,
397 #[serde(rename = "swapRecord")]
398 pub swap_record: Option<String>,
399}
400#[derive(Serialize)]
401#[serde(rename_all = "camelCase")]
402pub struct PutRecordOutput {
403 pub uri: String,
404 pub cid: String,
405}
406pub async fn put_record(
407 State(state): State<AppState>,
408 headers: HeaderMap,
409 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
410 Json(input): Json<PutRecordInput>,
411) -> Response {
412 let auth =
413 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
414 Ok(res) => res,
415 Err(err_res) => return err_res,
416 };
417
418 if let Err(e) = crate::auth::scope_check::check_repo_scope(
419 auth.is_oauth,
420 auth.scope.as_deref(),
421 crate::oauth::RepoAction::Create,
422 &input.collection,
423 ) {
424 return e;
425 }
426 if let Err(e) = crate::auth::scope_check::check_repo_scope(
427 auth.is_oauth,
428 auth.scope.as_deref(),
429 crate::oauth::RepoAction::Update,
430 &input.collection,
431 ) {
432 return e;
433 }
434
435 let did = auth.did;
436 let user_id = auth.user_id;
437 let current_root_cid = auth.current_root_cid;
438 let controller_did = auth.controller_did;
439
440 if let Some(swap_commit) = &input.swap_commit
441 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
442 {
443 return (
444 StatusCode::CONFLICT,
445 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
446 )
447 .into_response();
448 }
449 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
450 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
451 Ok(Some(b)) => b,
452 _ => {
453 return (
454 StatusCode::INTERNAL_SERVER_ERROR,
455 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
456 )
457 .into_response();
458 }
459 };
460 let commit = match Commit::from_cbor(&commit_bytes) {
461 Ok(c) => c,
462 _ => {
463 return (
464 StatusCode::INTERNAL_SERVER_ERROR,
465 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
466 )
467 .into_response();
468 }
469 };
470 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
471 let collection_nsid = match input.collection.parse::<Nsid>() {
472 Ok(n) => n,
473 Err(_) => {
474 return (
475 StatusCode::BAD_REQUEST,
476 Json(json!({"error": "InvalidCollection"})),
477 )
478 .into_response();
479 }
480 };
481 let key = format!("{}/{}", collection_nsid, input.rkey);
482 if input.validate.unwrap_or(true)
483 && let Err(err_response) = validate_record(&input.record, &input.collection)
484 {
485 return *err_response;
486 }
487 if let Some(swap_record_str) = &input.swap_record {
488 let expected_cid = Cid::from_str(swap_record_str).ok();
489 let actual_cid = mst.get(&key).await.ok().flatten();
490 if expected_cid != actual_cid {
491 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
492 }
493 }
494 let existing_cid = mst.get(&key).await.ok().flatten();
495 let mut record_bytes = Vec::new();
496 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
497 return (
498 StatusCode::BAD_REQUEST,
499 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
500 )
501 .into_response();
502 }
503 let record_cid = match tracking_store.put(&record_bytes).await {
504 Ok(c) => c,
505 _ => {
506 return (
507 StatusCode::INTERNAL_SERVER_ERROR,
508 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
509 )
510 .into_response();
511 }
512 };
513 let new_mst = if existing_cid.is_some() {
514 match mst.update(&key, record_cid).await {
515 Ok(m) => m,
516 Err(_) => {
517 return (
518 StatusCode::INTERNAL_SERVER_ERROR,
519 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
520 )
521 .into_response();
522 }
523 }
524 } else {
525 match mst.add(&key, record_cid).await {
526 Ok(m) => m,
527 Err(_) => {
528 return (
529 StatusCode::INTERNAL_SERVER_ERROR,
530 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
531 )
532 .into_response();
533 }
534 }
535 };
536 let new_mst_root = match new_mst.persist().await {
537 Ok(c) => c,
538 Err(_) => {
539 return (
540 StatusCode::INTERNAL_SERVER_ERROR,
541 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
542 )
543 .into_response();
544 }
545 };
546 let op = if existing_cid.is_some() {
547 RecordOp::Update {
548 collection: input.collection.clone(),
549 rkey: input.rkey.clone(),
550 cid: record_cid,
551 prev: existing_cid,
552 }
553 } else {
554 RecordOp::Create {
555 collection: input.collection.clone(),
556 rkey: input.rkey.clone(),
557 cid: record_cid,
558 }
559 };
560 let mut relevant_blocks = std::collections::BTreeMap::new();
561 if new_mst
562 .blocks_for_path(&key, &mut relevant_blocks)
563 .await
564 .is_err()
565 {
566 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
567 }
568 if mst
569 .blocks_for_path(&key, &mut relevant_blocks)
570 .await
571 .is_err()
572 {
573 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
574 }
575 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
576 let mut written_cids = tracking_store.get_all_relevant_cids();
577 for cid in relevant_blocks.keys() {
578 if !written_cids.contains(cid) {
579 written_cids.push(*cid);
580 }
581 }
582 let written_cids_str = written_cids
583 .iter()
584 .map(|c| c.to_string())
585 .collect::<Vec<_>>();
586 let is_update = existing_cid.is_some();
587 let blob_cids = extract_blob_cids(&input.record);
588 if let Err(e) = commit_and_log(
589 &state,
590 CommitParams {
591 did: &did,
592 user_id,
593 current_root_cid: Some(current_root_cid),
594 prev_data_cid: Some(commit.data),
595 new_mst_root,
596 ops: vec![op],
597 blocks_cids: &written_cids_str,
598 blobs: &blob_cids,
599 },
600 )
601 .await
602 {
603 return (
604 StatusCode::INTERNAL_SERVER_ERROR,
605 Json(json!({"error": "InternalError", "message": e})),
606 )
607 .into_response();
608 };
609
610 if let Some(ref controller) = controller_did {
611 let _ = delegation::log_delegation_action(
612 &state.db,
613 &did,
614 controller,
615 Some(controller),
616 DelegationActionType::RepoWrite,
617 Some(json!({
618 "action": if is_update { "update" } else { "create" },
619 "collection": input.collection,
620 "rkey": input.rkey
621 })),
622 None,
623 None,
624 )
625 .await;
626 }
627
628 (
629 StatusCode::OK,
630 Json(PutRecordOutput {
631 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
632 cid: record_cid.to_string(),
633 }),
634 )
635 .into_response()
636}