this repo has no description
1use axum::{ 2 extract::State, 3 Json, 4 response::{IntoResponse, Response}, 5 http::StatusCode, 6}; 7use serde::{Deserialize, Serialize}; 8use serde_json::json; 9use crate::state::AppState; 10use chrono::Utc; 11use sqlx::Row; 12use cid::Cid; 13use std::str::FromStr; 14use jacquard_repo::{mst::Mst, commit::Commit, storage::BlockStore}; 15use jacquard::types::{string::{Nsid, Tid}, did::Did, integer::LimitedU32}; 16use tracing::error; 17use std::sync::Arc; 18 19#[derive(Deserialize)] 20#[allow(dead_code)] 21pub struct CreateRecordInput { 22 pub repo: String, 23 pub collection: String, 24 pub rkey: Option<String>, 25 pub validate: Option<bool>, 26 pub record: serde_json::Value, 27 #[serde(rename = "swapCommit")] 28 pub swap_commit: Option<String>, 29} 30 31#[derive(Serialize)] 32#[serde(rename_all = "camelCase")] 33pub struct CreateRecordOutput { 34 pub uri: String, 35 pub cid: String, 36} 37 38pub async fn create_record( 39 State(state): State<AppState>, 40 headers: axum::http::HeaderMap, 41 Json(input): Json<CreateRecordInput>, 42) -> Response { 43 let auth_header = headers.get("Authorization"); 44 if auth_header.is_none() { 45 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response(); 46 } 47 let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", ""); 48 49 if let Err(_) = crate::auth::verify_token(&token) { 50 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token"}))).into_response(); 51 } 52 53 let session = sqlx::query("SELECT did FROM sessions WHERE access_jwt = $1") 54 .bind(&token) 55 .fetch_optional(&state.db) 56 .await 57 .unwrap_or(None); 58 59 let did = match session { 60 Some(row) => row.get::<String, _>("did"), 61 None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(), 62 }; 63 64 if input.repo != did { 65 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 66 } 67 68 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 69 .bind(&did) 70 .fetch_optional(&state.db) 71 .await; 72 73 let user_id: uuid::Uuid = match user_query { 74 Ok(Some(row)) => row.get("id"), 75 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "User not found"}))).into_response(), 76 }; 77 78 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 79 .bind(user_id) 80 .fetch_optional(&state.db) 81 .await; 82 83 let current_root_cid = match repo_root_query { 84 Ok(Some(row)) => { 85 let cid_str: String = row.get("repo_root_cid"); 86 Cid::from_str(&cid_str).ok() 87 }, 88 _ => None, 89 }; 90 91 if current_root_cid.is_none() { 92 error!("Repo root not found for user {}", did); 93 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Repo root not found"}))).into_response(); 94 } 95 let current_root_cid = current_root_cid.unwrap(); 96 97 let commit_bytes = match state.block_store.get(&current_root_cid).await { 98 Ok(Some(b)) => b, 99 Ok(None) => { 100 error!("Commit block not found: {}", current_root_cid); 101 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 102 }, 103 Err(e) => { 104 error!("Failed to load commit block: {:?}", e); 105 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 106 } 107 }; 108 109 let commit = match Commit::from_cbor(&commit_bytes) { 110 Ok(c) => c, 111 Err(e) => { 112 error!("Failed to parse commit: {:?}", e); 113 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 114 } 115 }; 116 117 let mst_root = commit.data; 118 let store = Arc::new(state.block_store.clone()); 119 let mst = Mst::load(store.clone(), mst_root, None); 120 121 let collection_nsid = match input.collection.parse::<Nsid>() { 122 Ok(n) => n, 123 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 124 }; 125 126 let rkey = input.rkey.unwrap_or_else(|| { 127 Utc::now().format("%Y%m%d%H%M%S%f").to_string() 128 }); 129 130 let mut record_bytes = Vec::new(); 131 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 132 error!("Error serializing record: {:?}", e); 133 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 134 } 135 136 let record_cid = match state.block_store.put(&record_bytes).await { 137 Ok(c) => c, 138 Err(e) => { 139 error!("Failed to save record block: {:?}", e); 140 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 141 } 142 }; 143 144 let key = format!("{}/{}", collection_nsid, rkey); 145 if let Err(e) = mst.update(&key, record_cid).await { 146 error!("Failed to update MST: {:?}", e); 147 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 148 } 149 150 let new_mst_root = match mst.root().await { 151 Ok(c) => c, 152 Err(e) => { 153 error!("Failed to get new MST root: {:?}", e); 154 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 155 } 156 }; 157 158 let did_obj = match Did::new(&did) { 159 Ok(d) => d, 160 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(), 161 }; 162 163 let rev = Tid::now(LimitedU32::MIN); 164 165 let new_commit = Commit::new_unsigned( 166 did_obj, 167 new_mst_root, 168 rev, 169 Some(current_root_cid) 170 ); 171 172 let new_commit_bytes = match new_commit.to_cbor() { 173 Ok(b) => b, 174 Err(e) => { 175 error!("Failed to serialize new commit: {:?}", e); 176 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 177 } 178 }; 179 180 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 181 Ok(c) => c, 182 Err(e) => { 183 error!("Failed to save new commit: {:?}", e); 184 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 185 } 186 }; 187 188 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 189 .bind(new_root_cid.to_string()) 190 .bind(user_id) 191 .execute(&state.db) 192 .await; 193 194 if let Err(e) = update_repo { 195 error!("Failed to update repo root in DB: {:?}", e); 196 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 197 } 198 199 let record_insert = sqlx::query( 200 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 201 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()" 202 ) 203 .bind(user_id) 204 .bind(&input.collection) 205 .bind(&rkey) 206 .bind(record_cid.to_string()) 207 .execute(&state.db) 208 .await; 209 210 if let Err(e) = record_insert { 211 error!("Error inserting record index: {:?}", e); 212 } 213 214 let output = CreateRecordOutput { 215 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 216 cid: record_cid.to_string(), 217 }; 218 (StatusCode::OK, Json(output)).into_response() 219}