this repo has no description
1use super::validation::validate_record_with_status;
2use crate::api::error::ApiError;
3use crate::api::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
4use crate::delegation::{self, DelegationActionType};
5use crate::repo::tracking::TrackingBlockStore;
6use crate::state::AppState;
7use crate::types::{AtIdentifier, AtUri, Did, Nsid, Rkey};
8use axum::{
9 Json,
10 extract::State,
11 http::{HeaderMap, StatusCode},
12 response::{IntoResponse, Response},
13};
14use cid::Cid;
15use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
16use serde::{Deserialize, Serialize};
17use serde_json::json;
18use sqlx::{PgPool, Row};
19use std::str::FromStr;
20use std::sync::Arc;
21use tracing::error;
22use uuid::Uuid;
23
24pub async fn has_verified_comms_channel(db: &PgPool, did: &Did) -> Result<bool, sqlx::Error> {
25 let row = sqlx::query(
26 r#"
27 SELECT
28 email_verified,
29 discord_verified,
30 telegram_verified,
31 signal_verified
32 FROM users
33 WHERE did = $1
34 "#,
35 )
36 .bind(did.as_str())
37 .fetch_optional(db)
38 .await?;
39 match row {
40 Some(r) => {
41 let email_verified: bool = r.get("email_verified");
42 let discord_verified: bool = r.get("discord_verified");
43 let telegram_verified: bool = r.get("telegram_verified");
44 let signal_verified: bool = r.get("signal_verified");
45 Ok(email_verified || discord_verified || telegram_verified || signal_verified)
46 }
47 None => Ok(false),
48 }
49}
50
51pub struct RepoWriteAuth {
52 pub did: Did,
53 pub user_id: Uuid,
54 pub current_root_cid: Cid,
55 pub is_oauth: bool,
56 pub scope: Option<String>,
57 pub controller_did: Option<Did>,
58}
59
60pub async fn prepare_repo_write(
61 state: &AppState,
62 headers: &HeaderMap,
63 repo: &AtIdentifier,
64 http_method: &str,
65 http_uri: &str,
66) -> Result<RepoWriteAuth, Response> {
67 let extracted = crate::auth::extract_auth_token_from_header(
68 headers.get("Authorization").and_then(|h| h.to_str().ok()),
69 )
70 .ok_or_else(|| ApiError::AuthenticationRequired.into_response())?;
71 let dpop_proof = headers.get("DPoP").and_then(|h| h.to_str().ok());
72 let auth_user = crate::auth::validate_token_with_dpop(
73 &state.db,
74 &extracted.token,
75 extracted.is_dpop,
76 dpop_proof,
77 http_method,
78 http_uri,
79 false,
80 false,
81 )
82 .await
83 .map_err(|e| {
84 tracing::warn!(error = ?e, is_dpop = extracted.is_dpop, "Token validation failed in prepare_repo_write");
85 ApiError::from(e).into_response()
86 })?;
87 if repo.as_str() != auth_user.did.as_str() {
88 return Err(
89 ApiError::InvalidRepo("Repo does not match authenticated user".into()).into_response(),
90 );
91 }
92 if crate::util::is_account_migrated(&state.db, &auth_user.did)
93 .await
94 .unwrap_or(false)
95 {
96 return Err(ApiError::AccountMigrated.into_response());
97 }
98 let is_verified = has_verified_comms_channel(&state.db, &auth_user.did)
99 .await
100 .unwrap_or(false);
101 let is_delegated = crate::delegation::is_delegated_account(&state.db, &auth_user.did)
102 .await
103 .unwrap_or(false);
104 if !is_verified && !is_delegated {
105 return Err(ApiError::AccountNotVerified.into_response());
106 }
107 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", &auth_user.did)
108 .fetch_optional(&state.db)
109 .await
110 .map_err(|e| {
111 error!("DB error fetching user: {}", e);
112 ApiError::InternalError(None).into_response()
113 })?
114 .ok_or_else(|| ApiError::InternalError(Some("User not found".into())).into_response())?;
115 let root_cid_str: String = sqlx::query_scalar!(
116 "SELECT repo_root_cid FROM repos WHERE user_id = $1",
117 user_id
118 )
119 .fetch_optional(&state.db)
120 .await
121 .map_err(|e| {
122 error!("DB error fetching repo root: {}", e);
123 ApiError::InternalError(None).into_response()
124 })?
125 .ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())).into_response())?;
126 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
127 ApiError::InternalError(Some("Invalid repo root CID".into())).into_response()
128 })?;
129 Ok(RepoWriteAuth {
130 did: auth_user.did.clone(),
131 user_id,
132 current_root_cid,
133 is_oauth: auth_user.is_oauth,
134 scope: auth_user.scope,
135 controller_did: auth_user.controller_did.clone(),
136 })
137}
138#[derive(Deserialize)]
139#[allow(dead_code)]
140pub struct CreateRecordInput {
141 pub repo: AtIdentifier,
142 pub collection: Nsid,
143 pub rkey: Option<Rkey>,
144 pub validate: Option<bool>,
145 pub record: serde_json::Value,
146 #[serde(rename = "swapCommit")]
147 pub swap_commit: Option<String>,
148}
149#[derive(Serialize)]
150#[serde(rename_all = "camelCase")]
151pub struct CommitInfo {
152 pub cid: String,
153 pub rev: String,
154}
155
156#[derive(Serialize)]
157#[serde(rename_all = "camelCase")]
158pub struct CreateRecordOutput {
159 pub uri: AtUri,
160 pub cid: String,
161 pub commit: CommitInfo,
162 #[serde(skip_serializing_if = "Option::is_none")]
163 pub validation_status: Option<String>,
164}
165pub async fn create_record(
166 State(state): State<AppState>,
167 headers: HeaderMap,
168 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
169 Json(input): Json<CreateRecordInput>,
170) -> Response {
171 let auth = match prepare_repo_write(
172 &state,
173 &headers,
174 &input.repo,
175 "POST",
176 &crate::util::build_full_url(&uri.to_string()),
177 )
178 .await
179 {
180 Ok(res) => res,
181 Err(err_res) => return err_res,
182 };
183
184 if let Err(e) = crate::auth::scope_check::check_repo_scope(
185 auth.is_oauth,
186 auth.scope.as_deref(),
187 crate::oauth::RepoAction::Create,
188 &input.collection,
189 ) {
190 return e;
191 }
192
193 let did = auth.did;
194 let user_id = auth.user_id;
195 let current_root_cid = auth.current_root_cid;
196 let controller_did = auth.controller_did;
197
198 if let Some(swap_commit) = &input.swap_commit
199 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
200 {
201 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
202 }
203 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
204 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
205 Ok(Some(b)) => b,
206 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
207 };
208 let commit = match Commit::from_cbor(&commit_bytes) {
209 Ok(c) => c,
210 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
211 };
212 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
213 let validation_status = if input.validate == Some(false) {
214 None
215 } else {
216 let require_lexicon = input.validate == Some(true);
217 match validate_record_with_status(
218 &input.record,
219 &input.collection,
220 input.rkey.as_ref(),
221 require_lexicon,
222 ) {
223 Ok(status) => Some(status),
224 Err(err_response) => return *err_response,
225 }
226 };
227 let rkey = input.rkey.unwrap_or_else(Rkey::generate);
228 let record_ipld = crate::util::json_to_ipld(&input.record);
229 let mut record_bytes = Vec::new();
230 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
231 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response();
232 }
233 let record_cid = match tracking_store.put(&record_bytes).await {
234 Ok(c) => c,
235 _ => {
236 return ApiError::InternalError(Some("Failed to save record block".into()))
237 .into_response();
238 }
239 };
240 let key = format!("{}/{}", input.collection, rkey);
241 let new_mst = match mst.add(&key, record_cid).await {
242 Ok(m) => m,
243 _ => return ApiError::InternalError(Some("Failed to add to MST".into())).into_response(),
244 };
245 let new_mst_root = match new_mst.persist().await {
246 Ok(c) => c,
247 _ => return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(),
248 };
249 let op = RecordOp::Create {
250 collection: input.collection.clone(),
251 rkey: rkey.clone(),
252 cid: record_cid,
253 };
254 let mut new_mst_blocks = std::collections::BTreeMap::new();
255 let mut old_mst_blocks = std::collections::BTreeMap::new();
256 if new_mst
257 .blocks_for_path(&key, &mut new_mst_blocks)
258 .await
259 .is_err()
260 {
261 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
262 .into_response();
263 }
264 if mst
265 .blocks_for_path(&key, &mut old_mst_blocks)
266 .await
267 .is_err()
268 {
269 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
270 .into_response();
271 }
272 let mut relevant_blocks = new_mst_blocks.clone();
273 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
274 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
275 let written_cids: Vec<Cid> = tracking_store
276 .get_all_relevant_cids()
277 .into_iter()
278 .chain(relevant_blocks.keys().copied())
279 .collect::<std::collections::HashSet<_>>()
280 .into_iter()
281 .collect();
282 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
283 let blob_cids = extract_blob_cids(&input.record);
284 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
285 .chain(
286 old_mst_blocks
287 .keys()
288 .filter(|cid| !new_mst_blocks.contains_key(*cid))
289 .copied(),
290 )
291 .collect();
292 let commit_result = match commit_and_log(
293 &state,
294 CommitParams {
295 did: &did,
296 user_id,
297 current_root_cid: Some(current_root_cid),
298 prev_data_cid: Some(commit.data),
299 new_mst_root,
300 ops: vec![op],
301 blocks_cids: &written_cids_str,
302 blobs: &blob_cids,
303 obsolete_cids,
304 },
305 )
306 .await
307 {
308 Ok(res) => res,
309 Err(e) if e.contains("ConcurrentModification") => {
310 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
311 }
312 Err(e) => return ApiError::InternalError(Some(e)).into_response(),
313 };
314
315 if let Some(ref controller) = controller_did {
316 let _ = delegation::log_delegation_action(
317 &state.db,
318 &did,
319 controller,
320 Some(controller),
321 DelegationActionType::RepoWrite,
322 Some(json!({
323 "action": "create",
324 "collection": input.collection,
325 "rkey": rkey
326 })),
327 None,
328 None,
329 )
330 .await;
331 }
332
333 (
334 StatusCode::OK,
335 Json(CreateRecordOutput {
336 uri: AtUri::from_parts(&did, &input.collection, &rkey),
337 cid: record_cid.to_string(),
338 commit: CommitInfo {
339 cid: commit_result.commit_cid.to_string(),
340 rev: commit_result.rev,
341 },
342 validation_status: validation_status.map(|s| s.to_string()),
343 }),
344 )
345 .into_response()
346}
347#[derive(Deserialize)]
348#[allow(dead_code)]
349pub struct PutRecordInput {
350 pub repo: AtIdentifier,
351 pub collection: Nsid,
352 pub rkey: Rkey,
353 pub validate: Option<bool>,
354 pub record: serde_json::Value,
355 #[serde(rename = "swapCommit")]
356 pub swap_commit: Option<String>,
357 #[serde(rename = "swapRecord")]
358 pub swap_record: Option<String>,
359}
360#[derive(Serialize)]
361#[serde(rename_all = "camelCase")]
362pub struct PutRecordOutput {
363 pub uri: AtUri,
364 pub cid: String,
365 #[serde(skip_serializing_if = "Option::is_none")]
366 pub commit: Option<CommitInfo>,
367 #[serde(skip_serializing_if = "Option::is_none")]
368 pub validation_status: Option<String>,
369}
370pub async fn put_record(
371 State(state): State<AppState>,
372 headers: HeaderMap,
373 axum::extract::OriginalUri(uri): axum::extract::OriginalUri,
374 Json(input): Json<PutRecordInput>,
375) -> Response {
376 let auth = match prepare_repo_write(
377 &state,
378 &headers,
379 &input.repo,
380 "POST",
381 &crate::util::build_full_url(&uri.to_string()),
382 )
383 .await
384 {
385 Ok(res) => res,
386 Err(err_res) => return err_res,
387 };
388
389 if let Err(e) = crate::auth::scope_check::check_repo_scope(
390 auth.is_oauth,
391 auth.scope.as_deref(),
392 crate::oauth::RepoAction::Create,
393 &input.collection,
394 ) {
395 return e;
396 }
397 if let Err(e) = crate::auth::scope_check::check_repo_scope(
398 auth.is_oauth,
399 auth.scope.as_deref(),
400 crate::oauth::RepoAction::Update,
401 &input.collection,
402 ) {
403 return e;
404 }
405
406 let did = auth.did;
407 let user_id = auth.user_id;
408 let current_root_cid = auth.current_root_cid;
409 let controller_did = auth.controller_did;
410
411 if let Some(swap_commit) = &input.swap_commit
412 && Cid::from_str(swap_commit).ok() != Some(current_root_cid)
413 {
414 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
415 }
416 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
417 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
418 Ok(Some(b)) => b,
419 _ => return ApiError::InternalError(Some("Commit block not found".into())).into_response(),
420 };
421 let commit = match Commit::from_cbor(&commit_bytes) {
422 Ok(c) => c,
423 _ => return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
424 };
425 let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
426 let key = format!("{}/{}", input.collection, input.rkey);
427 let validation_status = if input.validate == Some(false) {
428 None
429 } else {
430 let require_lexicon = input.validate == Some(true);
431 match validate_record_with_status(
432 &input.record,
433 &input.collection,
434 Some(&input.rkey),
435 require_lexicon,
436 ) {
437 Ok(status) => Some(status),
438 Err(err_response) => return *err_response,
439 }
440 };
441 if let Some(swap_record_str) = &input.swap_record {
442 let expected_cid = Cid::from_str(swap_record_str).ok();
443 let actual_cid = mst.get(&key).await.ok().flatten();
444 if expected_cid != actual_cid {
445 return ApiError::InvalidSwap(Some(
446 "Record has been modified or does not exist".into(),
447 ))
448 .into_response();
449 }
450 }
451 let existing_cid = mst.get(&key).await.ok().flatten();
452 let record_ipld = crate::util::json_to_ipld(&input.record);
453 let mut record_bytes = Vec::new();
454 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &record_ipld).is_err() {
455 return ApiError::InvalidRecord("Failed to serialize record".into()).into_response();
456 }
457 let record_cid = match tracking_store.put(&record_bytes).await {
458 Ok(c) => c,
459 _ => {
460 return ApiError::InternalError(Some("Failed to save record block".into()))
461 .into_response();
462 }
463 };
464 if existing_cid == Some(record_cid) {
465 return (
466 StatusCode::OK,
467 Json(PutRecordOutput {
468 uri: AtUri::from_parts(&did, &input.collection, &input.rkey),
469 cid: record_cid.to_string(),
470 commit: None,
471 validation_status: validation_status.map(|s| s.to_string()),
472 }),
473 )
474 .into_response();
475 }
476 let new_mst = if existing_cid.is_some() {
477 match mst.update(&key, record_cid).await {
478 Ok(m) => m,
479 Err(_) => {
480 return ApiError::InternalError(Some("Failed to update MST".into()))
481 .into_response();
482 }
483 }
484 } else {
485 match mst.add(&key, record_cid).await {
486 Ok(m) => m,
487 Err(_) => {
488 return ApiError::InternalError(Some("Failed to add to MST".into()))
489 .into_response();
490 }
491 }
492 };
493 let new_mst_root = match new_mst.persist().await {
494 Ok(c) => c,
495 Err(_) => {
496 return ApiError::InternalError(Some("Failed to persist MST".into())).into_response();
497 }
498 };
499 let op = if existing_cid.is_some() {
500 RecordOp::Update {
501 collection: input.collection.clone(),
502 rkey: input.rkey.clone(),
503 cid: record_cid,
504 prev: existing_cid,
505 }
506 } else {
507 RecordOp::Create {
508 collection: input.collection.clone(),
509 rkey: input.rkey.clone(),
510 cid: record_cid,
511 }
512 };
513 let mut new_mst_blocks = std::collections::BTreeMap::new();
514 let mut old_mst_blocks = std::collections::BTreeMap::new();
515 if new_mst
516 .blocks_for_path(&key, &mut new_mst_blocks)
517 .await
518 .is_err()
519 {
520 return ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
521 .into_response();
522 }
523 if mst
524 .blocks_for_path(&key, &mut old_mst_blocks)
525 .await
526 .is_err()
527 {
528 return ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
529 .into_response();
530 }
531 let mut relevant_blocks = new_mst_blocks.clone();
532 relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
533 relevant_blocks.insert(record_cid, bytes::Bytes::from(record_bytes));
534 let written_cids: Vec<Cid> = tracking_store
535 .get_all_relevant_cids()
536 .into_iter()
537 .chain(relevant_blocks.keys().copied())
538 .collect::<std::collections::HashSet<_>>()
539 .into_iter()
540 .collect();
541 let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
542 let is_update = existing_cid.is_some();
543 let blob_cids = extract_blob_cids(&input.record);
544 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
545 .chain(
546 old_mst_blocks
547 .keys()
548 .filter(|cid| !new_mst_blocks.contains_key(*cid))
549 .copied(),
550 )
551 .chain(existing_cid)
552 .collect();
553 let commit_result = match commit_and_log(
554 &state,
555 CommitParams {
556 did: &did,
557 user_id,
558 current_root_cid: Some(current_root_cid),
559 prev_data_cid: Some(commit.data),
560 new_mst_root,
561 ops: vec![op],
562 blocks_cids: &written_cids_str,
563 blobs: &blob_cids,
564 obsolete_cids,
565 },
566 )
567 .await
568 {
569 Ok(res) => res,
570 Err(e) if e.contains("ConcurrentModification") => {
571 return ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response();
572 }
573 Err(e) => return ApiError::InternalError(Some(e)).into_response(),
574 };
575
576 if let Some(ref controller) = controller_did {
577 let _ = delegation::log_delegation_action(
578 &state.db,
579 &did,
580 controller,
581 Some(controller),
582 DelegationActionType::RepoWrite,
583 Some(json!({
584 "action": if is_update { "update" } else { "create" },
585 "collection": input.collection,
586 "rkey": input.rkey
587 })),
588 None,
589 None,
590 )
591 .await;
592 }
593
594 (
595 StatusCode::OK,
596 Json(PutRecordOutput {
597 uri: AtUri::from_parts(&did, &input.collection, &input.rkey),
598 cid: record_cid.to_string(),
599 commit: Some(CommitInfo {
600 cid: commit_result.commit_cid.to_string(),
601 rev: commit_result.rev,
602 }),
603 validation_status: validation_status.map(|s| s.to_string()),
604 }),
605 )
606 .into_response()
607}