this repo has no description

feature: post at-uri matching

Signed-off-by: Nick Gerakines <12125+ngerakines@users.noreply.github.com>

+99 -40
+1
dev-server.sh
··· 6 6 export JETSTREAM_HOSTNAME=jetstream1.us-east.bsky.network 7 7 export CONSUMER_TASK_ENABLE=true 8 8 export FEEDS=$(pwd)/config.yml 9 + export COLLECTIONS=app.bsky.feed.like,app.bsky.feed.post 9 10 # export COMPRESSION=true 10 11 # export ZSTD_DICTIONARY=$(pwd)/jetstream_zstd_dictionary 11 12
+1
migrations/20241107144649_remove_cid.down.sql
··· 1 + -- Add down migration script here
+5
migrations/20241107144649_remove_cid.up.sql
··· 1 + -- Add up migration script here 2 + 3 + DROP INDEX feed_content_idx_feed; 4 + ALTER TABLE feed_content DROP COLUMN cid; 5 + CREATE INDEX feed_content_idx_feed ON feed_content(feed_id, indexed_at DESC);
+1
src/bin/supercell.rs
··· 106 106 zstd_dictionary_location: inner_config.zstd_dictionary.clone(), 107 107 jetstream_hostname: inner_config.jetstream_hostname.clone(), 108 108 feeds: inner_config.feeds.clone(), 109 + collections: inner_config.collections.as_ref().clone(), 109 110 }; 110 111 let task = ConsumerTask::new(pool.clone(), consumer_task_config, token.clone())?; 111 112 let inner_token = token.clone();
+35
src/config.rs
··· 15 15 pub description: String, 16 16 17 17 #[serde(default)] 18 + pub aturi: Option<String>, 19 + 20 + #[serde(default)] 18 21 pub allow: HashSet<String>, 19 22 20 23 #[serde(default)] ··· 49 52 pub struct Compression(bool); 50 53 51 54 #[derive(Clone)] 55 + pub struct Collections(Vec<String>); 56 + 57 + #[derive(Clone)] 52 58 pub struct Config { 53 59 pub version: String, 54 60 pub http_port: HttpPort, ··· 63 69 pub jetstream_hostname: String, 64 70 pub feeds: Feeds, 65 71 pub compression: Compression, 72 + pub collections: Collections, 66 73 } 67 74 68 75 impl Config { ··· 101 108 102 109 let feeds: Feeds = require_env("FEEDS")?.try_into()?; 103 110 111 + let collections: Collections = 112 + default_env("COLLECTIONS", "app.bsky.feed.post").try_into()?; 113 + 104 114 Ok(Self { 105 115 version: version()?, 106 116 http_port, ··· 115 125 zstd_dictionary, 116 126 feeds, 117 127 compression, 128 + collections, 118 129 }) 119 130 } 120 131 } ··· 226 237 }) 227 238 } 228 239 } 240 + 241 + impl TryFrom<String> for Collections { 242 + type Error = anyhow::Error; 243 + fn try_from(value: String) -> Result<Self, Self::Error> { 244 + Ok(Self( 245 + value 246 + .split(',') 247 + .filter_map(|s| { 248 + if s.is_empty() { 249 + None 250 + } else { 251 + Some(s.to_string()) 252 + } 253 + }) 254 + .collect::<Vec<String>>(), 255 + )) 256 + } 257 + } 258 + 259 + impl AsRef<Vec<String>> for Collections { 260 + fn as_ref(&self) -> &Vec<String> { 261 + &self.0 262 + } 263 + }
+36 -17
src/consumer.rs
··· 26 26 pub zstd_dictionary_location: String, 27 27 pub jetstream_hostname: String, 28 28 pub feeds: config::Feeds, 29 + pub collections: Vec<String>, 29 30 } 30 31 31 32 pub struct ConsumerTask { ··· 75 76 .map_err(|err| anyhow::Error::new(err).context("cannot connect to jetstream"))?; 76 77 77 78 let update = model::SubscriberSourcedMessage::Update { 78 - wanted_collections: vec!["app.bsky.feed.post".to_string()], 79 + wanted_collections: self.config.collections.clone(), 79 80 wanted_dids: vec![], 80 81 max_message_size_bytes: MAX_MESSAGE_SIZE as u64, 81 82 cursor: last_time_us, ··· 168 169 } 169 170 let event_value = event_value.unwrap(); 170 171 171 - tracing::trace!(event = ?event, "received event"); 172 - 173 172 for feed_matcher in self.feed_matchers.0.iter() { 174 173 if feed_matcher.matches(&event_value) { 175 174 tracing::debug!(feed_id = ?feed_matcher.feed, "matched event"); 176 - if let Some((uri, cid)) = model::to_post_strong_ref(&event) { 175 + if let Some(uri) = model::to_post_strong_ref(feed_matcher.aturi.as_ref(), &event, &event_value) { 177 176 let feed_content = storage::model::FeedContent{ 178 177 feed_id: feed_matcher.feed.clone(), 179 178 uri, 180 179 indexed_at: event.clone().time_us, 181 - cid, 182 180 }; 183 181 feed_content_insert(&self.pool, &feed_content).await?; 184 182 } ··· 199 197 use std::collections::HashMap; 200 198 201 199 use serde::{Deserialize, Serialize}; 200 + use serde_json_path::JsonPath; 202 201 203 202 #[derive(Debug, Clone, Serialize, Deserialize)] 204 203 #[serde(tag = "type", content = "payload")] ··· 240 239 pub(crate) enum Record { 241 240 #[serde(rename = "app.bsky.feed.post")] 242 241 Post { 243 - text: String, 244 - 245 - facets: Option<Vec<Facet>>, 246 - 247 - reply: Option<Reply>, 248 - 242 + #[serde(flatten)] 243 + extra: HashMap<String, serde_json::Value>, 244 + }, 245 + #[serde(rename = "app.bsky.feed.like")] 246 + Like { 249 247 #[serde(flatten)] 250 248 extra: HashMap<String, serde_json::Value>, 251 249 }, ··· 292 290 pub(crate) commit: Option<CommitOp>, 293 291 } 294 292 295 - pub(crate) fn to_post_strong_ref(event: &Event) -> Option<(String, String)> { 293 + pub(crate) fn to_post_strong_ref( 294 + aturi: Option<&JsonPath>, 295 + event: &Event, 296 + event_value: &serde_json::Value, 297 + ) -> Option<String> { 296 298 if let Some(CommitOp::Create { 297 - collection, 298 - rkey, 299 - cid, 300 - .. 299 + collection, rkey, .. 301 300 }) = &event.commit 302 301 { 302 + if let Some(aturi_path) = aturi { 303 + let nodes = aturi_path.query(event_value).all(); 304 + let string_nodes = nodes 305 + .iter() 306 + .filter_map(|value| { 307 + if let serde_json::Value::String(actual) = value { 308 + Some(actual.to_lowercase().clone()) 309 + } else { 310 + None 311 + } 312 + }) 313 + .collect::<Vec<String>>(); 314 + 315 + for value in string_nodes { 316 + if value.starts_with("at://") { 317 + return Some(value); 318 + } 319 + } 320 + } 321 + 303 322 let uri = format!("at://{}/{}/{}", event.did, collection, rkey); 304 - return Some((uri, cid.clone())); 323 + return Some(uri); 305 324 } 306 325 None 307 326 }
+3 -12
src/http/handle_get_feed_skeleton.rs
··· 116 116 let cursor = feed_items 117 117 .iter() 118 118 .last() 119 - .map(|last_feed_item| format!("{},{}", last_feed_item.indexed_at, last_feed_item.cid)); 119 + .map(|last_feed_item| last_feed_item.indexed_at.to_string()); 120 120 121 121 let feed_item_views = feed_items 122 122 .iter() ··· 209 209 Ok(claims.iss) 210 210 } 211 211 212 - fn parse_cursor(value: Option<String>) -> Option<(i64, String)> { 212 + fn parse_cursor(value: Option<String>) -> Option<i64> { 213 213 let value = value.as_ref()?; 214 214 215 215 let parts = value.split(",").collect::<Vec<&str>>(); 216 - if parts.len() != 2 { 217 - return None; 218 - } 219 216 220 - let time_us = parts[0].parse::<i64>(); 221 - if time_us.is_err() { 222 - return None; 223 - } 224 - let time_us = time_us.unwrap(); 225 - 226 - Some((time_us, parts[1].to_string())) 217 + parts.first().and_then(|value| value.parse::<i64>().ok()) 227 218 }
+11 -1
src/matcher.rs
··· 9 9 10 10 pub struct FeedMatcher { 11 11 pub(crate) feed: String, 12 + pub(crate) aturi: Option<serde_json_path::JsonPath>, 12 13 matchers: Vec<Box<dyn Matcher>>, 13 14 } 14 15 ··· 20 21 21 22 for config_feed in config_feeds.feeds.iter() { 22 23 let feed = config_feed.uri.clone(); 24 + 25 + let aturi = config_feed 26 + .aturi 27 + .as_ref() 28 + .and_then(|value| JsonPath::parse(value).ok()); 23 29 24 30 let mut matchers = vec![]; 25 31 ··· 39 45 } 40 46 } 41 47 42 - feed_matchers.push(FeedMatcher { feed, matchers }); 48 + feed_matchers.push(FeedMatcher { 49 + feed, 50 + aturi, 51 + matchers, 52 + }); 43 53 } 44 54 45 55 Ok(Self(feed_matchers))
+6 -10
src/storage.rs
··· 15 15 pub feed_id: String, 16 16 pub uri: String, 17 17 pub indexed_at: i64, 18 - pub cid: String, 19 18 } 20 19 } 21 20 ··· 26 25 let mut tx = pool.begin().await.context("failed to begin transaction")?; 27 26 28 27 let now = Utc::now(); 29 - sqlx::query("INSERT OR REPLACE INTO feed_content (feed_id, uri, indexed_at, cid, updated_at) VALUES (?, ?, ?, ?, ?)") 28 + sqlx::query("INSERT OR REPLACE INTO feed_content (feed_id, uri, indexed_at, updated_at) VALUES (?, ?, ?, ?)") 30 29 .bind(&feed_content.feed_id) 31 30 .bind(&feed_content.uri) 32 31 .bind(feed_content.indexed_at) 33 - .bind(&feed_content.cid) 34 32 .bind(now) 35 33 .execute(tx.as_mut()) 36 34 .await.context("failed to insert feed content record")?; ··· 42 40 pool: &StoragePool, 43 41 feed_uri: &str, 44 42 limit: Option<u16>, 45 - cursor: Option<(i64, String)>, 43 + cursor: Option<i64>, 46 44 ) -> Result<Vec<FeedContent>> { 47 45 let mut tx = pool.begin().await.context("failed to begin transaction")?; 48 46 49 47 let limit = limit.unwrap_or(20).clamp(1, 100); 50 48 51 - let results = if let Some((indexed_at, cid)) = cursor { 52 - let query = "SELECT * FROM feed_content WHERE feed_id = ? AND (indexed_at, cid) < (?, ?) ORDER BY indexed_at DESC, cid DESC LIMIT ?"; 49 + let results = if let Some(indexed_at) = cursor { 50 + let query = "SELECT * FROM feed_content WHERE feed_id = ? AND indexed_at < ? ORDER BY indexed_at DESC LIMIT ?"; 53 51 54 52 sqlx::query_as::<_, FeedContent>(query) 55 53 .bind(feed_uri) 56 54 .bind(indexed_at) 57 - .bind(cid) 58 55 .bind(limit) 59 56 .fetch_all(tx.as_mut()) 60 57 .await? 61 58 } else { 62 - let query = "SELECT * FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC, cid DESC LIMIT ?"; 59 + let query = "SELECT * FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC LIMIT ?"; 63 60 64 61 sqlx::query_as::<_, FeedContent>(query) 65 62 .bind(feed_uri) ··· 155 152 pub async fn feed_content_truncate(pool: &StoragePool, feed_id: &str) -> Result<()> { 156 153 let mut tx = pool.begin().await.context("failed to begin transaction")?; 157 154 158 - let result = sqlx::query_scalar::<_, DateTime<Utc>>("SELECT updated_at FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC, indexed_at_more DESC LIMIT 1 OFFSET 501") 155 + let result = sqlx::query_scalar::<_, DateTime<Utc>>("SELECT updated_at FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC LIMIT 1 OFFSET 501") 159 156 .bind(feed_id) 160 157 .fetch_optional(tx.as_mut()) 161 158 .await.context("failed select feed content mark record")?; ··· 183 180 uri: "at://did:plc:qadlgs4xioohnhi2jg54mqds/app.bsky.feed.post/3la3bqjg4hx2n" 184 181 .to_string(), 185 182 indexed_at: 1730673934229172_i64, 186 - cid: "bafyreih74qdc6zskq7yarqi3xm634vnubf4g3ac5ieegbvakprxpjnsj74".to_string(), 187 183 }; 188 184 super::feed_content_insert(&pool, &record) 189 185 .await