this repo has no description
1use axum::{
2 extract::State,
3 Json,
4 response::{IntoResponse, Response},
5 http::StatusCode,
6};
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use crate::state::AppState;
10use chrono::Utc;
11use sqlx::Row;
12use cid::Cid;
13use std::str::FromStr;
14use jacquard_repo::{mst::Mst, commit::Commit, storage::BlockStore};
15use jacquard::types::{string::{Nsid, Tid}, did::Did, integer::LimitedU32};
16use tracing::error;
17use std::sync::Arc;
18
19#[derive(Deserialize)]
20#[allow(dead_code)]
21pub struct CreateRecordInput {
22 pub repo: String,
23 pub collection: String,
24 pub rkey: Option<String>,
25 pub validate: Option<bool>,
26 pub record: serde_json::Value,
27 #[serde(rename = "swapCommit")]
28 pub swap_commit: Option<String>,
29}
30
31#[derive(Serialize)]
32#[serde(rename_all = "camelCase")]
33pub struct CreateRecordOutput {
34 pub uri: String,
35 pub cid: String,
36}
37
38pub async fn create_record(
39 State(state): State<AppState>,
40 headers: axum::http::HeaderMap,
41 Json(input): Json<CreateRecordInput>,
42) -> Response {
43 let auth_header = headers.get("Authorization");
44 if auth_header.is_none() {
45 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response();
46 }
47 let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
48
49 if let Err(_) = crate::auth::verify_token(&token) {
50 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"}))).into_response();
51 }
52
53 let session = sqlx::query("SELECT did FROM sessions WHERE access_jwt = $1")
54 .bind(&token)
55 .fetch_optional(&state.db)
56 .await
57 .unwrap_or(None);
58
59 let did = match session {
60 Some(row) => row.get::<String, _>("did"),
61 None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(),
62 };
63
64 if input.repo != did {
65 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
66 }
67
68 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
69 .bind(&did)
70 .fetch_optional(&state.db)
71 .await;
72
73 let user_id: uuid::Uuid = match user_query {
74 Ok(Some(row)) => row.get("id"),
75 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "User not found"}))).into_response(),
76 };
77
78 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
79 .bind(user_id)
80 .fetch_optional(&state.db)
81 .await;
82
83 let current_root_cid = match repo_root_query {
84 Ok(Some(row)) => {
85 let cid_str: String = row.get("repo_root_cid");
86 Cid::from_str(&cid_str).ok()
87 },
88 _ => None,
89 };
90
91 if current_root_cid.is_none() {
92 error!("Repo root not found for user {}", did);
93 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Repo root not found"}))).into_response();
94 }
95 let current_root_cid = current_root_cid.unwrap();
96
97 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
98 Ok(Some(b)) => b,
99 Ok(None) => {
100 error!("Commit block not found: {}", current_root_cid);
101 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
102 },
103 Err(e) => {
104 error!("Failed to load commit block: {:?}", e);
105 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
106 }
107 };
108
109 let commit = match Commit::from_cbor(&commit_bytes) {
110 Ok(c) => c,
111 Err(e) => {
112 error!("Failed to parse commit: {:?}", e);
113 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
114 }
115 };
116
117 let mst_root = commit.data;
118 let store = Arc::new(state.block_store.clone());
119 let mst = Mst::load(store.clone(), mst_root, None);
120
121 let collection_nsid = match input.collection.parse::<Nsid>() {
122 Ok(n) => n,
123 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
124 };
125
126 let rkey = input.rkey.unwrap_or_else(|| {
127 Utc::now().format("%Y%m%d%H%M%S%f").to_string()
128 });
129
130 let mut record_bytes = Vec::new();
131 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
132 error!("Error serializing record: {:?}", e);
133 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
134 }
135
136 let record_cid = match state.block_store.put(&record_bytes).await {
137 Ok(c) => c,
138 Err(e) => {
139 error!("Failed to save record block: {:?}", e);
140 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
141 }
142 };
143
144 let key = format!("{}/{}", collection_nsid, rkey);
145 if let Err(e) = mst.update(&key, record_cid).await {
146 error!("Failed to update MST: {:?}", e);
147 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
148 }
149
150 let new_mst_root = match mst.root().await {
151 Ok(c) => c,
152 Err(e) => {
153 error!("Failed to get new MST root: {:?}", e);
154 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
155 }
156 };
157
158 let did_obj = match Did::new(&did) {
159 Ok(d) => d,
160 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(),
161 };
162
163 let rev = Tid::now(LimitedU32::MIN);
164
165 let new_commit = Commit::new_unsigned(
166 did_obj,
167 new_mst_root,
168 rev,
169 Some(current_root_cid)
170 );
171
172 let new_commit_bytes = match new_commit.to_cbor() {
173 Ok(b) => b,
174 Err(e) => {
175 error!("Failed to serialize new commit: {:?}", e);
176 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
177 }
178 };
179
180 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
181 Ok(c) => c,
182 Err(e) => {
183 error!("Failed to save new commit: {:?}", e);
184 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
185 }
186 };
187
188 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
189 .bind(new_root_cid.to_string())
190 .bind(user_id)
191 .execute(&state.db)
192 .await;
193
194 if let Err(e) = update_repo {
195 error!("Failed to update repo root in DB: {:?}", e);
196 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
197 }
198
199 let record_insert = sqlx::query(
200 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
201 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()"
202 )
203 .bind(user_id)
204 .bind(&input.collection)
205 .bind(&rkey)
206 .bind(record_cid.to_string())
207 .execute(&state.db)
208 .await;
209
210 if let Err(e) = record_insert {
211 error!("Error inserting record index: {:?}", e);
212 }
213
214 let output = CreateRecordOutput {
215 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
216 cid: record_cid.to_string(),
217 };
218 (StatusCode::OK, Json(output)).into_response()
219}