this repo has no description
1use axum::{ 2 extract::State, 3 Json, 4 response::{IntoResponse, Response}, 5 http::StatusCode, 6}; 7use serde::{Deserialize, Serialize}; 8use serde_json::json; 9use crate::state::AppState; 10use chrono::Utc; 11use sqlx::Row; 12use cid::Cid; 13use std::str::FromStr; 14use jacquard_repo::{mst::Mst, commit::Commit, storage::BlockStore}; 15use jacquard::types::{string::{Nsid, Tid}, did::Did, integer::LimitedU32}; 16use tracing::error; 17use std::sync::Arc; 18 19#[derive(Deserialize)] 20#[allow(dead_code)] 21pub struct CreateRecordInput { 22 pub repo: String, 23 pub collection: String, 24 pub rkey: Option<String>, 25 pub validate: Option<bool>, 26 pub record: serde_json::Value, 27 #[serde(rename = "swapCommit")] 28 pub swap_commit: Option<String>, 29} 30 31#[derive(Serialize)] 32#[serde(rename_all = "camelCase")] 33pub struct CreateRecordOutput { 34 pub uri: String, 35 pub cid: String, 36} 37 38pub async fn create_record( 39 State(state): State<AppState>, 40 headers: axum::http::HeaderMap, 41 Json(input): Json<CreateRecordInput>, 42) -> Response { 43 let auth_header = headers.get("Authorization"); 44 if auth_header.is_none() { 45 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response(); 46 } 47 let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", ""); 48 49 let session = sqlx::query( 50 "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1" 51 ) 52 .bind(&token) 53 .fetch_optional(&state.db) 54 .await 55 .unwrap_or(None); 56 57 let (did, key_bytes) = match session { 58 Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")), 59 None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(), 60 }; 61 62 if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 63 return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response(); 64 } 65 66 if input.repo != did { 67 return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 68 } 69 70 let user_query = sqlx::query("SELECT id FROM users WHERE did = $1") 71 .bind(&did) 72 .fetch_optional(&state.db) 73 .await; 74 75 let user_id: uuid::Uuid = match user_query { 76 Ok(Some(row)) => row.get("id"), 77 _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "User not found"}))).into_response(), 78 }; 79 80 let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") 81 .bind(user_id) 82 .fetch_optional(&state.db) 83 .await; 84 85 let current_root_cid = match repo_root_query { 86 Ok(Some(row)) => { 87 let cid_str: String = row.get("repo_root_cid"); 88 Cid::from_str(&cid_str).ok() 89 }, 90 _ => None, 91 }; 92 93 if current_root_cid.is_none() { 94 error!("Repo root not found for user {}", did); 95 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Repo root not found"}))).into_response(); 96 } 97 let current_root_cid = current_root_cid.unwrap(); 98 99 let commit_bytes = match state.block_store.get(&current_root_cid).await { 100 Ok(Some(b)) => b, 101 Ok(None) => { 102 error!("Commit block not found: {}", current_root_cid); 103 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 104 }, 105 Err(e) => { 106 error!("Failed to load commit block: {:?}", e); 107 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 108 } 109 }; 110 111 let commit = match Commit::from_cbor(&commit_bytes) { 112 Ok(c) => c, 113 Err(e) => { 114 error!("Failed to parse commit: {:?}", e); 115 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 116 } 117 }; 118 119 let mst_root = commit.data; 120 let store = Arc::new(state.block_store.clone()); 121 let mst = Mst::load(store.clone(), mst_root, None); 122 123 let collection_nsid = match input.collection.parse::<Nsid>() { 124 Ok(n) => n, 125 Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 126 }; 127 128 let rkey = input.rkey.unwrap_or_else(|| { 129 Utc::now().format("%Y%m%d%H%M%S%f").to_string() 130 }); 131 132 let mut record_bytes = Vec::new(); 133 if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 134 error!("Error serializing record: {:?}", e); 135 return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 136 } 137 138 let record_cid = match state.block_store.put(&record_bytes).await { 139 Ok(c) => c, 140 Err(e) => { 141 error!("Failed to save record block: {:?}", e); 142 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 143 } 144 }; 145 146 let key = format!("{}/{}", collection_nsid, rkey); 147 if let Err(e) = mst.update(&key, record_cid).await { 148 error!("Failed to update MST: {:?}", e); 149 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 150 } 151 152 let new_mst_root = match mst.root().await { 153 Ok(c) => c, 154 Err(e) => { 155 error!("Failed to get new MST root: {:?}", e); 156 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 157 } 158 }; 159 160 let did_obj = match Did::new(&did) { 161 Ok(d) => d, 162 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(), 163 }; 164 165 let rev = Tid::now(LimitedU32::MIN); 166 167 let new_commit = Commit::new_unsigned( 168 did_obj, 169 new_mst_root, 170 rev, 171 Some(current_root_cid) 172 ); 173 174 let new_commit_bytes = match new_commit.to_cbor() { 175 Ok(b) => b, 176 Err(e) => { 177 error!("Failed to serialize new commit: {:?}", e); 178 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 179 } 180 }; 181 182 let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 183 Ok(c) => c, 184 Err(e) => { 185 error!("Failed to save new commit: {:?}", e); 186 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 187 } 188 }; 189 190 let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2") 191 .bind(new_root_cid.to_string()) 192 .bind(user_id) 193 .execute(&state.db) 194 .await; 195 196 if let Err(e) = update_repo { 197 error!("Failed to update repo root in DB: {:?}", e); 198 return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(); 199 } 200 201 let record_insert = sqlx::query( 202 "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 203 ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()" 204 ) 205 .bind(user_id) 206 .bind(&input.collection) 207 .bind(&rkey) 208 .bind(record_cid.to_string()) 209 .execute(&state.db) 210 .await; 211 212 if let Err(e) = record_insert { 213 error!("Error inserting record index: {:?}", e); 214 } 215 216 let output = CreateRecordOutput { 217 uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 218 cid: record_cid.to_string(), 219 }; 220 (StatusCode::OK, Json(output)).into_response() 221}