this repo has no description
1use super::validation::validate_record;
2use crate::api::repo::record::utils::{commit_and_log, RecordOp};
3use crate::repo::tracking::TrackingBlockStore;
4use crate::state::AppState;
5use axum::{
6 extract::State,
7 http::{HeaderMap, StatusCode},
8 response::{IntoResponse, Response},
9 Json,
10};
11use chrono::Utc;
12use cid::Cid;
13use jacquard::types::string::Nsid;
14use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use sqlx::{PgPool, Row};
18use std::str::FromStr;
19use std::sync::Arc;
20use tracing::error;
21use uuid::Uuid;
22
23pub async fn has_verified_notification_channel(db: &PgPool, did: &str) -> Result<bool, sqlx::Error> {
24 let row = sqlx::query(
25 r#"
26 SELECT
27 email_confirmed,
28 discord_verified,
29 telegram_verified,
30 signal_verified
31 FROM users
32 WHERE did = $1
33 "#
34 )
35 .bind(did)
36 .fetch_optional(db)
37 .await?;
38
39 match row {
40 Some(r) => {
41 let email_confirmed: bool = r.get("email_confirmed");
42 let discord_verified: bool = r.get("discord_verified");
43 let telegram_verified: bool = r.get("telegram_verified");
44 let signal_verified: bool = r.get("signal_verified");
45 Ok(email_confirmed || discord_verified || telegram_verified || signal_verified)
46 }
47 None => Ok(false),
48 }
49}
50
51pub async fn prepare_repo_write(
52 state: &AppState,
53 headers: &HeaderMap,
54 repo_did: &str,
55) -> Result<(String, Uuid, Cid), Response> {
56 let token = crate::auth::extract_bearer_token_from_header(
57 headers.get("Authorization").and_then(|h| h.to_str().ok())
58 ).ok_or_else(|| {
59 (
60 StatusCode::UNAUTHORIZED,
61 Json(json!({"error": "AuthenticationRequired"})),
62 )
63 .into_response()
64 })?;
65
66 let auth_user = crate::auth::validate_bearer_token(&state.db, &token)
67 .await
68 .map_err(|_| {
69 (
70 StatusCode::UNAUTHORIZED,
71 Json(json!({"error": "AuthenticationFailed"})),
72 )
73 .into_response()
74 })?;
75
76 if repo_did != auth_user.did {
77 return Err((
78 StatusCode::FORBIDDEN,
79 Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})),
80 )
81 .into_response());
82 }
83
84 match has_verified_notification_channel(&state.db, &auth_user.did).await {
85 Ok(true) => {}
86 Ok(false) => {
87 return Err((
88 StatusCode::FORBIDDEN,
89 Json(json!({
90 "error": "AccountNotVerified",
91 "message": "You must verify at least one notification channel (email, Discord, Telegram, or Signal) before creating records"
92 })),
93 )
94 .into_response());
95 }
96 Err(e) => {
97 error!("DB error checking notification channels: {}", e);
98 return Err((
99 StatusCode::INTERNAL_SERVER_ERROR,
100 Json(json!({"error": "InternalError"})),
101 )
102 .into_response());
103 }
104 }
105
106 let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
107 .fetch_optional(&state.db)
108 .await
109 .map_err(|e| {
110 error!("DB error fetching user: {}", e);
111 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
112 })?
113 .ok_or_else(|| {
114 (
115 StatusCode::INTERNAL_SERVER_ERROR,
116 Json(json!({"error": "InternalError", "message": "User not found"})),
117 )
118 .into_response()
119 })?;
120
121 let root_cid_str: String =
122 sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
123 .fetch_optional(&state.db)
124 .await
125 .map_err(|e| {
126 error!("DB error fetching repo root: {}", e);
127 (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
128 })?
129 .ok_or_else(|| {
130 (
131 StatusCode::INTERNAL_SERVER_ERROR,
132 Json(json!({"error": "InternalError", "message": "Repo root not found"})),
133 )
134 .into_response()
135 })?;
136
137 let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| {
138 (
139 StatusCode::INTERNAL_SERVER_ERROR,
140 Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})),
141 )
142 .into_response()
143 })?;
144
145 Ok((auth_user.did, user_id, current_root_cid))
146}
147
148#[derive(Deserialize)]
149#[allow(dead_code)]
150pub struct CreateRecordInput {
151 pub repo: String,
152 pub collection: String,
153 pub rkey: Option<String>,
154 pub validate: Option<bool>,
155 pub record: serde_json::Value,
156 #[serde(rename = "swapCommit")]
157 pub swap_commit: Option<String>,
158}
159
160#[derive(Serialize)]
161#[serde(rename_all = "camelCase")]
162pub struct CreateRecordOutput {
163 pub uri: String,
164 pub cid: String,
165}
166
167pub async fn create_record(
168 State(state): State<AppState>,
169 headers: HeaderMap,
170 Json(input): Json<CreateRecordInput>,
171) -> Response {
172 let (did, user_id, current_root_cid) =
173 match prepare_repo_write(&state, &headers, &input.repo).await {
174 Ok(res) => res,
175 Err(err_res) => return err_res,
176 };
177
178 if let Some(swap_commit) = &input.swap_commit {
179 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
180 return (
181 StatusCode::CONFLICT,
182 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})),
183 )
184 .into_response();
185 }
186 }
187
188 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
189
190 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
191 Ok(Some(b)) => b,
192 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
193 };
194 let commit = match Commit::from_cbor(&commit_bytes) {
195 Ok(c) => c,
196 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(),
197 };
198
199 let mst = Mst::load(
200 Arc::new(tracking_store.clone()),
201 commit.data,
202 None,
203 );
204
205 let collection_nsid = match input.collection.parse::<Nsid>() {
206 Ok(n) => n,
207 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
208 };
209
210 if input.validate.unwrap_or(true) {
211 if let Err(err_response) = validate_record(&input.record, &input.collection) {
212 return err_response;
213 }
214 }
215
216 let rkey = input.rkey.unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
217
218 let mut record_bytes = Vec::new();
219 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
220 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
221 }
222 let record_cid = match tracking_store.put(&record_bytes).await {
223 Ok(c) => c,
224 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(),
225 };
226
227 let key = format!("{}/{}", collection_nsid, rkey);
228 let new_mst = match mst.add(&key, record_cid).await {
229 Ok(m) => m,
230 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(),
231 };
232 let new_mst_root = match new_mst.persist().await {
233 Ok(c) => c,
234 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(),
235 };
236
237 let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid };
238 let written_cids = tracking_store.get_written_cids();
239 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>();
240
241 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await {
242 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response();
243 };
244
245 (StatusCode::OK, Json(CreateRecordOutput {
246 uri: format!("at://{}/{}/{}", did, input.collection, rkey),
247 cid: record_cid.to_string(),
248 })).into_response()
249}
250
251#[derive(Deserialize)]
252#[allow(dead_code)]
253pub struct PutRecordInput {
254 pub repo: String,
255 pub collection: String,
256 pub rkey: String,
257 pub validate: Option<bool>,
258 pub record: serde_json::Value,
259 #[serde(rename = "swapCommit")]
260 pub swap_commit: Option<String>,
261 #[serde(rename = "swapRecord")]
262 pub swap_record: Option<String>,
263}
264
265#[derive(Serialize)]
266#[serde(rename_all = "camelCase")]
267pub struct PutRecordOutput {
268 pub uri: String,
269 pub cid: String,
270}
271
272pub async fn put_record(
273 State(state): State<AppState>,
274 headers: HeaderMap,
275 Json(input): Json<PutRecordInput>,
276) -> Response {
277 let (did, user_id, current_root_cid) =
278 match prepare_repo_write(&state, &headers, &input.repo).await {
279 Ok(res) => res,
280 Err(err_res) => return err_res,
281 };
282
283 if let Some(swap_commit) = &input.swap_commit {
284 if Cid::from_str(swap_commit).ok() != Some(current_root_cid) {
285 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response();
286 }
287 }
288
289 let tracking_store = TrackingBlockStore::new(state.block_store.clone());
290
291 let commit_bytes = match tracking_store.get(¤t_root_cid).await {
292 Ok(Some(b)) => b,
293 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
294 };
295 let commit = match Commit::from_cbor(&commit_bytes) {
296 Ok(c) => c,
297 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(),
298 };
299
300 let mst = Mst::load(
301 Arc::new(tracking_store.clone()),
302 commit.data,
303 None,
304 );
305 let collection_nsid = match input.collection.parse::<Nsid>() {
306 Ok(n) => n,
307 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
308 };
309 let key = format!("{}/{}", collection_nsid, input.rkey);
310
311 if input.validate.unwrap_or(true) {
312 if let Err(err_response) = validate_record(&input.record, &input.collection) {
313 return err_response;
314 }
315 }
316
317 if let Some(swap_record_str) = &input.swap_record {
318 let expected_cid = Cid::from_str(swap_record_str).ok();
319 let actual_cid = mst.get(&key).await.ok().flatten();
320 if expected_cid != actual_cid {
321 return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response();
322 }
323 }
324
325 let existing_cid = mst.get(&key).await.ok().flatten();
326
327 let mut record_bytes = Vec::new();
328 if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() {
329 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
330 }
331 let record_cid = match tracking_store.put(&record_bytes).await {
332 Ok(c) => c,
333 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(),
334 };
335
336 let new_mst = if existing_cid.is_some() {
337 match mst.update(&key, record_cid).await {
338 Ok(m) => m,
339 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update MST"}))).into_response(),
340 }
341 } else {
342 match mst.add(&key, record_cid).await {
343 Ok(m) => m,
344 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(),
345 }
346 };
347 let new_mst_root = match new_mst.persist().await {
348 Ok(c) => c,
349 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(),
350 };
351
352 let op = if existing_cid.is_some() {
353 RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid }
354 } else {
355 RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid }
356 };
357
358 let written_cids = tracking_store.get_written_cids();
359 let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>();
360
361 if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await {
362 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response();
363 };
364
365 (StatusCode::OK, Json(PutRecordOutput {
366 uri: format!("at://{}/{}/{}", did, input.collection, input.rkey),
367 cid: record_cid.to_string(),
368 })).into_response()
369}