this repo has no description
1use super::validation::validate_record_with_status;
2use crate::api::error::ApiError;
3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
4use crate::delegation::{self, DelegationActionType};
5use crate::repo::tracking::TrackingBlockStore;
6use crate::state::AppState;
7use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey};
8use axum::{
9 Json,
10 extract::State,
11 http::{HeaderMap, StatusCode},
12 response::{IntoResponse, Response},
13};
14use cid::Cid;
15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use sqlx::{PgPool, Row};
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::error;
22use uuid::Uuid;
23
24pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
25 let row = sqlx::query(
26 r#"
27 SELECT
28 email_verified,
29 discord_verified,
30 telegram_verified,
31 signal_verified
32 FROM users
33 WHERE did = $1
34 "#,
35 )
36 .bind(did)
37 .fetch_optional(db)
38 .await?;
39 match row {
40 Some(r) => {
41 let email_verified: bool = r.get("email_verified");
42 let discord_verified: bool = r.get("discord_verified");
43 let telegram_verified: bool = r.get("telegram_verified");
44 let signal_verified: bool = r.get("signal_verified");
45 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
46 }
47 None => Ok(false),
48 }
49}
50
51pub struct RepoWriteAuth {
52 pub did: Did,
53 pub user_id: Uuid,
54 pub current_root_cid: Cid,
55 pub is_oauth: bool,
56 pub scope: Option<String>,
57 pub controller_did: Option<Did>,
58}
59
60pub async fn prepare_repo_write(
61 state: &AppState,
62 headers: &HeaderMap,
63 repo_did: &str,
64 http_method: &str,
65 http_uri: &str,
66) -> Result<RepoWriteAuth, Response> {
67 let extracted = crate::auth::extract_auth_token_from_header(
68 headers.get("Authorization").and_then(|h| h.to_str().ok()),
69 )
70 .ok_or_else(|| ApiError::AuthenticationRequired.into_response())?;
71 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
72 let auth_user = crate::auth::validate_token_with_dpop(
73 &state.db,
74 &extracted.token,
75 extracted.is_dpop,
76 dpop_proof,
77 http_method,
78 http_uri,
79 false,
80 )
81 .await
82 .map_err(|e| {
83 tracing::warn!(error = ?e, is_dpop = extracted.is_dpop, "Token validation failed in prepare_repo_write");
84 let mut response = ApiError::from(e).into_response();
85 if matches!(e, crate::auth::TokenValidationError::TokenExpired) {
86 let scheme = if extracted.is_dpop { "DPoP" } else { "Bearer" };
87 let www_auth = format!(
88 "{} error=\"invalid_token\", error_description=\"Token has expired\"",
89 scheme
90 );
91 response.headers_mut().insert(
92 "WWW-Authenticate",
93 www_auth.parse().unwrap(),
94 );
95 if extracted.is_dpop {
96 let nonce = crate::oauth::verify::generate_dpop_nonce();
97 response.headers_mut().insert("DPoP-Nonce", nonce.parse().unwrap());
98 }
99 }
100 response
101 })?;
102 if repo_did != auth_user.did {
103 return Err(
104 ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(),
105 );
106 }
107 if crate::util::is_account_migrated(&state.db, &auth_user.did)
108 .await
109 .unwrap_or(false)
110 {
111 return Err(ApiError::AccountMigrated.into_response());
112 }
113 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
114 .await
115 .unwrap_or(false);
116 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
117 .await
118 .unwrap_or(false);
119 if !is_verified && !is_delegated {
120 return Err(ApiError::AccountNotVerified.into_response());
121 }
122 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &auth_user.did)
123 .fetch_optional(&state.db)
124 .await
125 .map_err(|e| {
126 error!("DB error fetching user: {}", e);
127 ApiError::InternalError(None).into_response()
128 })?
129 .ok_or_else(|| ApiError::InternalError(Some("User not found".into())).into_response())?;
130 let root_cid_str: String = sqlx::query_scalar!(
131 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
132 user_id
133 )
134 .fetch_optional(&state.db)
135 .await
136 .map_err(|e| {
137 error!("DB error fetching repo root: {}", e);
138 ApiError::InternalError(None).into_response()
139 })?
140 .ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())).into_response())?;
141 let current_root_cid = Cid::from_str(&root_cid_str)
142 .map_err(|_| ApiError::InternalError(Some("Invalid repo root CID".into())).into_response())?;
143 Ok(RepoWriteAuth {
144 did: auth_user.did.clone(),
145 user_id,
146 current_root_cid,
147 is_oauth: auth_user.is_oauth,
148 scope: auth_user.scope,
149 controller_did: auth_user.controller_did.clone(),
150 })
151}
152#[derive(Deserialize)]
153#[allow(dead_code)]
154pub struct CreateRecordInput {
155 pub repo: AtIdentifier,
156 pub collection: Nsid,
157 pub rkey: Option<Rkey>,
158 pub validate: Option<bool>,
159 pub record: serde_json::Value,
160 #[serde(rename = "swapCommit")]
161 pub swap_commit: Option<String>,
162}
163#[derive(Serialize)]
164#[serde(rename_all = "camelCase")]
165pub struct CommitInfo {
166 pub cid: String,
167 pub rev: String,
168}
169
170#[derive(Serialize)]
171#[serde(rename_all = "camelCase")]
172pub struct CreateRecordOutput {
173 pub uri: AtUri,
174 pub cid: String,
175 pub commit: CommitInfo,
176 #[serde(skip_serializing_if = "Option::is_none")]
177 pub validation_status: Option<String>,
178}
179pub async fn create_record(
180 State(state): State<AppState>,
181 headers: HeaderMap,
182 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
183 Json(input): Json<CreateRecordInput>,
184) -> Response {
185 let auth = match prepare_repo_write(
186 &state,
187 &headers,
188 &input.repo,
189 "POST",
190 &crate::util::build_full_url(&uri.to_string()),
191 )
192 .await
193 {
194 Ok(res) => res,
195 Err(err_res) => return err_res,
196 };
197
198 if let Err(e) = crate::auth::scope_check::check_repo_scope(
199 auth.is_oauth,
200 auth.scope.as_deref(),
201 crate::oauth::RepoAction::Create,
202 &input.collection,
203 ) {
204 return e;
205 }
206
207 let did = auth.did;
208 let user_id = auth.user_id;
209 let current_root_cid = auth.current_root_cid;
210 let controller_did = auth.controller_did;
211
212 if let Some(swap_commit) = &input.swap_commit
213 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
214 {
215 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
216 }
217 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
218 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
219 Ok(Some(b)) => b,
220 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
221 };
222 let commit = match Commit::from_cbor(&commit_bytes) {
223 Ok(c) => c,
224 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
225 };
226 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
227 let validation_status = if input.validate == Some(false) {
228 None
229 } else {
230 let require_lexicon = input.validate == Some(true);
231 match validate_record_with_status(
232 &input.record,
233 &input.collection,
234 input.rkey.as_ref().map(|r| r.as_str()),
235 require_lexicon,
236 ) {
237 Ok(status) => Some(status),
238 Err(err_response) => return *err_response,
239 }
240 };
241 let rkey = input.rkey.unwrap_or_else(Rkey::generate);
242 let record_ipld = crate::util::json_to_ipld(&input.record);
243 let mut record_bytes = Vec::new();
244 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
245 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response();
246 }
247 let record_cid = match tracking_store.put(&record_bytes).await {
248 Ok(c) => c,
249 _ => {
250 return ApiError::InternalError(Some("Failed to save record block".into())).into_response()
251 }
252 };
253 let key = format!("{}/{}", input.collection, rkey);
254 let new_mst = match mst.add(&key, record_cid).await {
255 Ok(m) => m,
256 _ => return ApiError::InternalError(Some("Failed to add to MST".into())).into_response(),
257 };
258 let new_mst_root = match new_mst.persist().await {
259 Ok(c) => c,
260 _ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(),
261 };
262 let op = RecordOp::Create {
263 collection: input.collection.to_string(),
264 rkey: rkey.to_string(),
265 cid: record_cid,
266 };
267 let mut relevant_blocks = std::collections::BTreeMap::new();
268 if new_mst
269 .blocks_for_path(&key, &mut relevant_blocks)
270 .await
271 .is_err()
272 {
273 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
274 .into_response();
275 }
276 if mst
277 .blocks_for_path(&key, &mut relevant_blocks)
278 .await
279 .is_err()
280 {
281 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
282 .into_response();
283 }
284 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
285 let mut written_cids = tracking_store.get_all_relevant_cids();
286 for cid in relevant_blocks.keys() {
287 if !written_cids.contains(cid) {
288 written_cids.push(*cid);
289 }
290 }
291 let written_cids_str = written_cids
292 .iter()
293 .map(|c| c.to_string())
294 .collect::<Vec<_>>();
295 let blob_cids = extract_blob_cids(&input.record);
296 let commit_result = match commit_and_log(
297 &state,
298 CommitParams {
299 did: &did,
300 user_id,
301 current_root_cid: Some(current_root_cid),
302 prev_data_cid: Some(commit.data),
303 new_mst_root,
304 ops: vec![op],
305 blocks_cids: &written_cids_str,
306 blobs: &blob_cids,
307 },
308 )
309 .await
310 {
311 Ok(res) => res,
312 Err(e) => return ApiError::InternalError(Some(e)).into_response(),
313 };
314
315 if let Some(ref controller) = controller_did {
316 let _ = delegation::log_delegation_action(
317 &state.db,
318 &did,
319 controller,
320 Some(controller),
321 DelegationActionType::RepoWrite,
322 Some(json!({
323 "action": "create",
324 "collection": input.collection,
325 "rkey": rkey
326 })),
327 None,
328 None,
329 )
330 .await;
331 }
332
333 (
334 StatusCode::OK,
335 Json(CreateRecordOutput {
336 uri: AtUri::from_parts(&did, &input.collection, &rkey),
337 cid: record_cid.to_string(),
338 commit: CommitInfo {
339 cid: commit_result.commit_cid.to_string(),
340 rev: commit_result.rev,
341 },
342 validation_status: validation_status.map(|s| s.to_string()),
343 }),
344 )
345 .into_response()
346}
347#[derive(Deserialize)]
348#[allow(dead_code)]
349pub struct PutRecordInput {
350 pub repo: AtIdentifier,
351 pub collection: Nsid,
352 pub rkey: Rkey,
353 pub validate: Option<bool>,
354 pub record: serde_json::Value,
355 #[serde(rename = "swapCommit")]
356 pub swap_commit: Option<String>,
357 #[serde(rename = "swapRecord")]
358 pub swap_record: Option<String>,
359}
360#[derive(Serialize)]
361#[serde(rename_all = "camelCase")]
362pub struct PutRecordOutput {
363 pub uri: AtUri,
364 pub cid: String,
365 #[serde(skip_serializing_if = "Option::is_none")]
366 pub commit: Option<CommitInfo>,
367 #[serde(skip_serializing_if = "Option::is_none")]
368 pub validation_status: Option<String>,
369}
370pub async fn put_record(
371 State(state): State<AppState>,
372 headers: HeaderMap,
373 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
374 Json(input): Json<PutRecordInput>,
375) -> Response {
376 let auth = match prepare_repo_write(
377 &state,
378 &headers,
379 &input.repo,
380 "POST",
381 &crate::util::build_full_url(&uri.to_string()),
382 )
383 .await
384 {
385 Ok(res) => res,
386 Err(err_res) => return err_res,
387 };
388
389 if let Err(e) = crate::auth::scope_check::check_repo_scope(
390 auth.is_oauth,
391 auth.scope.as_deref(),
392 crate::oauth::RepoAction::Create,
393 &input.collection,
394 ) {
395 return e;
396 }
397 if let Err(e) = crate::auth::scope_check::check_repo_scope(
398 auth.is_oauth,
399 auth.scope.as_deref(),
400 crate::oauth::RepoAction::Update,
401 &input.collection,
402 ) {
403 return e;
404 }
405
406 let did = auth.did;
407 let user_id = auth.user_id;
408 let current_root_cid = auth.current_root_cid;
409 let controller_did = auth.controller_did;
410
411 if let Some(swap_commit) = &input.swap_commit
412 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
413 {
414 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
415 }
416 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
417 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
418 Ok(Some(b)) => b,
419 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
420 };
421 let commit = match Commit::from_cbor(&commit_bytes) {
422 Ok(c) => c,
423 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
424 };
425 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
426 let key = format!("{}/{}", input.collection, input.rkey);
427 let validation_status = if input.validate == Some(false) {
428 None
429 } else {
430 let require_lexicon = input.validate == Some(true);
431 match validate_record_with_status(
432 &input.record,
433 &input.collection,
434 Some(input.rkey.as_str()),
435 require_lexicon,
436 ) {
437 Ok(status) => Some(status),
438 Err(err_response) => return *err_response,
439 }
440 };
441 if let Some(swap_record_str) = &input.swap_record {
442 let expected_cid = Cid::from_str(swap_record_str).ok();
443 let actual_cid = mst.get(&key).await.ok().flatten();
444 if expected_cid != actual_cid {
445 return ApiError::InvalidSwap(Some("Record has been modified or does not exist".into()))
446 .into_response();
447 }
448 }
449 let existing_cid = mst.get(&key).await.ok().flatten();
450 let record_ipld = crate::util::json_to_ipld(&input.record);
451 let mut record_bytes = Vec::new();
452 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
453 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response();
454 }
455 let record_cid = match tracking_store.put(&record_bytes).await {
456 Ok(c) => c,
457 _ => {
458 return ApiError::InternalError(Some("Failed to save record block".into())).into_response()
459 }
460 };
461 if existing_cid == Some(record_cid) {
462 return (
463 StatusCode::OK,
464 Json(PutRecordOutput {
465 uri: AtUri::from_parts(&did, &input.collection, &input.rkey),
466 cid: record_cid.to_string(),
467 commit: None,
468 validation_status: validation_status.map(|s| s.to_string()),
469 }),
470 )
471 .into_response();
472 }
473 let new_mst = if existing_cid.is_some() {
474 match mst.update(&key, record_cid).await {
475 Ok(m) => m,
476 Err(_) => {
477 return ApiError::InternalError(Some("Failed to update MST".into())).into_response()
478 }
479 }
480 } else {
481 match mst.add(&key, record_cid).await {
482 Ok(m) => m,
483 Err(_) => {
484 return ApiError::InternalError(Some("Failed to add to MST".into())).into_response()
485 }
486 }
487 };
488 let new_mst_root = match new_mst.persist().await {
489 Ok(c) => c,
490 Err(_) => {
491 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response()
492 }
493 };
494 let op = if existing_cid.is_some() {
495 RecordOp::Update {
496 collection: input.collection.to_string(),
497 rkey: input.rkey.to_string(),
498 cid: record_cid,
499 prev: existing_cid,
500 }
501 } else {
502 RecordOp::Create {
503 collection: input.collection.to_string(),
504 rkey: input.rkey.to_string(),
505 cid: record_cid,
506 }
507 };
508 let mut relevant_blocks = std::collections::BTreeMap::new();
509 if new_mst
510 .blocks_for_path(&key, &mut relevant_blocks)
511 .await
512 .is_err()
513 {
514 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
515 .into_response();
516 }
517 if mst
518 .blocks_for_path(&key, &mut relevant_blocks)
519 .await
520 .is_err()
521 {
522 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
523 .into_response();
524 }
525 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
526 let mut written_cids = tracking_store.get_all_relevant_cids();
527 for cid in relevant_blocks.keys() {
528 if !written_cids.contains(cid) {
529 written_cids.push(*cid);
530 }
531 }
532 let written_cids_str = written_cids
533 .iter()
534 .map(|c| c.to_string())
535 .collect::<Vec<_>>();
536 let is_update = existing_cid.is_some();
537 let blob_cids = extract_blob_cids(&input.record);
538 let commit_result = match commit_and_log(
539 &state,
540 CommitParams {
541 did: &did,
542 user_id,
543 current_root_cid: Some(current_root_cid),
544 prev_data_cid: Some(commit.data),
545 new_mst_root,
546 ops: vec![op],
547 blocks_cids: &written_cids_str,
548 blobs: &blob_cids,
549 },
550 )
551 .await
552 {
553 Ok(res) => res,
554 Err(e) => return ApiError::InternalError(Some(e)).into_response(),
555 };
556
557 if let Some(ref controller) = controller_did {
558 let _ = delegation::log_delegation_action(
559 &state.db,
560 &did,
561 controller,
562 Some(controller),
563 DelegationActionType::RepoWrite,
564 Some(json!({
565 "action": if is_update { "update" } else { "create" },
566 "collection": input.collection,
567 "rkey": input.rkey
568 })),
569 None,
570 None,
571 )
572 .await;
573 }
574
575 (
576 StatusCode::OK,
577 Json(PutRecordOutput {
578 uri: AtUri::from_parts(&did, &input.collection, &input.rkey),
579 cid: record_cid.to_string(),
580 commit: Some(CommitInfo {
581 cid: commit_result.commit_cid.to_string(),
582 rev: commit_result.rev,
583 }),
584 validation_status: validation_status.map(|s| s.to_string()),
585 }),
586 )
587 .into_response()
588}