this repo has no description
1use super::validation::validate_record_with_rkey;
2use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
3use crate::delegation::{self, DelegationActionType};
4use crate::repo::tracking::TrackingBlockStore;
5use crate::state::AppState;
6use axum::{
7 Json,
8 extract::State,
9 http::{HeaderMap, StatusCode},
10 response::{IntoResponse, Response},
11};
12use cid::Cid;
13use jacquard::types::{
14 integer::LimitedU32,
15 string::{Nsid, Tid},
16};
17use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
18use serde::{Deserialize, Serialize};
19use serde_json::json;
20use sqlx::{PgPool, Row};
21use std::str::FromStr;
22use std::sync::Arc;
23use tracing::error;
24use uuid::Uuid;
25
26pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
27 let row = sqlx::query(
28 r#"
29 SELECT
30 email_verified,
31 discord_verified,
32 telegram_verified,
33 signal_verified
34 FROM users
35 WHERE did = $1
36 "#,
37 )
38 .bind(did)
39 .fetch_optional(db)
40 .await?;
41 match row {
42 Some(r) => {
43 let email_verified: bool = r.get("email_verified");
44 let discord_verified: bool = r.get("discord_verified");
45 let telegram_verified: bool = r.get("telegram_verified");
46 let signal_verified: bool = r.get("signal_verified");
47 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
48 }
49 None => Ok(false),
50 }
51}
52
53pub struct RepoWriteAuth {
54 pub did: String,
55 pub user_id: Uuid,
56 pub current_root_cid: Cid,
57 pub is_oauth: bool,
58 pub scope: Option<String>,
59 pub controller_did: Option<String>,
60}
61
62pub async fn prepare_repo_write(
63 state: &AppState,
64 headers: &HeaderMap,
65 repo_did: &str,
66 http_method: &str,
67 http_uri: &str,
68) -> Result<RepoWriteAuth, Response> {
69 let extracted = crate::auth::extract_auth_token_from_header(
70 headers.get("Authorization").and_then(|h| h.to_str().ok()),
71 )
72 .ok_or_else(|| {
73 (
74 StatusCode::UNAUTHORIZED,
75 Json(json!({"error": "AuthenticationRequired"})),
76 )
77 .into_response()
78 })?;
79 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
80 let auth_user = crate::auth::validate_token_with_dpop(
81 &state.db,
82 &extracted.token,
83 extracted.is_dpop,
84 dpop_proof,
85 http_method,
86 http_uri,
87 false,
88 )
89 .await
90 .map_err(|e| {
91 (
92 StatusCode::UNAUTHORIZED,
93 Json(json!({"error": e.to_string()})),
94 )
95 .into_response()
96 })?;
97 if repo_did != auth_user.did {
98 return Err((
99 StatusCode::FORBIDDEN,
100 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
101 )
102 .into_response());
103 }
104 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
105 .await
106 .unwrap_or(false);
107 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
108 .await
109 .unwrap_or(false);
110 if !is_verified && !is_delegated {
111 return Err((
112 StatusCode::FORBIDDEN,
113 Json(json!({
114 "error": "AccountNotVerified",
115 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
116 })),
117 )
118 .into_response());
119 }
120 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
121 .fetch_optional(&state.db)
122 .await
123 .map_err(|e| {
124 error!("DB error fetching user: {}", e);
125 (
126 StatusCode::INTERNAL_SERVER_ERROR,
127 Json(json!({"error": "InternalError"})),
128 )
129 .into_response()
130 })?
131 .ok_or_else(|| {
132 (
133 StatusCode::INTERNAL_SERVER_ERROR,
134 Json(json!({"error": "InternalError", "message": "User not found"})),
135 )
136 .into_response()
137 })?;
138 let root_cid_str: String = sqlx::query_scalar!(
139 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
140 user_id
141 )
142 .fetch_optional(&state.db)
143 .await
144 .map_err(|e| {
145 error!("DB error fetching repo root: {}", e);
146 (
147 StatusCode::INTERNAL_SERVER_ERROR,
148 Json(json!({"error": "InternalError"})),
149 )
150 .into_response()
151 })?
152 .ok_or_else(|| {
153 (
154 StatusCode::INTERNAL_SERVER_ERROR,
155 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
156 )
157 .into_response()
158 })?;
159 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
160 (
161 StatusCode::INTERNAL_SERVER_ERROR,
162 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
163 )
164 .into_response()
165 })?;
166 Ok(RepoWriteAuth {
167 did: auth_user.did,
168 user_id,
169 current_root_cid,
170 is_oauth: auth_user.is_oauth,
171 scope: auth_user.scope,
172 controller_did: auth_user.controller_did,
173 })
174}
175#[derive(Deserialize)]
176#[allow(dead_code)]
177pub struct CreateRecordInput {
178 pub repo: String,
179 pub collection: String,
180 pub rkey: Option<String>,
181 pub validate: Option<bool>,
182 pub record: serde_json::Value,
183 #[serde(rename = "swapCommit")]
184 pub swap_commit: Option<String>,
185}
186#[derive(Serialize)]
187#[serde(rename_all = "camelCase")]
188pub struct CreateRecordOutput {
189 pub uri: String,
190 pub cid: String,
191}
192pub async fn create_record(
193 State(state): State<AppState>,
194 headers: HeaderMap,
195 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
196 Json(input): Json<CreateRecordInput>,
197) -> Response {
198 let auth =
199 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
200 Ok(res) => res,
201 Err(err_res) => return err_res,
202 };
203
204 if let Err(e) = crate::auth::scope_check::check_repo_scope(
205 auth.is_oauth,
206 auth.scope.as_deref(),
207 crate::oauth::RepoAction::Create,
208 &input.collection,
209 ) {
210 return e;
211 }
212
213 let did = auth.did;
214 let user_id = auth.user_id;
215 let current_root_cid = auth.current_root_cid;
216 let controller_did = auth.controller_did;
217
218 if let Some(swap_commit) = &input.swap_commit
219 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
220 {
221 return (
222 StatusCode::CONFLICT,
223 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
224 )
225 .into_response();
226 }
227 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
228 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
229 Ok(Some(b)) => b,
230 _ => {
231 return (
232 StatusCode::INTERNAL_SERVER_ERROR,
233 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
234 )
235 .into_response();
236 }
237 };
238 let commit = match Commit::from_cbor(&commit_bytes) {
239 Ok(c) => c,
240 _ => {
241 return (
242 StatusCode::INTERNAL_SERVER_ERROR,
243 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
244 )
245 .into_response();
246 }
247 };
248 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
249 let collection_nsid = match input.collection.parse::<Nsid>() {
250 Ok(n) => n,
251 Err(_) => {
252 return (
253 StatusCode::BAD_REQUEST,
254 Json(json!({"error": "InvalidCollection"})),
255 )
256 .into_response();
257 }
258 };
259 if input.validate.unwrap_or(true)
260 && let Err(err_response) =
261 validate_record_with_rkey(&input.record, &input.collection, input.rkey.as_deref())
262 {
263 return *err_response;
264 }
265 let rkey = input
266 .rkey
267 .unwrap_or_else(|| Tid::now(LimitedU32::MIN).to_string());
268 let mut record_bytes = Vec::new();
269 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
270 return (
271 StatusCode::BAD_REQUEST,
272 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
273 )
274 .into_response();
275 }
276 let record_cid = match tracking_store.put(&record_bytes).await {
277 Ok(c) => c,
278 _ => {
279 return (
280 StatusCode::INTERNAL_SERVER_ERROR,
281 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
282 )
283 .into_response();
284 }
285 };
286 let key = format!("{}/{}", collection_nsid, rkey);
287 let new_mst = match mst.add(&key, record_cid).await {
288 Ok(m) => m,
289 _ => {
290 return (
291 StatusCode::INTERNAL_SERVER_ERROR,
292 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
293 )
294 .into_response();
295 }
296 };
297 let new_mst_root = match new_mst.persist().await {
298 Ok(c) => c,
299 _ => {
300 return (
301 StatusCode::INTERNAL_SERVER_ERROR,
302 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
303 )
304 .into_response();
305 }
306 };
307 let op = RecordOp::Create {
308 collection: input.collection.clone(),
309 rkey: rkey.clone(),
310 cid: record_cid,
311 };
312 let mut relevant_blocks = std::collections::BTreeMap::new();
313 if new_mst
314 .blocks_for_path(&key, &mut relevant_blocks)
315 .await
316 .is_err()
317 {
318 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
319 }
320 if mst
321 .blocks_for_path(&key, &mut relevant_blocks)
322 .await
323 .is_err()
324 {
325 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
326 }
327 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
328 let mut written_cids = tracking_store.get_all_relevant_cids();
329 for cid in relevant_blocks.keys() {
330 if !written_cids.contains(cid) {
331 written_cids.push(*cid);
332 }
333 }
334 let written_cids_str = written_cids
335 .iter()
336 .map(|c| c.to_string())
337 .collect::<Vec<_>>();
338 let blob_cids = extract_blob_cids(&input.record);
339 if let Err(e) = commit_and_log(
340 &state,
341 CommitParams {
342 did: &did,
343 user_id,
344 current_root_cid: Some(current_root_cid),
345 prev_data_cid: Some(commit.data),
346 new_mst_root,
347 ops: vec![op],
348 blocks_cids: &written_cids_str,
349 blobs: &blob_cids,
350 },
351 )
352 .await
353 {
354 return (
355 StatusCode::INTERNAL_SERVER_ERROR,
356 Json(json!({"error": "InternalError", "message": e})),
357 )
358 .into_response();
359 };
360
361 if let Some(ref controller) = controller_did {
362 let _ = delegation::log_delegation_action(
363 &state.db,
364 &did,
365 controller,
366 Some(controller),
367 DelegationActionType::RepoWrite,
368 Some(json!({
369 "action": "create",
370 "collection": input.collection,
371 "rkey": rkey
372 })),
373 None,
374 None,
375 )
376 .await;
377 }
378
379 (
380 StatusCode::OK,
381 Json(CreateRecordOutput {
382 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
383 cid: record_cid.to_string(),
384 }),
385 )
386 .into_response()
387}
388#[derive(Deserialize)]
389#[allow(dead_code)]
390pub struct PutRecordInput {
391 pub repo: String,
392 pub collection: String,
393 pub rkey: String,
394 pub validate: Option<bool>,
395 pub record: serde_json::Value,
396 #[serde(rename = "swapCommit")]
397 pub swap_commit: Option<String>,
398 #[serde(rename = "swapRecord")]
399 pub swap_record: Option<String>,
400}
401#[derive(Serialize)]
402#[serde(rename_all = "camelCase")]
403pub struct PutRecordOutput {
404 pub uri: String,
405 pub cid: String,
406}
407pub async fn put_record(
408 State(state): State<AppState>,
409 headers: HeaderMap,
410 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
411 Json(input): Json<PutRecordInput>,
412) -> Response {
413 let auth =
414 match prepare_repo_write(&state, &headers, &input.repo, "POST", &uri.to_string()).await {
415 Ok(res) => res,
416 Err(err_res) => return err_res,
417 };
418
419 if let Err(e) = crate::auth::scope_check::check_repo_scope(
420 auth.is_oauth,
421 auth.scope.as_deref(),
422 crate::oauth::RepoAction::Create,
423 &input.collection,
424 ) {
425 return e;
426 }
427 if let Err(e) = crate::auth::scope_check::check_repo_scope(
428 auth.is_oauth,
429 auth.scope.as_deref(),
430 crate::oauth::RepoAction::Update,
431 &input.collection,
432 ) {
433 return e;
434 }
435
436 let did = auth.did;
437 let user_id = auth.user_id;
438 let current_root_cid = auth.current_root_cid;
439 let controller_did = auth.controller_did;
440
441 if let Some(swap_commit) = &input.swap_commit
442 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
443 {
444 return (
445 StatusCode::CONFLICT,
446 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
447 )
448 .into_response();
449 }
450 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
451 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
452 Ok(Some(b)) => b,
453 _ => {
454 return (
455 StatusCode::INTERNAL_SERVER_ERROR,
456 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
457 )
458 .into_response();
459 }
460 };
461 let commit = match Commit::from_cbor(&commit_bytes) {
462 Ok(c) => c,
463 _ => {
464 return (
465 StatusCode::INTERNAL_SERVER_ERROR,
466 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
467 )
468 .into_response();
469 }
470 };
471 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
472 let collection_nsid = match input.collection.parse::<Nsid>() {
473 Ok(n) => n,
474 Err(_) => {
475 return (
476 StatusCode::BAD_REQUEST,
477 Json(json!({"error": "InvalidCollection"})),
478 )
479 .into_response();
480 }
481 };
482 let key = format!("{}/{}", collection_nsid, input.rkey);
483 if input.validate.unwrap_or(true)
484 && let Err(err_response) =
485 validate_record_with_rkey(&input.record, &input.collection, Some(&input.rkey))
486 {
487 return *err_response;
488 }
489 if let Some(swap_record_str) = &input.swap_record {
490 let expected_cid = Cid::from_str(swap_record_str).ok();
491 let actual_cid = mst.get(&key).await.ok().flatten();
492 if expected_cid != actual_cid {
493 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
494 }
495 }
496 let existing_cid = mst.get(&key).await.ok().flatten();
497 let mut record_bytes = Vec::new();
498 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
499 return (
500 StatusCode::BAD_REQUEST,
501 Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
502 )
503 .into_response();
504 }
505 let record_cid = match tracking_store.put(&record_bytes).await {
506 Ok(c) => c,
507 _ => {
508 return (
509 StatusCode::INTERNAL_SERVER_ERROR,
510 Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
511 )
512 .into_response();
513 }
514 };
515 let new_mst = if existing_cid.is_some() {
516 match mst.update(&key, record_cid).await {
517 Ok(m) => m,
518 Err(_) => {
519 return (
520 StatusCode::INTERNAL_SERVER_ERROR,
521 Json(json!({"error": "InternalError", "message": "Failed to update MST"})),
522 )
523 .into_response();
524 }
525 }
526 } else {
527 match mst.add(&key, record_cid).await {
528 Ok(m) => m,
529 Err(_) => {
530 return (
531 StatusCode::INTERNAL_SERVER_ERROR,
532 Json(json!({"error": "InternalError", "message": "Failed to add to MST"})),
533 )
534 .into_response();
535 }
536 }
537 };
538 let new_mst_root = match new_mst.persist().await {
539 Ok(c) => c,
540 Err(_) => {
541 return (
542 StatusCode::INTERNAL_SERVER_ERROR,
543 Json(json!({"error": "InternalError", "message": "Failed to persist MST"})),
544 )
545 .into_response();
546 }
547 };
548 let op = if existing_cid.is_some() {
549 RecordOp::Update {
550 collection: input.collection.clone(),
551 rkey: input.rkey.clone(),
552 cid: record_cid,
553 prev: existing_cid,
554 }
555 } else {
556 RecordOp::Create {
557 collection: input.collection.clone(),
558 rkey: input.rkey.clone(),
559 cid: record_cid,
560 }
561 };
562 let mut relevant_blocks = std::collections::BTreeMap::new();
563 if new_mst
564 .blocks_for_path(&key, &mut relevant_blocks)
565 .await
566 .is_err()
567 {
568 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST blocks for path"}))).into_response();
569 }
570 if mst
571 .blocks_for_path(&key, &mut relevant_blocks)
572 .await
573 .is_err()
574 {
575 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get old MST blocks for path"}))).into_response();
576 }
577 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
578 let mut written_cids = tracking_store.get_all_relevant_cids();
579 for cid in relevant_blocks.keys() {
580 if !written_cids.contains(cid) {
581 written_cids.push(*cid);
582 }
583 }
584 let written_cids_str = written_cids
585 .iter()
586 .map(|c| c.to_string())
587 .collect::<Vec<_>>();
588 let is_update = existing_cid.is_some();
589 let blob_cids = extract_blob_cids(&input.record);
590 if let Err(e) = commit_and_log(
591 &state,
592 CommitParams {
593 did: &did,
594 user_id,
595 current_root_cid: Some(current_root_cid),
596 prev_data_cid: Some(commit.data),
597 new_mst_root,
598 ops: vec![op],
599 blocks_cids: &written_cids_str,
600 blobs: &blob_cids,
601 },
602 )
603 .await
604 {
605 return (
606 StatusCode::INTERNAL_SERVER_ERROR,
607 Json(json!({"error": "InternalError", "message": e})),
608 )
609 .into_response();
610 };
611
612 if let Some(ref controller) = controller_did {
613 let _ = delegation::log_delegation_action(
614 &state.db,
615 &did,
616 controller,
617 Some(controller),
618 DelegationActionType::RepoWrite,
619 Some(json!({
620 "action": if is_update { "update" } else { "create" },
621 "collection": input.collection,
622 "rkey": input.rkey
623 })),
624 None,
625 None,
626 )
627 .await;
628 }
629
630 (
631 StatusCode::OK,
632 Json(PutRecordOutput {
633 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
634 cid: record_cid.to_string(),
635 }),
636 )
637 .into_response()
638}