···11+pub mod blob;
22+pub mod meta;
33+pub mod record;
44+55+pub use blob::upload_blob;
66+pub use meta::describe_repo;
77+pub use record::{create_record, delete_record, get_record, list_records, put_record};
+236
src/api/repo/record/delete.rs
···11+use crate::state::AppState;
22+use axum::{
33+ Json,
44+ extract::State,
55+ http::StatusCode,
66+ response::{IntoResponse, Response},
77+};
88+use cid::Cid;
99+use jacquard::types::{
1010+ did::Did,
1111+ integer::LimitedU32,
1212+ string::{Nsid, Tid},
1313+};
1414+use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
1515+use serde::Deserialize;
1616+use serde_json::json;
1717+use sqlx::Row;
1818+use std::str::FromStr;
1919+use std::sync::Arc;
2020+use tracing::error;
2121+2222+#[derive(Deserialize)]
2323+pub struct DeleteRecordInput {
2424+ pub repo: String,
2525+ pub collection: String,
2626+ pub rkey: String,
2727+ #[serde(rename = "swapRecord")]
2828+ pub swap_record: Option<String>,
2929+ #[serde(rename = "swapCommit")]
3030+ pub swap_commit: Option<String>,
3131+}
3232+3333+pub async fn delete_record(
3434+ State(state): State<AppState>,
3535+ headers: axum::http::HeaderMap,
3636+ Json(input): Json<DeleteRecordInput>,
3737+) -> Response {
3838+ let auth_header = headers.get("Authorization");
3939+ if auth_header.is_none() {
4040+ return (
4141+ StatusCode::UNAUTHORIZED,
4242+ Json(json!({"error": "AuthenticationRequired"})),
4343+ )
4444+ .into_response();
4545+ }
4646+ let token = auth_header
4747+ .unwrap()
4848+ .to_str()
4949+ .unwrap_or("")
5050+ .replace("Bearer ", "");
5151+5252+ let session = sqlx::query(
5353+ "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
5454+ )
5555+ .bind(&token)
5656+ .fetch_optional(&state.db)
5757+ .await
5858+ .unwrap_or(None);
5959+6060+ let (did, key_bytes) = match session {
6161+ Some(row) => (
6262+ row.get::<String, _>("did"),
6363+ row.get::<Vec<u8>, _>("key_bytes"),
6464+ ),
6565+ None => {
6666+ return (
6767+ StatusCode::UNAUTHORIZED,
6868+ Json(json!({"error": "AuthenticationFailed"})),
6969+ )
7070+ .into_response();
7171+ }
7272+ };
7373+7474+ if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
7575+ return (
7676+ StatusCode::UNAUTHORIZED,
7777+ Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
7878+ )
7979+ .into_response();
8080+ }
8181+8282+ if input.repo != did {
8383+ return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
8484+ }
8585+8686+ let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
8787+ .bind(&did)
8888+ .fetch_optional(&state.db)
8989+ .await;
9090+9191+ let user_id: uuid::Uuid = match user_query {
9292+ Ok(Some(row)) => row.get("id"),
9393+ _ => {
9494+ return (
9595+ StatusCode::INTERNAL_SERVER_ERROR,
9696+ Json(json!({"error": "InternalError", "message": "User not found"})),
9797+ )
9898+ .into_response();
9999+ }
100100+ };
101101+102102+ let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
103103+ .bind(user_id)
104104+ .fetch_optional(&state.db)
105105+ .await;
106106+107107+ let current_root_cid = match repo_root_query {
108108+ Ok(Some(row)) => {
109109+ let cid_str: String = row.get("repo_root_cid");
110110+ Cid::from_str(&cid_str).ok()
111111+ }
112112+ _ => None,
113113+ };
114114+115115+ if current_root_cid.is_none() {
116116+ return (
117117+ StatusCode::INTERNAL_SERVER_ERROR,
118118+ Json(json!({"error": "InternalError", "message": "Repo root not found"})),
119119+ )
120120+ .into_response();
121121+ }
122122+ let current_root_cid = current_root_cid.unwrap();
123123+124124+ let commit_bytes = match state.block_store.get(¤t_root_cid).await {
125125+ Ok(Some(b)) => b,
126126+ Ok(None) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
127127+ Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to load commit block: {:?}", e)}))).into_response(),
128128+ };
129129+130130+ let commit = match Commit::from_cbor(&commit_bytes) {
131131+ Ok(c) => c,
132132+ Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to parse commit: {:?}", e)}))).into_response(),
133133+ };
134134+135135+ let mst_root = commit.data;
136136+ let store = Arc::new(state.block_store.clone());
137137+ let mst = Mst::load(store.clone(), mst_root, None);
138138+139139+ let collection_nsid = match input.collection.parse::<Nsid>() {
140140+ Ok(n) => n,
141141+ Err(_) => {
142142+ return (
143143+ StatusCode::BAD_REQUEST,
144144+ Json(json!({"error": "InvalidCollection"})),
145145+ )
146146+ .into_response();
147147+ }
148148+ };
149149+150150+ let key = format!("{}/{}", collection_nsid, input.rkey);
151151+152152+ // TODO: Check swapRecord if provided? Skipping for brevity/robustness
153153+154154+ if let Err(e) = mst.delete(&key).await {
155155+ error!("Failed to delete from MST: {:?}", e);
156156+ return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to delete from MST: {:?}", e)}))).into_response();
157157+ }
158158+159159+ let new_mst_root = match mst.root().await {
160160+ Ok(c) => c,
161161+ Err(_e) => {
162162+ return (
163163+ StatusCode::INTERNAL_SERVER_ERROR,
164164+ Json(json!({"error": "InternalError", "message": "Failed to get new MST root"})),
165165+ )
166166+ .into_response();
167167+ }
168168+ };
169169+170170+ let did_obj = match Did::new(&did) {
171171+ Ok(d) => d,
172172+ Err(_) => {
173173+ return (
174174+ StatusCode::INTERNAL_SERVER_ERROR,
175175+ Json(json!({"error": "InternalError", "message": "Invalid DID"})),
176176+ )
177177+ .into_response();
178178+ }
179179+ };
180180+181181+ let rev = Tid::now(LimitedU32::MIN);
182182+183183+ let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid));
184184+185185+ let new_commit_bytes =
186186+ match new_commit.to_cbor() {
187187+ Ok(b) => b,
188188+ Err(_e) => return (
189189+ StatusCode::INTERNAL_SERVER_ERROR,
190190+ Json(
191191+ json!({"error": "InternalError", "message": "Failed to serialize new commit"}),
192192+ ),
193193+ )
194194+ .into_response(),
195195+ };
196196+197197+ let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
198198+ Ok(c) => c,
199199+ Err(_e) => {
200200+ return (
201201+ StatusCode::INTERNAL_SERVER_ERROR,
202202+ Json(json!({"error": "InternalError", "message": "Failed to save new commit"})),
203203+ )
204204+ .into_response();
205205+ }
206206+ };
207207+208208+ let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
209209+ .bind(new_root_cid.to_string())
210210+ .bind(user_id)
211211+ .execute(&state.db)
212212+ .await;
213213+214214+ if let Err(e) = update_repo {
215215+ error!("Failed to update repo root in DB: {:?}", e);
216216+ return (
217217+ StatusCode::INTERNAL_SERVER_ERROR,
218218+ Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"})),
219219+ )
220220+ .into_response();
221221+ }
222222+223223+ let record_delete =
224224+ sqlx::query("DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3")
225225+ .bind(user_id)
226226+ .bind(&input.collection)
227227+ .bind(&input.rkey)
228228+ .execute(&state.db)
229229+ .await;
230230+231231+ if let Err(e) = record_delete {
232232+ error!("Error deleting record index: {:?}", e);
233233+ }
234234+235235+ (StatusCode::OK, Json(json!({}))).into_response()
236236+}
+10
src/api/repo/record/mod.rs
···11+pub mod delete;
22+pub mod read;
33+pub mod write;
44+55+pub use delete::{DeleteRecordInput, delete_record};
66+pub use read::{GetRecordInput, ListRecordsInput, ListRecordsOutput, get_record, list_records};
77+pub use write::{
88+ CreateRecordInput, CreateRecordOutput, PutRecordInput, PutRecordOutput, create_record,
99+ put_record,
1010+};
+236
src/api/repo/record/read.rs
···11+use crate::state::AppState;
22+use axum::{
33+ Json,
44+ extract::{Query, State},
55+ http::StatusCode,
66+ response::{IntoResponse, Response},
77+};
88+use cid::Cid;
99+use jacquard_repo::storage::BlockStore;
1010+use serde::{Deserialize, Serialize};
1111+use serde_json::json;
1212+use sqlx::Row;
1313+use std::str::FromStr;
1414+use tracing::error;
1515+1616+#[derive(Deserialize)]
1717+pub struct GetRecordInput {
1818+ pub repo: String,
1919+ pub collection: String,
2020+ pub rkey: String,
2121+ pub cid: Option<String>,
2222+}
2323+2424+pub async fn get_record(
2525+ State(state): State<AppState>,
2626+ Query(input): Query<GetRecordInput>,
2727+) -> Response {
2828+ let user_row = if input.repo.starts_with("did:") {
2929+ sqlx::query("SELECT id FROM users WHERE did = $1")
3030+ .bind(&input.repo)
3131+ .fetch_optional(&state.db)
3232+ .await
3333+ } else {
3434+ sqlx::query("SELECT id FROM users WHERE handle = $1")
3535+ .bind(&input.repo)
3636+ .fetch_optional(&state.db)
3737+ .await
3838+ };
3939+4040+ let user_id: uuid::Uuid = match user_row {
4141+ Ok(Some(row)) => row.get("id"),
4242+ _ => {
4343+ return (
4444+ StatusCode::NOT_FOUND,
4545+ Json(json!({"error": "NotFound", "message": "Repo not found"})),
4646+ )
4747+ .into_response();
4848+ }
4949+ };
5050+5151+ let record_row = sqlx::query(
5252+ "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
5353+ )
5454+ .bind(user_id)
5555+ .bind(&input.collection)
5656+ .bind(&input.rkey)
5757+ .fetch_optional(&state.db)
5858+ .await;
5959+6060+ let record_cid_str: String = match record_row {
6161+ Ok(Some(row)) => row.get("record_cid"),
6262+ _ => {
6363+ return (
6464+ StatusCode::NOT_FOUND,
6565+ Json(json!({"error": "NotFound", "message": "Record not found"})),
6666+ )
6767+ .into_response();
6868+ }
6969+ };
7070+7171+ if let Some(expected_cid) = &input.cid {
7272+ if &record_cid_str != expected_cid {
7373+ return (
7474+ StatusCode::NOT_FOUND,
7575+ Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
7676+ )
7777+ .into_response();
7878+ }
7979+ }
8080+8181+ let cid = match Cid::from_str(&record_cid_str) {
8282+ Ok(c) => c,
8383+ Err(_) => {
8484+ return (
8585+ StatusCode::INTERNAL_SERVER_ERROR,
8686+ Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
8787+ )
8888+ .into_response();
8989+ }
9090+ };
9191+9292+ let block = match state.block_store.get(&cid).await {
9393+ Ok(Some(b)) => b,
9494+ _ => {
9595+ return (
9696+ StatusCode::INTERNAL_SERVER_ERROR,
9797+ Json(json!({"error": "InternalError", "message": "Record block not found"})),
9898+ )
9999+ .into_response();
100100+ }
101101+ };
102102+103103+ let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
104104+ Ok(v) => v,
105105+ Err(e) => {
106106+ error!("Failed to deserialize record: {:?}", e);
107107+ return (
108108+ StatusCode::INTERNAL_SERVER_ERROR,
109109+ Json(json!({"error": "InternalError"})),
110110+ )
111111+ .into_response();
112112+ }
113113+ };
114114+115115+ Json(json!({
116116+ "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
117117+ "cid": record_cid_str,
118118+ "value": value
119119+ }))
120120+ .into_response()
121121+}
122122+123123+#[derive(Deserialize)]
124124+pub struct ListRecordsInput {
125125+ pub repo: String,
126126+ pub collection: String,
127127+ pub limit: Option<i32>,
128128+ pub cursor: Option<String>,
129129+ #[serde(rename = "rkeyStart")]
130130+ pub rkey_start: Option<String>,
131131+ #[serde(rename = "rkeyEnd")]
132132+ pub rkey_end: Option<String>,
133133+ pub reverse: Option<bool>,
134134+}
135135+136136+#[derive(Serialize)]
137137+pub struct ListRecordsOutput {
138138+ pub cursor: Option<String>,
139139+ pub records: Vec<serde_json::Value>,
140140+}
141141+142142+pub async fn list_records(
143143+ State(state): State<AppState>,
144144+ Query(input): Query<ListRecordsInput>,
145145+) -> Response {
146146+ let user_row = if input.repo.starts_with("did:") {
147147+ sqlx::query("SELECT id FROM users WHERE did = $1")
148148+ .bind(&input.repo)
149149+ .fetch_optional(&state.db)
150150+ .await
151151+ } else {
152152+ sqlx::query("SELECT id FROM users WHERE handle = $1")
153153+ .bind(&input.repo)
154154+ .fetch_optional(&state.db)
155155+ .await
156156+ };
157157+158158+ let user_id: uuid::Uuid = match user_row {
159159+ Ok(Some(row)) => row.get("id"),
160160+ _ => {
161161+ return (
162162+ StatusCode::NOT_FOUND,
163163+ Json(json!({"error": "NotFound", "message": "Repo not found"})),
164164+ )
165165+ .into_response();
166166+ }
167167+ };
168168+169169+ let limit = input.limit.unwrap_or(50).clamp(1, 100);
170170+ let reverse = input.reverse.unwrap_or(false);
171171+172172+ // Simplistic query construction - no sophisticated cursor handling or rkey ranges for now, just basic pagination
173173+ // TODO: Implement rkeyStart/End and correct cursor logic
174174+175175+ let query_str = format!(
176176+ "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 {} ORDER BY rkey {} LIMIT {}",
177177+ if let Some(_c) = &input.cursor {
178178+ if reverse {
179179+ "AND rkey < $3"
180180+ } else {
181181+ "AND rkey > $3"
182182+ }
183183+ } else {
184184+ ""
185185+ },
186186+ if reverse { "DESC" } else { "ASC" },
187187+ limit
188188+ );
189189+190190+ let mut query = sqlx::query(&query_str)
191191+ .bind(user_id)
192192+ .bind(&input.collection);
193193+194194+ if let Some(c) = &input.cursor {
195195+ query = query.bind(c);
196196+ }
197197+198198+ let rows = match query.fetch_all(&state.db).await {
199199+ Ok(r) => r,
200200+ Err(e) => {
201201+ error!("Error listing records: {:?}", e);
202202+ return (
203203+ StatusCode::INTERNAL_SERVER_ERROR,
204204+ Json(json!({"error": "InternalError"})),
205205+ )
206206+ .into_response();
207207+ }
208208+ };
209209+210210+ let mut records = Vec::new();
211211+ let mut last_rkey = None;
212212+213213+ for row in rows {
214214+ let rkey: String = row.get("rkey");
215215+ let cid_str: String = row.get("record_cid");
216216+ last_rkey = Some(rkey.clone());
217217+218218+ if let Ok(cid) = Cid::from_str(&cid_str) {
219219+ if let Ok(Some(block)) = state.block_store.get(&cid).await {
220220+ if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
221221+ records.push(json!({
222222+ "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
223223+ "cid": cid_str,
224224+ "value": value
225225+ }));
226226+ }
227227+ }
228228+ }
229229+ }
230230+231231+ Json(ListRecordsOutput {
232232+ cursor: last_rkey,
233233+ records,
234234+ })
235235+ .into_response()
236236+}
+591
src/api/repo/record/write.rs
···11+use crate::state::AppState;
22+use axum::{
33+ Json,
44+ extract::State,
55+ http::StatusCode,
66+ response::{IntoResponse, Response},
77+};
88+use chrono::Utc;
99+use cid::Cid;
1010+use jacquard::types::{
1111+ did::Did,
1212+ integer::LimitedU32,
1313+ string::{Nsid, Tid},
1414+};
1515+use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
1616+use serde::{Deserialize, Serialize};
1717+use serde_json::json;
1818+use sqlx::Row;
1919+use std::str::FromStr;
2020+use std::sync::Arc;
2121+use tracing::error;
2222+2323+#[derive(Deserialize)]
2424+#[allow(dead_code)]
2525+pub struct CreateRecordInput {
2626+ pub repo: String,
2727+ pub collection: String,
2828+ pub rkey: Option<String>,
2929+ pub validate: Option<bool>,
3030+ pub record: serde_json::Value,
3131+ #[serde(rename = "swapCommit")]
3232+ pub swap_commit: Option<String>,
3333+}
3434+3535+#[derive(Serialize)]
3636+#[serde(rename_all = "camelCase")]
3737+pub struct CreateRecordOutput {
3838+ pub uri: String,
3939+ pub cid: String,
4040+}
4141+4242+pub async fn create_record(
4343+ State(state): State<AppState>,
4444+ headers: axum::http::HeaderMap,
4545+ Json(input): Json<CreateRecordInput>,
4646+) -> Response {
4747+ let auth_header = headers.get("Authorization");
4848+ if auth_header.is_none() {
4949+ return (
5050+ StatusCode::UNAUTHORIZED,
5151+ Json(json!({"error": "AuthenticationRequired"})),
5252+ )
5353+ .into_response();
5454+ }
5555+ let token = auth_header
5656+ .unwrap()
5757+ .to_str()
5858+ .unwrap_or("")
5959+ .replace("Bearer ", "");
6060+6161+ let session = sqlx::query(
6262+ "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
6363+ )
6464+ .bind(&token)
6565+ .fetch_optional(&state.db)
6666+ .await
6767+ .unwrap_or(None);
6868+6969+ let (did, key_bytes) = match session {
7070+ Some(row) => (
7171+ row.get::<String, _>("did"),
7272+ row.get::<Vec<u8>, _>("key_bytes"),
7373+ ),
7474+ None => {
7575+ return (
7676+ StatusCode::UNAUTHORIZED,
7777+ Json(json!({"error": "AuthenticationFailed"})),
7878+ )
7979+ .into_response();
8080+ }
8181+ };
8282+8383+ if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
8484+ return (
8585+ StatusCode::UNAUTHORIZED,
8686+ Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
8787+ )
8888+ .into_response();
8989+ }
9090+9191+ if input.repo != did {
9292+ return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
9393+ }
9494+9595+ let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
9696+ .bind(&did)
9797+ .fetch_optional(&state.db)
9898+ .await;
9999+100100+ let user_id: uuid::Uuid = match user_query {
101101+ Ok(Some(row)) => row.get("id"),
102102+ _ => {
103103+ return (
104104+ StatusCode::INTERNAL_SERVER_ERROR,
105105+ Json(json!({"error": "InternalError", "message": "User not found"})),
106106+ )
107107+ .into_response();
108108+ }
109109+ };
110110+111111+ let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
112112+ .bind(user_id)
113113+ .fetch_optional(&state.db)
114114+ .await;
115115+116116+ let current_root_cid = match repo_root_query {
117117+ Ok(Some(row)) => {
118118+ let cid_str: String = row.get("repo_root_cid");
119119+ Cid::from_str(&cid_str).ok()
120120+ }
121121+ _ => None,
122122+ };
123123+124124+ if current_root_cid.is_none() {
125125+ error!("Repo root not found for user {}", did);
126126+ return (
127127+ StatusCode::INTERNAL_SERVER_ERROR,
128128+ Json(json!({"error": "InternalError", "message": "Repo root not found"})),
129129+ )
130130+ .into_response();
131131+ }
132132+ let current_root_cid = current_root_cid.unwrap();
133133+134134+ let commit_bytes = match state.block_store.get(¤t_root_cid).await {
135135+ Ok(Some(b)) => b,
136136+ Ok(None) => {
137137+ error!("Commit block not found: {}", current_root_cid);
138138+ return (
139139+ StatusCode::INTERNAL_SERVER_ERROR,
140140+ Json(json!({"error": "InternalError"})),
141141+ )
142142+ .into_response();
143143+ }
144144+ Err(e) => {
145145+ error!("Failed to load commit block: {:?}", e);
146146+ return (
147147+ StatusCode::INTERNAL_SERVER_ERROR,
148148+ Json(json!({"error": "InternalError"})),
149149+ )
150150+ .into_response();
151151+ }
152152+ };
153153+154154+ let commit = match Commit::from_cbor(&commit_bytes) {
155155+ Ok(c) => c,
156156+ Err(e) => {
157157+ error!("Failed to parse commit: {:?}", e);
158158+ return (
159159+ StatusCode::INTERNAL_SERVER_ERROR,
160160+ Json(json!({"error": "InternalError"})),
161161+ )
162162+ .into_response();
163163+ }
164164+ };
165165+166166+ let mst_root = commit.data;
167167+ let store = Arc::new(state.block_store.clone());
168168+ let mst = Mst::load(store.clone(), mst_root, None);
169169+170170+ let collection_nsid = match input.collection.parse::<Nsid>() {
171171+ Ok(n) => n,
172172+ Err(_) => {
173173+ return (
174174+ StatusCode::BAD_REQUEST,
175175+ Json(json!({"error": "InvalidCollection"})),
176176+ )
177177+ .into_response();
178178+ }
179179+ };
180180+181181+ let rkey = input
182182+ .rkey
183183+ .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string());
184184+185185+ let mut record_bytes = Vec::new();
186186+ if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
187187+ error!("Error serializing record: {:?}", e);
188188+ return (
189189+ StatusCode::BAD_REQUEST,
190190+ Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
191191+ )
192192+ .into_response();
193193+ }
194194+195195+ let record_cid = match state.block_store.put(&record_bytes).await {
196196+ Ok(c) => c,
197197+ Err(e) => {
198198+ error!("Failed to save record block: {:?}", e);
199199+ return (
200200+ StatusCode::INTERNAL_SERVER_ERROR,
201201+ Json(json!({"error": "InternalError"})),
202202+ )
203203+ .into_response();
204204+ }
205205+ };
206206+207207+ let key = format!("{}/{}", collection_nsid, rkey);
208208+ if let Err(e) = mst.update(&key, record_cid).await {
209209+ error!("Failed to update MST: {:?}", e);
210210+ return (
211211+ StatusCode::INTERNAL_SERVER_ERROR,
212212+ Json(json!({"error": "InternalError"})),
213213+ )
214214+ .into_response();
215215+ }
216216+217217+ let new_mst_root = match mst.root().await {
218218+ Ok(c) => c,
219219+ Err(e) => {
220220+ error!("Failed to get new MST root: {:?}", e);
221221+ return (
222222+ StatusCode::INTERNAL_SERVER_ERROR,
223223+ Json(json!({"error": "InternalError"})),
224224+ )
225225+ .into_response();
226226+ }
227227+ };
228228+229229+ let did_obj = match Did::new(&did) {
230230+ Ok(d) => d,
231231+ Err(_) => {
232232+ return (
233233+ StatusCode::INTERNAL_SERVER_ERROR,
234234+ Json(json!({"error": "InternalError", "message": "Invalid DID"})),
235235+ )
236236+ .into_response();
237237+ }
238238+ };
239239+240240+ let rev = Tid::now(LimitedU32::MIN);
241241+242242+ let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid));
243243+244244+ let new_commit_bytes = match new_commit.to_cbor() {
245245+ Ok(b) => b,
246246+ Err(e) => {
247247+ error!("Failed to serialize new commit: {:?}", e);
248248+ return (
249249+ StatusCode::INTERNAL_SERVER_ERROR,
250250+ Json(json!({"error": "InternalError"})),
251251+ )
252252+ .into_response();
253253+ }
254254+ };
255255+256256+ let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
257257+ Ok(c) => c,
258258+ Err(e) => {
259259+ error!("Failed to save new commit: {:?}", e);
260260+ return (
261261+ StatusCode::INTERNAL_SERVER_ERROR,
262262+ Json(json!({"error": "InternalError"})),
263263+ )
264264+ .into_response();
265265+ }
266266+ };
267267+268268+ let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
269269+ .bind(new_root_cid.to_string())
270270+ .bind(user_id)
271271+ .execute(&state.db)
272272+ .await;
273273+274274+ if let Err(e) = update_repo {
275275+ error!("Failed to update repo root in DB: {:?}", e);
276276+ return (
277277+ StatusCode::INTERNAL_SERVER_ERROR,
278278+ Json(json!({"error": "InternalError"})),
279279+ )
280280+ .into_response();
281281+ }
282282+283283+ let record_insert = sqlx::query(
284284+ "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
285285+ ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
286286+ )
287287+ .bind(user_id)
288288+ .bind(&input.collection)
289289+ .bind(&rkey)
290290+ .bind(record_cid.to_string())
291291+ .execute(&state.db)
292292+ .await;
293293+294294+ if let Err(e) = record_insert {
295295+ error!("Error inserting record index: {:?}", e);
296296+ return (
297297+ StatusCode::INTERNAL_SERVER_ERROR,
298298+ Json(json!({"error": "InternalError", "message": "Failed to index record"})),
299299+ )
300300+ .into_response();
301301+ }
302302+303303+ let output = CreateRecordOutput {
304304+ uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
305305+ cid: record_cid.to_string(),
306306+ };
307307+ (StatusCode::OK, Json(output)).into_response()
308308+}
309309+310310+#[derive(Deserialize)]
311311+#[allow(dead_code)]
312312+pub struct PutRecordInput {
313313+ pub repo: String,
314314+ pub collection: String,
315315+ pub rkey: String,
316316+ pub validate: Option<bool>,
317317+ pub record: serde_json::Value,
318318+ #[serde(rename = "swapCommit")]
319319+ pub swap_commit: Option<String>,
320320+}
321321+322322+#[derive(Serialize)]
323323+#[serde(rename_all = "camelCase")]
324324+pub struct PutRecordOutput {
325325+ pub uri: String,
326326+ pub cid: String,
327327+}
328328+329329+pub async fn put_record(
330330+ State(state): State<AppState>,
331331+ headers: axum::http::HeaderMap,
332332+ Json(input): Json<PutRecordInput>,
333333+) -> Response {
334334+ let auth_header = headers.get("Authorization");
335335+ if auth_header.is_none() {
336336+ return (
337337+ StatusCode::UNAUTHORIZED,
338338+ Json(json!({"error": "AuthenticationRequired"})),
339339+ )
340340+ .into_response();
341341+ }
342342+ let token = auth_header
343343+ .unwrap()
344344+ .to_str()
345345+ .unwrap_or("")
346346+ .replace("Bearer ", "");
347347+348348+ let session = sqlx::query(
349349+ "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
350350+ )
351351+ .bind(&token)
352352+ .fetch_optional(&state.db)
353353+ .await
354354+ .unwrap_or(None);
355355+356356+ let (did, key_bytes) = match session {
357357+ Some(row) => (
358358+ row.get::<String, _>("did"),
359359+ row.get::<Vec<u8>, _>("key_bytes"),
360360+ ),
361361+ None => {
362362+ return (
363363+ StatusCode::UNAUTHORIZED,
364364+ Json(json!({"error": "AuthenticationFailed"})),
365365+ )
366366+ .into_response();
367367+ }
368368+ };
369369+370370+ if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
371371+ return (
372372+ StatusCode::UNAUTHORIZED,
373373+ Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
374374+ )
375375+ .into_response();
376376+ }
377377+378378+ if input.repo != did {
379379+ return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
380380+ }
381381+382382+ let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
383383+ .bind(&did)
384384+ .fetch_optional(&state.db)
385385+ .await;
386386+387387+ let user_id: uuid::Uuid = match user_query {
388388+ Ok(Some(row)) => row.get("id"),
389389+ _ => {
390390+ return (
391391+ StatusCode::INTERNAL_SERVER_ERROR,
392392+ Json(json!({"error": "InternalError", "message": "User not found"})),
393393+ )
394394+ .into_response();
395395+ }
396396+ };
397397+398398+ let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
399399+ .bind(user_id)
400400+ .fetch_optional(&state.db)
401401+ .await;
402402+403403+ let current_root_cid = match repo_root_query {
404404+ Ok(Some(row)) => {
405405+ let cid_str: String = row.get("repo_root_cid");
406406+ Cid::from_str(&cid_str).ok()
407407+ }
408408+ _ => None,
409409+ };
410410+411411+ if current_root_cid.is_none() {
412412+ error!("Repo root not found for user {}", did);
413413+ return (
414414+ StatusCode::INTERNAL_SERVER_ERROR,
415415+ Json(json!({"error": "InternalError", "message": "Repo root not found"})),
416416+ )
417417+ .into_response();
418418+ }
419419+ let current_root_cid = current_root_cid.unwrap();
420420+421421+ let commit_bytes = match state.block_store.get(¤t_root_cid).await {
422422+ Ok(Some(b)) => b,
423423+ Ok(None) => {
424424+ error!("Commit block not found: {}", current_root_cid);
425425+ return (
426426+ StatusCode::INTERNAL_SERVER_ERROR,
427427+ Json(json!({"error": "InternalError", "message": "Commit block not found"})),
428428+ )
429429+ .into_response();
430430+ }
431431+ Err(e) => {
432432+ error!("Failed to load commit block: {:?}", e);
433433+ return (
434434+ StatusCode::INTERNAL_SERVER_ERROR,
435435+ Json(json!({"error": "InternalError", "message": "Failed to load commit block"})),
436436+ )
437437+ .into_response();
438438+ }
439439+ };
440440+441441+ let commit = match Commit::from_cbor(&commit_bytes) {
442442+ Ok(c) => c,
443443+ Err(e) => {
444444+ error!("Failed to parse commit: {:?}", e);
445445+ return (
446446+ StatusCode::INTERNAL_SERVER_ERROR,
447447+ Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
448448+ )
449449+ .into_response();
450450+ }
451451+ };
452452+453453+ let mst_root = commit.data;
454454+ let store = Arc::new(state.block_store.clone());
455455+ let mst = Mst::load(store.clone(), mst_root, None);
456456+457457+ let collection_nsid = match input.collection.parse::<Nsid>() {
458458+ Ok(n) => n,
459459+ Err(_) => {
460460+ return (
461461+ StatusCode::BAD_REQUEST,
462462+ Json(json!({"error": "InvalidCollection"})),
463463+ )
464464+ .into_response();
465465+ }
466466+ };
467467+468468+ let rkey = input.rkey.clone();
469469+470470+ let mut record_bytes = Vec::new();
471471+ if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
472472+ error!("Error serializing record: {:?}", e);
473473+ return (
474474+ StatusCode::BAD_REQUEST,
475475+ Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})),
476476+ )
477477+ .into_response();
478478+ }
479479+480480+ let record_cid = match state.block_store.put(&record_bytes).await {
481481+ Ok(c) => c,
482482+ Err(e) => {
483483+ error!("Failed to save record block: {:?}", e);
484484+ return (
485485+ StatusCode::INTERNAL_SERVER_ERROR,
486486+ Json(json!({"error": "InternalError", "message": "Failed to save record block"})),
487487+ )
488488+ .into_response();
489489+ }
490490+ };
491491+492492+ let key = format!("{}/{}", collection_nsid, rkey);
493493+ if let Err(e) = mst.update(&key, record_cid).await {
494494+ error!("Failed to update MST: {:?}", e);
495495+ return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response();
496496+ }
497497+498498+ let new_mst_root = match mst.root().await {
499499+ Ok(c) => c,
500500+ Err(e) => {
501501+ error!("Failed to get new MST root: {:?}", e);
502502+ return (
503503+ StatusCode::INTERNAL_SERVER_ERROR,
504504+ Json(json!({"error": "InternalError", "message": "Failed to get new MST root"})),
505505+ )
506506+ .into_response();
507507+ }
508508+ };
509509+510510+ let did_obj = match Did::new(&did) {
511511+ Ok(d) => d,
512512+ Err(_) => {
513513+ return (
514514+ StatusCode::INTERNAL_SERVER_ERROR,
515515+ Json(json!({"error": "InternalError", "message": "Invalid DID"})),
516516+ )
517517+ .into_response();
518518+ }
519519+ };
520520+521521+ let rev = Tid::now(LimitedU32::MIN);
522522+523523+ let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid));
524524+525525+ let new_commit_bytes = match new_commit.to_cbor() {
526526+ Ok(b) => b,
527527+ Err(e) => {
528528+ error!("Failed to serialize new commit: {:?}", e);
529529+ return (
530530+ StatusCode::INTERNAL_SERVER_ERROR,
531531+ Json(
532532+ json!({"error": "InternalError", "message": "Failed to serialize new commit"}),
533533+ ),
534534+ )
535535+ .into_response();
536536+ }
537537+ };
538538+539539+ let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
540540+ Ok(c) => c,
541541+ Err(e) => {
542542+ error!("Failed to save new commit: {:?}", e);
543543+ return (
544544+ StatusCode::INTERNAL_SERVER_ERROR,
545545+ Json(json!({"error": "InternalError", "message": "Failed to save new commit"})),
546546+ )
547547+ .into_response();
548548+ }
549549+ };
550550+551551+ let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
552552+ .bind(new_root_cid.to_string())
553553+ .bind(user_id)
554554+ .execute(&state.db)
555555+ .await;
556556+557557+ if let Err(e) = update_repo {
558558+ error!("Failed to update repo root in DB: {:?}", e);
559559+ return (
560560+ StatusCode::INTERNAL_SERVER_ERROR,
561561+ Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"})),
562562+ )
563563+ .into_response();
564564+ }
565565+566566+ let record_insert = sqlx::query(
567567+ "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
568568+ ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()",
569569+ )
570570+ .bind(user_id)
571571+ .bind(&input.collection)
572572+ .bind(&rkey)
573573+ .bind(record_cid.to_string())
574574+ .execute(&state.db)
575575+ .await;
576576+577577+ if let Err(e) = record_insert {
578578+ error!("Error inserting record index: {:?}", e);
579579+ return (
580580+ StatusCode::INTERNAL_SERVER_ERROR,
581581+ Json(json!({"error": "InternalError", "message": "Failed to index record"})),
582582+ )
583583+ .into_response();
584584+ }
585585+586586+ let output = PutRecordOutput {
587587+ uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
588588+ cid: record_cid.to_string(),
589589+ };
590590+ (StatusCode::OK, Json(output)).into_response()
591591+}
···11+pub mod meta;
22+pub mod session;
33+44+pub use meta::{describe_server, health};
55+pub use session::{create_session, delete_session, get_session, refresh_session};