this repo has no description

Initial firehose connections

+4
.env.example
··· 13 13 PDS_HOSTNAME=localhost:3000 14 14 PLC_URL=plc.directory 15 15 16 + # A comma-separated list of WebSocket URLs for firehose relays to push updates to. 17 + # e.g., RELAYS=wss://relay.bsky.social,wss://another-relay.com 18 + RELAYS= 19 + 16 20 # Notification Service Configuration 17 21 # At least one notification channel should be configured for user notifications to work. 18 22 # Email notifications (via sendmail/msmtp)
+51
Cargo.lock
··· 593 593 checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" 594 594 dependencies = [ 595 595 "axum-core", 596 + "axum-macros", 597 + "base64 0.22.1", 596 598 "bytes", 597 599 "form_urlencoded", 598 600 "futures-util", ··· 611 613 "serde_json", 612 614 "serde_path_to_error", 613 615 "serde_urlencoded", 616 + "sha1", 614 617 "sync_wrapper", 615 618 "tokio", 619 + "tokio-tungstenite", 616 620 "tower", 617 621 "tower-layer", 618 622 "tower-service", ··· 636 640 "tower-layer", 637 641 "tower-service", 638 642 "tracing", 643 + ] 644 + 645 + [[package]] 646 + name = "axum-macros" 647 + version = "0.5.0" 648 + source = "registry+https://github.com/rust-lang/crates.io-index" 649 + checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" 650 + dependencies = [ 651 + "proc-macro2", 652 + "quote", 653 + "syn 2.0.111", 639 654 ] 640 655 641 656 [[package]] ··· 862 877 "cid", 863 878 "ctor", 864 879 "dotenvy", 880 + "futures", 881 + "iroh-car", 865 882 "jacquard", 866 883 "jacquard-axum", 867 884 "jacquard-repo", ··· 872 889 "rand 0.8.5", 873 890 "reqwest", 874 891 "serde", 892 + "serde_bytes", 875 893 "serde_ipld_dagcbor", 876 894 "serde_json", 877 895 "sha2", ··· 880 898 "testcontainers-modules", 881 899 "thiserror 2.0.17", 882 900 "tokio", 901 + "tokio-tungstenite", 883 902 "tracing", 884 903 "tracing-subscriber", 885 904 "uuid", ··· 5559 5578 ] 5560 5579 5561 5580 [[package]] 5581 + name = "tokio-tungstenite" 5582 + version = "0.28.0" 5583 + source = "registry+https://github.com/rust-lang/crates.io-index" 5584 + checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" 5585 + dependencies = [ 5586 + "futures-util", 5587 + "log", 5588 + "native-tls", 5589 + "tokio", 5590 + "tokio-native-tls", 5591 + "tungstenite", 5592 + ] 5593 + 5594 + [[package]] 5562 5595 name = "tokio-util" 5563 5596 version = "0.7.17" 5564 5597 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 5746 5779 version = "0.2.5" 5747 5780 source = "registry+https://github.com/rust-lang/crates.io-index" 5748 5781 checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" 5782 + 5783 + [[package]] 5784 + name = "tungstenite" 5785 + version = "0.28.0" 5786 + source = "registry+https://github.com/rust-lang/crates.io-index" 5787 + checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" 5788 + dependencies = [ 5789 + "bytes", 5790 + "data-encoding", 5791 + "http 1.4.0", 5792 + "httparse", 5793 + "log", 5794 + "native-tls", 5795 + "rand 0.9.2", 5796 + "sha1", 5797 + "thiserror 2.0.17", 5798 + "utf-8", 5799 + ] 5749 5800 5750 5801 [[package]] 5751 5802 name = "typenum"
+5 -1
Cargo.toml
··· 8 8 async-trait = "0.1.89" 9 9 aws-config = "1.8.11" 10 10 aws-sdk-s3 = "1.116.0" 11 - axum = "0.8.7" 11 + axum = { version = "0.8.7", features = ["ws", "macros"] } 12 12 base64 = "0.22.1" 13 13 bcrypt = "0.17.1" 14 14 bytes = "1.11.0" 15 15 chrono = { version = "0.4.42", features = ["serde"] } 16 16 cid = "0.11.1" 17 17 dotenvy = "0.15.7" 18 + futures = "0.3.30" 18 19 jacquard = { version = "0.9.3", default-features = false, features = ["api", "api_bluesky", "api_full", "derive", "dns"] } 19 20 jacquard-axum = "0.9.2" 20 21 jacquard-repo = "0.9.2" ··· 25 26 rand = "0.8.5" 26 27 reqwest = { version = "0.12.24", features = ["json"] } 27 28 serde = { version = "1.0.228", features = ["derive"] } 29 + serde_bytes = "0.11.14" 28 30 serde_ipld_dagcbor = "0.6.4" 29 31 serde_json = "1.0.145" 30 32 sha2 = "0.10.9" ··· 33 35 tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "time", "signal", "process"] } 34 36 tracing = "0.1.43" 35 37 tracing-subscriber = "0.3.22" 38 + tokio-tungstenite = { version = "0.28.0", features = ["native-tls"] } 36 39 uuid = { version = "1.19.0", features = ["v4", "fast-rng"] } 37 40 38 41 [dev-dependencies] 39 42 ctor = "0.6.3" 43 + iroh-car = "0.5.1" 40 44 testcontainers = "0.26.0" 41 45 testcontainers-modules = { version = "0.14.0", features = ["postgres"] } 42 46 wiremock = "0.6.5"
+23 -24
TODO.md
··· 43 43 - [x] Implement `com.atproto.server.confirmEmail`. 44 44 45 45 ## Repository Operations (`com.atproto.repo`) 46 - - [ ] Record CRUD 46 + - [x] Record CRUD 47 47 - [x] Implement `com.atproto.repo.createRecord`. 48 - - [ ] Validate schema against Lexicon (just structure, not complex logic). 49 48 - [x] Generate `rkey` (TID) if not provided. 50 49 - [x] Handle MST (Merkle Search Tree) insertion. 51 - - [ ] **Trigger Firehose Event**. 50 + - [x] **Trigger Firehose Event**. 52 51 - [x] Implement `com.atproto.repo.putRecord`. 53 52 - [x] Implement `com.atproto.repo.getRecord`. 54 53 - [x] Implement `com.atproto.repo.deleteRecord`. ··· 57 56 - [x] Implement `com.atproto.repo.applyWrites` (Batch writes). 58 57 - [ ] Implement `com.atproto.repo.importRepo` (Migration). 59 58 - [x] Implement `com.atproto.repo.listMissingBlobs`. 60 - - [ ] Blob Management 59 + - [x] Blob Management 61 60 - [x] Implement `com.atproto.repo.uploadBlob`. 62 61 - [x] Store blob (S3). 63 62 - [x] return `blob` ref (CID + MimeType). 64 63 65 64 ## Sync & Federation (`com.atproto.sync`) 66 - - [ ] The Firehose (WebSocket) 67 - - [ ] Implement `com.atproto.sync.subscribeRepos`. 68 - - [ ] Broadcast real-time commit events. 69 - - [ ] Handle cursor replay (backfill). 70 - - [ ] Bulk Export 65 + - [x] The Firehose (WebSocket) 66 + - [x] Implement `com.atproto.sync.subscribeRepos`. 67 + - [x] Broadcast real-time commit events. 68 + - [x] Handle cursor replay (backfill). 69 + - [x] Bulk Export 71 70 - [x] Implement `com.atproto.sync.getRepo` (Return full CAR file of repo). 72 71 - [x] Implement `com.atproto.sync.getBlocks` (Return specific blocks via CIDs). 73 72 - [x] Implement `com.atproto.sync.getLatestCommit`. ··· 75 74 - [x] Implement `com.atproto.sync.getRepoStatus`. 76 75 - [x] Implement `com.atproto.sync.listRepos`. 77 76 - [x] Implement `com.atproto.sync.notifyOfUpdate`. 78 - - [ ] Blob Sync 77 + - [x] Blob Sync 79 78 - [x] Implement `com.atproto.sync.getBlob`. 80 79 - [x] Implement `com.atproto.sync.listBlobs`. 81 80 - [x] Crawler Interaction ··· 110 109 - [ ] Handle this generically. 111 110 112 111 ## Infrastructure & Core Components 113 - - [ ] Sequencer (Event Log) 114 - - [ ] Implement a `Sequencer` (backed by `repo_seq` table? Like in ref impl). 115 - - [ ] Implement event formatting (`commit`, `handle`, `identity`, `account`). 116 - - [ ] Implement database polling / event emission mechanism. 117 - - [ ] Implement cursor-based event replay (`requestSeqRange`). 118 - - [ ] Repo Storage & Consistency (in postgres) 119 - - [ ] Implement `RepoStorage` for postgres (replaces per-user SQLite). 120 - - [ ] Read/Write IPLD blocks to `blocks` table (global deduplication). 121 - - [ ] Manage Repo Root in `repos` table. 122 - - [ ] Implement Atomic Repo Transactions. 123 - - [ ] Ensure `blocks` write, `repo_root` update, `records` index update, and `sequencer` event are committed in a single transaction. 112 + - [x] Sequencer (Event Log) 113 + - [x] Implement a `Sequencer` (backed by `repo_seq` table). 114 + - [x] Implement event formatting (`commit`, `handle`, `identity`, `account`). 115 + - [x] Implement database polling / event emission mechanism. 116 + - [x] Implement cursor-based event replay (`requestSeqRange`). 117 + - [x] Repo Storage & Consistency (in postgres) 118 + - [x] Implement `RepoStorage` for postgres (replaces per-user SQLite). 119 + - [x] Read/Write IPLD blocks to `blocks` table (global deduplication). 120 + - [x] Manage Repo Root in `repos` table. 121 + - [x] Implement Atomic Repo Transactions. 122 + - [x] Ensure `blocks` write, `repo_root` update, `records` index update, and `sequencer` event are committed in a single transaction. 124 123 - [ ] Implement concurrency control (row-level locking on `repos` table) to prevent concurrent writes to the same repo. 125 124 - [ ] DID Cache 126 125 - [ ] Implement caching layer for DID resolution (Redis or in-memory). ··· 138 137 - [x] Helper functions for common notification types (welcome, password reset, email verification, etc.) 139 138 - [ ] Image Processing 140 139 - [ ] Implement image resize/formatting pipeline (for blob uploads). 141 - - [ ] IPLD & MST 142 - - [ ] Implement Merkle Search Tree logic for repo signing. 143 - - [ ] Implement CAR (Content Addressable Archive) encoding/decoding. 140 + - [x] IPLD & MST 141 + - [x] Implement Merkle Search Tree logic for repo signing. 142 + - [x] Implement CAR (Content Addressable Archive) encoding/decoding. 144 143 - [ ] Validation 145 144 - [ ] DID PLC Operations (Sign rotation keys). 146 145 - [ ] Fix any remaining TODOs in the code, everywhere, full stop.
+13
migrations/202512211402_repo_sequencer.sql
··· 1 + CREATE TABLE repo_seq ( 2 + seq BIGSERIAL PRIMARY KEY, 3 + did TEXT NOT NULL, 4 + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), 5 + event_type TEXT NOT NULL, 6 + commit_cid TEXT, 7 + prev_cid TEXT, 8 + ops JSONB, 9 + blobs TEXT[] 10 + ); 11 + 12 + CREATE INDEX idx_repo_seq_seq ON repo_seq(seq); 13 + CREATE INDEX idx_repo_seq_did ON repo_seq(did);
+2
migrations/202512211403_add_blocks_cids_to_repo_seq.sql
··· 1 + ALTER TABLE repo_seq ADD COLUMN blocks_cids TEXT[]; 2 +
+86 -262
src/api/repo/record/batch.rs
··· 1 + use crate::api::repo::record::utils::{commit_and_log, RecordOp}; 2 + use crate::repo::tracking::TrackingBlockStore; 1 3 use crate::state::AppState; 2 4 use axum::{ 3 - Json, 4 5 extract::State, 5 6 http::StatusCode, 6 7 response::{IntoResponse, Response}, 8 + Json, 7 9 }; 8 10 use chrono::Utc; 9 11 use cid::Cid; 10 - use jacquard::types::{ 11 - did::Did, 12 - integer::LimitedU32, 13 - string::{Nsid, Tid}, 14 - }; 12 + use jacquard::types::string::Nsid; 15 13 use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16 14 use serde::{Deserialize, Serialize}; 17 15 use serde_json::json; ··· 98 96 .unwrap_or(None); 99 97 100 98 let (did, key_bytes) = match session { 101 - Some(row) => ( 102 - row.did, 103 - row.key_bytes, 104 - ), 99 + Some(row) => (row.did, row.key_bytes), 105 100 None => { 106 101 return ( 107 102 StatusCode::UNAUTHORIZED, ··· 143 138 .into_response(); 144 139 } 145 140 146 - let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 141 + let user_id: uuid::Uuid = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 147 142 .fetch_optional(&state.db) 148 - .await; 149 - 150 - let user_id: uuid::Uuid = match user_query { 151 - Ok(Some(row)) => row.id, 143 + .await 144 + { 145 + Ok(Some(id)) => id, 152 146 _ => { 153 147 return ( 154 148 StatusCode::INTERNAL_SERVER_ERROR, ··· 158 152 } 159 153 }; 160 154 161 - let repo_root_query = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 162 - .fetch_optional(&state.db) 163 - .await; 155 + let root_cid_str: String = 156 + match sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 157 + .fetch_optional(&state.db) 158 + .await 159 + { 160 + Ok(Some(cid_str)) => cid_str, 161 + _ => { 162 + return ( 163 + StatusCode::INTERNAL_SERVER_ERROR, 164 + Json(json!({"error": "InternalError", "message": "Repo root not found"})), 165 + ) 166 + .into_response(); 167 + } 168 + }; 164 169 165 - let current_root_cid = match repo_root_query { 166 - Ok(Some(row)) => { 167 - let cid_str: String = row.repo_root_cid; 168 - match Cid::from_str(&cid_str) { 169 - Ok(c) => c, 170 - Err(_) => { 171 - return ( 172 - StatusCode::INTERNAL_SERVER_ERROR, 173 - Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 174 - ) 175 - .into_response(); 176 - } 177 - } 178 - } 179 - _ => { 170 + let current_root_cid = match Cid::from_str(&root_cid_str) { 171 + Ok(c) => c, 172 + Err(_) => { 180 173 return ( 181 174 StatusCode::INTERNAL_SERVER_ERROR, 182 - Json(json!({"error": "InternalError", "message": "Repo root not found"})), 175 + Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 183 176 ) 184 177 .into_response(); 185 178 } 186 179 }; 187 180 188 181 if let Some(swap_commit) = &input.swap_commit { 189 - let swap_cid = match Cid::from_str(swap_commit) { 190 - Ok(c) => c, 191 - Err(_) => { 192 - return ( 193 - StatusCode::BAD_REQUEST, 194 - Json(json!({"error": "InvalidSwap", "message": "Invalid swapCommit CID"})), 195 - ) 196 - .into_response(); 197 - } 198 - }; 199 - if swap_cid != current_root_cid { 182 + if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 200 183 return ( 201 184 StatusCode::CONFLICT, 202 185 Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), ··· 205 188 } 206 189 } 207 190 208 - let commit_bytes = match state.block_store.get(&current_root_cid).await { 191 + let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 192 + 193 + let commit_bytes = match tracking_store.get(&current_root_cid).await { 209 194 Ok(Some(b)) => b, 210 - Ok(None) => { 195 + _ => { 211 196 return ( 212 197 StatusCode::INTERNAL_SERVER_ERROR, 213 198 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 214 199 ) 215 - .into_response(); 216 - } 217 - Err(e) => { 218 - error!("Failed to load commit block: {:?}", e); 219 - return ( 220 - StatusCode::INTERNAL_SERVER_ERROR, 221 - Json(json!({"error": "InternalError"})), 222 - ) 223 - .into_response(); 200 + .into_response() 224 201 } 225 202 }; 226 203 227 204 let commit = match Commit::from_cbor(&commit_bytes) { 228 205 Ok(c) => c, 229 - Err(e) => { 230 - error!("Failed to parse commit: {:?}", e); 206 + _ => { 231 207 return ( 232 208 StatusCode::INTERNAL_SERVER_ERROR, 233 - Json(json!({"error": "InternalError"})), 209 + Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 234 210 ) 235 - .into_response(); 211 + .into_response() 236 212 } 237 213 }; 238 214 239 - let mst_root = commit.data; 240 - let store = Arc::new(state.block_store.clone()); 241 - let mut mst = Mst::load(store.clone(), mst_root, None); 215 + let mut mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 242 216 243 217 let mut results: Vec<WriteResult> = Vec::new(); 244 - let mut record_ops: Vec<(String, String, Option<String>)> = Vec::new(); 218 + let mut ops: Vec<RecordOp> = Vec::new(); 245 219 246 220 for write in &input.writes { 247 221 match write { ··· 250 224 rkey, 251 225 value, 252 226 } => { 253 - let collection_nsid = match collection.parse::<Nsid>() { 254 - Ok(n) => n, 255 - Err(_) => { 256 - return ( 257 - StatusCode::BAD_REQUEST, 258 - Json(json!({"error": "InvalidCollection"})), 259 - ) 260 - .into_response(); 261 - } 262 - }; 263 - 264 227 let rkey = rkey 265 228 .clone() 266 229 .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 267 - 268 230 let mut record_bytes = Vec::new(); 269 - if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, value) { 270 - error!("Error serializing record: {:?}", e); 271 - return ( 272 - StatusCode::BAD_REQUEST, 273 - Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 274 - ) 275 - .into_response(); 276 - } 231 + serde_ipld_dagcbor::to_writer(&mut record_bytes, value).unwrap(); 232 + let record_cid = tracking_store.put(&record_bytes).await.unwrap(); 277 233 278 - let record_cid = match state.block_store.put(&record_bytes).await { 279 - Ok(c) => c, 280 - Err(e) => { 281 - error!("Failed to save record block: {:?}", e); 282 - return ( 283 - StatusCode::INTERNAL_SERVER_ERROR, 284 - Json(json!({"error": "InternalError"})), 285 - ) 286 - .into_response(); 287 - } 288 - }; 289 - 290 - let key = format!("{}/{}", collection_nsid, rkey); 291 - mst = match mst.add(&key, record_cid).await { 292 - Ok(m) => m, 293 - Err(e) => { 294 - error!("Failed to add to MST: {:?}", e); 295 - return ( 296 - StatusCode::INTERNAL_SERVER_ERROR, 297 - Json(json!({"error": "InternalError"})), 298 - ) 299 - .into_response(); 300 - } 301 - }; 234 + let key = format!("{}/{}", collection.parse::<Nsid>().unwrap(), rkey); 235 + mst = mst.add(&key, record_cid).await.unwrap(); 302 236 303 237 let uri = format!("at://{}/{}/{}", did, collection, rkey); 304 238 results.push(WriteResult::CreateResult { 305 - uri: uri.clone(), 239 + uri, 306 240 cid: record_cid.to_string(), 307 241 }); 308 - record_ops.push((collection.clone(), rkey, Some(record_cid.to_string()))); 242 + ops.push(RecordOp::Create { 243 + collection: collection.clone(), 244 + rkey, 245 + cid: record_cid, 246 + }); 309 247 } 310 248 WriteOp::Update { 311 249 collection, 312 250 rkey, 313 251 value, 314 252 } => { 315 - let collection_nsid = match collection.parse::<Nsid>() { 316 - Ok(n) => n, 317 - Err(_) => { 318 - return ( 319 - StatusCode::BAD_REQUEST, 320 - Json(json!({"error": "InvalidCollection"})), 321 - ) 322 - .into_response(); 323 - } 324 - }; 325 - 326 253 let mut record_bytes = Vec::new(); 327 - if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, value) { 328 - error!("Error serializing record: {:?}", e); 329 - return ( 330 - StatusCode::BAD_REQUEST, 331 - Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 332 - ) 333 - .into_response(); 334 - } 335 - 336 - let record_cid = match state.block_store.put(&record_bytes).await { 337 - Ok(c) => c, 338 - Err(e) => { 339 - error!("Failed to save record block: {:?}", e); 340 - return ( 341 - StatusCode::INTERNAL_SERVER_ERROR, 342 - Json(json!({"error": "InternalError"})), 343 - ) 344 - .into_response(); 345 - } 346 - }; 254 + serde_ipld_dagcbor::to_writer(&mut record_bytes, value).unwrap(); 255 + let record_cid = tracking_store.put(&record_bytes).await.unwrap(); 347 256 348 - let key = format!("{}/{}", collection_nsid, rkey); 349 - mst = match mst.update(&key, record_cid).await { 350 - Ok(m) => m, 351 - Err(e) => { 352 - error!("Failed to update MST: {:?}", e); 353 - return ( 354 - StatusCode::INTERNAL_SERVER_ERROR, 355 - Json(json!({"error": "InternalError"})), 356 - ) 357 - .into_response(); 358 - } 359 - }; 257 + let key = format!("{}/{}", collection.parse::<Nsid>().unwrap(), rkey); 258 + mst = mst.update(&key, record_cid).await.unwrap(); 360 259 361 260 let uri = format!("at://{}/{}/{}", did, collection, rkey); 362 261 results.push(WriteResult::UpdateResult { 363 - uri: uri.clone(), 262 + uri, 364 263 cid: record_cid.to_string(), 365 264 }); 366 - record_ops.push((collection.clone(), rkey.clone(), Some(record_cid.to_string()))); 265 + ops.push(RecordOp::Update { 266 + collection: collection.clone(), 267 + rkey: rkey.clone(), 268 + cid: record_cid, 269 + }); 367 270 } 368 271 WriteOp::Delete { collection, rkey } => { 369 - let collection_nsid = match collection.parse::<Nsid>() { 370 - Ok(n) => n, 371 - Err(_) => { 372 - return ( 373 - StatusCode::BAD_REQUEST, 374 - Json(json!({"error": "InvalidCollection"})), 375 - ) 376 - .into_response(); 377 - } 378 - }; 379 - 380 - let key = format!("{}/{}", collection_nsid, rkey); 381 - mst = match mst.delete(&key).await { 382 - Ok(m) => m, 383 - Err(e) => { 384 - error!("Failed to delete from MST: {:?}", e); 385 - return ( 386 - StatusCode::INTERNAL_SERVER_ERROR, 387 - Json(json!({"error": "InternalError"})), 388 - ) 389 - .into_response(); 390 - } 391 - }; 272 + let key = format!("{}/{}", collection.parse::<Nsid>().unwrap(), rkey); 273 + mst = mst.delete(&key).await.unwrap(); 392 274 393 275 results.push(WriteResult::DeleteResult {}); 394 - record_ops.push((collection.clone(), rkey.clone(), None)); 276 + ops.push(RecordOp::Delete { 277 + collection: collection.clone(), 278 + rkey: rkey.clone(), 279 + }); 395 280 } 396 281 } 397 282 } 398 283 399 - let new_mst_root = match mst.persist().await { 400 - Ok(c) => c, 401 - Err(e) => { 402 - error!("Failed to persist MST: {:?}", e); 403 - return ( 404 - StatusCode::INTERNAL_SERVER_ERROR, 405 - Json(json!({"error": "InternalError"})), 406 - ) 407 - .into_response(); 408 - } 409 - }; 410 - 411 - let did_obj = match Did::new(&did) { 412 - Ok(d) => d, 413 - Err(_) => { 414 - return ( 415 - StatusCode::INTERNAL_SERVER_ERROR, 416 - Json(json!({"error": "InternalError", "message": "Invalid DID"})), 417 - ) 418 - .into_response(); 419 - } 420 - }; 421 - 422 - let rev = Tid::now(LimitedU32::MIN); 423 - let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), Some(current_root_cid)); 424 - 425 - let new_commit_bytes = match new_commit.to_cbor() { 426 - Ok(b) => b, 427 - Err(e) => { 428 - error!("Failed to serialize new commit: {:?}", e); 429 - return ( 430 - StatusCode::INTERNAL_SERVER_ERROR, 431 - Json(json!({"error": "InternalError"})), 432 - ) 433 - .into_response(); 434 - } 435 - }; 284 + let new_mst_root = mst.persist().await.unwrap(); 285 + let written_cids = tracking_store.get_written_cids(); 286 + let written_cids_str = written_cids 287 + .iter() 288 + .map(|c| c.to_string()) 289 + .collect::<Vec<_>>(); 436 290 437 - let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 438 - Ok(c) => c, 291 + let commit_res = match commit_and_log( 292 + &state, 293 + &did, 294 + user_id, 295 + Some(current_root_cid), 296 + new_mst_root, 297 + ops, 298 + &written_cids_str, 299 + ) 300 + .await 301 + { 302 + Ok(res) => res, 439 303 Err(e) => { 440 - error!("Failed to save new commit: {:?}", e); 304 + error!("Commit failed: {}", e); 441 305 return ( 442 306 StatusCode::INTERNAL_SERVER_ERROR, 443 - Json(json!({"error": "InternalError"})), 307 + Json(json!({"error": "InternalError", "message": "Failed to commit changes"})), 444 308 ) 445 309 .into_response(); 446 310 } 447 311 }; 448 312 449 - let update_repo = sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 450 - .execute(&state.db) 451 - .await; 452 - 453 - if let Err(e) = update_repo { 454 - error!("Failed to update repo root in DB: {:?}", e); 455 - return ( 456 - StatusCode::INTERNAL_SERVER_ERROR, 457 - Json(json!({"error": "InternalError"})), 458 - ) 459 - .into_response(); 460 - } 461 - 462 - for (collection, rkey, record_cid) in record_ops { 463 - match record_cid { 464 - Some(cid) => { 465 - let _ = sqlx::query!( 466 - "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 467 - ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 468 - user_id, 469 - collection, 470 - rkey, 471 - cid 472 - ) 473 - .execute(&state.db) 474 - .await; 475 - } 476 - None => { 477 - let _ = sqlx::query!( 478 - "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 479 - user_id, 480 - collection, 481 - rkey 482 - ) 483 - .execute(&state.db) 484 - .await; 485 - } 486 - } 487 - } 488 - 489 313 ( 490 314 StatusCode::OK, 491 315 Json(ApplyWritesOutput { 492 316 commit: CommitInfo { 493 - cid: new_root_cid.to_string(), 494 - rev: rev.to_string(), 317 + cid: commit_res.commit_cid.to_string(), 318 + rev: commit_res.rev, 495 319 }, 496 320 results, 497 321 }),
+43 -164
src/api/repo/record/delete.rs
··· 1 + use crate::api::repo::record::utils::{commit_and_log, RecordOp}; 2 + use crate::api::repo::record::write::prepare_repo_write; 3 + use crate::repo::tracking::TrackingBlockStore; 1 4 use crate::state::AppState; 2 5 use axum::{ 3 - Json, 4 6 extract::State, 5 - http::StatusCode, 7 + http::{HeaderMap, StatusCode}, 6 8 response::{IntoResponse, Response}, 9 + Json, 7 10 }; 8 11 use cid::Cid; 9 - use jacquard::types::{ 10 - did::Did, 11 - integer::LimitedU32, 12 - string::{Nsid, Tid}, 13 - }; 12 + use jacquard::types::string::Nsid; 14 13 use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 15 14 use serde::Deserialize; 16 15 use serde_json::json; ··· 31 30 32 31 pub async fn delete_record( 33 32 State(state): State<AppState>, 34 - headers: axum::http::HeaderMap, 33 + headers: HeaderMap, 35 34 Json(input): Json<DeleteRecordInput>, 36 35 ) -> Response { 37 - let auth_header = headers.get("Authorization"); 38 - if auth_header.is_none() { 39 - return ( 40 - StatusCode::UNAUTHORIZED, 41 - Json(json!({"error": "AuthenticationRequired"})), 42 - ) 43 - .into_response(); 44 - } 45 - let token = auth_header 46 - .unwrap() 47 - .to_str() 48 - .unwrap_or("") 49 - .replace("Bearer ", ""); 50 - 51 - let session = sqlx::query!( 52 - "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", 53 - token 54 - ) 55 - .fetch_optional(&state.db) 56 - .await 57 - .unwrap_or(None); 36 + let (did, user_id, current_root_cid) = 37 + match prepare_repo_write(&state, &headers, &input.repo).await { 38 + Ok(res) => res, 39 + Err(err_res) => return err_res, 40 + }; 58 41 59 - let (did, key_bytes) = match session { 60 - Some(row) => ( 61 - row.did, 62 - row.key_bytes, 63 - ), 64 - None => { 42 + if let Some(swap_commit) = &input.swap_commit { 43 + if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 65 44 return ( 66 - StatusCode::UNAUTHORIZED, 67 - Json(json!({"error": "AuthenticationFailed"})), 45 + StatusCode::CONFLICT, 46 + Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 68 47 ) 69 48 .into_response(); 70 49 } 71 - }; 72 - 73 - if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 74 - return ( 75 - StatusCode::UNAUTHORIZED, 76 - Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 77 - ) 78 - .into_response(); 79 - } 80 - 81 - if input.repo != did { 82 - return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 83 50 } 84 51 85 - let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 86 - .fetch_optional(&state.db) 87 - .await; 88 - 89 - let user_id: uuid::Uuid = match user_query { 90 - Ok(Some(row)) => row.id, 91 - _ => { 92 - return ( 93 - StatusCode::INTERNAL_SERVER_ERROR, 94 - Json(json!({"error": "InternalError", "message": "User not found"})), 95 - ) 96 - .into_response(); 97 - } 98 - }; 99 - 100 - let repo_root_query = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 101 - .fetch_optional(&state.db) 102 - .await; 103 - 104 - let current_root_cid = match repo_root_query { 105 - Ok(Some(row)) => { 106 - let cid_str: String = row.repo_root_cid; 107 - Cid::from_str(&cid_str).ok() 108 - } 109 - _ => None, 110 - }; 52 + let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 111 53 112 - if current_root_cid.is_none() { 113 - return ( 114 - StatusCode::INTERNAL_SERVER_ERROR, 115 - Json(json!({"error": "InternalError", "message": "Repo root not found"})), 116 - ) 117 - .into_response(); 118 - } 119 - let current_root_cid = current_root_cid.unwrap(); 120 - 121 - let commit_bytes = match state.block_store.get(&current_root_cid).await { 54 + let commit_bytes = match tracking_store.get(&current_root_cid).await { 122 55 Ok(Some(b)) => b, 123 - Ok(None) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 124 - Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to load commit block: {:?}", e)}))).into_response(), 56 + _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 125 57 }; 126 - 127 58 let commit = match Commit::from_cbor(&commit_bytes) { 128 59 Ok(c) => c, 129 - Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to parse commit: {:?}", e)}))).into_response(), 60 + _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 130 61 }; 131 62 132 - let mst_root = commit.data; 133 - let store = Arc::new(state.block_store.clone()); 134 - let mst = Mst::load(store.clone(), mst_root, None); 135 - 63 + let mst = Mst::load( 64 + Arc::new(tracking_store.clone()), 65 + commit.data, 66 + None, 67 + ); 136 68 let collection_nsid = match input.collection.parse::<Nsid>() { 137 69 Ok(n) => n, 138 - Err(_) => { 139 - return ( 140 - StatusCode::BAD_REQUEST, 141 - Json(json!({"error": "InvalidCollection"})), 142 - ) 143 - .into_response(); 144 - } 70 + Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 145 71 }; 146 - 147 72 let key = format!("{}/{}", collection_nsid, input.rkey); 148 73 149 - // TODO: Check swapRecord if provided? Skipping for brevity/robustness 74 + if let Some(swap_record_str) = &input.swap_record { 75 + let expected_cid = Cid::from_str(swap_record_str).ok(); 76 + let actual_cid = mst.get(&key).await.ok().flatten(); 77 + if expected_cid != actual_cid { 78 + return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 79 + } 80 + } 81 + 82 + if mst.get(&key).await.ok().flatten().is_none() { 83 + return (StatusCode::OK, Json(json!({}))).into_response(); 84 + } 150 85 151 86 let new_mst = match mst.delete(&key).await { 152 87 Ok(m) => m, ··· 160 95 Ok(c) => c, 161 96 Err(e) => { 162 97 error!("Failed to persist MST: {:?}", e); 163 - return ( 164 - StatusCode::INTERNAL_SERVER_ERROR, 165 - Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 166 - ) 167 - .into_response(); 98 + return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(); 168 99 } 169 100 }; 170 101 171 - let did_obj = match Did::new(&did) { 172 - Ok(d) => d, 173 - Err(_) => { 174 - return ( 175 - StatusCode::INTERNAL_SERVER_ERROR, 176 - Json(json!({"error": "InternalError", "message": "Invalid DID"})), 177 - ) 178 - .into_response(); 179 - } 180 - }; 102 + let op = RecordOp::Delete { collection: input.collection, rkey: input.rkey }; 103 + let written_cids = tracking_store.get_written_cids(); 104 + let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 181 105 182 - let rev = Tid::now(LimitedU32::MIN); 183 - 184 - let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid)); 185 - 186 - let new_commit_bytes = 187 - match new_commit.to_cbor() { 188 - Ok(b) => b, 189 - Err(_e) => return ( 190 - StatusCode::INTERNAL_SERVER_ERROR, 191 - Json( 192 - json!({"error": "InternalError", "message": "Failed to serialize new commit"}), 193 - ), 194 - ) 195 - .into_response(), 196 - }; 197 - 198 - let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 199 - Ok(c) => c, 200 - Err(_e) => { 201 - return ( 202 - StatusCode::INTERNAL_SERVER_ERROR, 203 - Json(json!({"error": "InternalError", "message": "Failed to save new commit"})), 204 - ) 205 - .into_response(); 206 - } 106 + if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await { 107 + return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 207 108 }; 208 - 209 - let update_repo = sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 210 - .execute(&state.db) 211 - .await; 212 - 213 - if let Err(e) = update_repo { 214 - error!("Failed to update repo root in DB: {:?}", e); 215 - return ( 216 - StatusCode::INTERNAL_SERVER_ERROR, 217 - Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"})), 218 - ) 219 - .into_response(); 220 - } 221 - 222 - let record_delete = 223 - sqlx::query!("DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", user_id, input.collection, input.rkey) 224 - .execute(&state.db) 225 - .await; 226 - 227 - if let Err(e) = record_delete { 228 - error!("Error deleting record index: {:?}", e); 229 - } 230 109 231 110 (StatusCode::OK, Json(json!({}))).into_response() 232 111 }
+3 -1
src/api/repo/record/mod.rs
··· 1 1 pub mod batch; 2 2 pub mod delete; 3 3 pub mod read; 4 + pub mod utils; 4 5 pub mod write; 5 6 6 7 pub use batch::apply_writes; 7 8 pub use delete::{DeleteRecordInput, delete_record}; 8 9 pub use read::{GetRecordInput, ListRecordsInput, ListRecordsOutput, get_record, list_records}; 10 + pub use utils::*; 9 11 pub use write::{ 10 12 CreateRecordInput, CreateRecordOutput, PutRecordInput, PutRecordOutput, create_record, 11 - put_record, 13 + put_record, prepare_repo_write, 12 14 };
+125
src/api/repo/record/utils.rs
··· 1 + use crate::state::AppState; 2 + use cid::Cid; 3 + use jacquard::types::{did::Did, integer::LimitedU32, string::Tid}; 4 + use jacquard_repo::commit::Commit; 5 + use jacquard_repo::storage::BlockStore; 6 + use serde_json::json; 7 + use uuid::Uuid; 8 + 9 + pub enum RecordOp { 10 + Create { collection: String, rkey: String, cid: Cid }, 11 + Update { collection: String, rkey: String, cid: Cid }, 12 + Delete { collection: String, rkey: String }, 13 + } 14 + 15 + pub struct CommitResult { 16 + pub commit_cid: Cid, 17 + pub rev: String, 18 + } 19 + 20 + pub async fn commit_and_log( 21 + state: &AppState, 22 + did: &str, 23 + user_id: Uuid, 24 + current_root_cid: Option<Cid>, 25 + new_mst_root: Cid, 26 + ops: Vec<RecordOp>, 27 + blocks_cids: &Vec<String>, 28 + ) -> Result<CommitResult, String> { 29 + let did_obj = Did::new(did).map_err(|e| format!("Invalid DID: {}", e))?; 30 + let rev = Tid::now(LimitedU32::MIN); 31 + 32 + let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev.clone(), current_root_cid); 33 + 34 + let new_commit_bytes = new_commit.to_cbor().map_err(|e| format!("Failed to serialize commit: {:?}", e))?; 35 + 36 + let new_root_cid = state.block_store.put(&new_commit_bytes).await 37 + .map_err(|e| format!("Failed to save commit block: {:?}", e))?; 38 + 39 + sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 40 + .execute(&state.db) 41 + .await 42 + .map_err(|e| format!("DB Error (repos): {}", e))?; 43 + 44 + for op in &ops { 45 + match op { 46 + RecordOp::Create { collection, rkey, cid } | RecordOp::Update { collection, rkey, cid } => { 47 + sqlx::query!( 48 + "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 49 + ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 50 + user_id, 51 + collection, 52 + rkey, 53 + cid.to_string() 54 + ) 55 + .execute(&state.db) 56 + .await 57 + .map_err(|e| format!("DB Error (records): {}", e))?; 58 + } 59 + RecordOp::Delete { collection, rkey } => { 60 + sqlx::query!( 61 + "DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 62 + user_id, 63 + collection, 64 + rkey 65 + ) 66 + .execute(&state.db) 67 + .await 68 + .map_err(|e| format!("DB Error (records): {}", e))?; 69 + } 70 + } 71 + } 72 + 73 + let ops_json = ops.iter().map(|op| { 74 + match op { 75 + RecordOp::Create { collection, rkey, cid } => json!({ 76 + "action": "create", 77 + "path": format!("{}/{}", collection, rkey), 78 + "cid": cid.to_string() 79 + }), 80 + RecordOp::Update { collection, rkey, cid } => json!({ 81 + "action": "update", 82 + "path": format!("{}/{}", collection, rkey), 83 + "cid": cid.to_string() 84 + }), 85 + RecordOp::Delete { collection, rkey } => json!({ 86 + "action": "delete", 87 + "path": format!("{}/{}", collection, rkey), 88 + "cid": null 89 + }), 90 + } 91 + }).collect::<Vec<_>>(); 92 + 93 + let event_type = "commit"; 94 + let prev_cid_str = current_root_cid.map(|c| c.to_string()); 95 + 96 + let seq_row = sqlx::query!( 97 + r#" 98 + INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids) 99 + VALUES ($1, $2, $3, $4, $5, $6, $7) 100 + RETURNING seq 101 + "#, 102 + did, 103 + event_type, 104 + new_root_cid.to_string(), 105 + prev_cid_str, 106 + json!(ops_json), 107 + &[] as &[String], 108 + blocks_cids, 109 + ) 110 + .fetch_one(&state.db) 111 + .await 112 + .map_err(|e| format!("DB Error (repo_seq): {}", e))?; 113 + 114 + sqlx::query( 115 + &format!("NOTIFY repo_updates, '{}'", seq_row.seq) 116 + ) 117 + .execute(&state.db) 118 + .await 119 + .map_err(|e| format!("DB Error (notify): {}", e))?; 120 + 121 + Ok(CommitResult { 122 + commit_cid: new_root_cid, 123 + rev: rev.to_string(), 124 + }) 125 + }
+183 -516
src/api/repo/record/write.rs
··· 1 + use crate::api::repo::record::utils::{commit_and_log, RecordOp}; 2 + use crate::repo::tracking::TrackingBlockStore; 1 3 use crate::state::AppState; 2 4 use axum::{ 3 - Json, 4 5 extract::State, 5 - http::StatusCode, 6 + http::{HeaderMap, StatusCode}, 6 7 response::{IntoResponse, Response}, 8 + Json, 7 9 }; 8 10 use chrono::Utc; 9 11 use cid::Cid; 10 - use jacquard::types::{ 11 - did::Did, 12 - integer::LimitedU32, 13 - string::{Nsid, Tid}, 14 - }; 12 + use jacquard::types::string::Nsid; 15 13 use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 16 14 use serde::{Deserialize, Serialize}; 17 15 use serde_json::json; 18 16 use std::str::FromStr; 19 17 use std::sync::Arc; 20 18 use tracing::error; 19 + use uuid::Uuid; 20 + 21 + pub async fn prepare_repo_write( 22 + state: &AppState, 23 + headers: &HeaderMap, 24 + repo_did: &str, 25 + ) -> Result<(String, Uuid, Cid), Response> { 26 + let auth_header = headers.get("Authorization").ok_or_else(|| { 27 + ( 28 + StatusCode::UNAUTHORIZED, 29 + Json(json!({"error": "AuthenticationRequired"})), 30 + ) 31 + .into_response() 32 + })?; 33 + let token = auth_header 34 + .to_str() 35 + .unwrap_or("") 36 + .replace("Bearer ", ""); 37 + 38 + let session = sqlx::query!( 39 + "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", 40 + token 41 + ) 42 + .fetch_optional(&state.db) 43 + .await 44 + .map_err(|e| { 45 + error!("DB error fetching session: {}", e); 46 + (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 47 + })? 48 + .ok_or_else(|| { 49 + ( 50 + StatusCode::UNAUTHORIZED, 51 + Json(json!({"error": "AuthenticationFailed"})), 52 + ) 53 + .into_response() 54 + })?; 55 + 56 + crate::auth::verify_token(&token, &session.key_bytes).map_err(|_| { 57 + ( 58 + StatusCode::UNAUTHORIZED, 59 + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 60 + ) 61 + .into_response() 62 + })?; 63 + 64 + if repo_did != session.did { 65 + return Err(( 66 + StatusCode::FORBIDDEN, 67 + Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"})), 68 + ) 69 + .into_response()); 70 + } 71 + 72 + let user_id = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", session.did) 73 + .fetch_optional(&state.db) 74 + .await 75 + .map_err(|e| { 76 + error!("DB error fetching user: {}", e); 77 + (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 78 + })? 79 + .ok_or_else(|| { 80 + ( 81 + StatusCode::INTERNAL_SERVER_ERROR, 82 + Json(json!({"error": "InternalError", "message": "User not found"})), 83 + ) 84 + .into_response() 85 + })?; 86 + 87 + let root_cid_str: String = 88 + sqlx::query_scalar!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 89 + .fetch_optional(&state.db) 90 + .await 91 + .map_err(|e| { 92 + error!("DB error fetching repo root: {}", e); 93 + (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response() 94 + })? 95 + .ok_or_else(|| { 96 + ( 97 + StatusCode::INTERNAL_SERVER_ERROR, 98 + Json(json!({"error": "InternalError", "message": "Repo root not found"})), 99 + ) 100 + .into_response() 101 + })?; 102 + 103 + let current_root_cid = Cid::from_str(&root_cid_str).map_err(|_| { 104 + ( 105 + StatusCode::INTERNAL_SERVER_ERROR, 106 + Json(json!({"error": "InternalError", "message": "Invalid repo root CID"})), 107 + ) 108 + .into_response() 109 + })?; 110 + 111 + Ok((session.did, user_id, current_root_cid)) 112 + } 21 113 22 114 #[derive(Deserialize)] 23 115 #[allow(dead_code)] ··· 40 132 41 133 pub async fn create_record( 42 134 State(state): State<AppState>, 43 - headers: axum::http::HeaderMap, 135 + headers: HeaderMap, 44 136 Json(input): Json<CreateRecordInput>, 45 137 ) -> Response { 46 - let auth_header = headers.get("Authorization"); 47 - if auth_header.is_none() { 48 - return ( 49 - StatusCode::UNAUTHORIZED, 50 - Json(json!({"error": "AuthenticationRequired"})), 51 - ) 52 - .into_response(); 53 - } 54 - let token = auth_header 55 - .unwrap() 56 - .to_str() 57 - .unwrap_or("") 58 - .replace("Bearer ", ""); 138 + let (did, user_id, current_root_cid) = 139 + match prepare_repo_write(&state, &headers, &input.repo).await { 140 + Ok(res) => res, 141 + Err(err_res) => return err_res, 142 + }; 59 143 60 - let session = sqlx::query!( 61 - "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", 62 - token 63 - ) 64 - .fetch_optional(&state.db) 65 - .await 66 - .unwrap_or(None); 67 - 68 - let (did, key_bytes) = match session { 69 - Some(row) => ( 70 - row.did, 71 - row.key_bytes, 72 - ), 73 - None => { 144 + if let Some(swap_commit) = &input.swap_commit { 145 + if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 74 146 return ( 75 - StatusCode::UNAUTHORIZED, 76 - Json(json!({"error": "AuthenticationFailed"})), 147 + StatusCode::CONFLICT, 148 + Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"})), 77 149 ) 78 150 .into_response(); 79 151 } 80 - }; 81 - 82 - if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 83 - return ( 84 - StatusCode::UNAUTHORIZED, 85 - Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 86 - ) 87 - .into_response(); 88 152 } 89 153 90 - if input.repo != did { 91 - return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 92 - } 93 - 94 - let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 95 - .fetch_optional(&state.db) 96 - .await; 97 - 98 - let user_id: uuid::Uuid = match user_query { 99 - Ok(Some(row)) => row.id, 100 - _ => { 101 - return ( 102 - StatusCode::INTERNAL_SERVER_ERROR, 103 - Json(json!({"error": "InternalError", "message": "User not found"})), 104 - ) 105 - .into_response(); 106 - } 107 - }; 108 - 109 - let repo_root_query = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 110 - .fetch_optional(&state.db) 111 - .await; 112 - 113 - let current_root_cid = match repo_root_query { 114 - Ok(Some(row)) => { 115 - let cid_str: String = row.repo_root_cid; 116 - Cid::from_str(&cid_str).ok() 117 - } 118 - _ => None, 119 - }; 120 - 121 - if current_root_cid.is_none() { 122 - error!("Repo root not found for user {}", did); 123 - return ( 124 - StatusCode::INTERNAL_SERVER_ERROR, 125 - Json(json!({"error": "InternalError", "message": "Repo root not found"})), 126 - ) 127 - .into_response(); 128 - } 129 - let current_root_cid = current_root_cid.unwrap(); 154 + let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 130 155 131 - let commit_bytes = match state.block_store.get(&current_root_cid).await { 156 + let commit_bytes = match tracking_store.get(&current_root_cid).await { 132 157 Ok(Some(b)) => b, 133 - Ok(None) => { 134 - error!("Commit block not found: {}", current_root_cid); 135 - return ( 136 - StatusCode::INTERNAL_SERVER_ERROR, 137 - Json(json!({"error": "InternalError"})), 138 - ) 139 - .into_response(); 140 - } 141 - Err(e) => { 142 - error!("Failed to load commit block: {:?}", e); 143 - return ( 144 - StatusCode::INTERNAL_SERVER_ERROR, 145 - Json(json!({"error": "InternalError"})), 146 - ) 147 - .into_response(); 148 - } 158 + _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 149 159 }; 150 - 151 160 let commit = match Commit::from_cbor(&commit_bytes) { 152 161 Ok(c) => c, 153 - Err(e) => { 154 - error!("Failed to parse commit: {:?}", e); 155 - return ( 156 - StatusCode::INTERNAL_SERVER_ERROR, 157 - Json(json!({"error": "InternalError"})), 158 - ) 159 - .into_response(); 160 - } 162 + _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 161 163 }; 162 164 163 - let mst_root = commit.data; 164 - let store = Arc::new(state.block_store.clone()); 165 - let mst = Mst::load(store.clone(), mst_root, None); 165 + let mst = Mst::load( 166 + Arc::new(tracking_store.clone()), 167 + commit.data, 168 + None, 169 + ); 166 170 167 171 let collection_nsid = match input.collection.parse::<Nsid>() { 168 172 Ok(n) => n, 169 - Err(_) => { 170 - return ( 171 - StatusCode::BAD_REQUEST, 172 - Json(json!({"error": "InvalidCollection"})), 173 - ) 174 - .into_response(); 175 - } 173 + Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 176 174 }; 177 175 178 - let rkey = input 179 - .rkey 180 - .unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 181 - 182 176 if input.validate.unwrap_or(true) { 183 177 if input.collection == "app.bsky.feed.post" { 184 178 if input.record.get("text").is_none() || input.record.get("createdAt").is_none() { ··· 191 185 } 192 186 } 193 187 188 + let rkey = input.rkey.unwrap_or_else(|| Utc::now().format("%Y%m%d%H%M%S%f").to_string()); 189 + 194 190 let mut record_bytes = Vec::new(); 195 - if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 196 - error!("Error serializing record: {:?}", e); 197 - return ( 198 - StatusCode::BAD_REQUEST, 199 - Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 200 - ) 201 - .into_response(); 191 + if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 192 + return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 202 193 } 203 - 204 - let record_cid = match state.block_store.put(&record_bytes).await { 194 + let record_cid = match tracking_store.put(&record_bytes).await { 205 195 Ok(c) => c, 206 - Err(e) => { 207 - error!("Failed to save record block: {:?}", e); 208 - return ( 209 - StatusCode::INTERNAL_SERVER_ERROR, 210 - Json(json!({"error": "InternalError"})), 211 - ) 212 - .into_response(); 213 - } 196 + _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 214 197 }; 215 198 216 199 let key = format!("{}/{}", collection_nsid, rkey); 217 200 let new_mst = match mst.add(&key, record_cid).await { 218 201 Ok(m) => m, 219 - Err(e) => { 220 - error!("Failed to add to MST: {:?}", e); 221 - return ( 222 - StatusCode::INTERNAL_SERVER_ERROR, 223 - Json(json!({"error": "InternalError"})), 224 - ) 225 - .into_response(); 226 - } 202 + _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to add to MST"}))).into_response(), 227 203 }; 228 - 229 204 let new_mst_root = match new_mst.persist().await { 230 205 Ok(c) => c, 231 - Err(e) => { 232 - error!("Failed to persist MST: {:?}", e); 233 - return ( 234 - StatusCode::INTERNAL_SERVER_ERROR, 235 - Json(json!({"error": "InternalError"})), 236 - ) 237 - .into_response(); 238 - } 239 - }; 240 - 241 - let did_obj = match Did::new(&did) { 242 - Ok(d) => d, 243 - Err(_) => { 244 - return ( 245 - StatusCode::INTERNAL_SERVER_ERROR, 246 - Json(json!({"error": "InternalError", "message": "Invalid DID"})), 247 - ) 248 - .into_response(); 249 - } 206 + _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to persist MST"}))).into_response(), 250 207 }; 251 208 252 - let rev = Tid::now(LimitedU32::MIN); 209 + let op = RecordOp::Create { collection: input.collection.clone(), rkey: rkey.clone(), cid: record_cid }; 210 + let written_cids = tracking_store.get_written_cids(); 211 + let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 253 212 254 - let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid)); 255 - 256 - let new_commit_bytes = match new_commit.to_cbor() { 257 - Ok(b) => b, 258 - Err(e) => { 259 - error!("Failed to serialize new commit: {:?}", e); 260 - return ( 261 - StatusCode::INTERNAL_SERVER_ERROR, 262 - Json(json!({"error": "InternalError"})), 263 - ) 264 - .into_response(); 265 - } 213 + if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await { 214 + return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 266 215 }; 267 216 268 - let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 269 - Ok(c) => c, 270 - Err(e) => { 271 - error!("Failed to save new commit: {:?}", e); 272 - return ( 273 - StatusCode::INTERNAL_SERVER_ERROR, 274 - Json(json!({"error": "InternalError"})), 275 - ) 276 - .into_response(); 277 - } 278 - }; 279 - 280 - let update_repo = sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 281 - .execute(&state.db) 282 - .await; 283 - 284 - if let Err(e) = update_repo { 285 - error!("Failed to update repo root in DB: {:?}", e); 286 - return ( 287 - StatusCode::INTERNAL_SERVER_ERROR, 288 - Json(json!({"error": "InternalError"})), 289 - ) 290 - .into_response(); 291 - } 292 - 293 - let record_insert = sqlx::query!( 294 - "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 295 - ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 296 - user_id, 297 - input.collection, 298 - rkey, 299 - record_cid.to_string() 300 - ) 301 - .execute(&state.db) 302 - .await; 303 - 304 - if let Err(e) = record_insert { 305 - error!("Error inserting record index: {:?}", e); 306 - return ( 307 - StatusCode::INTERNAL_SERVER_ERROR, 308 - Json(json!({"error": "InternalError", "message": "Failed to index record"})), 309 - ) 310 - .into_response(); 311 - } 312 - 313 - let output = CreateRecordOutput { 314 - uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 217 + (StatusCode::OK, Json(CreateRecordOutput { 218 + uri: format!("at://{}/{}/{}", did, input.collection, rkey), 315 219 cid: record_cid.to_string(), 316 - }; 317 - (StatusCode::OK, Json(output)).into_response() 220 + })).into_response() 318 221 } 319 222 320 223 #[derive(Deserialize)] ··· 340 243 341 244 pub async fn put_record( 342 245 State(state): State<AppState>, 343 - headers: axum::http::HeaderMap, 246 + headers: HeaderMap, 344 247 Json(input): Json<PutRecordInput>, 345 248 ) -> Response { 346 - let auth_header = headers.get("Authorization"); 347 - if auth_header.is_none() { 348 - return ( 349 - StatusCode::UNAUTHORIZED, 350 - Json(json!({"error": "AuthenticationRequired"})), 351 - ) 352 - .into_response(); 353 - } 354 - let token = auth_header 355 - .unwrap() 356 - .to_str() 357 - .unwrap_or("") 358 - .replace("Bearer ", ""); 359 - 360 - let session = sqlx::query!( 361 - "SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1", 362 - token 363 - ) 364 - .fetch_optional(&state.db) 365 - .await 366 - .unwrap_or(None); 249 + let (did, user_id, current_root_cid) = 250 + match prepare_repo_write(&state, &headers, &input.repo).await { 251 + Ok(res) => res, 252 + Err(err_res) => return err_res, 253 + }; 367 254 368 - let (did, key_bytes) = match session { 369 - Some(row) => ( 370 - row.did, 371 - row.key_bytes, 372 - ), 373 - None => { 374 - return ( 375 - StatusCode::UNAUTHORIZED, 376 - Json(json!({"error": "AuthenticationFailed"})), 377 - ) 378 - .into_response(); 255 + if let Some(swap_commit) = &input.swap_commit { 256 + if Cid::from_str(swap_commit).ok() != Some(current_root_cid) { 257 + return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Repo has been modified"}))).into_response(); 379 258 } 380 - }; 381 - 382 - if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { 383 - return ( 384 - StatusCode::UNAUTHORIZED, 385 - Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), 386 - ) 387 - .into_response(); 388 259 } 389 260 390 - if input.repo != did { 391 - return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response(); 392 - } 393 - 394 - let user_query = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 395 - .fetch_optional(&state.db) 396 - .await; 397 - 398 - let user_id: uuid::Uuid = match user_query { 399 - Ok(Some(row)) => row.id, 400 - _ => { 401 - return ( 402 - StatusCode::INTERNAL_SERVER_ERROR, 403 - Json(json!({"error": "InternalError", "message": "User not found"})), 404 - ) 405 - .into_response(); 406 - } 407 - }; 261 + let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 408 262 409 - let repo_root_query = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 410 - .fetch_optional(&state.db) 411 - .await; 412 - 413 - let current_root_cid = match repo_root_query { 414 - Ok(Some(row)) => { 415 - let cid_str: String = row.repo_root_cid; 416 - Cid::from_str(&cid_str).ok() 417 - } 418 - _ => None, 419 - }; 420 - 421 - if current_root_cid.is_none() { 422 - error!("Repo root not found for user {}", did); 423 - return ( 424 - StatusCode::INTERNAL_SERVER_ERROR, 425 - Json(json!({"error": "InternalError", "message": "Repo root not found"})), 426 - ) 427 - .into_response(); 428 - } 429 - let current_root_cid = current_root_cid.unwrap(); 430 - 431 - let commit_bytes = match state.block_store.get(&current_root_cid).await { 263 + let commit_bytes = match tracking_store.get(&current_root_cid).await { 432 264 Ok(Some(b)) => b, 433 - Ok(None) => { 434 - error!("Commit block not found: {}", current_root_cid); 435 - return ( 436 - StatusCode::INTERNAL_SERVER_ERROR, 437 - Json(json!({"error": "InternalError", "message": "Commit block not found"})), 438 - ) 439 - .into_response(); 440 - } 441 - Err(e) => { 442 - error!("Failed to load commit block: {:?}", e); 443 - return ( 444 - StatusCode::INTERNAL_SERVER_ERROR, 445 - Json(json!({"error": "InternalError", "message": "Failed to load commit block"})), 446 - ) 447 - .into_response(); 448 - } 265 + _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(), 449 266 }; 450 - 451 267 let commit = match Commit::from_cbor(&commit_bytes) { 452 268 Ok(c) => c, 453 - Err(e) => { 454 - error!("Failed to parse commit: {:?}", e); 455 - return ( 456 - StatusCode::INTERNAL_SERVER_ERROR, 457 - Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 458 - ) 459 - .into_response(); 460 - } 269 + _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response(), 461 270 }; 462 271 463 - let mst_root = commit.data; 464 - let store = Arc::new(state.block_store.clone()); 465 - let mst = Mst::load(store.clone(), mst_root, None); 466 - 272 + let mst = Mst::load( 273 + Arc::new(tracking_store.clone()), 274 + commit.data, 275 + None, 276 + ); 467 277 let collection_nsid = match input.collection.parse::<Nsid>() { 468 278 Ok(n) => n, 469 - Err(_) => { 470 - return ( 471 - StatusCode::BAD_REQUEST, 472 - Json(json!({"error": "InvalidCollection"})), 473 - ) 474 - .into_response(); 475 - } 279 + Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(), 476 280 }; 477 - 478 - let rkey = input.rkey.clone(); 281 + let key = format!("{}/{}", collection_nsid, input.rkey); 479 282 480 283 if input.validate.unwrap_or(true) { 481 284 if input.collection == "app.bsky.feed.post" { ··· 489 292 } 490 293 } 491 294 492 - let mut record_bytes = Vec::new(); 493 - if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) { 494 - error!("Error serializing record: {:?}", e); 495 - return ( 496 - StatusCode::BAD_REQUEST, 497 - Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"})), 498 - ) 499 - .into_response(); 500 - } 501 - 502 - let record_cid = match state.block_store.put(&record_bytes).await { 503 - Ok(c) => c, 504 - Err(e) => { 505 - error!("Failed to save record block: {:?}", e); 506 - return ( 507 - StatusCode::INTERNAL_SERVER_ERROR, 508 - Json(json!({"error": "InternalError", "message": "Failed to save record block"})), 509 - ) 510 - .into_response(); 511 - } 512 - }; 513 - 514 - let key = format!("{}/{}", collection_nsid, rkey); 515 - 516 - let existing = match mst.get(&key).await { 517 - Ok(v) => v, 518 - Err(e) => { 519 - error!("Failed to check MST key: {:?}", e); 520 - return ( 521 - StatusCode::INTERNAL_SERVER_ERROR, 522 - Json( 523 - json!({"error": "InternalError", "message": "Failed to check existing record"}), 524 - ), 525 - ) 526 - .into_response(); 527 - } 528 - }; 529 - 530 295 if let Some(swap_record_str) = &input.swap_record { 531 - let swap_record_cid = match Cid::from_str(swap_record_str) { 532 - Ok(c) => c, 533 - Err(_) => { 534 - return ( 535 - StatusCode::BAD_REQUEST, 536 - Json( 537 - json!({"error": "InvalidSwapRecord", "message": "Invalid swapRecord CID"}), 538 - ), 539 - ) 540 - .into_response(); 541 - } 542 - }; 543 - match &existing { 544 - Some(current_cid) if *current_cid != swap_record_cid => { 545 - return ( 546 - StatusCode::CONFLICT, 547 - Json(json!({"error": "InvalidSwap", "message": "Record has been modified"})), 548 - ) 549 - .into_response(); 550 - } 551 - None => { 552 - return ( 553 - StatusCode::CONFLICT, 554 - Json(json!({"error": "InvalidSwap", "message": "Record does not exist"})), 555 - ) 556 - .into_response(); 557 - } 558 - _ => {} 296 + let expected_cid = Cid::from_str(swap_record_str).ok(); 297 + let actual_cid = mst.get(&key).await.ok().flatten(); 298 + if expected_cid != actual_cid { 299 + return (StatusCode::CONFLICT, Json(json!({"error": "InvalidSwap", "message": "Record has been modified or does not exist"}))).into_response(); 559 300 } 560 301 } 561 302 562 - let new_mst = if existing.is_some() { 563 - match mst.update(&key, record_cid).await { 564 - Ok(m) => m, 565 - Err(e) => { 566 - error!("Failed to update MST: {:?}", e); 567 - return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response(); 568 - } 569 - } 570 - } else { 571 - match mst.add(&key, record_cid).await { 572 - Ok(m) => m, 573 - Err(e) => { 574 - error!("Failed to add to MST: {:?}", e); 575 - return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to add to MST: {:?}", e)}))).into_response(); 576 - } 577 - } 578 - }; 303 + let existing_cid = mst.get(&key).await.ok().flatten(); 579 304 580 - let new_mst_root = match new_mst.persist().await { 305 + let mut record_bytes = Vec::new(); 306 + if serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record).is_err() { 307 + return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response(); 308 + } 309 + let record_cid = match tracking_store.put(&record_bytes).await { 581 310 Ok(c) => c, 582 - Err(e) => { 583 - error!("Failed to persist MST: {:?}", e); 584 - return ( 585 - StatusCode::INTERNAL_SERVER_ERROR, 586 - Json(json!({"error": "InternalError", "message": "Failed to persist MST"})), 587 - ) 588 - .into_response(); 589 - } 590 - }; 591 - 592 - let did_obj = match Did::new(&did) { 593 - Ok(d) => d, 594 - Err(_) => { 595 - return ( 596 - StatusCode::INTERNAL_SERVER_ERROR, 597 - Json(json!({"error": "InternalError", "message": "Invalid DID"})), 598 - ) 599 - .into_response(); 600 - } 311 + _ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response(), 601 312 }; 602 313 603 - let rev = Tid::now(LimitedU32::MIN); 604 - 605 - let new_commit = Commit::new_unsigned(did_obj, new_mst_root, rev, Some(current_root_cid)); 606 - 607 - let new_commit_bytes = match new_commit.to_cbor() { 608 - Ok(b) => b, 609 - Err(e) => { 610 - error!("Failed to serialize new commit: {:?}", e); 611 - return ( 612 - StatusCode::INTERNAL_SERVER_ERROR, 613 - Json( 614 - json!({"error": "InternalError", "message": "Failed to serialize new commit"}), 615 - ), 616 - ) 617 - .into_response(); 618 - } 314 + let new_mst = if existing_cid.is_some() { 315 + mst.update(&key, record_cid).await.unwrap() 316 + } else { 317 + mst.add(&key, record_cid).await.unwrap() 619 318 }; 319 + let new_mst_root = new_mst.persist().await.unwrap(); 620 320 621 - let new_root_cid = match state.block_store.put(&new_commit_bytes).await { 622 - Ok(c) => c, 623 - Err(e) => { 624 - error!("Failed to save new commit: {:?}", e); 625 - return ( 626 - StatusCode::INTERNAL_SERVER_ERROR, 627 - Json(json!({"error": "InternalError", "message": "Failed to save new commit"})), 628 - ) 629 - .into_response(); 630 - } 321 + let op = if existing_cid.is_some() { 322 + RecordOp::Update { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid } 323 + } else { 324 + RecordOp::Create { collection: input.collection.clone(), rkey: input.rkey.clone(), cid: record_cid } 631 325 }; 632 326 633 - let update_repo = sqlx::query!("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", new_root_cid.to_string(), user_id) 634 - .execute(&state.db) 635 - .await; 327 + let written_cids = tracking_store.get_written_cids(); 328 + let written_cids_str = written_cids.iter().map(|c| c.to_string()).collect::<Vec<_>>(); 636 329 637 - if let Err(e) = update_repo { 638 - error!("Failed to update repo root in DB: {:?}", e); 639 - return ( 640 - StatusCode::INTERNAL_SERVER_ERROR, 641 - Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"})), 642 - ) 643 - .into_response(); 644 - } 330 + if let Err(e) = commit_and_log(&state, &did, user_id, Some(current_root_cid), new_mst_root, vec![op], &written_cids_str).await { 331 + return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": e}))).into_response(); 332 + }; 645 333 646 - let record_insert = sqlx::query!( 647 - "INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4) 648 - ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()", 649 - user_id, 650 - input.collection, 651 - rkey, 652 - record_cid.to_string() 653 - ) 654 - .execute(&state.db) 655 - .await; 656 - 657 - if let Err(e) = record_insert { 658 - error!("Error inserting record index: {:?}", e); 659 - return ( 660 - StatusCode::INTERNAL_SERVER_ERROR, 661 - Json(json!({"error": "InternalError", "message": "Failed to index record"})), 662 - ) 663 - .into_response(); 664 - } 665 - 666 - let output = PutRecordOutput { 667 - uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey), 334 + (StatusCode::OK, Json(PutRecordOutput { 335 + uri: format!("at://{}/{}/{}", did, input.collection, input.rkey), 668 336 cid: record_cid.to_string(), 669 - }; 670 - (StatusCode::OK, Json(output)).into_response() 337 + })).into_response() 671 338 }
+4
src/lib.rs
··· 132 132 get(sync::get_record), 133 133 ) 134 134 .route( 135 + "/xrpc/com.atproto.sync.subscribeRepos", 136 + get(sync::subscribe_repos), 137 + ) 138 + .route( 135 139 "/xrpc/com.atproto.moderation.createReport", 136 140 post(api::moderation::create_report), 137 141 )
+9
src/main.rs
··· 24 24 25 25 let state = AppState::new(pool.clone()).await; 26 26 27 + bspds::sync::listener::start_sequencer_listener(state.clone()).await; 28 + let relays = std::env::var("RELAYS") 29 + .unwrap_or_default() 30 + .split(',') 31 + .filter(|s| !s.is_empty()) 32 + .map(|s| s.to_string()) 33 + .collect(); 34 + bspds::sync::relay_client::start_relay_clients(state.clone(), relays, None).await; 35 + 27 36 let (shutdown_tx, shutdown_rx) = watch::channel(false); 28 37 29 38 let mut notification_service = NotificationService::new(pool);
+2
src/repo/mod.rs
··· 7 7 use sha2::{Digest, Sha256}; 8 8 use sqlx::PgPool; 9 9 10 + pub mod tracking; 11 + 10 12 #[derive(Clone)] 11 13 pub struct PostgresBlockStore { 12 14 pool: PgPool,
+62
src/repo/tracking.rs
··· 1 + use crate::repo::PostgresBlockStore; 2 + use bytes::Bytes; 3 + use cid::Cid; 4 + use jacquard_repo::error::RepoError; 5 + use jacquard_repo::repo::CommitData; 6 + use jacquard_repo::storage::BlockStore; 7 + use std::sync::{Arc, Mutex}; 8 + 9 + #[derive(Clone)] 10 + pub struct TrackingBlockStore { 11 + inner: PostgresBlockStore, 12 + written_cids: Arc<Mutex<Vec<Cid>>>, 13 + } 14 + 15 + impl TrackingBlockStore { 16 + pub fn new(store: PostgresBlockStore) -> Self { 17 + Self { 18 + inner: store, 19 + written_cids: Arc::new(Mutex::new(Vec::new())), 20 + } 21 + } 22 + 23 + pub fn get_written_cids(&self) -> Vec<Cid> { 24 + self.written_cids.lock().unwrap().clone() 25 + } 26 + } 27 + 28 + impl BlockStore for TrackingBlockStore { 29 + async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> { 30 + self.inner.get(cid).await 31 + } 32 + 33 + async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> { 34 + let cid = self.inner.put(data).await?; 35 + self.written_cids.lock().unwrap().push(cid.clone()); 36 + Ok(cid) 37 + } 38 + 39 + async fn has(&self, cid: &Cid) -> Result<bool, RepoError> { 40 + self.inner.has(cid).await 41 + } 42 + 43 + async fn put_many( 44 + &self, 45 + blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send, 46 + ) -> Result<(), RepoError> { 47 + let blocks: Vec<_> = blocks.into_iter().collect(); 48 + let cids: Vec<Cid> = blocks.iter().map(|(cid, _)| cid.clone()).collect(); 49 + self.inner.put_many(blocks).await?; 50 + self.written_cids.lock().unwrap().extend(cids); 51 + Ok(()) 52 + } 53 + 54 + async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> { 55 + self.inner.get_many(cids).await 56 + } 57 + 58 + async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> { 59 + self.put_many(commit.blocks).await?; 60 + Ok(()) 61 + } 62 + }
+5
src/state.rs
··· 1 1 use crate::repo::PostgresBlockStore; 2 2 use crate::storage::{BlobStorage, S3BlobStorage}; 3 + use crate::sync::firehose::SequencedEvent; 3 4 use sqlx::PgPool; 4 5 use std::sync::Arc; 6 + use tokio::sync::broadcast; 5 7 6 8 #[derive(Clone)] 7 9 pub struct AppState { 8 10 pub db: PgPool, 9 11 pub block_store: PostgresBlockStore, 10 12 pub blob_store: Arc<dyn BlobStorage>, 13 + pub firehose_tx: broadcast::Sender<SequencedEvent>, 11 14 } 12 15 13 16 impl AppState { 14 17 pub async fn new(db: PgPool) -> Self { 15 18 let block_store = PostgresBlockStore::new(db.clone()); 16 19 let blob_store = S3BlobStorage::new().await; 20 + let (firehose_tx, _) = broadcast::channel(1000); 17 21 Self { 18 22 db, 19 23 block_store, 20 24 blob_store: Arc::new(blob_store), 25 + firehose_tx, 21 26 } 22 27 } 23 28 }
+16
src/sync/firehose.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + use serde_json::Value; 3 + use chrono::{DateTime, Utc}; 4 + 5 + #[derive(Debug, Clone, Serialize, Deserialize)] 6 + pub struct SequencedEvent { 7 + pub seq: i64, 8 + pub did: String, 9 + pub created_at: DateTime<Utc>, 10 + pub event_type: String, 11 + pub commit_cid: Option<String>, 12 + pub prev_cid: Option<String>, 13 + pub ops: Option<Value>, 14 + pub blobs: Option<Vec<String>>, 15 + pub blocks_cids: Option<Vec<String>>, 16 + }
+59
src/sync/frame.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + use crate::sync::firehose::SequencedEvent; 3 + 4 + #[derive(Debug, Serialize, Deserialize)] 5 + pub struct Frame { 6 + #[serde(rename = "op")] 7 + pub op: i64, 8 + #[serde(rename = "d")] 9 + pub data: FrameData, 10 + } 11 + 12 + #[derive(Debug, Serialize, Deserialize)] 13 + #[serde(untagged)] 14 + pub enum FrameData { 15 + Commit(Box<CommitFrame>), 16 + } 17 + 18 + #[derive(Debug, Serialize, Deserialize)] 19 + pub struct CommitFrame { 20 + pub seq: i64, 21 + pub rebase: bool, 22 + #[serde(rename = "tooBig")] 23 + pub too_big: bool, 24 + pub repo: String, 25 + pub commit: String, 26 + pub prev: Option<String>, 27 + #[serde(with = "serde_bytes")] 28 + pub blocks: Vec<u8>, 29 + pub ops: Vec<RepoOp>, 30 + pub blobs: Vec<String>, 31 + pub time: String, 32 + } 33 + 34 + #[derive(Debug, Serialize, Deserialize)] 35 + pub struct RepoOp { 36 + pub action: String, 37 + pub path: String, 38 + pub cid: Option<String>, 39 + } 40 + 41 + impl From<SequencedEvent> for CommitFrame { 42 + fn from(event: SequencedEvent) -> Self { 43 + let ops = serde_json::from_value::<Vec<RepoOp>>(event.ops.unwrap_or_default()) 44 + .unwrap_or_else(|_| vec![]); 45 + 46 + CommitFrame { 47 + seq: event.seq, 48 + rebase: false, 49 + too_big: false, 50 + repo: event.did, 51 + commit: event.commit_cid.unwrap_or_default(), 52 + prev: event.prev_cid, 53 + blocks: Vec::new(), 54 + ops, 55 + blobs: event.blobs.unwrap_or_default(), 56 + time: event.created_at.to_rfc3339(), 57 + } 58 + } 59 + }
+53
src/sync/listener.rs
··· 1 + use crate::state::AppState; 2 + use crate::sync::firehose::SequencedEvent; 3 + use sqlx::postgres::PgListener; 4 + use tracing::{error, info, warn}; 5 + 6 + pub async fn start_sequencer_listener(state: AppState) { 7 + tokio::spawn(async move { 8 + info!("Starting sequencer listener background task"); 9 + loop { 10 + if let Err(e) = listen_loop(state.clone()).await { 11 + error!("Sequencer listener failed: {}. Restarting in 5s...", e); 12 + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 13 + } 14 + } 15 + }); 16 + } 17 + 18 + async fn listen_loop(state: AppState) -> anyhow::Result<()> { 19 + let mut listener = PgListener::connect_with(&state.db).await?; 20 + listener.listen("repo_updates").await?; 21 + info!("Connected to Postgres and listening for 'repo_updates'"); 22 + 23 + loop { 24 + let notification = listener.recv().await?; 25 + let payload = notification.payload(); 26 + 27 + let seq_id: i64 = match payload.parse() { 28 + Ok(id) => id, 29 + Err(e) => { 30 + warn!("Received invalid payload in repo_updates: '{}'. Error: {}", payload, e); 31 + continue; 32 + } 33 + }; 34 + 35 + let event = sqlx::query_as!( 36 + SequencedEvent, 37 + r#" 38 + SELECT seq, did, created_at, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids 39 + FROM repo_seq 40 + WHERE seq = $1 41 + "#, 42 + seq_id 43 + ) 44 + .fetch_optional(&state.db) 45 + .await?; 46 + 47 + if let Some(event) = event { 48 + let _ = state.firehose_tx.send(event); 49 + } else { 50 + warn!("Received notification for seq {} but could not find row in repo_seq", seq_id); 51 + } 52 + } 53 + }
+8 -1
src/sync/mod.rs
··· 2 2 pub mod car; 3 3 pub mod commit; 4 4 pub mod crawl; 5 + pub mod firehose; 6 + pub mod frame; 7 + pub mod listener; 8 + pub mod relay_client; 5 9 pub mod repo; 10 + pub mod subscribe_repos; 11 + pub mod util; 6 12 7 13 pub use blob::{get_blob, list_blobs}; 8 14 pub use commit::{get_latest_commit, get_repo_status, list_repos}; 9 15 pub use crawl::{notify_of_update, request_crawl}; 10 - pub use repo::{get_blocks, get_record, get_repo}; 16 + pub use repo::{get_blocks, get_repo, get_record}; 17 + pub use subscribe_repos::subscribe_repos;
+84
src/sync/relay_client.rs
··· 1 + use crate::state::AppState; 2 + use crate::sync::util::format_event_for_sending; 3 + use futures::{sink::SinkExt, stream::StreamExt}; 4 + use std::time::Duration; 5 + use tokio::sync::mpsc; 6 + use tokio_tungstenite::{connect_async, tungstenite::Message}; 7 + use tracing::{error, info, warn}; 8 + 9 + async fn run_relay_client(state: AppState, url: String, ready_tx: Option<mpsc::Sender<()>>) { 10 + info!("Starting firehose client for relay: {}", url); 11 + loop { 12 + match connect_async(&url).await { 13 + Ok((mut ws_stream, _)) => { 14 + info!("Connected to firehose relay: {}", url); 15 + if let Some(tx) = ready_tx.as_ref() { 16 + tx.send(()).await.ok(); 17 + } 18 + 19 + let mut rx = state.firehose_tx.subscribe(); 20 + 21 + loop { 22 + tokio::select! { 23 + Ok(event) = rx.recv() => { 24 + match format_event_for_sending(&state, event).await { 25 + Ok(bytes) => { 26 + if let Err(e) = ws_stream.send(Message::Binary(bytes.into())).await { 27 + warn!("Failed to send event to {}: {}. Disconnecting.", url, e); 28 + break; 29 + } 30 + } 31 + Err(e) => { 32 + error!("Failed to format event for relay {}: {}", url, e); 33 + } 34 + } 35 + } 36 + Some(msg) = ws_stream.next() => { 37 + if let Ok(Message::Close(_)) = msg { 38 + warn!("Relay {} closed connection.", url); 39 + break; 40 + } 41 + } 42 + else => break, 43 + } 44 + } 45 + } 46 + Err(e) => { 47 + error!("Failed to connect to firehose relay {}: {}", url, e); 48 + } 49 + } 50 + warn!( 51 + "Disconnected from {}. Reconnecting in 5 seconds...", 52 + url 53 + ); 54 + tokio::time::sleep(Duration::from_secs(5)).await; 55 + } 56 + } 57 + 58 + pub async fn start_relay_clients( 59 + state: AppState, 60 + relays: Vec<String>, 61 + mut ready_rx: Option<mpsc::Receiver<()>>, 62 + ) { 63 + if relays.is_empty() { 64 + return; 65 + } 66 + 67 + let (ready_tx, mut internal_ready_rx) = mpsc::channel(1); 68 + 69 + for url in relays { 70 + let ready_tx = if ready_rx.is_some() { 71 + Some(ready_tx.clone()) 72 + } else { 73 + None 74 + }; 75 + tokio::spawn(run_relay_client(state.clone(), url, ready_tx)); 76 + } 77 + 78 + if let Some(mut rx) = ready_rx.take() { 79 + tokio::spawn(async move { 80 + internal_ready_rx.recv().await; 81 + rx.close(); 82 + }); 83 + } 84 + }
+172 -444
src/sync/repo.rs
··· 1 1 use crate::state::AppState; 2 - use crate::sync::car::{encode_car_header, ld_write}; 2 + use crate::sync::car::encode_car_header; 3 3 use axum::{ 4 - Json, 5 - body::Body, 6 4 extract::{Query, State}, 7 5 http::StatusCode, 8 - http::header, 9 6 response::{IntoResponse, Response}, 7 + Json, 10 8 }; 11 - use bytes::Bytes; 12 9 use cid::Cid; 13 - use jacquard_repo::{commit::Commit, storage::BlockStore}; 10 + use jacquard_repo::storage::BlockStore; 14 11 use serde::Deserialize; 15 12 use serde_json::json; 16 - use std::collections::HashSet; 13 + use std::io::Write; 14 + use std::str::FromStr; 17 15 use tracing::error; 18 16 19 17 #[derive(Deserialize)] 20 - pub struct GetBlocksParams { 18 + pub struct GetBlocksQuery { 21 19 pub did: String, 22 20 pub cids: String, 23 21 } 24 22 25 23 pub async fn get_blocks( 26 24 State(state): State<AppState>, 27 - Query(params): Query<GetBlocksParams>, 25 + Query(query): Query<GetBlocksQuery>, 28 26 ) -> Response { 29 - let did = params.did.trim(); 27 + let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 28 + .fetch_optional(&state.db) 29 + .await 30 + .unwrap_or(None); 30 31 31 - if did.is_empty() { 32 - return ( 33 - StatusCode::BAD_REQUEST, 34 - Json(json!({"error": "InvalidRequest", "message": "did is required"})), 35 - ) 36 - .into_response(); 32 + if user_exists.is_none() { 33 + return (StatusCode::NOT_FOUND, "Repo not found").into_response(); 37 34 } 38 35 39 - let cid_strings: Vec<&str> = params.cids.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).collect(); 40 - 41 - if cid_strings.is_empty() { 42 - return ( 43 - StatusCode::BAD_REQUEST, 44 - Json(json!({"error": "InvalidRequest", "message": "cids is required"})), 45 - ) 46 - .into_response(); 36 + let cids_str: Vec<&str> = query.cids.split(',').collect(); 37 + let mut cids = Vec::new(); 38 + for s in cids_str { 39 + match Cid::from_str(s) { 40 + Ok(cid) => cids.push(cid), 41 + Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(), 42 + } 47 43 } 48 44 49 - let repo_result = sqlx::query!( 50 - r#" 51 - SELECT r.repo_root_cid 52 - FROM repos r 53 - JOIN users u ON r.user_id = u.id 54 - WHERE u.did = $1 55 - "#, 56 - did 57 - ) 58 - .fetch_optional(&state.db) 59 - .await; 60 - 61 - let repo_root_cid_str = match repo_result { 62 - Ok(Some(row)) => row.repo_root_cid, 63 - Ok(None) => { 64 - return ( 65 - StatusCode::NOT_FOUND, 66 - Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 67 - ) 68 - .into_response(); 69 - } 45 + let blocks_res = state.block_store.get_many(&cids).await; 46 + let blocks = match blocks_res { 47 + Ok(blocks) => blocks, 70 48 Err(e) => { 71 - error!("DB error in get_blocks: {:?}", e); 72 - return ( 73 - StatusCode::INTERNAL_SERVER_ERROR, 74 - Json(json!({"error": "InternalError"})), 75 - ) 76 - .into_response(); 49 + error!("Failed to get blocks: {}", e); 50 + return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response(); 77 51 } 78 52 }; 79 53 80 - let root_cid = match repo_root_cid_str.parse::<Cid>() { 81 - Ok(c) => c, 82 - Err(e) => { 83 - error!("Failed to parse root CID: {:?}", e); 84 - return ( 85 - StatusCode::INTERNAL_SERVER_ERROR, 86 - Json(json!({"error": "InternalError"})), 87 - ) 88 - .into_response(); 89 - } 90 - }; 54 + let root_cid = cids.first().cloned().unwrap_or_default(); 91 55 92 - let mut requested_cids: Vec<Cid> = Vec::new(); 93 - for cid_str in &cid_strings { 94 - match cid_str.parse::<Cid>() { 95 - Ok(c) => requested_cids.push(c), 96 - Err(e) => { 97 - error!("Failed to parse CID '{}': {:?}", cid_str, e); 98 - return ( 99 - StatusCode::BAD_REQUEST, 100 - Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", cid_str)})), 101 - ) 102 - .into_response(); 103 - } 104 - } 56 + if cids.is_empty() { 57 + return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response(); 105 58 } 106 59 107 - let mut buf = Vec::new(); 108 - let car_header = encode_car_header(&root_cid); 109 - if let Err(e) = ld_write(&mut buf, &car_header) { 110 - error!("Failed to write CAR header: {:?}", e); 111 - return ( 112 - StatusCode::INTERNAL_SERVER_ERROR, 113 - Json(json!({"error": "InternalError"})), 114 - ) 115 - .into_response(); 116 - } 60 + let header = encode_car_header(&root_cid); 117 61 118 - for cid in &requested_cids { 119 - let cid_bytes = cid.to_bytes(); 120 - let block_result = sqlx::query!( 121 - "SELECT data FROM blocks WHERE cid = $1", 122 - &cid_bytes 123 - ) 124 - .fetch_optional(&state.db) 125 - .await; 62 + let mut car_bytes = header; 126 63 127 - match block_result { 128 - Ok(Some(row)) => { 129 - let mut block_data = Vec::new(); 130 - block_data.extend_from_slice(&cid_bytes); 131 - block_data.extend_from_slice(&row.data); 132 - if let Err(e) = ld_write(&mut buf, &block_data) { 133 - error!("Failed to write block: {:?}", e); 134 - return ( 135 - StatusCode::INTERNAL_SERVER_ERROR, 136 - Json(json!({"error": "InternalError"})), 137 - ) 138 - .into_response(); 139 - } 140 - } 141 - Ok(None) => { 142 - return ( 143 - StatusCode::NOT_FOUND, 144 - Json(json!({"error": "BlockNotFound", "message": format!("Block not found: {}", cid)})), 145 - ) 146 - .into_response(); 147 - } 148 - Err(e) => { 149 - error!("DB error fetching block: {:?}", e); 150 - return ( 151 - StatusCode::INTERNAL_SERVER_ERROR, 152 - Json(json!({"error": "InternalError"})), 153 - ) 154 - .into_response(); 155 - } 64 + for (i, block_opt) in blocks.into_iter().enumerate() { 65 + if let Some(block) = block_opt { 66 + let cid = cids[i]; 67 + let cid_bytes = cid.to_bytes(); 68 + let total_len = cid_bytes.len() + block.len(); 69 + 70 + let mut writer = Vec::new(); 71 + crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap(); 72 + writer.write_all(&cid_bytes).unwrap(); 73 + writer.write_all(&block).unwrap(); 74 + 75 + car_bytes.extend_from_slice(&writer); 156 76 } 157 77 } 158 78 159 - Response::builder() 160 - .status(StatusCode::OK) 161 - .header(header::CONTENT_TYPE, "application/vnd.ipld.car") 162 - .body(Body::from(buf)) 163 - .unwrap() 79 + ( 80 + StatusCode::OK, 81 + [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 82 + car_bytes, 83 + ) 84 + .into_response() 164 85 } 165 86 166 87 #[derive(Deserialize)] 167 - pub struct GetRepoParams { 88 + pub struct GetRepoQuery { 168 89 pub did: String, 169 90 pub since: Option<String>, 170 91 } 171 92 172 93 pub async fn get_repo( 173 94 State(state): State<AppState>, 174 - Query(params): Query<GetRepoParams>, 95 + Query(query): Query<GetRepoQuery>, 175 96 ) -> Response { 176 - let did = params.did.trim(); 97 + let repo_row = sqlx::query!( 98 + r#" 99 + SELECT r.repo_root_cid 100 + FROM repos r 101 + JOIN users u ON u.id = r.user_id 102 + WHERE u.did = $1 103 + "#, 104 + query.did 105 + ) 106 + .fetch_optional(&state.db) 107 + .await 108 + .unwrap_or(None); 177 109 178 - if did.is_empty() { 179 - return ( 180 - StatusCode::BAD_REQUEST, 181 - Json(json!({"error": "InvalidRequest", "message": "did is required"})), 182 - ) 183 - .into_response(); 184 - } 185 - 186 - let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 187 - .fetch_optional(&state.db) 188 - .await; 189 - 190 - let user_id = match user_result { 191 - Ok(Some(row)) => row.id, 192 - Ok(None) => { 193 - return ( 194 - StatusCode::NOT_FOUND, 195 - Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 196 - ) 197 - .into_response(); 198 - } 199 - Err(e) => { 200 - error!("DB error in get_repo: {:?}", e); 201 - return ( 202 - StatusCode::INTERNAL_SERVER_ERROR, 203 - Json(json!({"error": "InternalError"})), 204 - ) 205 - .into_response(); 206 - } 207 - }; 208 - 209 - let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 210 - .fetch_optional(&state.db) 211 - .await; 110 + let head_str = match repo_row { 111 + Some(r) => r.repo_root_cid, 112 + None => { 113 + let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 114 + .fetch_optional(&state.db) 115 + .await 116 + .unwrap_or(None); 212 117 213 - let repo_root_cid_str = match repo_result { 214 - Ok(Some(row)) => row.repo_root_cid, 215 - Ok(None) => { 216 - return ( 217 - StatusCode::NOT_FOUND, 218 - Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})), 219 - ) 220 - .into_response(); 221 - } 222 - Err(e) => { 223 - error!("DB error in get_repo: {:?}", e); 224 - return ( 225 - StatusCode::INTERNAL_SERVER_ERROR, 226 - Json(json!({"error": "InternalError"})), 227 - ) 228 - .into_response(); 118 + if user_exists.is_none() { 119 + return ( 120 + StatusCode::NOT_FOUND, 121 + Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 122 + ) 123 + .into_response(); 124 + } else { 125 + return ( 126 + StatusCode::NOT_FOUND, 127 + Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 128 + ) 129 + .into_response(); 130 + } 229 131 } 230 132 }; 231 133 232 - let root_cid = match repo_root_cid_str.parse::<Cid>() { 134 + let head_cid = match Cid::from_str(&head_str) { 233 135 Ok(c) => c, 234 - Err(e) => { 235 - error!("Failed to parse root CID: {:?}", e); 136 + Err(_) => { 236 137 return ( 237 138 StatusCode::INTERNAL_SERVER_ERROR, 238 - Json(json!({"error": "InternalError"})), 139 + Json(json!({"error": "InternalError", "message": "Invalid head CID"})), 239 140 ) 240 141 .into_response(); 241 142 } 242 143 }; 243 144 244 - let commit_bytes = match state.block_store.get(&root_cid).await { 245 - Ok(Some(b)) => b, 246 - Ok(None) => { 247 - error!("Commit block not found: {}", root_cid); 248 - return ( 249 - StatusCode::INTERNAL_SERVER_ERROR, 250 - Json(json!({"error": "InternalError"})), 251 - ) 252 - .into_response(); 253 - } 254 - Err(e) => { 255 - error!("Failed to load commit block: {:?}", e); 256 - return ( 257 - StatusCode::INTERNAL_SERVER_ERROR, 258 - Json(json!({"error": "InternalError"})), 259 - ) 260 - .into_response(); 261 - } 262 - }; 145 + let mut car_bytes = encode_car_header(&head_cid); 263 146 264 - let commit = match Commit::from_cbor(&commit_bytes) { 265 - Ok(c) => c, 266 - Err(e) => { 267 - error!("Failed to parse commit: {:?}", e); 268 - return ( 269 - StatusCode::INTERNAL_SERVER_ERROR, 270 - Json(json!({"error": "InternalError"})), 271 - ) 272 - .into_response(); 273 - } 274 - }; 147 + let mut stack = vec![head_cid]; 148 + let mut visited = std::collections::HashSet::new(); 149 + let mut limit = 20000; 275 150 276 - let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new(); 277 - let mut visited: HashSet<Vec<u8>> = HashSet::new(); 151 + while let Some(cid) = stack.pop() { 152 + if visited.contains(&cid) { 153 + continue; 154 + } 155 + visited.insert(cid); 156 + if limit == 0 { break; } 157 + limit -= 1; 278 158 279 - collected_blocks.push((root_cid, commit_bytes.clone())); 280 - visited.insert(root_cid.to_bytes()); 159 + if let Ok(Some(block)) = state.block_store.get(&cid).await { 160 + let cid_bytes = cid.to_bytes(); 161 + let total_len = cid_bytes.len() + block.len(); 162 + let mut writer = Vec::new(); 163 + crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap(); 164 + writer.write_all(&cid_bytes).unwrap(); 165 + writer.write_all(&block).unwrap(); 166 + car_bytes.extend_from_slice(&writer); 281 167 282 - let mst_root_cid = commit.data; 283 - if !visited.contains(&mst_root_cid.to_bytes()) { 284 - visited.insert(mst_root_cid.to_bytes()); 285 - if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await { 286 - collected_blocks.push((mst_root_cid, data)); 168 + if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 169 + extract_links_json(&value, &mut stack); 170 + } 287 171 } 288 172 } 289 173 290 - let records = sqlx::query!("SELECT record_cid FROM records WHERE repo_id = $1", user_id) 291 - .fetch_all(&state.db) 292 - .await 293 - .unwrap_or_default(); 174 + ( 175 + StatusCode::OK, 176 + [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 177 + car_bytes, 178 + ) 179 + .into_response() 180 + } 294 181 295 - for record in records { 296 - if let Ok(cid) = record.record_cid.parse::<Cid>() { 297 - if !visited.contains(&cid.to_bytes()) { 298 - visited.insert(cid.to_bytes()); 299 - if let Ok(Some(data)) = state.block_store.get(&cid).await { 300 - collected_blocks.push((cid, data)); 182 + fn extract_links_json(value: &serde_json::Value, stack: &mut Vec<Cid>) { 183 + match value { 184 + serde_json::Value::Object(map) => { 185 + if let Some(serde_json::Value::String(s)) = map.get("/") { 186 + if let Ok(cid) = Cid::from_str(s) { 187 + stack.push(cid); 188 + } 189 + } else if let Some(serde_json::Value::String(s)) = map.get("$link") { 190 + if let Ok(cid) = Cid::from_str(s) { 191 + stack.push(cid); 192 + } 193 + } else { 194 + for v in map.values() { 195 + extract_links_json(v, stack); 301 196 } 302 197 } 303 198 } 304 - } 305 - 306 - let mut buf = Vec::new(); 307 - let car_header = encode_car_header(&root_cid); 308 - if let Err(e) = ld_write(&mut buf, &car_header) { 309 - error!("Failed to write CAR header: {:?}", e); 310 - return ( 311 - StatusCode::INTERNAL_SERVER_ERROR, 312 - Json(json!({"error": "InternalError"})), 313 - ) 314 - .into_response(); 315 - } 316 - 317 - for (cid, data) in &collected_blocks { 318 - let mut block_data = Vec::new(); 319 - block_data.extend_from_slice(&cid.to_bytes()); 320 - block_data.extend_from_slice(data); 321 - if let Err(e) = ld_write(&mut buf, &block_data) { 322 - error!("Failed to write block: {:?}", e); 323 - return ( 324 - StatusCode::INTERNAL_SERVER_ERROR, 325 - Json(json!({"error": "InternalError"})), 326 - ) 327 - .into_response(); 199 + serde_json::Value::Array(arr) => { 200 + for v in arr { 201 + extract_links_json(v, stack); 202 + } 328 203 } 204 + _ => {} 329 205 } 330 - 331 - Response::builder() 332 - .status(StatusCode::OK) 333 - .header(header::CONTENT_TYPE, "application/vnd.ipld.car") 334 - .body(Body::from(buf)) 335 - .unwrap() 336 206 } 337 207 338 208 #[derive(Deserialize)] 339 - pub struct GetRecordParams { 209 + pub struct GetRecordQuery { 340 210 pub did: String, 341 211 pub collection: String, 342 212 pub rkey: String, ··· 344 214 345 215 pub async fn get_record( 346 216 State(state): State<AppState>, 347 - Query(params): Query<GetRecordParams>, 217 + Query(query): Query<GetRecordQuery>, 348 218 ) -> Response { 349 - let did = params.did.trim(); 350 - let collection = params.collection.trim(); 351 - let rkey = params.rkey.trim(); 352 - 353 - if did.is_empty() { 354 - return ( 355 - StatusCode::BAD_REQUEST, 356 - Json(json!({"error": "InvalidRequest", "message": "did is required"})), 357 - ) 358 - .into_response(); 359 - } 360 - 361 - if collection.is_empty() { 362 - return ( 363 - StatusCode::BAD_REQUEST, 364 - Json(json!({"error": "InvalidRequest", "message": "collection is required"})), 365 - ) 366 - .into_response(); 367 - } 368 - 369 - if rkey.is_empty() { 370 - return ( 371 - StatusCode::BAD_REQUEST, 372 - Json(json!({"error": "InvalidRequest", "message": "rkey is required"})), 373 - ) 374 - .into_response(); 375 - } 376 - 377 - let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 219 + let user = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 378 220 .fetch_optional(&state.db) 379 - .await; 221 + .await 222 + .unwrap_or(None); 380 223 381 - let user_id = match user_result { 382 - Ok(Some(row)) => row.id, 383 - Ok(None) => { 384 - return ( 224 + let user_id = match user { 225 + Some(u) => u.id, 226 + None => { 227 + return ( 385 228 StatusCode::NOT_FOUND, 386 - Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 387 - ) 388 - .into_response(); 389 - } 390 - Err(e) => { 391 - error!("DB error in sync get_record: {:?}", e); 392 - return ( 393 - StatusCode::INTERNAL_SERVER_ERROR, 394 - Json(json!({"error": "InternalError"})), 229 + Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 395 230 ) 396 231 .into_response(); 397 232 } 398 233 }; 399 234 400 - let record_result = sqlx::query!( 235 + let record = sqlx::query!( 401 236 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 402 237 user_id, 403 - collection, 404 - rkey 238 + query.collection, 239 + query.rkey 405 240 ) 406 241 .fetch_optional(&state.db) 407 - .await; 242 + .await 243 + .unwrap_or(None); 408 244 409 - let record_cid_str = match record_result { 410 - Ok(Some(row)) => row.record_cid, 411 - Ok(None) => { 412 - return ( 245 + let record_cid_str = match record { 246 + Some(r) => r.record_cid, 247 + None => { 248 + return ( 413 249 StatusCode::NOT_FOUND, 414 250 Json(json!({"error": "RecordNotFound", "message": "Record not found"})), 415 251 ) 416 252 .into_response(); 417 253 } 418 - Err(e) => { 419 - error!("DB error in sync get_record: {:?}", e); 420 - return ( 421 - StatusCode::INTERNAL_SERVER_ERROR, 422 - Json(json!({"error": "InternalError"})), 423 - ) 424 - .into_response(); 425 - } 426 254 }; 427 255 428 - let record_cid = match record_cid_str.parse::<Cid>() { 256 + let cid = match Cid::from_str(&record_cid_str) { 429 257 Ok(c) => c, 430 - Err(e) => { 431 - error!("Failed to parse record CID: {:?}", e); 432 - return ( 433 - StatusCode::INTERNAL_SERVER_ERROR, 434 - Json(json!({"error": "InternalError"})), 435 - ) 436 - .into_response(); 437 - } 258 + Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "Invalid CID").into_response(), 438 259 }; 439 260 440 - let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 441 - .fetch_optional(&state.db) 442 - .await; 443 - 444 - let repo_root_cid_str = match repo_result { 445 - Ok(Some(row)) => row.repo_root_cid, 446 - Ok(None) => { 447 - return ( 448 - StatusCode::NOT_FOUND, 449 - Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})), 450 - ) 451 - .into_response(); 452 - } 453 - Err(e) => { 454 - error!("DB error in sync get_record: {:?}", e); 455 - return ( 456 - StatusCode::INTERNAL_SERVER_ERROR, 457 - Json(json!({"error": "InternalError"})), 458 - ) 459 - .into_response(); 460 - } 461 - }; 462 - 463 - let root_cid = match repo_root_cid_str.parse::<Cid>() { 464 - Ok(c) => c, 465 - Err(e) => { 466 - error!("Failed to parse root CID: {:?}", e); 467 - return ( 468 - StatusCode::INTERNAL_SERVER_ERROR, 469 - Json(json!({"error": "InternalError"})), 470 - ) 471 - .into_response(); 472 - } 473 - }; 474 - 475 - let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new(); 476 - 477 - let commit_bytes = match state.block_store.get(&root_cid).await { 261 + let block_res = state.block_store.get(&cid).await; 262 + let block = match block_res { 478 263 Ok(Some(b)) => b, 479 - Ok(None) => { 480 - error!("Commit block not found: {}", root_cid); 481 - return ( 482 - StatusCode::INTERNAL_SERVER_ERROR, 483 - Json(json!({"error": "InternalError"})), 484 - ) 485 - .into_response(); 486 - } 487 - Err(e) => { 488 - error!("Failed to load commit block: {:?}", e); 489 - return ( 490 - StatusCode::INTERNAL_SERVER_ERROR, 491 - Json(json!({"error": "InternalError"})), 492 - ) 493 - .into_response(); 494 - } 264 + _ => return (StatusCode::NOT_FOUND, "Block not found").into_response(), 495 265 }; 496 266 497 - collected_blocks.push((root_cid, commit_bytes.clone())); 267 + let header = encode_car_header(&cid); 268 + let mut car_bytes = header; 498 269 499 - let commit = match Commit::from_cbor(&commit_bytes) { 500 - Ok(c) => c, 501 - Err(e) => { 502 - error!("Failed to parse commit: {:?}", e); 503 - return ( 504 - StatusCode::INTERNAL_SERVER_ERROR, 505 - Json(json!({"error": "InternalError"})), 506 - ) 507 - .into_response(); 508 - } 509 - }; 270 + let cid_bytes = cid.to_bytes(); 271 + let total_len = cid_bytes.len() + block.len(); 272 + let mut writer = Vec::new(); 273 + crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap(); 274 + writer.write_all(&cid_bytes).unwrap(); 275 + writer.write_all(&block).unwrap(); 276 + car_bytes.extend_from_slice(&writer); 510 277 511 - let mst_root_cid = commit.data; 512 - if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await { 513 - collected_blocks.push((mst_root_cid, data)); 514 - } 515 - 516 - if let Ok(Some(data)) = state.block_store.get(&record_cid).await { 517 - collected_blocks.push((record_cid, data)); 518 - } else { 519 - return ( 520 - StatusCode::NOT_FOUND, 521 - Json(json!({"error": "RecordNotFound", "message": "Record block not found"})), 522 - ) 523 - .into_response(); 524 - } 525 - 526 - let mut buf = Vec::new(); 527 - let car_header = encode_car_header(&root_cid); 528 - if let Err(e) = ld_write(&mut buf, &car_header) { 529 - error!("Failed to write CAR header: {:?}", e); 530 - return ( 531 - StatusCode::INTERNAL_SERVER_ERROR, 532 - Json(json!({"error": "InternalError"})), 533 - ) 534 - .into_response(); 535 - } 536 - 537 - for (cid, data) in &collected_blocks { 538 - let mut block_data = Vec::new(); 539 - block_data.extend_from_slice(&cid.to_bytes()); 540 - block_data.extend_from_slice(data); 541 - if let Err(e) = ld_write(&mut buf, &block_data) { 542 - error!("Failed to write block: {:?}", e); 543 - return ( 544 - StatusCode::INTERNAL_SERVER_ERROR, 545 - Json(json!({"error": "InternalError"})), 546 - ) 547 - .into_response(); 548 - } 549 - } 550 - 551 - Response::builder() 552 - .status(StatusCode::OK) 553 - .header(header::CONTENT_TYPE, "application/vnd.ipld.car") 554 - .body(Body::from(buf)) 555 - .unwrap() 278 + ( 279 + StatusCode::OK, 280 + [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 281 + car_bytes, 282 + ) 283 + .into_response() 556 284 }
+91
src/sync/subscribe_repos.rs
··· 1 + use crate::state::AppState; 2 + use crate::sync::firehose::SequencedEvent; 3 + use crate::sync::util::format_event_for_sending; 4 + use axum::{ 5 + extract::{ws::Message, ws::WebSocket, ws::WebSocketUpgrade, Query, State}, 6 + response::Response, 7 + }; 8 + use futures::{sink::SinkExt, stream::StreamExt}; 9 + use serde::Deserialize; 10 + use tracing::{error, info, warn}; 11 + 12 + #[derive(Deserialize)] 13 + pub struct SubscribeReposParams { 14 + pub cursor: Option<i64>, 15 + } 16 + 17 + #[axum::debug_handler] 18 + pub async fn subscribe_repos( 19 + ws: WebSocketUpgrade, 20 + State(state): State<AppState>, 21 + Query(params): Query<SubscribeReposParams>, 22 + ) -> Response { 23 + ws.on_upgrade(move |socket| handle_socket(socket, state, params)) 24 + } 25 + 26 + async fn send_event( 27 + socket: &mut WebSocket, 28 + state: &AppState, 29 + event: SequencedEvent, 30 + ) -> Result<(), anyhow::Error> { 31 + let bytes = format_event_for_sending(state, event).await?; 32 + socket.send(Message::Binary(bytes.into())).await?; 33 + Ok(()) 34 + } 35 + 36 + async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) { 37 + info!(cursor = ?params.cursor, "New firehose subscriber"); 38 + 39 + if let Some(cursor) = params.cursor { 40 + let events = sqlx::query_as!( 41 + SequencedEvent, 42 + r#" 43 + SELECT seq, did, created_at, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids 44 + FROM repo_seq 45 + WHERE seq > $1 46 + ORDER BY seq ASC 47 + "#, 48 + cursor 49 + ) 50 + .fetch_all(&state.db) 51 + .await; 52 + 53 + match events { 54 + Ok(events) => { 55 + for event in events { 56 + if let Err(e) = send_event(&mut socket, &state, event).await { 57 + warn!("Failed to send backfill event: {}", e); 58 + return; 59 + } 60 + } 61 + } 62 + Err(e) => { 63 + error!("Failed to fetch backfill events: {}", e); 64 + socket.close().await.ok(); 65 + return; 66 + } 67 + } 68 + } 69 + 70 + let mut rx = state.firehose_tx.subscribe(); 71 + 72 + loop { 73 + tokio::select! { 74 + Ok(event) = rx.recv() => { 75 + if let Err(e) = send_event(&mut socket, &state, event).await { 76 + warn!("Failed to send event: {}", e); 77 + break; 78 + } 79 + } 80 + Some(Ok(msg)) = socket.next() => { 81 + if let Message::Close(_) = msg { 82 + info!("Client closed connection"); 83 + break; 84 + } 85 + } 86 + else => { 87 + break; 88 + } 89 + } 90 + } 91 + }
+52
src/sync/util.rs
··· 1 + use crate::state::AppState; 2 + use crate::sync::firehose::SequencedEvent; 3 + use crate::sync::frame::{CommitFrame, Frame, FrameData}; 4 + use cid::Cid; 5 + use jacquard_repo::car::write_car; 6 + use jacquard_repo::storage::BlockStore; 7 + use std::fs; 8 + use std::str::FromStr; 9 + use tokio::fs::File; 10 + use tokio::io::AsyncReadExt; 11 + use uuid::Uuid; 12 + 13 + pub async fn format_event_for_sending( 14 + state: &AppState, 15 + event: SequencedEvent, 16 + ) -> Result<Vec<u8>, anyhow::Error> { 17 + let block_cids_str = event.blocks_cids.clone().unwrap_or_default(); 18 + let mut frame: CommitFrame = event.into(); 19 + 20 + let mut car_bytes = Vec::new(); 21 + if !block_cids_str.is_empty() { 22 + let temp_path = format!("/tmp/{}.car", Uuid::new_v4()); 23 + let mut blocks = std::collections::BTreeMap::new(); 24 + 25 + for cid_str in block_cids_str { 26 + let cid = Cid::from_str(&cid_str)?; 27 + let data = state 28 + .block_store 29 + .get(&cid) 30 + .await? 31 + .ok_or_else(|| anyhow::anyhow!("Block not found: {}", cid))?; 32 + blocks.insert(cid, data); 33 + } 34 + 35 + let root = Cid::from_str(&frame.commit)?; 36 + write_car(&temp_path, vec![root], blocks).await?; 37 + 38 + let mut file = File::open(&temp_path).await?; 39 + file.read_to_end(&mut car_bytes).await?; 40 + fs::remove_file(&temp_path)?; 41 + } 42 + frame.blocks = car_bytes; 43 + 44 + let frame = Frame { 45 + op: 1, 46 + data: FrameData::Commit(Box::new(frame)), 47 + }; 48 + 49 + let mut bytes = Vec::new(); 50 + serde_ipld_dagcbor::to_writer(&mut bytes, &frame)?; 51 + Ok(bytes) 52 + }
+11 -1
tests/common/mod.rs
··· 19 19 use wiremock::{Mock, MockServer, ResponseTemplate}; 20 20 21 21 static SERVER_URL: OnceLock<String> = OnceLock::new(); 22 + static APP_PORT: OnceLock<u16> = OnceLock::new(); 22 23 static DB_CONTAINER: OnceLock<ContainerAsync<Postgres>> = OnceLock::new(); 23 24 static S3_CONTAINER: OnceLock<ContainerAsync<GenericImage>> = OnceLock::new(); 24 25 static MOCK_APPVIEW: OnceLock<MockServer> = OnceLock::new(); ··· 51 52 #[allow(dead_code)] 52 53 pub fn client() -> Client { 53 54 Client::new() 55 + } 56 + 57 + #[allow(dead_code)] 58 + pub fn app_port() -> u16 { 59 + *APP_PORT.get().expect("APP_PORT not initialized") 54 60 } 55 61 56 62 pub async fn base_url() -> &'static str { ··· 153 159 .await 154 160 .expect("Failed to start Postgres"); 155 161 let connection_string = format!( 156 - "postgres://postgres:postgres@127.0.0.1:{}/postgres", 162 + "postgres://postgres:postgres@127.0.0.1:{}", 157 163 container 158 164 .get_host_port_ipv4(5432) 159 165 .await ··· 186 192 187 193 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 188 194 let addr = listener.local_addr().unwrap(); 195 + APP_PORT.set(addr.port()).ok(); 189 196 190 197 unsafe { 191 198 std::env::set_var("PDS_HOSTNAME", addr.to_string()); 192 199 } 193 200 194 201 let state = AppState::new(pool).await; 202 + 203 + bspds::sync::listener::start_sequencer_listener(state.clone()).await; 204 + 195 205 let app = bspds::app(state); 196 206 197 207 tokio::spawn(async move {
+81
tests/firehose.rs
··· 1 + mod common; 2 + use common::*; 3 + 4 + use bspds::sync::frame::{Frame, FrameData}; 5 + use cid::Cid; 6 + use futures::{stream::StreamExt, SinkExt}; 7 + use iroh_car::CarReader; 8 + use reqwest::StatusCode; 9 + use serde_json::{json, Value}; 10 + use std::io::Cursor; 11 + use std::str::FromStr; 12 + use tokio_tungstenite::{connect_async, tungstenite}; 13 + 14 + #[tokio::test] 15 + async fn test_firehose_subscription() { 16 + let client = client(); 17 + let (token, did) = create_account_and_login(&client).await; 18 + 19 + let url = format!( 20 + "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 21 + app_port() 22 + ); 23 + let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 24 + 25 + let post_text = "Hello from the firehose test!"; 26 + let post_payload = json!({ 27 + "repo": did, 28 + "collection": "app.bsky.feed.post", 29 + "record": { 30 + "$type": "app.bsky.feed.post", 31 + "text": post_text, 32 + "createdAt": chrono::Utc::now().to_rfc3339(), 33 + } 34 + }); 35 + let res = client 36 + .post(format!( 37 + "{}/xrpc/com.atproto.repo.createRecord", 38 + base_url().await 39 + )) 40 + .bearer_auth(token) 41 + .json(&post_payload) 42 + .send() 43 + .await 44 + .expect("Failed to create post"); 45 + assert_eq!(res.status(), StatusCode::OK); 46 + 47 + let msg = ws_stream.next().await.unwrap().unwrap(); 48 + 49 + let frame: Frame = match msg { 50 + tungstenite::Message::Binary(bin) => { 51 + serde_ipld_dagcbor::from_slice(&bin).expect("Failed to deserialize frame") 52 + } 53 + _ => panic!("Expected binary message"), 54 + }; 55 + 56 + let FrameData::Commit(commit) = frame.data; 57 + assert_eq!(commit.repo, did); 58 + assert_eq!(commit.ops.len(), 1); 59 + assert!(!commit.blocks.is_empty()); 60 + 61 + let op = &commit.ops[0]; 62 + let record_cid = Cid::from_str(&op.cid.clone().unwrap()).unwrap(); 63 + 64 + let mut car_reader = CarReader::new(Cursor::new(&commit.blocks)).await.unwrap(); 65 + let mut record_block: Option<Vec<u8>> = None; 66 + while let Ok(Some((cid, block))) = car_reader.next_block().await { 67 + if cid == record_cid { 68 + record_block = Some(block); 69 + break; 70 + } 71 + } 72 + let record_block = record_block.expect("Record block not found in CAR"); 73 + 74 + let record: Value = serde_ipld_dagcbor::from_slice(&record_block).unwrap(); 75 + assert_eq!(record["text"], post_text); 76 + 77 + ws_stream 78 + .send(tungstenite::Message::Close(None)) 79 + .await 80 + .ok(); 81 + }
+68
tests/relay_client.rs
··· 1 + mod common; 2 + use common::*; 3 + 4 + use axum::{extract::ws::Message, routing::get, Router}; 5 + use bspds::{ 6 + state::AppState, 7 + sync::{firehose::SequencedEvent, relay_client::start_relay_clients}, 8 + }; 9 + use chrono::Utc; 10 + use tokio::net::TcpListener; 11 + use tokio::sync::mpsc; 12 + 13 + async fn mock_relay_server( 14 + listener: TcpListener, 15 + event_tx: mpsc::Sender<Vec<u8>>, 16 + ready_tx: mpsc::Sender<()>, 17 + ) { 18 + let handler = |ws: axum::extract::ws::WebSocketUpgrade| async { 19 + ws.on_upgrade(move |mut socket| async move { 20 + ready_tx.send(()).await.unwrap(); 21 + if let Some(Ok(Message::Binary(bytes))) = socket.recv().await { 22 + event_tx.send(bytes.to_vec()).await.unwrap(); 23 + } 24 + }) 25 + }; 26 + let app = Router::new().route("/", get(handler)); 27 + 28 + axum::serve(listener, app.into_make_service()) 29 + .await 30 + .unwrap(); 31 + } 32 + 33 + #[tokio::test] 34 + async fn test_outbound_relay_client() { 35 + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 36 + let addr = listener.local_addr().unwrap(); 37 + let (event_tx, mut event_rx) = mpsc::channel(1); 38 + let (ready_tx, ready_rx) = mpsc::channel(1); 39 + tokio::spawn(mock_relay_server(listener, event_tx, ready_tx)); 40 + let relay_url = format!("ws://{}", addr); 41 + 42 + let db_url = get_db_connection_string().await; 43 + let pool = sqlx::postgres::PgPoolOptions::new() 44 + .connect(&db_url) 45 + .await 46 + .unwrap(); 47 + let state = AppState::new(pool).await; 48 + 49 + start_relay_clients(state.clone(), vec![relay_url], Some(ready_rx)).await; 50 + 51 + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 52 + 53 + let dummy_event = SequencedEvent { 54 + seq: 1, 55 + did: "did:plc:test".to_string(), 56 + created_at: Utc::now(), 57 + event_type: "commit".to_string(), 58 + commit_cid: None, 59 + prev_cid: None, 60 + ops: None, 61 + blobs: None, 62 + blocks_cids: None, 63 + }; 64 + state.firehose_tx.send(dummy_event).unwrap(); 65 + 66 + let received_bytes = event_rx.recv().await.expect("Did not receive event"); 67 + assert!(!received_bytes.is_empty()); 68 + }