this repo has no description
1use super::validation::validate_record_with_status;
2use crate::api::error::ApiError;
3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
4use crate::delegation::{self, DelegationActionType};
5use crate::repo::tracking::TrackingBlockStore;
6use crate::state::AppState;
7use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey};
8use axum::{
9 Json,
10 extract::State,
11 http::{HeaderMap, StatusCode},
12 response::{IntoResponse, Response},
13};
14use cid::Cid;
15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use sqlx::{PgPool, Row};
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::error;
22use uuid::Uuid;
23
24pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
25 let row = sqlx::query(
26 r#"
27 SELECT
28 email_verified,
29 discord_verified,
30 telegram_verified,
31 signal_verified
32 FROM users
33 WHERE did = $1
34 "#,
35 )
36 .bind(did)
37 .fetch_optional(db)
38 .await?;
39 match row {
40 Some(r) => {
41 let email_verified: bool = r.get("email_verified");
42 let discord_verified: bool = r.get("discord_verified");
43 let telegram_verified: bool = r.get("telegram_verified");
44 let signal_verified: bool = r.get("signal_verified");
45 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
46 }
47 None => Ok(false),
48 }
49}
50
51pub struct RepoWriteAuth {
52 pub did: Did,
53 pub user_id: Uuid,
54 pub current_root_cid: Cid,
55 pub is_oauth: bool,
56 pub scope: Option<String>,
57 pub controller_did: Option<Did>,
58}
59
60pub async fn prepare_repo_write(
61 state: &AppState,
62 headers: &HeaderMap,
63 repo_did: &str,
64 http_method: &str,
65 http_uri: &str,
66) -> Result<RepoWriteAuth, Response> {
67 let extracted = crate::auth::extract_auth_token_from_header(
68 headers.get("Authorization").and_then(|h| h.to_str().ok()),
69 )
70 .ok_or_else(|| ApiError::AuthenticationRequired.into_response())?;
71 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
72 let auth_user = crate::auth::validate_token_with_dpop(
73 &state.db,
74 &extracted.token,
75 extracted.is_dpop,
76 dpop_proof,
77 http_method,
78 http_uri,
79 false,
80 )
81 .await
82 .map_err(|e| {
83 tracing::warn!(error = ?e, is_dpop = extracted.is_dpop, "Token validation failed in prepare_repo_write");
84 let mut response = ApiError::from(e).into_response();
85 if matches!(e, crate::auth::TokenValidationError::TokenExpired) {
86 let scheme = if extracted.is_dpop { "DPoP" } else { "Bearer" };
87 let www_auth = format!(
88 "{} error=\"invalid_token\", error_description=\"Token has expired\"",
89 scheme
90 );
91 response.headers_mut().insert(
92 "WWW-Authenticate",
93 www_auth.parse().unwrap(),
94 );
95 if extracted.is_dpop {
96 let nonce = crate::oauth::verify::generate_dpop_nonce();
97 response.headers_mut().insert("DPoP-Nonce", nonce.parse().unwrap());
98 }
99 }
100 response
101 })?;
102 if repo_did != auth_user.did {
103 return Err(
104 ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(),
105 );
106 }
107 if crate::util::is_account_migrated(&state.db, &auth_user.did)
108 .await
109 .unwrap_or(false)
110 {
111 return Err(ApiError::AccountMigrated.into_response());
112 }
113 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
114 .await
115 .unwrap_or(false);
116 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
117 .await
118 .unwrap_or(false);
119 if !is_verified && !is_delegated {
120 return Err(ApiError::AccountNotVerified.into_response());
121 }
122 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &auth_user.did)
123 .fetch_optional(&state.db)
124 .await
125 .map_err(|e| {
126 error!("DB error fetching user: {}", e);
127 ApiError::InternalError(None).into_response()
128 })?
129 .ok_or_else(|| ApiError::InternalError(Some("User not found".into())).into_response())?;
130 let root_cid_str: String = sqlx::query_scalar!(
131 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
132 user_id
133 )
134 .fetch_optional(&state.db)
135 .await
136 .map_err(|e| {
137 error!("DB error fetching repo root: {}", e);
138 ApiError::InternalError(None).into_response()
139 })?
140 .ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())).into_response())?;
141 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
142 ApiError::InternalError(Some("Invalid repo root CID".into())).into_response()
143 })?;
144 Ok(RepoWriteAuth {
145 did: auth_user.did.clone(),
146 user_id,
147 current_root_cid,
148 is_oauth: auth_user.is_oauth,
149 scope: auth_user.scope,
150 controller_did: auth_user.controller_did.clone(),
151 })
152}
153#[derive(Deserialize)]
154#[allow(dead_code)]
155pub struct CreateRecordInput {
156 pub repo: AtIdentifier,
157 pub collection: Nsid,
158 pub rkey: Option<Rkey>,
159 pub validate: Option<bool>,
160 pub record: serde_json::Value,
161 #[serde(rename = "swapCommit")]
162 pub swap_commit: Option<String>,
163}
164#[derive(Serialize)]
165#[serde(rename_all = "camelCase")]
166pub struct CommitInfo {
167 pub cid: String,
168 pub rev: String,
169}
170
171#[derive(Serialize)]
172#[serde(rename_all = "camelCase")]
173pub struct CreateRecordOutput {
174 pub uri: AtUri,
175 pub cid: String,
176 pub commit: CommitInfo,
177 #[serde(skip_serializing_if = "Option::is_none")]
178 pub validation_status: Option<String>,
179}
180pub async fn create_record(
181 State(state): State<AppState>,
182 headers: HeaderMap,
183 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
184 Json(input): Json<CreateRecordInput>,
185) -> Response {
186 let auth = match prepare_repo_write(
187 &state,
188 &headers,
189 &input.repo,
190 "POST",
191 &crate::util::build_full_url(&uri.to_string()),
192 )
193 .await
194 {
195 Ok(res) => res,
196 Err(err_res) => return err_res,
197 };
198
199 if let Err(e) = crate::auth::scope_check::check_repo_scope(
200 auth.is_oauth,
201 auth.scope.as_deref(),
202 crate::oauth::RepoAction::Create,
203 &input.collection,
204 ) {
205 return e;
206 }
207
208 let did = auth.did;
209 let user_id = auth.user_id;
210 let current_root_cid = auth.current_root_cid;
211 let controller_did = auth.controller_did;
212
213 if let Some(swap_commit) = &input.swap_commit
214 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
215 {
216 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
217 }
218 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
219 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
220 Ok(Some(b)) => b,
221 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
222 };
223 let commit = match Commit::from_cbor(&commit_bytes) {
224 Ok(c) => c,
225 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
226 };
227 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
228 let validation_status = if input.validate == Some(false) {
229 None
230 } else {
231 let require_lexicon = input.validate == Some(true);
232 match validate_record_with_status(
233 &input.record,
234 &input.collection,
235 input.rkey.as_ref().map(|r| r.as_str()),
236 require_lexicon,
237 ) {
238 Ok(status) => Some(status),
239 Err(err_response) => return *err_response,
240 }
241 };
242 let rkey = input.rkey.unwrap_or_else(Rkey::generate);
243 let record_ipld = crate::util::json_to_ipld(&input.record);
244 let mut record_bytes = Vec::new();
245 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
246 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response();
247 }
248 let record_cid = match tracking_store.put(&record_bytes).await {
249 Ok(c) => c,
250 _ => {
251 return ApiError::InternalError(Some("Failed to save record block".into()))
252 .into_response();
253 }
254 };
255 let key = format!("{}/{}", input.collection, rkey);
256 let new_mst = match mst.add(&key, record_cid).await {
257 Ok(m) => m,
258 _ => return ApiError::InternalError(Some("Failed to add to MST".into())).into_response(),
259 };
260 let new_mst_root = match new_mst.persist().await {
261 Ok(c) => c,
262 _ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(),
263 };
264 let op = RecordOp::Create {
265 collection: input.collection.to_string(),
266 rkey: rkey.to_string(),
267 cid: record_cid,
268 };
269 let mut new_mst_blocks = std::collections::BTreeMap::new();
270 let mut old_mst_blocks = std::collections::BTreeMap::new();
271 if new_mst
272 .blocks_for_path(&key, &mut new_mst_blocks)
273 .await
274 .is_err()
275 {
276 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
277 .into_response();
278 }
279 if mst
280 .blocks_for_path(&key, &mut old_mst_blocks)
281 .await
282 .is_err()
283 {
284 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
285 .into_response();
286 }
287 let mut relevant_blocks = new_mst_blocks.clone();
288 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
289 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
290 let written_cids: Vec<Cid> = tracking_store
291 .get_all_relevant_cids()
292 .into_iter()
293 .chain(relevant_blocks.keys().copied())
294 .collect::<std::collections::HashSet<_>>()
295 .into_iter()
296 .collect();
297 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
298 let blob_cids = extract_blob_cids(&input.record);
299 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
300 .chain(
301 old_mst_blocks
302 .keys()
303 .filter(|cid| !new_mst_blocks.contains_key(*cid))
304 .copied(),
305 )
306 .collect();
307 let commit_result = match commit_and_log(
308 &state,
309 CommitParams {
310 did: &did,
311 user_id,
312 current_root_cid: Some(current_root_cid),
313 prev_data_cid: Some(commit.data),
314 new_mst_root,
315 ops: vec![op],
316 blocks_cids: &written_cids_str,
317 blobs: &blob_cids,
318 obsolete_cids,
319 },
320 )
321 .await
322 {
323 Ok(res) => res,
324 Err(e) => return ApiError::InternalError(Some(e)).into_response(),
325 };
326
327 if let Some(ref controller) = controller_did {
328 let _ = delegation::log_delegation_action(
329 &state.db,
330 &did,
331 controller,
332 Some(controller),
333 DelegationActionType::RepoWrite,
334 Some(json!({
335 "action": "create",
336 "collection": input.collection,
337 "rkey": rkey
338 })),
339 None,
340 None,
341 )
342 .await;
343 }
344
345 (
346 StatusCode::OK,
347 Json(CreateRecordOutput {
348 uri: AtUri::from_parts(&did, &input.collection, &rkey),
349 cid: record_cid.to_string(),
350 commit: CommitInfo {
351 cid: commit_result.commit_cid.to_string(),
352 rev: commit_result.rev,
353 },
354 validation_status: validation_status.map(|s| s.to_string()),
355 }),
356 )
357 .into_response()
358}
359#[derive(Deserialize)]
360#[allow(dead_code)]
361pub struct PutRecordInput {
362 pub repo: AtIdentifier,
363 pub collection: Nsid,
364 pub rkey: Rkey,
365 pub validate: Option<bool>,
366 pub record: serde_json::Value,
367 #[serde(rename = "swapCommit")]
368 pub swap_commit: Option<String>,
369 #[serde(rename = "swapRecord")]
370 pub swap_record: Option<String>,
371}
372#[derive(Serialize)]
373#[serde(rename_all = "camelCase")]
374pub struct PutRecordOutput {
375 pub uri: AtUri,
376 pub cid: String,
377 #[serde(skip_serializing_if = "Option::is_none")]
378 pub commit: Option<CommitInfo>,
379 #[serde(skip_serializing_if = "Option::is_none")]
380 pub validation_status: Option<String>,
381}
382pub async fn put_record(
383 State(state): State<AppState>,
384 headers: HeaderMap,
385 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
386 Json(input): Json<PutRecordInput>,
387) -> Response {
388 let auth = match prepare_repo_write(
389 &state,
390 &headers,
391 &input.repo,
392 "POST",
393 &crate::util::build_full_url(&uri.to_string()),
394 )
395 .await
396 {
397 Ok(res) => res,
398 Err(err_res) => return err_res,
399 };
400
401 if let Err(e) = crate::auth::scope_check::check_repo_scope(
402 auth.is_oauth,
403 auth.scope.as_deref(),
404 crate::oauth::RepoAction::Create,
405 &input.collection,
406 ) {
407 return e;
408 }
409 if let Err(e) = crate::auth::scope_check::check_repo_scope(
410 auth.is_oauth,
411 auth.scope.as_deref(),
412 crate::oauth::RepoAction::Update,
413 &input.collection,
414 ) {
415 return e;
416 }
417
418 let did = auth.did;
419 let user_id = auth.user_id;
420 let current_root_cid = auth.current_root_cid;
421 let controller_did = auth.controller_did;
422
423 if let Some(swap_commit) = &input.swap_commit
424 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
425 {
426 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
427 }
428 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
429 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
430 Ok(Some(b)) => b,
431 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
432 };
433 let commit = match Commit::from_cbor(&commit_bytes) {
434 Ok(c) => c,
435 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
436 };
437 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
438 let key = format!("{}/{}", input.collection, input.rkey);
439 let validation_status = if input.validate == Some(false) {
440 None
441 } else {
442 let require_lexicon = input.validate == Some(true);
443 match validate_record_with_status(
444 &input.record,
445 &input.collection,
446 Some(input.rkey.as_str()),
447 require_lexicon,
448 ) {
449 Ok(status) => Some(status),
450 Err(err_response) => return *err_response,
451 }
452 };
453 if let Some(swap_record_str) = &input.swap_record {
454 let expected_cid = Cid::from_str(swap_record_str).ok();
455 let actual_cid = mst.get(&key).await.ok().flatten();
456 if expected_cid != actual_cid {
457 return ApiError::InvalidSwap(Some(
458 "Record has been modified or does not exist".into(),
459 ))
460 .into_response();
461 }
462 }
463 let existing_cid = mst.get(&key).await.ok().flatten();
464 let record_ipld = crate::util::json_to_ipld(&input.record);
465 let mut record_bytes = Vec::new();
466 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
467 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response();
468 }
469 let record_cid = match tracking_store.put(&record_bytes).await {
470 Ok(c) => c,
471 _ => {
472 return ApiError::InternalError(Some("Failed to save record block".into()))
473 .into_response();
474 }
475 };
476 if existing_cid == Some(record_cid) {
477 return (
478 StatusCode::OK,
479 Json(PutRecordOutput {
480 uri: AtUri::from_parts(&did, &input.collection, &input.rkey),
481 cid: record_cid.to_string(),
482 commit: None,
483 validation_status: validation_status.map(|s| s.to_string()),
484 }),
485 )
486 .into_response();
487 }
488 let new_mst = if existing_cid.is_some() {
489 match mst.update(&key, record_cid).await {
490 Ok(m) => m,
491 Err(_) => {
492 return ApiError::InternalError(Some("Failed to update MST".into()))
493 .into_response();
494 }
495 }
496 } else {
497 match mst.add(&key, record_cid).await {
498 Ok(m) => m,
499 Err(_) => {
500 return ApiError::InternalError(Some("Failed to add to MST".into()))
501 .into_response();
502 }
503 }
504 };
505 let new_mst_root = match new_mst.persist().await {
506 Ok(c) => c,
507 Err(_) => {
508 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response();
509 }
510 };
511 let op = if existing_cid.is_some() {
512 RecordOp::Update {
513 collection: input.collection.to_string(),
514 rkey: input.rkey.to_string(),
515 cid: record_cid,
516 prev: existing_cid,
517 }
518 } else {
519 RecordOp::Create {
520 collection: input.collection.to_string(),
521 rkey: input.rkey.to_string(),
522 cid: record_cid,
523 }
524 };
525 let mut new_mst_blocks = std::collections::BTreeMap::new();
526 let mut old_mst_blocks = std::collections::BTreeMap::new();
527 if new_mst
528 .blocks_for_path(&key, &mut new_mst_blocks)
529 .await
530 .is_err()
531 {
532 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
533 .into_response();
534 }
535 if mst
536 .blocks_for_path(&key, &mut old_mst_blocks)
537 .await
538 .is_err()
539 {
540 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
541 .into_response();
542 }
543 let mut relevant_blocks = new_mst_blocks.clone();
544 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
545 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
546 let written_cids: Vec<Cid> = tracking_store
547 .get_all_relevant_cids()
548 .into_iter()
549 .chain(relevant_blocks.keys().copied())
550 .collect::<std::collections::HashSet<_>>()
551 .into_iter()
552 .collect();
553 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
554 let is_update = existing_cid.is_some();
555 let blob_cids = extract_blob_cids(&input.record);
556 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
557 .chain(
558 old_mst_blocks
559 .keys()
560 .filter(|cid| !new_mst_blocks.contains_key(*cid))
561 .copied(),
562 )
563 .chain(existing_cid)
564 .collect();
565 let commit_result = match commit_and_log(
566 &state,
567 CommitParams {
568 did: &did,
569 user_id,
570 current_root_cid: Some(current_root_cid),
571 prev_data_cid: Some(commit.data),
572 new_mst_root,
573 ops: vec![op],
574 blocks_cids: &written_cids_str,
575 blobs: &blob_cids,
576 obsolete_cids,
577 },
578 )
579 .await
580 {
581 Ok(res) => res,
582 Err(e) => return ApiError::InternalError(Some(e)).into_response(),
583 };
584
585 if let Some(ref controller) = controller_did {
586 let _ = delegation::log_delegation_action(
587 &state.db,
588 &did,
589 controller,
590 Some(controller),
591 DelegationActionType::RepoWrite,
592 Some(json!({
593 "action": if is_update { "update" } else { "create" },
594 "collection": input.collection,
595 "rkey": input.rkey
596 })),
597 None,
598 None,
599 )
600 .await;
601 }
602
603 (
604 StatusCode::OK,
605 Json(PutRecordOutput {
606 uri: AtUri::from_parts(&did, &input.collection, &input.rkey),
607 cid: record_cid.to_string(),
608 commit: Some(CommitInfo {
609 cid: commit_result.commit_cid.to_string(),
610 rev: commit_result.rev,
611 }),
612 validation_status: validation_status.map(|s| s.to_string()),
613 }),
614 )
615 .into_response()
616}