this repo has no description

feature: compression configuration option

Signed-off-by: Nick Gerakines <12125+ngerakines@users.noreply.github.com>

+106 -31
+2 -1
README.md
··· 12 12 * `EXTERNAL_BASE` - The hostname of the feed generator. 13 13 * `DATABASE_URL` - The URL of the database to use. 14 14 * `JETSTREAM_HOSTNAME` - The hostname of the JetStream server to consume events from. 15 - * `ZSTD_DICTIONARY` - The path to the ZSTD dictionary to use. 15 + * `COMPRESSION` - Use zstd compression. Default `false`. 16 + * `ZSTD_DICTIONARY` - The path to the ZSTD dictionary to use. Required when compression is enabled. 16 17 * `CONSUMER_TASK_ENABLE` - Whether or not to enable the consumer tasks. Default `true`. 17 18 * `VMC_TASK_ENABLE` - Whether or not to enable the VMC (verification method cache) tasks. Default `true`. 18 19 * `PLC_HOSTNAME` - The hostname of the PLC server to use for VMC tasks. Default `plc.directory`.
+2 -1
dev-server.sh
··· 4 4 export EXTERNAL_BASE=feeds.smokesignal.events 5 5 export DATABASE_URL=sqlite://development.db 6 6 export JETSTREAM_HOSTNAME=jetstream1.us-east.bsky.network 7 - export ZSTD_DICTIONARY=$(pwd)/jetstream_zstd_dictionary 8 7 export CONSUMER_TASK_ENABLE=true 9 8 export FEEDS=$(pwd)/config.yml 9 + # export COMPRESSION=true 10 + # export ZSTD_DICTIONARY=$(pwd)/jetstream_zstd_dictionary 10 11 11 12 touch development.db 12 13 sqlx migrate run --database-url sqlite://development.db
+34
docs/playbook-enable-compression.md
··· 1 + # Playbook: Enable compression 2 + 3 + Jetstream supports optional zstd compression for streamed events. This feature can reduce bandwidth usage by up to 50% with minimal performance impact, but at the cost of a more complicated deployment and additional maintenance steps. 4 + 5 + [zstd](https://github.com/facebook/zstd) is a dictionary-based compression algorithm that is optimized for real-time compression and decompression. 6 + 7 + ## Configuration 8 + 9 + Compression is enabled by setting the `COMPRESSION` environment variable to `true`. When enabled, the `ZSTD_DICTIONARY` environment variable must be set to the path of the ZSTD dictionary to use. 10 + 11 + The dictionary file can be downloaded from [github.com/bluesky-social/jetstream/blob/main/pkg/models/zstd_dictionary](https://github.com/bluesky-social/jetstream/blob/main/pkg/models/zstd_dictionary). 12 + 13 + ## FAQ 14 + 15 + ### Why is compression disabled by default? 16 + 17 + The benefits of compression are not guaranteed and depend on the data being compressed. For most supercell deployments, the impact on CPU and memory is minimal, and the benefits of reduced bandwidth is significant. However, this feature is maturing and may not be suitable for all deployments. 18 + 19 + ### Why is a custom dictionary required? 20 + 21 + Zstd uses a dictionary to improve compression performance. There is no default dictionary, so a custom dictionary must be provided. 22 + 23 + The dictionary is occasionally rebuilt to improve compression performance. The dictionary is built from a sample of the data that is being compressed, so the dictionary is specific to the data that is being compressed. 24 + 25 + ### Dictionary mismatch 26 + 27 + This error occurs when compression is enabled and the dictionary is invalid or does not match the dictionary that was used to compress the data. 28 + 29 + To resolve, ensure the `ZSTD_DICTIONARY` environment variable is set to the correct path and that the file at that path is the same as the one from the jetstream repository. 30 + 31 + ### Destination buffer is too small 32 + 33 + This error occurs when the buffer used to decompress the data is too small to contain the decompressed event. 34 +
+1
src/bin/supercell.rs
··· 102 102 if task_enable { 103 103 let consumer_task_config = ConsumerTaskConfig { 104 104 user_agent: inner_config.user_agent.clone(), 105 + compression: *inner_config.compression.as_ref(), 105 106 zstd_dictionary_location: inner_config.zstd_dictionary.clone(), 106 107 jetstream_hostname: inner_config.jetstream_hostname.clone(), 107 108 feeds: inner_config.feeds.clone(),
+29 -1
src/config.rs
··· 46 46 pub struct TaskEnable(bool); 47 47 48 48 #[derive(Clone)] 49 + pub struct Compression(bool); 50 + 51 + #[derive(Clone)] 49 52 pub struct Config { 50 53 pub version: String, 51 54 pub http_port: HttpPort, ··· 59 62 pub zstd_dictionary: String, 60 63 pub jetstream_hostname: String, 61 64 pub feeds: Feeds, 65 + pub compression: Compression, 62 66 } 63 67 64 68 impl Config { ··· 72 76 optional_env("CERTIFICATE_BUNDLES").try_into()?; 73 77 74 78 let jetstream_hostname = require_env("JETSTREAM_HOSTNAME")?; 75 - let zstd_dictionary = require_env("ZSTD_DICTIONARY")?; 79 + 80 + let compression: Compression = default_env("COMPRESSION", "false").try_into()?; 81 + 82 + let zstd_dictionary = if compression.0 { 83 + require_env("ZSTD_DICTIONARY")? 84 + } else { 85 + "".to_string() 86 + }; 76 87 77 88 let consumer_task_enable: TaskEnable = 78 89 default_env("CONSUMER_TASK_ENABLE", "true").try_into()?; ··· 103 114 jetstream_hostname, 104 115 zstd_dictionary, 105 116 feeds, 117 + compression, 106 118 }) 107 119 } 108 120 } ··· 181 193 fn try_from(value: String) -> Result<Self, Self::Error> { 182 194 let value = value.parse::<bool>().map_err(|err| { 183 195 anyhow::Error::new(err).context(anyhow!("parsing task enable into bool failed")) 196 + })?; 197 + Ok(Self(value)) 198 + } 199 + } 200 + 201 + impl AsRef<bool> for Compression { 202 + fn as_ref(&self) -> &bool { 203 + &self.0 204 + } 205 + } 206 + 207 + impl TryFrom<String> for Compression { 208 + type Error = anyhow::Error; 209 + fn try_from(value: String) -> Result<Self, Self::Error> { 210 + let value = value.parse::<bool>().map_err(|err| { 211 + anyhow::Error::new(err).context(anyhow!("parsing compression into bool failed")) 184 212 })?; 185 213 Ok(Self(value)) 186 214 }
+38 -28
src/consumer.rs
··· 1 1 use std::str::FromStr; 2 2 3 - use anyhow::{Context, Result}; 3 + use anyhow::{anyhow, Context, Result}; 4 4 use futures_util::SinkExt; 5 5 use futures_util::StreamExt; 6 6 use http::HeaderValue; ··· 22 22 #[derive(Clone)] 23 23 pub struct ConsumerTaskConfig { 24 24 pub user_agent: String, 25 + pub compression: bool, 25 26 pub zstd_dictionary_location: String, 26 27 pub jetstream_hostname: String, 27 28 pub feeds: config::Feeds, ··· 56 57 let last_time_us = 57 58 consumer_control_get(&self.pool, &self.config.jetstream_hostname).await?; 58 59 59 - // mkdir -p data/ && curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary 60 - let data: Vec<u8> = std::fs::read(self.config.zstd_dictionary_location.clone()) 61 - .context("unable to load zstd dictionary")?; 62 - 63 60 let uri = Uri::from_str(&format!( 64 - "wss://{}/subscribe?compress=true&requireHello=true", 65 - self.config.jetstream_hostname 61 + "wss://{}/subscribe?compress={}&requireHello=true", 62 + self.config.jetstream_hostname, self.config.compression 66 63 )) 67 64 .context("invalid jetstream URL")?; 65 + 66 + tracing::debug!(uri = ?uri, "connecting to jetstream"); 68 67 69 68 let (mut client, _) = ClientBuilder::from_uri(uri) 70 69 .add_header( ··· 89 88 .await 90 89 .map_err(|err| anyhow::Error::msg(err).context("cannot send update"))?; 91 90 92 - let mut decompressor = zstd::bulk::Decompressor::with_dictionary(&data) 93 - .map_err(|err| anyhow::Error::msg(err).context("cannot create decompressor"))?; 91 + let mut decompressor = if self.config.compression { 92 + // mkdir -p data/ && curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary 93 + let data: Vec<u8> = std::fs::read(self.config.zstd_dictionary_location.clone()) 94 + .context("unable to load zstd dictionary")?; 95 + zstd::bulk::Decompressor::with_dictionary(&data) 96 + .map_err(|err| anyhow::Error::msg(err).context("cannot create decompressor"))? 97 + } else { 98 + zstd::bulk::Decompressor::new() 99 + .map_err(|err| anyhow::Error::msg(err).context("cannot create decompressor"))? 100 + }; 94 101 95 102 let interval = std::time::Duration::from_secs(120); 96 103 let sleeper = sleep(interval); ··· 120 127 } 121 128 let item = item.unwrap(); 122 129 123 - if !item.is_binary() { 124 - tracing::warn!("message from jetstream is not binary"); 125 - continue; 126 - } 127 - let payload = item.into_payload(); 128 - 129 - let decoded = decompressor.decompress(&payload, MAX_MESSAGE_SIZE * 3); 130 - if let Err(err) = decoded { 131 - let length = payload.len(); 132 - tracing::error!(error = ?err, length = ?length, "error processing jetstream message"); 133 - continue; 134 - } 135 - let decoded = decoded.unwrap(); 130 + let event = if self.config.compression { 131 + if !item.is_binary() { 132 + tracing::debug!("compression enabled but message from jetstream is not binary"); 133 + continue; 134 + } 135 + let payload = item.into_payload(); 136 136 137 - let event = serde_json::from_slice::<model::Event>(&decoded); 137 + let decoded = decompressor.decompress(&payload, MAX_MESSAGE_SIZE * 3); 138 + if let Err(err) = decoded { 139 + tracing::debug!(err = ?err, "cannot decompress message"); 140 + continue; 141 + } 142 + let decoded = decoded.unwrap(); 143 + serde_json::from_slice::<model::Event>(&decoded) 144 + } else { 145 + if !item.is_text() { 146 + tracing::debug!("compression enabled but message from jetstream is not binary"); 147 + continue; 148 + } 149 + serde_json::from_str(item.as_text().ok_or(anyhow!("cannot convert message to text"))?) 150 + }; 138 151 if let Err(err) = event { 139 152 tracing::error!(error = ?err, "error processing jetstream message"); 140 - 141 - #[cfg(debug_assertions)] 142 - { 143 - println!("{:?}", std::str::from_utf8(&decoded)); 144 - } 145 153 146 154 continue; 147 155 } ··· 159 167 continue; 160 168 } 161 169 let event_value = event_value.unwrap(); 170 + 171 + tracing::trace!(event = ?event, "received event"); 162 172 163 173 for feed_matcher in self.feed_matchers.0.iter() { 164 174 if feed_matcher.matches(&event_value) {