tracks lexicons and how many times they appeared on the jetstream

feat(server): implement fetching hits

ptr.pet e020b36f e8dfb4a1

verified
+246 -12
+144
server/Cargo.lock
··· 40 40 ] 41 41 42 42 [[package]] 43 + name = "alloc-no-stdlib" 44 + version = "2.0.4" 45 + source = "registry+https://github.com/rust-lang/crates.io-index" 46 + checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" 47 + 48 + [[package]] 49 + name = "alloc-stdlib" 50 + version = "0.2.2" 51 + source = "registry+https://github.com/rust-lang/crates.io-index" 52 + checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" 53 + dependencies = [ 54 + "alloc-no-stdlib", 55 + ] 56 + 57 + [[package]] 43 58 name = "anyhow" 44 59 version = "1.0.98" 45 60 source = "registry+https://github.com/rust-lang/crates.io-index" 46 61 checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" 47 62 48 63 [[package]] 64 + name = "async-compression" 65 + version = "0.4.25" 66 + source = "registry+https://github.com/rust-lang/crates.io-index" 67 + checksum = "40f6024f3f856663b45fd0c9b6f2024034a702f453549449e0d84a305900dad4" 68 + dependencies = [ 69 + "brotli", 70 + "flate2", 71 + "futures-core", 72 + "memchr", 73 + "pin-project-lite", 74 + "tokio", 75 + "zstd", 76 + "zstd-safe", 77 + ] 78 + 79 + [[package]] 49 80 name = "async-trait" 50 81 version = "0.1.88" 51 82 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 76 107 dependencies = [ 77 108 "axum-core", 78 109 "bytes", 110 + "form_urlencoded", 79 111 "futures-util", 80 112 "http", 81 113 "http-body", ··· 92 124 "serde", 93 125 "serde_json", 94 126 "serde_path_to_error", 127 + "serde_urlencoded", 95 128 "sync_wrapper", 96 129 "tokio", 97 130 "tower", ··· 183 216 ] 184 217 185 218 [[package]] 219 + name = "brotli" 220 + version = "8.0.1" 221 + source = "registry+https://github.com/rust-lang/crates.io-index" 222 + checksum = "9991eea70ea4f293524138648e41ee89b0b2b12ddef3b255effa43c8056e0e0d" 223 + dependencies = [ 224 + "alloc-no-stdlib", 225 + "alloc-stdlib", 226 + "brotli-decompressor", 227 + ] 228 + 229 + [[package]] 230 + name = "brotli-decompressor" 231 + version = "5.0.0" 232 + source = "registry+https://github.com/rust-lang/crates.io-index" 233 + checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" 234 + dependencies = [ 235 + "alloc-no-stdlib", 236 + "alloc-stdlib", 237 + ] 238 + 239 + [[package]] 186 240 name = "bumpalo" 187 241 version = "3.19.0" 188 242 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 235 289 source = "registry+https://github.com/rust-lang/crates.io-index" 236 290 checksum = "deec109607ca693028562ed836a5f1c4b8bd77755c4e132fc5ce11b0b6211ae7" 237 291 dependencies = [ 292 + "jobserver", 293 + "libc", 238 294 "shlex", 239 295 ] 240 296 ··· 295 351 checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" 296 352 dependencies = [ 297 353 "libc", 354 + ] 355 + 356 + [[package]] 357 + name = "crc32fast" 358 + version = "1.5.0" 359 + source = "registry+https://github.com/rust-lang/crates.io-index" 360 + checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" 361 + dependencies = [ 362 + "cfg-if", 298 363 ] 299 364 300 365 [[package]] ··· 414 479 ] 415 480 416 481 [[package]] 482 + name = "flate2" 483 + version = "1.1.2" 484 + source = "registry+https://github.com/rust-lang/crates.io-index" 485 + checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d" 486 + dependencies = [ 487 + "crc32fast", 488 + "miniz_oxide", 489 + ] 490 + 491 + [[package]] 417 492 name = "fnv" 418 493 version = "1.0.7" 419 494 source = "registry+https://github.com/rust-lang/crates.io-index" 420 495 checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" 496 + 497 + [[package]] 498 + name = "form_urlencoded" 499 + version = "1.2.1" 500 + source = "registry+https://github.com/rust-lang/crates.io-index" 501 + checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" 502 + dependencies = [ 503 + "percent-encoding", 504 + ] 421 505 422 506 [[package]] 423 507 name = "futures-channel" ··· 689 773 checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" 690 774 691 775 [[package]] 776 + name = "jobserver" 777 + version = "0.1.33" 778 + source = "registry+https://github.com/rust-lang/crates.io-index" 779 + checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a" 780 + dependencies = [ 781 + "getrandom 0.3.3", 782 + "libc", 783 + ] 784 + 785 + [[package]] 692 786 name = "js-sys" 693 787 version = "0.3.77" 694 788 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 944 1038 dependencies = [ 945 1039 "ahash", 946 1040 ] 1041 + 1042 + [[package]] 1043 + name = "pkg-config" 1044 + version = "0.3.32" 1045 + source = "registry+https://github.com/rust-lang/crates.io-index" 1046 + checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 947 1047 948 1048 [[package]] 949 1049 name = "proc-macro2" ··· 1331 1431 ] 1332 1432 1333 1433 [[package]] 1434 + name = "serde_urlencoded" 1435 + version = "0.7.1" 1436 + source = "registry+https://github.com/rust-lang/crates.io-index" 1437 + checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" 1438 + dependencies = [ 1439 + "form_urlencoded", 1440 + "itoa", 1441 + "ryu", 1442 + "serde", 1443 + ] 1444 + 1445 + [[package]] 1334 1446 name = "server" 1335 1447 version = "0.1.0" 1336 1448 dependencies = [ ··· 1633 1745 source = "registry+https://github.com/rust-lang/crates.io-index" 1634 1746 checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" 1635 1747 dependencies = [ 1748 + "async-compression", 1636 1749 "bitflags", 1637 1750 "bytes", 1751 + "futures-core", 1638 1752 "http", 1639 1753 "http-body", 1640 1754 "pin-project-lite", 1755 + "tokio", 1756 + "tokio-util", 1641 1757 "tower-layer", 1642 1758 "tower-service", 1643 1759 "tracing", ··· 2165 2281 version = "1.8.1" 2166 2282 source = "registry+https://github.com/rust-lang/crates.io-index" 2167 2283 checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" 2284 + 2285 + [[package]] 2286 + name = "zstd" 2287 + version = "0.13.3" 2288 + source = "registry+https://github.com/rust-lang/crates.io-index" 2289 + checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" 2290 + dependencies = [ 2291 + "zstd-safe", 2292 + ] 2293 + 2294 + [[package]] 2295 + name = "zstd-safe" 2296 + version = "7.2.4" 2297 + source = "registry+https://github.com/rust-lang/crates.io-index" 2298 + checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" 2299 + dependencies = [ 2300 + "zstd-sys", 2301 + ] 2302 + 2303 + [[package]] 2304 + name = "zstd-sys" 2305 + version = "2.0.15+zstd.1.5.7" 2306 + source = "registry+https://github.com/rust-lang/crates.io-index" 2307 + checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237" 2308 + dependencies = [ 2309 + "cc", 2310 + "pkg-config", 2311 + ]
+2 -2
server/Cargo.toml
··· 13 13 rustls = { version = "0.23", default-features = false, features = ["log", "ring", "std"] } 14 14 tokio-websockets = { version = "0.12", features = ["client", "rustls-platform-verifier", "getrandom", "ring"] } 15 15 futures-util = "0.3" 16 - axum = { version = "0.8", default-features = false, features = ["http1", "tokio", "tracing", "json"] } 16 + axum = { version = "0.8", default-features = false, features = ["http1", "tokio", "tracing", "json", "query"] } 17 17 axum-tws = { git = "https://github.com/90-008/axum-tws.git", features = ["http2"] } 18 18 pingora-limits = "0.5" 19 - tower-http = {version = "0.6", features = ["request-id", "trace"]} 19 + tower-http = {version = "0.6", features = ["request-id", "trace", "compression-full"]} 20 20 fjall = { version = "2", default-features = false, features = ["miniz", "lz4"] } 21 21 rkyv = {version = "0.8", features = ["unaligned"]} 22 22 smol_str = { version = "0.3", features = ["serde"] }
+61 -3
server/src/api.rs
··· 1 1 use std::{ 2 - collections::HashMap, fmt::Display, net::SocketAddr, ops::Deref, sync::Arc, time::Duration, 2 + collections::HashMap, 3 + fmt::Display, 4 + net::SocketAddr, 5 + ops::Deref, 6 + sync::Arc, 7 + time::{Duration, UNIX_EPOCH}, 3 8 }; 4 9 5 10 use anyhow::anyhow; 6 - use axum::{Json, Router, extract::State, http::Request, response::Response, routing::get}; 11 + use axum::{ 12 + Json, Router, 13 + extract::{Query, State}, 14 + http::Request, 15 + response::Response, 16 + routing::get, 17 + }; 7 18 use axum_tws::{Message, WebSocketUpgrade}; 8 - use serde::Serialize; 19 + use serde::{Deserialize, Serialize}; 9 20 use smol_str::SmolStr; 10 21 use tokio_util::sync::CancellationToken; 11 22 use tower_http::{ 12 23 classify::ServerErrorsFailureClass, 24 + compression::CompressionLayer, 13 25 request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer}, 14 26 trace::TraceLayer, 15 27 }; ··· 38 50 let app = Router::new() 39 51 .route("/events", get(events)) 40 52 .route("/stream_events", get(stream_events)) 53 + .route("/hits", get(hits)) 54 + .route_layer(CompressionLayer::new().br(true).deflate(true).gzip(true).zstd(true)) 41 55 .route_layer(PropagateRequestIdLayer::x_request_id()) 42 56 .route_layer( 43 57 TraceLayer::new_for_http() ··· 122 136 events, 123 137 per_second: db.eps(), 124 138 })) 139 + } 140 + 141 + #[derive(Debug, Deserialize)] 142 + struct HitsQuery { 143 + nsid: SmolStr, 144 + from: Option<u64>, 145 + to: Option<u64>, 146 + } 147 + 148 + #[derive(Serialize)] 149 + struct Hit { 150 + timestamp: u64, 151 + deleted: bool, 152 + } 153 + 154 + const MAX_HITS: usize = 100_000; 155 + 156 + async fn hits( 157 + State(db): State<Arc<Db>>, 158 + Query(params): Query<HitsQuery>, 159 + ) -> AppResult<Json<Vec<Hit>>> { 160 + let maybe_hits = db 161 + .get_hits( 162 + &params.nsid, 163 + params.to.unwrap_or(0) 164 + ..params.from.unwrap_or( 165 + std::time::SystemTime::now() 166 + .duration_since(UNIX_EPOCH) 167 + .expect("oops") 168 + .as_micros() as u64, 169 + ), 170 + )? 171 + .take(MAX_HITS); 172 + let mut hits = Vec::with_capacity(maybe_hits.size_hint().0); 173 + 174 + for maybe_hit in maybe_hits { 175 + let (timestamp, hit) = maybe_hit?; 176 + hits.push(Hit { 177 + timestamp, 178 + deleted: hit.deleted, 179 + }); 180 + } 181 + 182 + Ok(Json(hits)) 125 183 } 126 184 127 185 async fn stream_events(db: State<Arc<Db>>, ws: WebSocketUpgrade) -> Response {
+38 -6
server/src/db.rs
··· 1 - use std::{ops::Deref, path::Path, time::Duration}; 1 + use std::{ 2 + ops::{Bound, Deref, RangeBounds}, 3 + path::Path, 4 + time::Duration, 5 + }; 2 6 3 7 use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; 4 8 use pingora_limits::rate::Rate; ··· 180 184 pub fn get_hits( 181 185 &self, 182 186 nsid: &str, 183 - ) -> AppResult<impl Iterator<Item = AppResult<(u64, NsidHit)>>> { 184 - self.run_in_nsid_tree(nsid, |tree| { 185 - Ok(tree.iter().map(|res| { 187 + range: impl RangeBounds<u64>, 188 + ) -> AppResult<Box<dyn Iterator<Item = AppResult<(u64, NsidHit)>>>> { 189 + let start = range.start_bound().cloned().map(u64::to_be_bytes); 190 + let end = range.end_bound().cloned().map(u64::to_be_bytes); 191 + 192 + let _guard = self.hits.guard(); 193 + let Some(tree) = self.hits.get(nsid, &_guard) else { 194 + return Ok(Box::new(std::iter::empty())); 195 + }; 196 + 197 + Ok(Box::new(tree.range(TimestampRange { start, end }).map( 198 + |res| { 186 199 res.map_err(AppError::from).map(|(key, val)| { 187 200 ( 188 201 u64::from_be_bytes(key.as_ref().try_into().unwrap()), 189 202 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 190 203 ) 191 204 }) 192 - })) 193 - }) 205 + }, 206 + ))) 207 + } 208 + } 209 + 210 + type TimestampRepr = [u8; 8]; 211 + 212 + struct TimestampRange { 213 + start: Bound<TimestampRepr>, 214 + end: Bound<TimestampRepr>, 215 + } 216 + 217 + impl RangeBounds<TimestampRepr> for TimestampRange { 218 + #[inline(always)] 219 + fn start_bound(&self) -> Bound<&TimestampRepr> { 220 + self.start.as_ref() 221 + } 222 + 223 + #[inline(always)] 224 + fn end_bound(&self) -> Bound<&TimestampRepr> { 225 + self.end.as_ref() 194 226 } 195 227 }
+1 -1
server/src/main.rs
··· 126 126 let mut total_count = 0_u64; 127 127 for nsid in from.get_nsids() { 128 128 tracing::info!("migrating {} ...", nsid.deref()); 129 - for hit in from.get_hits(&nsid).expect("cant read hits") { 129 + for hit in from.get_hits(&nsid, ..).expect("cant read hits") { 130 130 let (timestamp, data) = hit.expect("cant read event"); 131 131 to.record_event(EventRecord { 132 132 nsid: nsid.to_smolstr(),