this repo has no description
1use super::validation::validate_record_with_status;
2use crate::api::error::ApiError;
3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
4use crate::delegation::{self, DelegationActionType};
5use crate::repo::tracking::TrackingBlockStore;
6use crate::state::AppState;
7use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey};
8use axum::{
9 Json,
10 extract::State,
11 http::{HeaderMap, StatusCode},
12 response::{IntoResponse, Response},
13};
14use cid::Cid;
15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use sqlx::{PgPool, Row};
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::error;
22use uuid::Uuid;
23
24pub async fn has_verified_comms_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
25 let row = sqlx::query(
26 r#"
27 SELECT
28 email_verified,
29 discord_verified,
30 telegram_verified,
31 signal_verified
32 FROM users
33 WHERE did = $1
34 "#,
35 )
36 .bind(did)
37 .fetch_optional(db)
38 .await?;
39 match row {
40 Some(r) => {
41 let email_verified: bool = r.get("email_verified");
42 let discord_verified: bool = r.get("discord_verified");
43 let telegram_verified: bool = r.get("telegram_verified");
44 let signal_verified: bool = r.get("signal_verified");
45 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
46 }
47 None => Ok(false),
48 }
49}
50
51pub struct RepoWriteAuth {
52 pub did: Did,
53 pub user_id: Uuid,
54 pub current_root_cid: Cid,
55 pub is_oauth: bool,
56 pub scope: Option<String>,
57 pub controller_did: Option<Did>,
58}
59
60pub async fn prepare_repo_write(
61 state: &AppState,
62 headers: &HeaderMap,
63 repo_did: &str,
64 http_method: &str,
65 http_uri: &str,
66) -> Result<RepoWriteAuth, Response> {
67 let extracted = crate::auth::extract_auth_token_from_header(
68 headers.get("Authorization").and_then(|h| h.to_str().ok()),
69 )
70 .ok_or_else(|| ApiError::AuthenticationRequired.into_response())?;
71 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
72 let auth_user = crate::auth::validate_token_with_dpop(
73 &state.db,
74 &extracted.token,
75 extracted.is_dpop,
76 dpop_proof,
77 http_method,
78 http_uri,
79 false,
80 false,
81 )
82 .await
83 .map_err(|e| {
84 tracing::warn!(error = ?e, is_dpop = extracted.is_dpop, "Token validation failed in prepare_repo_write");
85 let mut response = ApiError::from(e).into_response();
86 if matches!(e, crate::auth::TokenValidationError::TokenExpired) && extracted.is_dpop {
87 *response.status_mut() = axum::http::StatusCode::UNAUTHORIZED;
88 let www_auth =
89 "DPoP error=\"invalid_token\", error_description=\"Token has expired\"";
90 response.headers_mut().insert(
91 "WWW-Authenticate",
92 www_auth.parse().unwrap(),
93 );
94 let nonce = crate::oauth::verify::generate_dpop_nonce();
95 response.headers_mut().insert("DPoP-Nonce", nonce.parse().unwrap());
96 }
97 response
98 })?;
99 if repo_did != auth_user.did {
100 return Err(
101 ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(),
102 );
103 }
104 if crate::util::is_account_migrated(&state.db, &auth_user.did)
105 .await
106 .unwrap_or(false)
107 {
108 return Err(ApiError::AccountMigrated.into_response());
109 }
110 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
111 .await
112 .unwrap_or(false);
113 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
114 .await
115 .unwrap_or(false);
116 if !is_verified && !is_delegated {
117 return Err(ApiError::AccountNotVerified.into_response());
118 }
119 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &auth_user.did)
120 .fetch_optional(&state.db)
121 .await
122 .map_err(|e| {
123 error!("DB error fetching user: {}", e);
124 ApiError::InternalError(None).into_response()
125 })?
126 .ok_or_else(|| ApiError::InternalError(Some("User not found".into())).into_response())?;
127 let root_cid_str: String = sqlx::query_scalar!(
128 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
129 user_id
130 )
131 .fetch_optional(&state.db)
132 .await
133 .map_err(|e| {
134 error!("DB error fetching repo root: {}", e);
135 ApiError::InternalError(None).into_response()
136 })?
137 .ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())).into_response())?;
138 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
139 ApiError::InternalError(Some("Invalid repo root CID".into())).into_response()
140 })?;
141 Ok(RepoWriteAuth {
142 did: auth_user.did.clone(),
143 user_id,
144 current_root_cid,
145 is_oauth: auth_user.is_oauth,
146 scope: auth_user.scope,
147 controller_did: auth_user.controller_did.clone(),
148 })
149}
150#[derive(Deserialize)]
151#[allow(dead_code)]
152pub struct CreateRecordInput {
153 pub repo: AtIdentifier,
154 pub collection: Nsid,
155 pub rkey: Option<Rkey>,
156 pub validate: Option<bool>,
157 pub record: serde_json::Value,
158 #[serde(rename = "swapCommit")]
159 pub swap_commit: Option<String>,
160}
161#[derive(Serialize)]
162#[serde(rename_all = "camelCase")]
163pub struct CommitInfo {
164 pub cid: String,
165 pub rev: String,
166}
167
168#[derive(Serialize)]
169#[serde(rename_all = "camelCase")]
170pub struct CreateRecordOutput {
171 pub uri: AtUri,
172 pub cid: String,
173 pub commit: CommitInfo,
174 #[serde(skip_serializing_if = "Option::is_none")]
175 pub validation_status: Option<String>,
176}
177pub async fn create_record(
178 State(state): State<AppState>,
179 headers: HeaderMap,
180 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
181 Json(input): Json<CreateRecordInput>,
182) -> Response {
183 let auth = match prepare_repo_write(
184 &state,
185 &headers,
186 &input.repo,
187 "POST",
188 &crate::util::build_full_url(&uri.to_string()),
189 )
190 .await
191 {
192 Ok(res) => res,
193 Err(err_res) => return err_res,
194 };
195
196 if let Err(e) = crate::auth::scope_check::check_repo_scope(
197 auth.is_oauth,
198 auth.scope.as_deref(),
199 crate::oauth::RepoAction::Create,
200 &input.collection,
201 ) {
202 return e;
203 }
204
205 let did = auth.did;
206 let user_id = auth.user_id;
207 let current_root_cid = auth.current_root_cid;
208 let controller_did = auth.controller_did;
209
210 if let Some(swap_commit) = &input.swap_commit
211 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
212 {
213 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
214 }
215 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
216 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
217 Ok(Some(b)) => b,
218 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
219 };
220 let commit = match Commit::from_cbor(&commit_bytes) {
221 Ok(c) => c,
222 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
223 };
224 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
225 let validation_status = if input.validate == Some(false) {
226 None
227 } else {
228 let require_lexicon = input.validate == Some(true);
229 match validate_record_with_status(
230 &input.record,
231 &input.collection,
232 input.rkey.as_ref().map(|r| r.as_str()),
233 require_lexicon,
234 ) {
235 Ok(status) => Some(status),
236 Err(err_response) => return *err_response,
237 }
238 };
239 let rkey = input.rkey.unwrap_or_else(Rkey::generate);
240 let record_ipld = crate::util::json_to_ipld(&input.record);
241 let mut record_bytes = Vec::new();
242 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
243 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response();
244 }
245 let record_cid = match tracking_store.put(&record_bytes).await {
246 Ok(c) => c,
247 _ => {
248 return ApiError::InternalError(Some("Failed to save record block".into()))
249 .into_response();
250 }
251 };
252 let key = format!("{}/{}", input.collection, rkey);
253 let new_mst = match mst.add(&key, record_cid).await {
254 Ok(m) => m,
255 _ => return ApiError::InternalError(Some("Failed to add to MST".into())).into_response(),
256 };
257 let new_mst_root = match new_mst.persist().await {
258 Ok(c) => c,
259 _ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(),
260 };
261 let op = RecordOp::Create {
262 collection: input.collection.to_string(),
263 rkey: rkey.to_string(),
264 cid: record_cid,
265 };
266 let mut new_mst_blocks = std::collections::BTreeMap::new();
267 let mut old_mst_blocks = std::collections::BTreeMap::new();
268 if new_mst
269 .blocks_for_path(&key, &mut new_mst_blocks)
270 .await
271 .is_err()
272 {
273 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
274 .into_response();
275 }
276 if mst
277 .blocks_for_path(&key, &mut old_mst_blocks)
278 .await
279 .is_err()
280 {
281 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
282 .into_response();
283 }
284 let mut relevant_blocks = new_mst_blocks.clone();
285 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
286 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
287 let written_cids: Vec<Cid> = tracking_store
288 .get_all_relevant_cids()
289 .into_iter()
290 .chain(relevant_blocks.keys().copied())
291 .collect::<std::collections::HashSet<_>>()
292 .into_iter()
293 .collect();
294 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
295 let blob_cids = extract_blob_cids(&input.record);
296 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
297 .chain(
298 old_mst_blocks
299 .keys()
300 .filter(|cid| !new_mst_blocks.contains_key(*cid))
301 .copied(),
302 )
303 .collect();
304 let commit_result = match commit_and_log(
305 &state,
306 CommitParams {
307 did: &did,
308 user_id,
309 current_root_cid: Some(current_root_cid),
310 prev_data_cid: Some(commit.data),
311 new_mst_root,
312 ops: vec![op],
313 blocks_cids: &written_cids_str,
314 blobs: &blob_cids,
315 obsolete_cids,
316 },
317 )
318 .await
319 {
320 Ok(res) => res,
321 Err(e) if e.contains("ConcurrentModification") => {
322 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
323 }
324 Err(e) => return ApiError::InternalError(Some(e)).into_response(),
325 };
326
327 if let Some(ref controller) = controller_did {
328 let _ = delegation::log_delegation_action(
329 &state.db,
330 &did,
331 controller,
332 Some(controller),
333 DelegationActionType::RepoWrite,
334 Some(json!({
335 "action": "create",
336 "collection": input.collection,
337 "rkey": rkey
338 })),
339 None,
340 None,
341 )
342 .await;
343 }
344
345 (
346 StatusCode::OK,
347 Json(CreateRecordOutput {
348 uri: AtUri::from_parts(&did, &input.collection, &rkey),
349 cid: record_cid.to_string(),
350 commit: CommitInfo {
351 cid: commit_result.commit_cid.to_string(),
352 rev: commit_result.rev,
353 },
354 validation_status: validation_status.map(|s| s.to_string()),
355 }),
356 )
357 .into_response()
358}
359#[derive(Deserialize)]
360#[allow(dead_code)]
361pub struct PutRecordInput {
362 pub repo: AtIdentifier,
363 pub collection: Nsid,
364 pub rkey: Rkey,
365 pub validate: Option<bool>,
366 pub record: serde_json::Value,
367 #[serde(rename = "swapCommit")]
368 pub swap_commit: Option<String>,
369 #[serde(rename = "swapRecord")]
370 pub swap_record: Option<String>,
371}
372#[derive(Serialize)]
373#[serde(rename_all = "camelCase")]
374pub struct PutRecordOutput {
375 pub uri: AtUri,
376 pub cid: String,
377 #[serde(skip_serializing_if = "Option::is_none")]
378 pub commit: Option<CommitInfo>,
379 #[serde(skip_serializing_if = "Option::is_none")]
380 pub validation_status: Option<String>,
381}
382pub async fn put_record(
383 State(state): State<AppState>,
384 headers: HeaderMap,
385 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
386 Json(input): Json<PutRecordInput>,
387) -> Response {
388 let auth = match prepare_repo_write(
389 &state,
390 &headers,
391 &input.repo,
392 "POST",
393 &crate::util::build_full_url(&uri.to_string()),
394 )
395 .await
396 {
397 Ok(res) => res,
398 Err(err_res) => return err_res,
399 };
400
401 if let Err(e) = crate::auth::scope_check::check_repo_scope(
402 auth.is_oauth,
403 auth.scope.as_deref(),
404 crate::oauth::RepoAction::Create,
405 &input.collection,
406 ) {
407 return e;
408 }
409 if let Err(e) = crate::auth::scope_check::check_repo_scope(
410 auth.is_oauth,
411 auth.scope.as_deref(),
412 crate::oauth::RepoAction::Update,
413 &input.collection,
414 ) {
415 return e;
416 }
417
418 let did = auth.did;
419 let user_id = auth.user_id;
420 let current_root_cid = auth.current_root_cid;
421 let controller_did = auth.controller_did;
422
423 if let Some(swap_commit) = &input.swap_commit
424 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
425 {
426 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
427 }
428 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
429 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
430 Ok(Some(b)) => b,
431 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
432 };
433 let commit = match Commit::from_cbor(&commit_bytes) {
434 Ok(c) => c,
435 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
436 };
437 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
438 let key = format!("{}/{}", input.collection, input.rkey);
439 let validation_status = if input.validate == Some(false) {
440 None
441 } else {
442 let require_lexicon = input.validate == Some(true);
443 match validate_record_with_status(
444 &input.record,
445 &input.collection,
446 Some(input.rkey.as_str()),
447 require_lexicon,
448 ) {
449 Ok(status) => Some(status),
450 Err(err_response) => return *err_response,
451 }
452 };
453 if let Some(swap_record_str) = &input.swap_record {
454 let expected_cid = Cid::from_str(swap_record_str).ok();
455 let actual_cid = mst.get(&key).await.ok().flatten();
456 if expected_cid != actual_cid {
457 return ApiError::InvalidSwap(Some(
458 "Record has been modified or does not exist".into(),
459 ))
460 .into_response();
461 }
462 }
463 let existing_cid = mst.get(&key).await.ok().flatten();
464 let record_ipld = crate::util::json_to_ipld(&input.record);
465 let mut record_bytes = Vec::new();
466 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
467 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response();
468 }
469 let record_cid = match tracking_store.put(&record_bytes).await {
470 Ok(c) => c,
471 _ => {
472 return ApiError::InternalError(Some("Failed to save record block".into()))
473 .into_response();
474 }
475 };
476 if existing_cid == Some(record_cid) {
477 return (
478 StatusCode::OK,
479 Json(PutRecordOutput {
480 uri: AtUri::from_parts(&did, &input.collection, &input.rkey),
481 cid: record_cid.to_string(),
482 commit: None,
483 validation_status: validation_status.map(|s| s.to_string()),
484 }),
485 )
486 .into_response();
487 }
488 let new_mst = if existing_cid.is_some() {
489 match mst.update(&key, record_cid).await {
490 Ok(m) => m,
491 Err(_) => {
492 return ApiError::InternalError(Some("Failed to update MST".into()))
493 .into_response();
494 }
495 }
496 } else {
497 match mst.add(&key, record_cid).await {
498 Ok(m) => m,
499 Err(_) => {
500 return ApiError::InternalError(Some("Failed to add to MST".into()))
501 .into_response();
502 }
503 }
504 };
505 let new_mst_root = match new_mst.persist().await {
506 Ok(c) => c,
507 Err(_) => {
508 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response();
509 }
510 };
511 let op = if existing_cid.is_some() {
512 RecordOp::Update {
513 collection: input.collection.to_string(),
514 rkey: input.rkey.to_string(),
515 cid: record_cid,
516 prev: existing_cid,
517 }
518 } else {
519 RecordOp::Create {
520 collection: input.collection.to_string(),
521 rkey: input.rkey.to_string(),
522 cid: record_cid,
523 }
524 };
525 let mut new_mst_blocks = std::collections::BTreeMap::new();
526 let mut old_mst_blocks = std::collections::BTreeMap::new();
527 if new_mst
528 .blocks_for_path(&key, &mut new_mst_blocks)
529 .await
530 .is_err()
531 {
532 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
533 .into_response();
534 }
535 if mst
536 .blocks_for_path(&key, &mut old_mst_blocks)
537 .await
538 .is_err()
539 {
540 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
541 .into_response();
542 }
543 let mut relevant_blocks = new_mst_blocks.clone();
544 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
545 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
546 let written_cids: Vec<Cid> = tracking_store
547 .get_all_relevant_cids()
548 .into_iter()
549 .chain(relevant_blocks.keys().copied())
550 .collect::<std::collections::HashSet<_>>()
551 .into_iter()
552 .collect();
553 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
554 let is_update = existing_cid.is_some();
555 let blob_cids = extract_blob_cids(&input.record);
556 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
557 .chain(
558 old_mst_blocks
559 .keys()
560 .filter(|cid| !new_mst_blocks.contains_key(*cid))
561 .copied(),
562 )
563 .chain(existing_cid)
564 .collect();
565 let commit_result = match commit_and_log(
566 &state,
567 CommitParams {
568 did: &did,
569 user_id,
570 current_root_cid: Some(current_root_cid),
571 prev_data_cid: Some(commit.data),
572 new_mst_root,
573 ops: vec![op],
574 blocks_cids: &written_cids_str,
575 blobs: &blob_cids,
576 obsolete_cids,
577 },
578 )
579 .await
580 {
581 Ok(res) => res,
582 Err(e) if e.contains("ConcurrentModification") => {
583 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
584 }
585 Err(e) => return ApiError::InternalError(Some(e)).into_response(),
586 };
587
588 if let Some(ref controller) = controller_did {
589 let _ = delegation::log_delegation_action(
590 &state.db,
591 &did,
592 controller,
593 Some(controller),
594 DelegationActionType::RepoWrite,
595 Some(json!({
596 "action": if is_update { "update" } else { "create" },
597 "collection": input.collection,
598 "rkey": input.rkey
599 })),
600 None,
601 None,
602 )
603 .await;
604 }
605
606 (
607 StatusCode::OK,
608 Json(PutRecordOutput {
609 uri: AtUri::from_parts(&did, &input.collection, &input.rkey),
610 cid: record_cid.to_string(),
611 commit: Some(CommitInfo {
612 cid: commit_result.commit_cid.to_string(),
613 rev: commit_result.rev,
614 }),
615 validation_status: validation_status.map(|s| s.to_string()),
616 }),
617 )
618 .into_response()
619}