this repo has no description
1use axum::{
2 extract::State,
3 Json,
4 response::{IntoResponse, Response},
5 http::StatusCode,
6};
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use crate::state::AppState;
10use chrono::Utc;
11use sqlx::Row;
12use cid::Cid;
13use std::str::FromStr;
14use jacquard_repo::{mst::Mst, commit::Commit, storage::BlockStore};
15use jacquard::types::{string::{Nsid, Tid}, did::Did, integer::LimitedU32};
16use tracing::error;
17use std::sync::Arc;
18
19#[derive(Deserialize)]
20#[allow(dead_code)]
21pub struct CreateRecordInput {
22 pub repo: String,
23 pub collection: String,
24 pub rkey: Option<String>,
25 pub validate: Option<bool>,
26 pub record: serde_json::Value,
27 #[serde(rename = "swapCommit")]
28 pub swap_commit: Option<String>,
29}
30
31#[derive(Serialize)]
32#[serde(rename_all = "camelCase")]
33pub struct CreateRecordOutput {
34 pub uri: String,
35 pub cid: String,
36}
37
38pub async fn create_record(
39 State(state): State<AppState>,
40 headers: axum::http::HeaderMap,
41 Json(input): Json<CreateRecordInput>,
42) -> Response {
43 let auth_header = headers.get("Authorization");
44 if auth_header.is_none() {
45 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response();
46 }
47 let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
48
49 let session = sqlx::query(
50 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
51 )
52 .bind(&token)
53 .fetch_optional(&state.db)
54 .await
55 .unwrap_or(None);
56
57 let (did, key_bytes) = match session {
58 Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")),
59 None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(),
60 };
61
62 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
63 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response();
64 }
65
66 if input.repo != did {
67 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
68 }
69
70 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
71 .bind(&did)
72 .fetch_optional(&state.db)
73 .await;
74
75 let user_id: uuid::Uuid = match user_query {
76 Ok(Some(row)) => row.get("id"),
77 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "User not found"}))).into_response(),
78 };
79
80 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
81 .bind(user_id)
82 .fetch_optional(&state.db)
83 .await;
84
85 let current_root_cid = match repo_root_query {
86 Ok(Some(row)) => {
87 let cid_str: String = row.get("repo_root_cid");
88 Cid::from_str(&cid_str).ok()
89 },
90 _ => None,
91 };
92
93 if current_root_cid.is_none() {
94 error!("Repo root not found for user {}", did);
95 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Repo root not found"}))).into_response();
96 }
97 let current_root_cid = current_root_cid.unwrap();
98
99 let commit_bytes = match state.block_store.get(¤t_root_cid).await {
100 Ok(Some(b)) => b,
101 Ok(None) => {
102 error!("Commit block not found: {}", current_root_cid);
103 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
104 },
105 Err(e) => {
106 error!("Failed to load commit block: {:?}", e);
107 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
108 }
109 };
110
111 let commit = match Commit::from_cbor(&commit_bytes) {
112 Ok(c) => c,
113 Err(e) => {
114 error!("Failed to parse commit: {:?}", e);
115 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
116 }
117 };
118
119 let mst_root = commit.data;
120 let store = Arc::new(state.block_store.clone());
121 let mst = Mst::load(store.clone(), mst_root, None);
122
123 let collection_nsid = match input.collection.parse::<Nsid>() {
124 Ok(n) => n,
125 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
126 };
127
128 let rkey = input.rkey.unwrap_or_else(|| {
129 Utc::now().format("%Y%m%d%H%M%S%f").to_string()
130 });
131
132 let mut record_bytes = Vec::new();
133 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
134 error!("Error serializing record: {:?}", e);
135 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
136 }
137
138 let record_cid = match state.block_store.put(&record_bytes).await {
139 Ok(c) => c,
140 Err(e) => {
141 error!("Failed to save record block: {:?}", e);
142 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
143 }
144 };
145
146 let key = format!("{}/{}", collection_nsid, rkey);
147 if let Err(e) = mst.update(&key, record_cid).await {
148 error!("Failed to update MST: {:?}", e);
149 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
150 }
151
152 let new_mst_root = match mst.root().await {
153 Ok(c) => c,
154 Err(e) => {
155 error!("Failed to get new MST root: {:?}", e);
156 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
157 }
158 };
159
160 let did_obj = match Did::new(&did) {
161 Ok(d) => d,
162 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(),
163 };
164
165 let rev = Tid::now(LimitedU32::MIN);
166
167 let new_commit = Commit::new_unsigned(
168 did_obj,
169 new_mst_root,
170 rev,
171 Some(current_root_cid)
172 );
173
174 let new_commit_bytes = match new_commit.to_cbor() {
175 Ok(b) => b,
176 Err(e) => {
177 error!("Failed to serialize new commit: {:?}", e);
178 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
179 }
180 };
181
182 let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
183 Ok(c) => c,
184 Err(e) => {
185 error!("Failed to save new commit: {:?}", e);
186 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
187 }
188 };
189
190 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
191 .bind(new_root_cid.to_string())
192 .bind(user_id)
193 .execute(&state.db)
194 .await;
195
196 if let Err(e) = update_repo {
197 error!("Failed to update repo root in DB: {:?}", e);
198 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
199 }
200
201 let record_insert = sqlx::query(
202 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
203 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()"
204 )
205 .bind(user_id)
206 .bind(&input.collection)
207 .bind(&rkey)
208 .bind(record_cid.to_string())
209 .execute(&state.db)
210 .await;
211
212 if let Err(e) = record_insert {
213 error!("Error inserting record index: {:?}", e);
214 }
215
216 let output = CreateRecordOutput {
217 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
218 cid: record_cid.to_string(),
219 };
220 (StatusCode::OK, Json(output)).into_response()
221}