this repo has no description
1use super::validation::validate_record_with_status;
2use crate::api::error::ApiError;
3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
4use crate::delegation::{self, DelegationActionType};
5use crate::repo::tracking::TrackingBlockStore;
6use crate::state::AppState;
7use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey};
8use axum::{
9 Json,
10 extract::State,
11 http::{HeaderMap, StatusCode},
12 response::{IntoResponse, Response},
13};
14use cid::Cid;
15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use sqlx::{PgPool, Row};
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::error;
22use uuid::Uuid;
23
24pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
25 let row = sqlx::query(
26 r#"
27 SELECT
28 email_verified,
29 discord_verified,
30 telegram_verified,
31 signal_verified
32 FROM users
33 WHERE did = $1
34 "#,
35 )
36 .bind(did)
37 .fetch_optional(db)
38 .await?;
39 match row {
40 Some(r) => {
41 let email_verified: bool = r.get("email_verified");
42 let discord_verified: bool = r.get("discord_verified");
43 let telegram_verified: bool = r.get("telegram_verified");
44 let signal_verified: bool = r.get("signal_verified");
45 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
46 }
47 None => Ok(false),
48 }
49}
50
51pub struct RepoWriteAuth {
52 pub did: Did,
53 pub user_id: Uuid,
54 pub current_root_cid: Cid,
55 pub is_oauth: bool,
56 pub scope: Option<String>,
57 pub controller_did: Option<Did>,
58}
59
60pub async fn prepare_repo_write(
61 state: &AppState,
62 headers: &HeaderMap,
63 repo_did: &str,
64 http_method: &str,
65 http_uri: &str,
66) -> Result<RepoWriteAuth, Response> {
67 let extracted = crate::auth::extract_auth_token_from_header(
68 headers.get("Authorization").and_then(|h| h.to_str().ok()),
69 )
70 .ok_or_else(|| ApiError::AuthenticationRequired.into_response())?;
71 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
72 let auth_user = crate::auth::validate_token_with_dpop(
73 &state.db,
74 &extracted.token,
75 extracted.is_dpop,
76 dpop_proof,
77 http_method,
78 http_uri,
79 false,
80 )
81 .await
82 .map_err(|e| {
83 tracing::warn!(error = ?e, is_dpop = extracted.is_dpop, "Token validation failed in prepare_repo_write");
84 let mut response = ApiError::from(e).into_response();
85 if matches!(e, crate::auth::TokenValidationError::TokenExpired) {
86 let scheme = if extracted.is_dpop { "DPoP" } else { "Bearer" };
87 let www_auth = format!(
88 "{} error=\"invalid_token\", error_description=\"Token has expired\"",
89 scheme
90 );
91 response.headers_mut().insert(
92 "WWW-Authenticate",
93 www_auth.parse().unwrap(),
94 );
95 if extracted.is_dpop {
96 let nonce = crate::oauth::verify::generate_dpop_nonce();
97 response.headers_mut().insert("DPoP-Nonce", nonce.parse().unwrap());
98 }
99 }
100 response
101 })?;
102 if repo_did != auth_user.did {
103 return Err(
104 ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(),
105 );
106 }
107 if crate::util::is_account_migrated(&state.db, &auth_user.did)
108 .await
109 .unwrap_or(false)
110 {
111 return Err(ApiError::AccountMigrated.into_response());
112 }
113 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
114 .await
115 .unwrap_or(false);
116 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
117 .await
118 .unwrap_or(false);
119 if !is_verified && !is_delegated {
120 return Err(ApiError::AccountNotVerified.into_response());
121 }
122 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &auth_user.did)
123 .fetch_optional(&state.db)
124 .await
125 .map_err(|e| {
126 error!("DB error fetching user: {}", e);
127 ApiError::InternalError(None).into_response()
128 })?
129 .ok_or_else(|| ApiError::InternalError(Some("User not found".into())).into_response())?;
130 let root_cid_str: String = sqlx::query_scalar!(
131 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
132 user_id
133 )
134 .fetch_optional(&state.db)
135 .await
136 .map_err(|e| {
137 error!("DB error fetching repo root: {}", e);
138 ApiError::InternalError(None).into_response()
139 })?
140 .ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())).into_response())?;
141 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
142 ApiError::InternalError(Some("Invalid repo root CID".into())).into_response()
143 })?;
144 Ok(RepoWriteAuth {
145 did: auth_user.did.clone(),
146 user_id,
147 current_root_cid,
148 is_oauth: auth_user.is_oauth,
149 scope: auth_user.scope,
150 controller_did: auth_user.controller_did.clone(),
151 })
152}
153#[derive(Deserialize)]
154#[allow(dead_code)]
155pub struct CreateRecordInput {
156 pub repo: AtIdentifier,
157 pub collection: Nsid,
158 pub rkey: Option<Rkey>,
159 pub validate: Option<bool>,
160 pub record: serde_json::Value,
161 #[serde(rename = "swapCommit")]
162 pub swap_commit: Option<String>,
163}
164#[derive(Serialize)]
165#[serde(rename_all = "camelCase")]
166pub struct CommitInfo {
167 pub cid: String,
168 pub rev: String,
169}
170
171#[derive(Serialize)]
172#[serde(rename_all = "camelCase")]
173pub struct CreateRecordOutput {
174 pub uri: AtUri,
175 pub cid: String,
176 pub commit: CommitInfo,
177 #[serde(skip_serializing_if = "Option::is_none")]
178 pub validation_status: Option<String>,
179}
180pub async fn create_record(
181 State(state): State<AppState>,
182 headers: HeaderMap,
183 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
184 Json(input): Json<CreateRecordInput>,
185) -> Response {
186 let auth = match prepare_repo_write(
187 &state,
188 &headers,
189 &input.repo,
190 "POST",
191 &crate::util::build_full_url(&uri.to_string()),
192 )
193 .await
194 {
195 Ok(res) => res,
196 Err(err_res) => return err_res,
197 };
198
199 if let Err(e) = crate::auth::scope_check::check_repo_scope(
200 auth.is_oauth,
201 auth.scope.as_deref(),
202 crate::oauth::RepoAction::Create,
203 &input.collection,
204 ) {
205 return e;
206 }
207
208 let did = auth.did;
209 let user_id = auth.user_id;
210 let current_root_cid = auth.current_root_cid;
211 let controller_did = auth.controller_did;
212
213 if let Some(swap_commit) = &input.swap_commit
214 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
215 {
216 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
217 }
218 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
219 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
220 Ok(Some(b)) => b,
221 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
222 };
223 let commit = match Commit::from_cbor(&commit_bytes) {
224 Ok(c) => c,
225 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
226 };
227 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
228 let validation_status = if input.validate == Some(false) {
229 None
230 } else {
231 let require_lexicon = input.validate == Some(true);
232 match validate_record_with_status(
233 &input.record,
234 &input.collection,
235 input.rkey.as_ref().map(|r| r.as_str()),
236 require_lexicon,
237 ) {
238 Ok(status) => Some(status),
239 Err(err_response) => return *err_response,
240 }
241 };
242 let rkey = input.rkey.unwrap_or_else(Rkey::generate);
243 let record_ipld = crate::util::json_to_ipld(&input.record);
244 let mut record_bytes = Vec::new();
245 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
246 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response();
247 }
248 let record_cid = match tracking_store.put(&record_bytes).await {
249 Ok(c) => c,
250 _ => {
251 return ApiError::InternalError(Some("Failed to save record block".into()))
252 .into_response();
253 }
254 };
255 let key = format!("{}/{}", input.collection, rkey);
256 let new_mst = match mst.add(&key, record_cid).await {
257 Ok(m) => m,
258 _ => return ApiError::InternalError(Some("Failed to add to MST".into())).into_response(),
259 };
260 let new_mst_root = match new_mst.persist().await {
261 Ok(c) => c,
262 _ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(),
263 };
264 let op = RecordOp::Create {
265 collection: input.collection.to_string(),
266 rkey: rkey.to_string(),
267 cid: record_cid,
268 };
269 let mut relevant_blocks = std::collections::BTreeMap::new();
270 if new_mst
271 .blocks_for_path(&key, &mut relevant_blocks)
272 .await
273 .is_err()
274 {
275 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
276 .into_response();
277 }
278 if mst
279 .blocks_for_path(&key, &mut relevant_blocks)
280 .await
281 .is_err()
282 {
283 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
284 .into_response();
285 }
286 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
287 let mut written_cids = tracking_store.get_all_relevant_cids();
288 for cid in relevant_blocks.keys() {
289 if !written_cids.contains(cid) {
290 written_cids.push(*cid);
291 }
292 }
293 let written_cids_str = written_cids
294 .iter()
295 .map(|c| c.to_string())
296 .collect::<Vec<_>>();
297 let blob_cids = extract_blob_cids(&input.record);
298 let commit_result = match commit_and_log(
299 &state,
300 CommitParams {
301 did: &did,
302 user_id,
303 current_root_cid: Some(current_root_cid),
304 prev_data_cid: Some(commit.data),
305 new_mst_root,
306 ops: vec![op],
307 blocks_cids: &written_cids_str,
308 blobs: &blob_cids,
309 },
310 )
311 .await
312 {
313 Ok(res) => res,
314 Err(e) => return ApiError::InternalError(Some(e)).into_response(),
315 };
316
317 if let Some(ref controller) = controller_did {
318 let _ = delegation::log_delegation_action(
319 &state.db,
320 &did,
321 controller,
322 Some(controller),
323 DelegationActionType::RepoWrite,
324 Some(json!({
325 "action": "create",
326 "collection": input.collection,
327 "rkey": rkey
328 })),
329 None,
330 None,
331 )
332 .await;
333 }
334
335 (
336 StatusCode::OK,
337 Json(CreateRecordOutput {
338 uri: AtUri::from_parts(&did, &input.collection, &rkey),
339 cid: record_cid.to_string(),
340 commit: CommitInfo {
341 cid: commit_result.commit_cid.to_string(),
342 rev: commit_result.rev,
343 },
344 validation_status: validation_status.map(|s| s.to_string()),
345 }),
346 )
347 .into_response()
348}
349#[derive(Deserialize)]
350#[allow(dead_code)]
351pub struct PutRecordInput {
352 pub repo: AtIdentifier,
353 pub collection: Nsid,
354 pub rkey: Rkey,
355 pub validate: Option<bool>,
356 pub record: serde_json::Value,
357 #[serde(rename = "swapCommit")]
358 pub swap_commit: Option<String>,
359 #[serde(rename = "swapRecord")]
360 pub swap_record: Option<String>,
361}
362#[derive(Serialize)]
363#[serde(rename_all = "camelCase")]
364pub struct PutRecordOutput {
365 pub uri: AtUri,
366 pub cid: String,
367 #[serde(skip_serializing_if = "Option::is_none")]
368 pub commit: Option<CommitInfo>,
369 #[serde(skip_serializing_if = "Option::is_none")]
370 pub validation_status: Option<String>,
371}
372pub async fn put_record(
373 State(state): State<AppState>,
374 headers: HeaderMap,
375 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
376 Json(input): Json<PutRecordInput>,
377) -> Response {
378 let auth = match prepare_repo_write(
379 &state,
380 &headers,
381 &input.repo,
382 "POST",
383 &crate::util::build_full_url(&uri.to_string()),
384 )
385 .await
386 {
387 Ok(res) => res,
388 Err(err_res) => return err_res,
389 };
390
391 if let Err(e) = crate::auth::scope_check::check_repo_scope(
392 auth.is_oauth,
393 auth.scope.as_deref(),
394 crate::oauth::RepoAction::Create,
395 &input.collection,
396 ) {
397 return e;
398 }
399 if let Err(e) = crate::auth::scope_check::check_repo_scope(
400 auth.is_oauth,
401 auth.scope.as_deref(),
402 crate::oauth::RepoAction::Update,
403 &input.collection,
404 ) {
405 return e;
406 }
407
408 let did = auth.did;
409 let user_id = auth.user_id;
410 let current_root_cid = auth.current_root_cid;
411 let controller_did = auth.controller_did;
412
413 if let Some(swap_commit) = &input.swap_commit
414 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
415 {
416 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
417 }
418 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
419 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
420 Ok(Some(b)) => b,
421 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
422 };
423 let commit = match Commit::from_cbor(&commit_bytes) {
424 Ok(c) => c,
425 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
426 };
427 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
428 let key = format!("{}/{}", input.collection, input.rkey);
429 let validation_status = if input.validate == Some(false) {
430 None
431 } else {
432 let require_lexicon = input.validate == Some(true);
433 match validate_record_with_status(
434 &input.record,
435 &input.collection,
436 Some(input.rkey.as_str()),
437 require_lexicon,
438 ) {
439 Ok(status) => Some(status),
440 Err(err_response) => return *err_response,
441 }
442 };
443 if let Some(swap_record_str) = &input.swap_record {
444 let expected_cid = Cid::from_str(swap_record_str).ok();
445 let actual_cid = mst.get(&key).await.ok().flatten();
446 if expected_cid != actual_cid {
447 return ApiError::InvalidSwap(Some(
448 "Record has been modified or does not exist".into(),
449 ))
450 .into_response();
451 }
452 }
453 let existing_cid = mst.get(&key).await.ok().flatten();
454 let record_ipld = crate::util::json_to_ipld(&input.record);
455 let mut record_bytes = Vec::new();
456 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
457 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response();
458 }
459 let record_cid = match tracking_store.put(&record_bytes).await {
460 Ok(c) => c,
461 _ => {
462 return ApiError::InternalError(Some("Failed to save record block".into()))
463 .into_response();
464 }
465 };
466 if existing_cid == Some(record_cid) {
467 return (
468 StatusCode::OK,
469 Json(PutRecordOutput {
470 uri: AtUri::from_parts(&did, &input.collection, &input.rkey),
471 cid: record_cid.to_string(),
472 commit: None,
473 validation_status: validation_status.map(|s| s.to_string()),
474 }),
475 )
476 .into_response();
477 }
478 let new_mst = if existing_cid.is_some() {
479 match mst.update(&key, record_cid).await {
480 Ok(m) => m,
481 Err(_) => {
482 return ApiError::InternalError(Some("Failed to update MST".into()))
483 .into_response();
484 }
485 }
486 } else {
487 match mst.add(&key, record_cid).await {
488 Ok(m) => m,
489 Err(_) => {
490 return ApiError::InternalError(Some("Failed to add to MST".into()))
491 .into_response();
492 }
493 }
494 };
495 let new_mst_root = match new_mst.persist().await {
496 Ok(c) => c,
497 Err(_) => {
498 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response();
499 }
500 };
501 let op = if existing_cid.is_some() {
502 RecordOp::Update {
503 collection: input.collection.to_string(),
504 rkey: input.rkey.to_string(),
505 cid: record_cid,
506 prev: existing_cid,
507 }
508 } else {
509 RecordOp::Create {
510 collection: input.collection.to_string(),
511 rkey: input.rkey.to_string(),
512 cid: record_cid,
513 }
514 };
515 let mut relevant_blocks = std::collections::BTreeMap::new();
516 if new_mst
517 .blocks_for_path(&key, &mut relevant_blocks)
518 .await
519 .is_err()
520 {
521 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
522 .into_response();
523 }
524 if mst
525 .blocks_for_path(&key, &mut relevant_blocks)
526 .await
527 .is_err()
528 {
529 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
530 .into_response();
531 }
532 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
533 let mut written_cids = tracking_store.get_all_relevant_cids();
534 for cid in relevant_blocks.keys() {
535 if !written_cids.contains(cid) {
536 written_cids.push(*cid);
537 }
538 }
539 let written_cids_str = written_cids
540 .iter()
541 .map(|c| c.to_string())
542 .collect::<Vec<_>>();
543 let is_update = existing_cid.is_some();
544 let blob_cids = extract_blob_cids(&input.record);
545 let commit_result = match commit_and_log(
546 &state,
547 CommitParams {
548 did: &did,
549 user_id,
550 current_root_cid: Some(current_root_cid),
551 prev_data_cid: Some(commit.data),
552 new_mst_root,
553 ops: vec![op],
554 blocks_cids: &written_cids_str,
555 blobs: &blob_cids,
556 },
557 )
558 .await
559 {
560 Ok(res) => res,
561 Err(e) => return ApiError::InternalError(Some(e)).into_response(),
562 };
563
564 if let Some(ref controller) = controller_did {
565 let _ = delegation::log_delegation_action(
566 &state.db,
567 &did,
568 controller,
569 Some(controller),
570 DelegationActionType::RepoWrite,
571 Some(json!({
572 "action": if is_update { "update" } else { "create" },
573 "collection": input.collection,
574 "rkey": input.rkey
575 })),
576 None,
577 None,
578 )
579 .await;
580 }
581
582 (
583 StatusCode::OK,
584 Json(PutRecordOutput {
585 uri: AtUri::from_parts(&did, &input.collection, &input.rkey),
586 cid: record_cid.to_string(),
587 commit: Some(CommitInfo {
588 cid: commit_result.commit_cid.to_string(),
589 rev: commit_result.rev,
590 }),
591 validation_status: validation_status.map(|s| s.to_string()),
592 }),
593 )
594 .into_response()
595}