this repo has no description

feature: cleanup task as part of data retention policy

Signed-off-by: Nick Gerakines <12125+ngerakines@users.noreply.github.com>

+115 -96
+5
README.md
··· 16 16 * `ZSTD_DICTIONARY` - The path to the ZSTD dictionary to use. Required when compression is enabled. 17 17 * `CONSUMER_TASK_ENABLE` - Whether or not to enable the consumer tasks. Default `true`. 18 18 * `VMC_TASK_ENABLE` - Whether or not to enable the VMC (verification method cache) tasks. Default `true`. 19 + * `CACHE_TASK_ENABLE` - Whether or not to enable the cache tasks. Default `true`. 20 + * `CACHE_TASK_INTERVAL` - The interval to run the cache tasks. Default `3m`. 21 + * `CLEANUP_TASK_ENABLE` - Whether or not to enable the cleanup tasks. Default `true`. 22 + * `CLEANUP_TASK_INTERVAL` - The interval to run the cleanup tasks. Default `1h`. 23 + * `CLEANUP_TASK_MAX_AGE` - The maximum age of a post before it is considered stale and deleted from storage. Default `48h`. 19 24 * `PLC_HOSTNAME` - The hostname of the PLC server to use for VMC tasks. Default `plc.directory`. 20 25 * `FEEDS` - The path to the feeds configuration file. 21 26 * `COLLECTIONS` - The collections to consume. Default `app.bsky.feed.post`.
+2 -3
src/bin/dropsonde.rs
··· 1 1 use anyhow::{anyhow, Context, Result}; 2 - use supercell::matcher::RhaiMatcher; 3 2 use supercell::matcher::Matcher; 3 + use supercell::matcher::RhaiMatcher; 4 4 5 5 fn main() -> Result<()> { 6 6 let mut rhai_input_path: Option<String> = None; ··· 29 29 let value: serde_json::Value = 30 30 serde_json::from_slice(&json_content).context("parsing input_json failed")?; 31 31 32 - let matcher = RhaiMatcher::new(&rhai_input_path) 33 - .context("could not construct matcher")?; 32 + let matcher = RhaiMatcher::new(&rhai_input_path).context("could not construct matcher")?; 34 33 let result = matcher.matches(&value)?; 35 34 36 35 let result = result.ok_or(anyhow!("no matches found"))?;
+21 -1
src/bin/supercell.rs
··· 5 5 use std::env; 6 6 use supercell::cache::Cache; 7 7 use supercell::cache::CacheTask; 8 + use supercell::cleanup::CleanTask; 8 9 use supercell::vmc::VerificationMethodCacheTask; 9 10 use tokio::net::TcpListener; 10 11 use tokio::signal; ··· 149 150 }); 150 151 } 151 152 } 153 + 152 154 { 153 155 let inner_config = config.clone(); 154 156 let task_enable = *inner_config.cache_task_enable.as_ref(); ··· 164 166 let interval = *inner_config.cache_task_interval.as_ref(); 165 167 tracker.spawn(async move { 166 168 if let Err(err) = task.run_background(interval).await { 167 - tracing::warn!(error = ?err, "consumer task error"); 169 + tracing::warn!(error = ?err, "cache task error"); 170 + } 171 + inner_token.cancel(); 172 + }); 173 + } 174 + } 175 + 176 + { 177 + let inner_config = config.clone(); 178 + let task_enable = *inner_config.cleanup_task_enable.as_ref(); 179 + let max_age = *inner_config.cleanup_task_max_age.as_ref(); 180 + if task_enable { 181 + let task = CleanTask::new(pool.clone(), max_age, token.clone()); 182 + task.main().await?; 183 + let inner_token = token.clone(); 184 + let interval = *inner_config.cleanup_task_interval.as_ref(); 185 + tracker.spawn(async move { 186 + if let Err(err) = task.run_background(interval).await { 187 + tracing::warn!(error = ?err, "cleanup task error"); 168 188 } 169 189 inner_token.cancel(); 170 190 });
+8 -10
src/cache.rs
··· 6 6 7 7 use crate::storage::{feed_content_cached, StoragePool}; 8 8 9 - pub(crate) struct InnerCache { 10 - pub(crate) page_size: u8, 11 - pub(crate) cached_feeds: HashMap<String, Vec<Vec<String>>>, 9 + pub struct InnerCache { 10 + page_size: u8, 11 + cached_feeds: HashMap<String, Vec<Vec<String>>>, 12 12 } 13 13 14 14 #[derive(Clone)] 15 15 pub struct Cache { 16 - pub(crate) inner_cache: Arc<RwLock<InnerCache>>, 16 + pub inner_cache: Arc<RwLock<InnerCache>>, 17 17 } 18 18 19 19 impl Default for InnerCache { ··· 75 75 } 76 76 77 77 pub struct CacheTask { 78 - pub(crate) pool: StoragePool, 79 - pub(crate) cache: Cache, 80 - pub(crate) config: crate::config::Config, 78 + pub pool: StoragePool, 79 + pub cache: Cache, 80 + pub config: crate::config::Config, 81 81 82 - pub(crate) cancellation_token: CancellationToken, 82 + pub cancellation_token: CancellationToken, 83 83 } 84 84 85 85 impl CacheTask { ··· 169 169 .collect::<Vec<(f64, String, i64)>>(); 170 170 171 171 scored_posts.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap()); 172 - 173 - println!("{:?}", scored_posts); 174 172 175 173 let sorted_posts = scored_posts.iter().map(|post| post.1.clone()).collect(); 176 174
+56
src/cleanup.rs
··· 1 + use anyhow::Result; 2 + use chrono::Utc; 3 + use tokio_util::sync::CancellationToken; 4 + 5 + use crate::storage::{feed_content_truncate_oldest, StoragePool}; 6 + 7 + pub struct CleanTask { 8 + pool: StoragePool, 9 + max_age: chrono::Duration, 10 + cancellation_token: CancellationToken, 11 + } 12 + 13 + impl CleanTask { 14 + pub fn new( 15 + pool: StoragePool, 16 + max_age: chrono::Duration, 17 + cancellation_token: CancellationToken, 18 + ) -> Self { 19 + Self { 20 + pool, 21 + max_age, 22 + cancellation_token, 23 + } 24 + } 25 + 26 + pub async fn run_background(&self, interval: chrono::Duration) -> Result<()> { 27 + let interval = interval.to_std()?; 28 + 29 + let sleeper = tokio::time::sleep(interval); 30 + tokio::pin!(sleeper); 31 + 32 + loop { 33 + tokio::select! { 34 + () = self.cancellation_token.cancelled() => { 35 + break; 36 + }, 37 + () = &mut sleeper => { 38 + 39 + if let Err(err) = self.main().await { 40 + tracing::error!("CleanTask task failed: {}", err); 41 + } 42 + 43 + 44 + sleeper.as_mut().reset(tokio::time::Instant::now() + interval); 45 + } 46 + } 47 + } 48 + Ok(()) 49 + } 50 + 51 + pub async fn main(&self) -> Result<()> { 52 + let now = Utc::now(); 53 + let max_age = now - self.max_age; 54 + feed_content_truncate_oldest(&self.pool, max_age).await 55 + } 56 + }
+15
src/config.rs
··· 118 118 pub consumer_task_enable: TaskEnable, 119 119 pub cache_task_enable: TaskEnable, 120 120 pub cache_task_interval: TaskInterval, 121 + pub cleanup_task_enable: TaskEnable, 122 + pub cleanup_task_interval: TaskInterval, 123 + pub cleanup_task_max_age: TaskInterval, 121 124 pub vmc_task_enable: TaskEnable, 122 125 pub plc_hostname: String, 123 126 pub user_agent: String, ··· 156 159 let cache_task_interval: TaskInterval = 157 160 default_env("CACHE_TASK_INTERVAL", "3m").try_into()?; 158 161 162 + let cleanup_task_enable: TaskEnable = 163 + default_env("CLEANUP_TASK_ENABLE", "true").try_into()?; 164 + 165 + let cleanup_task_interval: TaskInterval = 166 + default_env("CLEANUP_TASK_INTERVAL", "1h").try_into()?; 167 + 168 + let cleanup_task_max_age: TaskInterval = 169 + default_env("CLEANUP_TASK_MAX_AGE", "48h").try_into()?; 170 + 159 171 let vmc_task_enable: TaskEnable = default_env("VMC_TASK_ENABLE", "true").try_into()?; 160 172 161 173 let plc_hostname = default_env("PLC_HOSTNAME", "plc.directory"); ··· 181 193 consumer_task_enable, 182 194 cache_task_enable, 183 195 cache_task_interval, 196 + cleanup_task_enable, 197 + cleanup_task_interval, 198 + cleanup_task_max_age, 184 199 vmc_task_enable, 185 200 plc_hostname, 186 201 user_agent,
+1
src/lib.rs
··· 1 1 pub mod cache; 2 + pub mod cleanup; 2 3 pub mod config; 3 4 pub mod consumer; 4 5 pub mod crypto;
+7 -82
src/storage.rs
··· 78 78 tx.commit().await.context("failed to commit transaction") 79 79 } 80 80 81 - pub async fn feed_content_paginate( 82 - pool: &StoragePool, 83 - feed_uri: &str, 84 - limit: Option<u16>, 85 - cursor: Option<i64>, 86 - ) -> Result<Vec<FeedContent>> { 87 - let mut tx = pool.begin().await.context("failed to begin transaction")?; 88 - 89 - let limit = limit.unwrap_or(20).clamp(1, 100); 90 - 91 - let results = if let Some(indexed_at) = cursor { 92 - let query = "SELECT * FROM feed_content WHERE feed_id = ? AND indexed_at < ? ORDER BY indexed_at DESC LIMIT ?"; 93 - 94 - sqlx::query_as::<_, FeedContent>(query) 95 - .bind(feed_uri) 96 - .bind(indexed_at) 97 - .bind(limit) 98 - .fetch_all(tx.as_mut()) 99 - .await? 100 - } else { 101 - let query = "SELECT * FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC LIMIT ?"; 102 - 103 - sqlx::query_as::<_, FeedContent>(query) 104 - .bind(feed_uri) 105 - .bind(limit) 106 - .fetch_all(tx.as_mut()) 107 - .await? 108 - }; 109 - 110 - tx.commit().await.context("failed to commit transaction")?; 111 - 112 - Ok(results) 113 - } 114 - 115 - pub async fn feed_content_paginate_popular( 116 - pool: &StoragePool, 117 - feed_uri: &str, 118 - limit: Option<u16>, 119 - cursor: Option<i64>, 120 - ) -> Result<Vec<FeedContent>> { 121 - let mut tx = pool.begin().await.context("failed to begin transaction")?; 122 - 123 - let limit = limit.unwrap_or(20).clamp(1, 100); 124 - 125 - let results = if let Some(indexed_at) = cursor { 126 - let query = "SELECT * FROM feed_content WHERE feed_id = ? AND indexed_at < ? ORDER BY indexed_at DESC LIMIT ?"; 127 - 128 - sqlx::query_as::<_, FeedContent>(query) 129 - .bind(feed_uri) 130 - .bind(indexed_at) 131 - .bind(limit) 132 - .fetch_all(tx.as_mut()) 133 - .await? 134 - } else { 135 - let query = "SELECT * FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC LIMIT ?"; 136 - 137 - sqlx::query_as::<_, FeedContent>(query) 138 - .bind(feed_uri) 139 - .bind(limit) 140 - .fetch_all(tx.as_mut()) 141 - .await? 142 - }; 143 - 144 - tx.commit().await.context("failed to commit transaction")?; 145 - 146 - Ok(results) 147 - } 148 - 149 81 pub async fn feed_content_cached( 150 82 pool: &StoragePool, 151 83 feed_uri: &str, ··· 245 177 Ok(result) 246 178 } 247 179 248 - pub async fn feed_content_truncate(pool: &StoragePool, feed_id: &str) -> Result<()> { 180 + pub async fn feed_content_truncate_oldest(pool: &StoragePool, age: DateTime<Utc>) -> Result<()> { 249 181 let mut tx = pool.begin().await.context("failed to begin transaction")?; 250 182 251 - let result = sqlx::query_scalar::<_, DateTime<Utc>>("SELECT updated_at FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC LIMIT 1 OFFSET 501") 252 - .bind(feed_id) 253 - .fetch_optional(tx.as_mut()) 254 - .await.context("failed select feed content mark record")?; 255 - 256 - if let Some(updated_at) = result { 257 - sqlx::query("DELETE FROM feed_content WHERE feed_id = ? AND updated_at < ?") 258 - .bind(feed_id) 259 - .bind(updated_at) 260 - .execute(tx.as_mut()) 261 - .await 262 - .context("failed to delete feed content beyond mark")?; 263 - } 183 + // TODO: This might need an index. 184 + sqlx::query("DELETE FROM feed_content WHERE updated_at < ?") 185 + .bind(age) 186 + .execute(tx.as_mut()) 187 + .await 188 + .context("failed to delete feed content beyond mark")?; 264 189 265 190 tx.commit().await.context("failed to commit transaction") 266 191 }