this repo has no description
1use super::validation::validate_record_with_status;
2use crate::api::error::ApiError;
3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
4use crate::delegation::{self, DelegationActionType};
5use crate::repo::tracking::TrackingBlockStore;
6use crate::state::AppState;
7use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey};
8use axum::{
9 Json,
10 extract::State,
11 http::{HeaderMap, StatusCode},
12 response::{IntoResponse, Response},
13};
14use cid::Cid;
15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use sqlx::{PgPool, Row};
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::error;
22use uuid::Uuid;
23
24pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
25 let row = sqlx::query(
26 r#"
27 SELECT
28 email_verified,
29 discord_verified,
30 telegram_verified,
31 signal_verified
32 FROM users
33 WHERE did = $1
34 "#,
35 )
36 .bind(did)
37 .fetch_optional(db)
38 .await?;
39 match row {
40 Some(r) => {
41 let email_verified: bool = r.get("email_verified");
42 let discord_verified: bool = r.get("discord_verified");
43 let telegram_verified: bool = r.get("telegram_verified");
44 let signal_verified: bool = r.get("signal_verified");
45 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
46 }
47 None => Ok(false),
48 }
49}
50
51pub struct RepoWriteAuth {
52 pub did: Did,
53 pub user_id: Uuid,
54 pub current_root_cid: Cid,
55 pub is_oauth: bool,
56 pub scope: Option<String>,
57 pub controller_did: Option<Did>,
58}
59
60pub async fn prepare_repo_write(
61 state: &AppState,
62 headers: &HeaderMap,
63 repo_did: &str,
64 http_method: &str,
65 http_uri: &str,
66) -> Result<RepoWriteAuth, Response> {
67 let extracted = crate::auth::extract_auth_token_from_header(
68 headers.get("Authorization").and_then(|h| h.to_str().ok()),
69 )
70 .ok_or_else(|| ApiError::AuthenticationRequired.into_response())?;
71 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
72 let auth_user = crate::auth::validate_token_with_dpop(
73 &state.db,
74 &extracted.token,
75 extracted.is_dpop,
76 dpop_proof,
77 http_method,
78 http_uri,
79 false,
80 false,
81 )
82 .await
83 .map_err(|e| {
84 tracing::warn!(error = ?e, is_dpop = extracted.is_dpop, "Token validation failed in prepare_repo_write");
85 let mut response = ApiError::from(e).into_response();
86 if matches!(e, crate::auth::TokenValidationError::TokenExpired) {
87 let scheme = if extracted.is_dpop { "DPoP" } else { "Bearer" };
88 let www_auth = format!(
89 "{} error=\"invalid_token\", error_description=\"Token has expired\"",
90 scheme
91 );
92 response.headers_mut().insert(
93 "WWW-Authenticate",
94 www_auth.parse().unwrap(),
95 );
96 if extracted.is_dpop {
97 let nonce = crate::oauth::verify::generate_dpop_nonce();
98 response.headers_mut().insert("DPoP-Nonce", nonce.parse().unwrap());
99 }
100 }
101 response
102 })?;
103 if repo_did != auth_user.did {
104 return Err(
105 ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(),
106 );
107 }
108 if crate::util::is_account_migrated(&state.db, &auth_user.did)
109 .await
110 .unwrap_or(false)
111 {
112 return Err(ApiError::AccountMigrated.into_response());
113 }
114 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
115 .await
116 .unwrap_or(false);
117 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
118 .await
119 .unwrap_or(false);
120 if !is_verified && !is_delegated {
121 return Err(ApiError::AccountNotVerified.into_response());
122 }
123 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &auth_user.did)
124 .fetch_optional(&state.db)
125 .await
126 .map_err(|e| {
127 error!("DB error fetching user: {}", e);
128 ApiError::InternalError(None).into_response()
129 })?
130 .ok_or_else(|| ApiError::InternalError(Some("User not found".into())).into_response())?;
131 let root_cid_str: String = sqlx::query_scalar!(
132 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
133 user_id
134 )
135 .fetch_optional(&state.db)
136 .await
137 .map_err(|e| {
138 error!("DB error fetching repo root: {}", e);
139 ApiError::InternalError(None).into_response()
140 })?
141 .ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())).into_response())?;
142 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
143 ApiError::InternalError(Some("Invalid repo root CID".into())).into_response()
144 })?;
145 Ok(RepoWriteAuth {
146 did: auth_user.did.clone(),
147 user_id,
148 current_root_cid,
149 is_oauth: auth_user.is_oauth,
150 scope: auth_user.scope,
151 controller_did: auth_user.controller_did.clone(),
152 })
153}
154#[derive(Deserialize)]
155#[allow(dead_code)]
156pub struct CreateRecordInput {
157 pub repo: AtIdentifier,
158 pub collection: Nsid,
159 pub rkey: Option<Rkey>,
160 pub validate: Option<bool>,
161 pub record: serde_json::Value,
162 #[serde(rename = "swapCommit")]
163 pub swap_commit: Option<String>,
164}
165#[derive(Serialize)]
166#[serde(rename_all = "camelCase")]
167pub struct CommitInfo {
168 pub cid: String,
169 pub rev: String,
170}
171
172#[derive(Serialize)]
173#[serde(rename_all = "camelCase")]
174pub struct CreateRecordOutput {
175 pub uri: AtUri,
176 pub cid: String,
177 pub commit: CommitInfo,
178 #[serde(skip_serializing_if = "Option::is_none")]
179 pub validation_status: Option<String>,
180}
181pub async fn create_record(
182 State(state): State<AppState>,
183 headers: HeaderMap,
184 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
185 Json(input): Json<CreateRecordInput>,
186) -> Response {
187 let auth = match prepare_repo_write(
188 &state,
189 &headers,
190 &input.repo,
191 "POST",
192 &crate::util::build_full_url(&uri.to_string()),
193 )
194 .await
195 {
196 Ok(res) => res,
197 Err(err_res) => return err_res,
198 };
199
200 if let Err(e) = crate::auth::scope_check::check_repo_scope(
201 auth.is_oauth,
202 auth.scope.as_deref(),
203 crate::oauth::RepoAction::Create,
204 &input.collection,
205 ) {
206 return e;
207 }
208
209 let did = auth.did;
210 let user_id = auth.user_id;
211 let current_root_cid = auth.current_root_cid;
212 let controller_did = auth.controller_did;
213
214 if let Some(swap_commit) = &input.swap_commit
215 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
216 {
217 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
218 }
219 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
220 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
221 Ok(Some(b)) => b,
222 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
223 };
224 let commit = match Commit::from_cbor(&commit_bytes) {
225 Ok(c) => c,
226 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
227 };
228 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
229 let validation_status = if input.validate == Some(false) {
230 None
231 } else {
232 let require_lexicon = input.validate == Some(true);
233 match validate_record_with_status(
234 &input.record,
235 &input.collection,
236 input.rkey.as_ref().map(|r| r.as_str()),
237 require_lexicon,
238 ) {
239 Ok(status) => Some(status),
240 Err(err_response) => return *err_response,
241 }
242 };
243 let rkey = input.rkey.unwrap_or_else(Rkey::generate);
244 let record_ipld = crate::util::json_to_ipld(&input.record);
245 let mut record_bytes = Vec::new();
246 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
247 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response();
248 }
249 let record_cid = match tracking_store.put(&record_bytes).await {
250 Ok(c) => c,
251 _ => {
252 return ApiError::InternalError(Some("Failed to save record block".into()))
253 .into_response();
254 }
255 };
256 let key = format!("{}/{}", input.collection, rkey);
257 let new_mst = match mst.add(&key, record_cid).await {
258 Ok(m) => m,
259 _ => return ApiError::InternalError(Some("Failed to add to MST".into())).into_response(),
260 };
261 let new_mst_root = match new_mst.persist().await {
262 Ok(c) => c,
263 _ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(),
264 };
265 let op = RecordOp::Create {
266 collection: input.collection.to_string(),
267 rkey: rkey.to_string(),
268 cid: record_cid,
269 };
270 let mut new_mst_blocks = std::collections::BTreeMap::new();
271 let mut old_mst_blocks = std::collections::BTreeMap::new();
272 if new_mst
273 .blocks_for_path(&key, &mut new_mst_blocks)
274 .await
275 .is_err()
276 {
277 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
278 .into_response();
279 }
280 if mst
281 .blocks_for_path(&key, &mut old_mst_blocks)
282 .await
283 .is_err()
284 {
285 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
286 .into_response();
287 }
288 let mut relevant_blocks = new_mst_blocks.clone();
289 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
290 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
291 let written_cids: Vec<Cid> = tracking_store
292 .get_all_relevant_cids()
293 .into_iter()
294 .chain(relevant_blocks.keys().copied())
295 .collect::<std::collections::HashSet<_>>()
296 .into_iter()
297 .collect();
298 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
299 let blob_cids = extract_blob_cids(&input.record);
300 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
301 .chain(
302 old_mst_blocks
303 .keys()
304 .filter(|cid| !new_mst_blocks.contains_key(*cid))
305 .copied(),
306 )
307 .collect();
308 let commit_result = match commit_and_log(
309 &state,
310 CommitParams {
311 did: &did,
312 user_id,
313 current_root_cid: Some(current_root_cid),
314 prev_data_cid: Some(commit.data),
315 new_mst_root,
316 ops: vec![op],
317 blocks_cids: &written_cids_str,
318 blobs: &blob_cids,
319 obsolete_cids,
320 },
321 )
322 .await
323 {
324 Ok(res) => res,
325 Err(e) => return ApiError::InternalError(Some(e)).into_response(),
326 };
327
328 if let Some(ref controller) = controller_did {
329 let _ = delegation::log_delegation_action(
330 &state.db,
331 &did,
332 controller,
333 Some(controller),
334 DelegationActionType::RepoWrite,
335 Some(json!({
336 "action": "create",
337 "collection": input.collection,
338 "rkey": rkey
339 })),
340 None,
341 None,
342 )
343 .await;
344 }
345
346 (
347 StatusCode::OK,
348 Json(CreateRecordOutput {
349 uri: AtUri::from_parts(&did, &input.collection, &rkey),
350 cid: record_cid.to_string(),
351 commit: CommitInfo {
352 cid: commit_result.commit_cid.to_string(),
353 rev: commit_result.rev,
354 },
355 validation_status: validation_status.map(|s| s.to_string()),
356 }),
357 )
358 .into_response()
359}
360#[derive(Deserialize)]
361#[allow(dead_code)]
362pub struct PutRecordInput {
363 pub repo: AtIdentifier,
364 pub collection: Nsid,
365 pub rkey: Rkey,
366 pub validate: Option<bool>,
367 pub record: serde_json::Value,
368 #[serde(rename = "swapCommit")]
369 pub swap_commit: Option<String>,
370 #[serde(rename = "swapRecord")]
371 pub swap_record: Option<String>,
372}
373#[derive(Serialize)]
374#[serde(rename_all = "camelCase")]
375pub struct PutRecordOutput {
376 pub uri: AtUri,
377 pub cid: String,
378 #[serde(skip_serializing_if = "Option::is_none")]
379 pub commit: Option<CommitInfo>,
380 #[serde(skip_serializing_if = "Option::is_none")]
381 pub validation_status: Option<String>,
382}
383pub async fn put_record(
384 State(state): State<AppState>,
385 headers: HeaderMap,
386 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
387 Json(input): Json<PutRecordInput>,
388) -> Response {
389 let auth = match prepare_repo_write(
390 &state,
391 &headers,
392 &input.repo,
393 "POST",
394 &crate::util::build_full_url(&uri.to_string()),
395 )
396 .await
397 {
398 Ok(res) => res,
399 Err(err_res) => return err_res,
400 };
401
402 if let Err(e) = crate::auth::scope_check::check_repo_scope(
403 auth.is_oauth,
404 auth.scope.as_deref(),
405 crate::oauth::RepoAction::Create,
406 &input.collection,
407 ) {
408 return e;
409 }
410 if let Err(e) = crate::auth::scope_check::check_repo_scope(
411 auth.is_oauth,
412 auth.scope.as_deref(),
413 crate::oauth::RepoAction::Update,
414 &input.collection,
415 ) {
416 return e;
417 }
418
419 let did = auth.did;
420 let user_id = auth.user_id;
421 let current_root_cid = auth.current_root_cid;
422 let controller_did = auth.controller_did;
423
424 if let Some(swap_commit) = &input.swap_commit
425 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
426 {
427 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
428 }
429 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
430 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
431 Ok(Some(b)) => b,
432 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
433 };
434 let commit = match Commit::from_cbor(&commit_bytes) {
435 Ok(c) => c,
436 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
437 };
438 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
439 let key = format!("{}/{}", input.collection, input.rkey);
440 let validation_status = if input.validate == Some(false) {
441 None
442 } else {
443 let require_lexicon = input.validate == Some(true);
444 match validate_record_with_status(
445 &input.record,
446 &input.collection,
447 Some(input.rkey.as_str()),
448 require_lexicon,
449 ) {
450 Ok(status) => Some(status),
451 Err(err_response) => return *err_response,
452 }
453 };
454 if let Some(swap_record_str) = &input.swap_record {
455 let expected_cid = Cid::from_str(swap_record_str).ok();
456 let actual_cid = mst.get(&key).await.ok().flatten();
457 if expected_cid != actual_cid {
458 return ApiError::InvalidSwap(Some(
459 "Record has been modified or does not exist".into(),
460 ))
461 .into_response();
462 }
463 }
464 let existing_cid = mst.get(&key).await.ok().flatten();
465 let record_ipld = crate::util::json_to_ipld(&input.record);
466 let mut record_bytes = Vec::new();
467 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
468 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response();
469 }
470 let record_cid = match tracking_store.put(&record_bytes).await {
471 Ok(c) => c,
472 _ => {
473 return ApiError::InternalError(Some("Failed to save record block".into()))
474 .into_response();
475 }
476 };
477 if existing_cid == Some(record_cid) {
478 return (
479 StatusCode::OK,
480 Json(PutRecordOutput {
481 uri: AtUri::from_parts(&did, &input.collection, &input.rkey),
482 cid: record_cid.to_string(),
483 commit: None,
484 validation_status: validation_status.map(|s| s.to_string()),
485 }),
486 )
487 .into_response();
488 }
489 let new_mst = if existing_cid.is_some() {
490 match mst.update(&key, record_cid).await {
491 Ok(m) => m,
492 Err(_) => {
493 return ApiError::InternalError(Some("Failed to update MST".into()))
494 .into_response();
495 }
496 }
497 } else {
498 match mst.add(&key, record_cid).await {
499 Ok(m) => m,
500 Err(_) => {
501 return ApiError::InternalError(Some("Failed to add to MST".into()))
502 .into_response();
503 }
504 }
505 };
506 let new_mst_root = match new_mst.persist().await {
507 Ok(c) => c,
508 Err(_) => {
509 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response();
510 }
511 };
512 let op = if existing_cid.is_some() {
513 RecordOp::Update {
514 collection: input.collection.to_string(),
515 rkey: input.rkey.to_string(),
516 cid: record_cid,
517 prev: existing_cid,
518 }
519 } else {
520 RecordOp::Create {
521 collection: input.collection.to_string(),
522 rkey: input.rkey.to_string(),
523 cid: record_cid,
524 }
525 };
526 let mut new_mst_blocks = std::collections::BTreeMap::new();
527 let mut old_mst_blocks = std::collections::BTreeMap::new();
528 if new_mst
529 .blocks_for_path(&key, &mut new_mst_blocks)
530 .await
531 .is_err()
532 {
533 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
534 .into_response();
535 }
536 if mst
537 .blocks_for_path(&key, &mut old_mst_blocks)
538 .await
539 .is_err()
540 {
541 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
542 .into_response();
543 }
544 let mut relevant_blocks = new_mst_blocks.clone();
545 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
546 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
547 let written_cids: Vec<Cid> = tracking_store
548 .get_all_relevant_cids()
549 .into_iter()
550 .chain(relevant_blocks.keys().copied())
551 .collect::<std::collections::HashSet<_>>()
552 .into_iter()
553 .collect();
554 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
555 let is_update = existing_cid.is_some();
556 let blob_cids = extract_blob_cids(&input.record);
557 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
558 .chain(
559 old_mst_blocks
560 .keys()
561 .filter(|cid| !new_mst_blocks.contains_key(*cid))
562 .copied(),
563 )
564 .chain(existing_cid)
565 .collect();
566 let commit_result = match commit_and_log(
567 &state,
568 CommitParams {
569 did: &did,
570 user_id,
571 current_root_cid: Some(current_root_cid),
572 prev_data_cid: Some(commit.data),
573 new_mst_root,
574 ops: vec![op],
575 blocks_cids: &written_cids_str,
576 blobs: &blob_cids,
577 obsolete_cids,
578 },
579 )
580 .await
581 {
582 Ok(res) => res,
583 Err(e) => return ApiError::InternalError(Some(e)).into_response(),
584 };
585
586 if let Some(ref controller) = controller_did {
587 let _ = delegation::log_delegation_action(
588 &state.db,
589 &did,
590 controller,
591 Some(controller),
592 DelegationActionType::RepoWrite,
593 Some(json!({
594 "action": if is_update { "update" } else { "create" },
595 "collection": input.collection,
596 "rkey": input.rkey
597 })),
598 None,
599 None,
600 )
601 .await;
602 }
603
604 (
605 StatusCode::OK,
606 Json(PutRecordOutput {
607 uri: AtUri::from_parts(&did, &input.collection, &input.rkey),
608 cid: record_cid.to_string(),
609 commit: Some(CommitInfo {
610 cid: commit_result.commit_cid.to_string(),
611 rev: commit_result.rev,
612 }),
613 validation_status: validation_status.map(|s| s.to_string()),
614 }),
615 )
616 .into_response()
617}