this repo has no description

refactor: Cleaned up and refactored matchers

Signed-off-by: Nick Gerakines <12125+ngerakines@users.noreply.github.com>

+389 -243
+1 -2
Cargo.toml
··· 9 10 [features] 11 default = [] 12 - rhai = ["dep:rhai"] 13 14 [dependencies] 15 anyhow = "1.0.88" ··· 42 tracing = { version = "0.1.40", features = ["async-await", "log", "valuable"] } 43 zstd = "0.13.2" 44 reqwest = { version = "0.12.9", features = ["json", "zstd", "rustls-tls"] } 45 - rhai = { version = "1.20.0", features = ["serde", "std", "sync"], optional = true}
··· 9 10 [features] 11 default = [] 12 13 [dependencies] 14 anyhow = "1.0.88" ··· 41 tracing = { version = "0.1.40", features = ["async-await", "log", "valuable"] } 42 zstd = "0.13.2" 43 reqwest = { version = "0.12.9", features = ["json", "zstd", "rustls-tls"] } 44 + rhai = { version = "1.20.0", features = ["serde", "std", "sync"]}
+9
docs/playbook-dry-run.md
···
··· 1 + # Playbook: Dry Run 2 + 3 + To test matchers, you can run in a dry-run mode using an in-memory database. 4 + 5 + ```shell 6 + export DATABASE_URL=sqlite://file::memory:?cache=shared 7 + export RUST_LOG=supercell=debug,warning 8 + ``` 9 +
+16
etc/rhai_ngerakines_activity.rhai
···
··· 1 + if event.did != "did:plc:cbkjy5n7bk3ax2wplmtjofq2" { 2 + return false; 3 + } 4 + 5 + let rtype = event?.commit?.record["$type"]; 6 + switch rtype { 7 + "app.bsky.feed.post" => { 8 + return build_aturi(event); 9 + } 10 + "app.bsky.feed.like" => { 11 + return event?.commit?.record?.subject?.uri ?? false; 12 + } 13 + _ => { } 14 + } 15 + 16 + false
+3
migrations/20241111011116_feed_content_score.down.sql
···
··· 1 + -- Add down migration script here 2 + 3 + ALTER TABLE feed_content DROP COLUMN score;
+3
migrations/20241111011116_feed_content_score.up.sql
···
··· 1 + -- Add up migration script here 2 + 3 + ALTER TABLE feed_content ADD COLUMN score INT NOT NULL DEFAULT 0;
+80 -1
src/config.rs
··· 1 use std::collections::HashSet; 2 3 use anyhow::{anyhow, Result}; 4 - use serde::Deserialize; 5 6 #[derive(Clone, Deserialize)] 7 pub struct Feeds { 8 pub feeds: Vec<Feed>, 9 } 10 11 #[derive(Clone, Deserialize)] 12 pub struct Feed { 13 pub uri: String, ··· 22 23 #[serde(default)] 24 pub deny: Option<String>, 25 26 pub matchers: Vec<Matcher>, 27 } ··· 276 &self.0 277 } 278 }
··· 1 use std::collections::HashSet; 2 + use std::fmt; 3 + use std::marker::PhantomData; 4 + use std::str::FromStr; 5 6 use anyhow::{anyhow, Result}; 7 + use serde::de::{self, MapAccess, Visitor}; 8 + use serde::{Deserialize, Deserializer}; 9 10 #[derive(Clone, Deserialize)] 11 pub struct Feeds { 12 pub feeds: Vec<Feed>, 13 } 14 15 + #[derive(Clone, Debug, Deserialize)] 16 + #[serde(tag = "type")] 17 + pub enum FeedQuery { 18 + #[serde(rename = "simple")] 19 + Simple {}, 20 + 21 + #[serde(rename = "popular")] 22 + Popular { 23 + #[serde(default)] 24 + age_floor: i64, 25 + 26 + #[serde(default)] 27 + gravity: f64, 28 + }, 29 + } 30 + 31 #[derive(Clone, Deserialize)] 32 pub struct Feed { 33 pub uri: String, ··· 42 43 #[serde(default)] 44 pub deny: Option<String>, 45 + 46 + #[serde(default, deserialize_with = "string_or_struct")] 47 + pub query: FeedQuery, 48 49 pub matchers: Vec<Matcher>, 50 } ··· 299 &self.0 300 } 301 } 302 + 303 + impl Default for FeedQuery { 304 + fn default() -> Self { 305 + FeedQuery::Simple {} 306 + } 307 + } 308 + 309 + impl FromStr for FeedQuery { 310 + type Err = anyhow::Error; 311 + 312 + fn from_str(value: &str) -> Result<Self, Self::Err> { 313 + match value { 314 + "simple" => Ok(FeedQuery::Simple {}), 315 + "popular" => Ok(FeedQuery::Popular { 316 + age_floor: 0, 317 + gravity: 2.0, 318 + }), 319 + _ => Err(anyhow!("unsupported query")), 320 + } 321 + } 322 + } 323 + 324 + fn string_or_struct<'de, T, D>(deserializer: D) -> Result<T, D::Error> 325 + where 326 + T: Deserialize<'de> + FromStr<Err = anyhow::Error>, 327 + D: Deserializer<'de>, 328 + { 329 + struct StringOrStruct<T>(PhantomData<fn() -> T>); 330 + 331 + impl<'de, T> Visitor<'de> for StringOrStruct<T> 332 + where 333 + T: Deserialize<'de> + FromStr<Err = anyhow::Error>, 334 + { 335 + type Value = T; 336 + 337 + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 338 + formatter.write_str("string or FeedQuery") 339 + } 340 + 341 + fn visit_str<E>(self, value: &str) -> Result<T, E> 342 + where 343 + E: de::Error, 344 + { 345 + FromStr::from_str(value).map_err(|_| de::Error::custom("cannot deserialize field")) 346 + } 347 + 348 + fn visit_map<M>(self, map: M) -> Result<T, M::Error> 349 + where 350 + M: MapAccess<'de>, 351 + { 352 + Deserialize::deserialize(de::value::MapAccessDeserializer::new(map)) 353 + } 354 + } 355 + 356 + deserializer.deserialize_any(StringOrStruct(PhantomData)) 357 + }
+11 -7
src/consumer.rs
··· 11 12 use crate::config; 13 use crate::matcher::FeedMatchers; 14 use crate::storage; 15 use crate::storage::consumer_control_get; 16 use crate::storage::consumer_control_insert; ··· 176 let event_value = event_value.unwrap(); 177 178 for feed_matcher in self.feed_matchers.0.iter() { 179 - if let Some(match_result) = feed_matcher.matches(&event_value) { 180 tracing::debug!(feed_id = ?feed_matcher.feed, "matched event"); 181 - if match_result.matched { 182 - let feed_content = storage::model::FeedContent{ 183 - feed_id: feed_matcher.feed.clone(), 184 - uri: match_result.aturi, 185 - indexed_at: event.clone().time_us, 186 - }; 187 feed_content_insert(&self.pool, &feed_content).await?; 188 } 189 } 190 } 191 }
··· 11 12 use crate::config; 13 use crate::matcher::FeedMatchers; 14 + use crate::matcher::Match; 15 + use crate::matcher::MatchOperation; 16 use crate::storage; 17 use crate::storage::consumer_control_get; 18 use crate::storage::consumer_control_insert; ··· 178 let event_value = event_value.unwrap(); 179 180 for feed_matcher in self.feed_matchers.0.iter() { 181 + if let Some(Match(op, aturi)) = feed_matcher.matches(&event_value) { 182 tracing::debug!(feed_id = ?feed_matcher.feed, "matched event"); 183 + let feed_content = storage::model::FeedContent{ 184 + feed_id: feed_matcher.feed.clone(), 185 + uri: aturi, 186 + indexed_at: event.clone().time_us, 187 + score: 1, 188 + }; 189 + if op == MatchOperation::Upsert { 190 feed_content_insert(&self.pool, &feed_content).await?; 191 } 192 + 193 } 194 } 195 }
+194 -191
src/matcher.rs
··· 2 3 use serde_json_path::JsonPath; 4 5 use crate::config; 6 7 - #[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] 8 - pub struct MatcherResult { 9 - pub matched: bool, 10 - pub aturi: String, 11 } 12 13 - impl PartialEq<bool> for MatcherResult { 14 - fn eq(&self, other: &bool) -> bool { 15 - self.matched == *other 16 - } 17 - } 18 19 - impl MatcherResult { 20 - fn get_matched(&mut self) -> bool { 21 - self.matched 22 - } 23 - 24 - fn set_matched(&mut self, value: bool) { 25 - self.matched = value; 26 - } 27 - 28 - fn get_aturi(&mut self) -> String { 29 - self.aturi.clone() 30 - } 31 - 32 - fn set_aturi(&mut self, value: String) { 33 - self.aturi = value; 34 } 35 } 36 37 pub trait Matcher: Sync + Send { 38 - fn matches(&self, value: &serde_json::Value) -> Result<MatcherResult>; 39 } 40 41 pub struct FeedMatcher { ··· 75 as Box<dyn Matcher>); 76 } 77 78 - #[cfg(feature = "rhai")] 79 config::Matcher::Rhai { script } => { 80 - matchers 81 - .push(Box::new(rhai::RhaiMatcher::new(script)?) as Box<dyn Matcher>); 82 - } 83 - 84 - #[cfg(not(feature = "rhai"))] 85 - config::Matcher::Rhai { .. } => { 86 - return Err(anyhow!("rhai not enabled in this build")) 87 } 88 } 89 } ··· 96 } 97 98 impl FeedMatcher { 99 - pub(crate) fn matches(&self, value: &serde_json::Value) -> Option<MatcherResult> { 100 for matcher in self.matchers.iter() { 101 let result = matcher.matches(value); 102 if let Err(err) = result { ··· 104 continue; 105 } 106 let result = result.unwrap(); 107 - if result.matched { 108 - return Some(result); 109 } 110 } 111 None ··· 137 } 138 139 impl Matcher for EqualsMatcher { 140 - fn matches(&self, value: &serde_json::Value) -> Result<MatcherResult> { 141 let nodes = self.path.query(value).all(); 142 143 let string_nodes = nodes ··· 152 .collect::<Vec<String>>(); 153 154 if string_nodes.iter().any(|value| value == &self.expected) { 155 - let aturi = extract_aturi(self.aturi_path.as_ref(), value).ok_or(anyhow!( 156 - "matcher matched but could not create at-uri: {:?}", 157 - value 158 - ))?; 159 - Ok(MatcherResult { 160 - matched: true, 161 - aturi, 162 - }) 163 } else { 164 - Ok(MatcherResult::default()) 165 } 166 } 167 } ··· 191 } 192 193 impl Matcher for PrefixMatcher { 194 - fn matches(&self, value: &serde_json::Value) -> Result<MatcherResult> { 195 let nodes = self.path.query(value).all(); 196 197 let string_nodes = nodes ··· 209 .iter() 210 .any(|value| value.starts_with(&self.prefix)); 211 if found { 212 - let aturi = extract_aturi(self.aturi_path.as_ref(), value).ok_or(anyhow!( 213 - "matcher matched but could not create at-uri: {:?}", 214 - value 215 - ))?; 216 - Ok(MatcherResult { 217 - matched: true, 218 - aturi, 219 - }) 220 } else { 221 - Ok(MatcherResult::default()) 222 } 223 } 224 } ··· 248 } 249 250 impl Matcher for SequenceMatcher { 251 - fn matches(&self, value: &serde_json::Value) -> Result<MatcherResult> { 252 let nodes = self.path.query(value).all(); 253 254 let string_nodes = nodes ··· 282 } 283 284 if last_found != -1 && found_index == self.expected.len() - 1 { 285 - let aturi = extract_aturi(self.aturi_path.as_ref(), value).ok_or(anyhow!( 286 - "matcher matched but could not create at-uri: {:?}", 287 - value 288 - ))?; 289 - return Ok(MatcherResult { 290 - matched: true, 291 - aturi, 292 - }); 293 } 294 } 295 296 - Ok(MatcherResult::default()) 297 } 298 } 299 ··· 346 None 347 } 348 349 - #[cfg(feature = "rhai")] 350 - pub mod rhai { 351 352 - use super::{Matcher, MatcherResult}; 353 - use anyhow::{anyhow, Context, Result}; 354 355 - use rhai::{serde::to_dynamic, Dynamic, Engine, Scope, AST}; 356 - use std::{path::PathBuf, str::FromStr}; 357 358 - pub struct RhaiMatcher { 359 - source: String, 360 - engine: Engine, 361 - ast: AST, 362 } 363 364 - impl RhaiMatcher { 365 - pub(crate) fn new(source: &str) -> Result<Self> { 366 - let mut engine = Engine::new(); 367 - engine 368 - .register_type_with_name::<MatcherResult>("MatcherResult") 369 - .register_get_set( 370 - "matched", 371 - MatcherResult::get_matched, 372 - MatcherResult::set_matched, 373 - ) 374 - .register_get_set("aturi", MatcherResult::get_aturi, MatcherResult::set_aturi) 375 - .register_fn("new_matcher_result", MatcherResult::default) 376 - .register_fn("build_aturi", build_aturi); 377 - let ast = engine 378 - .compile_file(PathBuf::from_str(source)?) 379 - .context("cannot compile script")?; 380 - Ok(Self { 381 - source: source.to_string(), 382 - engine, 383 - ast, 384 - }) 385 - } 386 } 387 388 - impl Matcher for RhaiMatcher { 389 - fn matches(&self, value: &serde_json::Value) -> Result<MatcherResult> { 390 - let mut scope = Scope::new(); 391 - let value_map = to_dynamic(value); 392 - if let Err(err) = value_map { 393 - tracing::error!(source = ?self.source, error = ?err, "error converting value to dynamic"); 394 - return Ok(MatcherResult::default()); 395 - } 396 - let value_map = value_map.unwrap(); 397 - scope.push("event", value_map); 398 399 - self.engine 400 - .eval_ast_with_scope::<MatcherResult>(&mut scope, &self.ast) 401 - .context("error evaluating script") 402 } 403 } 404 405 - fn build_aturi_maybe(event: Dynamic) -> Result<String> { 406 - println!("{event:?}"); 407 - let event = event.as_map_ref().map_err(|err| anyhow!(err))?; 408 409 - let commit = event 410 - .get("commit") 411 - .ok_or(anyhow!("no commit on event"))? 412 - .as_map_ref() 413 - .map_err(|err| anyhow!(err))?; 414 - let record = commit 415 - .get("record") 416 - .ok_or(anyhow!("no record on event commit"))? 417 - .as_map_ref() 418 - .map_err(|err| anyhow!(err))?; 419 420 - let rtype = record 421 - .get("$type") 422 - .ok_or(anyhow!("no $type on event commit record"))? 423 - .as_immutable_string_ref() 424 - .map_err(|err| anyhow!(err))?; 425 426 - match rtype.as_str() { 427 - "app.bsky.feed.post" => { 428 - let did = event 429 - .get("did") 430 - .ok_or(anyhow!("no did on event"))? 431 - .as_immutable_string_ref() 432 - .map_err(|err| anyhow!(err))?; 433 - let collection = commit 434 - .get("collection") 435 - .ok_or(anyhow!("no collection on event"))? 436 - .as_immutable_string_ref() 437 - .map_err(|err| anyhow!(err))?; 438 - let rkey = commit 439 - .get("rkey") 440 - .ok_or(anyhow!("no rkey on event commit"))? 441 - .as_immutable_string_ref() 442 - .map_err(|err| anyhow!(err))?; 443 444 - Ok(format!( 445 - "at://{}/{}/{}", 446 - did.as_str(), 447 - collection.as_str(), 448 - rkey.as_str() 449 - )) 450 - } 451 - _ => Err(anyhow!("no aturi for event")), 452 } 453 } 454 455 - fn build_aturi(event: Dynamic) -> String { 456 - let aturi = build_aturi_maybe(event); 457 - if let Err(err) = aturi { 458 - println!("error {err:?}"); 459 - return "".into(); 460 - } 461 - aturi.unwrap() 462 } 463 } 464 465 #[cfg(test)] 466 mod tests { 467 468 use super::*; 469 470 #[test] 471 - fn equals_matcher() { 472 let raw_json = r#"{ 473 "did": "did:plc:tgudj2fjm77pzkuawquqhsxm", 474 "time_us": 1730491093829414, ··· 505 506 for (path, expected, result) in tests { 507 let matcher = EqualsMatcher::new(expected, path, &None).expect("matcher is valid"); 508 - assert_eq!(matcher.matches(&value).expect("match ok"), result); 509 } 510 } 511 512 #[test] 513 - fn prefix_matcher() { 514 let raw_json = r#"{ 515 "did": "did:plc:tgudj2fjm77pzkuawquqhsxm", 516 "time_us": 1730491093829414, ··· 553 554 for (path, prefix, result) in tests { 555 let matcher = PrefixMatcher::new(prefix, path, &None).expect("matcher is valid"); 556 - assert_eq!(matcher.matches(&value).expect("match ok"), result); 557 } 558 } 559 560 #[test] 561 - fn sequence_matcher() { 562 let raw_json = r#"{ 563 "did": "did:plc:tgudj2fjm77pzkuawquqhsxm", 564 "time_us": 1730491093829414, ··· 620 621 for (path, values, result) in tests { 622 let matcher = SequenceMatcher::new(&values, path, &None).expect("matcher is valid"); 623 - assert_eq!(matcher.matches(&value).expect("match ok"), result); 624 } 625 } 626 627 #[test] 628 - fn sequence_matcher_edge_case_1() { 629 let raw_json = r#"{"text": "Stellwerkstörung. Und Signalstörung. Und der Alternativzug ist auch ausgefallen. Und überhaupt."}"#; 630 let value: serde_json::Value = serde_json::from_str(raw_json).expect("json is valid"); 631 let matcher = SequenceMatcher::new( 632 &vec!["smoke".to_string(), "signal".to_string()], 633 "$.text", 634 &None, 635 - ) 636 - .expect("matcher is valid"); 637 - assert_eq!(matcher.matches(&value).expect("match ok"), false); 638 - } 639 - } 640 - 641 - #[cfg(all(test, feature = "rhai"))] 642 - mod rhaitests { 643 644 - use super::rhai::*; 645 - use super::*; 646 - use anyhow::{anyhow, Result}; 647 - use std::path::PathBuf; 648 649 - #[cfg(feature = "rhai")] 650 #[test] 651 fn rhai_matcher() -> Result<()> { 652 let testdata = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("testdata"); 653 654 - let tests = vec![ 655 ( 656 "post1.json", 657 - [ 658 ( 659 "rhai_match_everything.rhai", 660 true, 661 "at://did:plc:cbkjy5n7bk3ax2wplmtjofq2/app.bsky.feed.post/3laadb7behk25", 662 ), ··· 675 ), 676 ( 677 "post2.json", 678 - [ 679 ( 680 "rhai_match_everything.rhai", 681 true, ··· 714 .context("could not construct matcher")?; 715 let result = matcher.matches(&value)?; 716 assert_eq!( 717 - result.matched, matched, 718 "matched {}: {}", 719 - input_json, matcher_file_name 720 - ); 721 - assert_eq!( 722 - result.aturi, aturi, 723 - "aturi {}: {}", 724 - input_json, matcher_file_name 725 ); 726 } 727 }
··· 2 3 use serde_json_path::JsonPath; 4 5 + use rhai::{ 6 + serde::to_dynamic, 7 + CustomType, Dynamic, Engine, Scope, TypeBuilder, AST, 8 + }; 9 + use std::{collections::HashMap, path::PathBuf, str::FromStr}; 10 + 11 use crate::config; 12 13 + #[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, PartialEq)] 14 + pub enum MatchOperation { 15 + #[default] 16 + Upsert, 17 + Update, 18 } 19 20 + #[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, CustomType)] 21 + pub struct Match(pub MatchOperation, pub String); 22 23 + impl Match { 24 + fn upsert(aturi: &str) -> Self { 25 + Self(MatchOperation::Upsert, aturi.to_string()) 26 } 27 } 28 29 pub trait Matcher: Sync + Send { 30 + fn matches(&self, value: &serde_json::Value) -> Result<Option<Match>>; 31 } 32 33 pub struct FeedMatcher { ··· 67 as Box<dyn Matcher>); 68 } 69 70 config::Matcher::Rhai { script } => { 71 + matchers.push(Box::new(RhaiMatcher::new(script)?) as Box<dyn Matcher>); 72 } 73 } 74 } ··· 81 } 82 83 impl FeedMatcher { 84 + pub(crate) fn matches(&self, value: &serde_json::Value) -> Option<Match> { 85 for matcher in self.matchers.iter() { 86 let result = matcher.matches(value); 87 if let Err(err) = result { ··· 89 continue; 90 } 91 let result = result.unwrap(); 92 + if result.is_some() { 93 + return result; 94 } 95 } 96 None ··· 122 } 123 124 impl Matcher for EqualsMatcher { 125 + fn matches(&self, value: &serde_json::Value) -> Result<Option<Match>> { 126 let nodes = self.path.query(value).all(); 127 128 let string_nodes = nodes ··· 137 .collect::<Vec<String>>(); 138 139 if string_nodes.iter().any(|value| value == &self.expected) { 140 + extract_aturi(self.aturi_path.as_ref(), value) 141 + .map(|value| Some(Match::upsert(&value))) 142 + .ok_or(anyhow!( 143 + "matcher matched but could not create at-uri: {:?}", 144 + value 145 + )) 146 } else { 147 + Ok(None) 148 } 149 } 150 } ··· 174 } 175 176 impl Matcher for PrefixMatcher { 177 + fn matches(&self, value: &serde_json::Value) -> Result<Option<Match>> { 178 let nodes = self.path.query(value).all(); 179 180 let string_nodes = nodes ··· 192 .iter() 193 .any(|value| value.starts_with(&self.prefix)); 194 if found { 195 + extract_aturi(self.aturi_path.as_ref(), value) 196 + .map(|value| Some(Match::upsert(&value))) 197 + .ok_or(anyhow!( 198 + "matcher matched but could not create at-uri: {:?}", 199 + value 200 + )) 201 } else { 202 + Ok(None) 203 } 204 } 205 } ··· 229 } 230 231 impl Matcher for SequenceMatcher { 232 + fn matches(&self, value: &serde_json::Value) -> Result<Option<Match>> { 233 let nodes = self.path.query(value).all(); 234 235 let string_nodes = nodes ··· 263 } 264 265 if last_found != -1 && found_index == self.expected.len() - 1 { 266 + return extract_aturi(self.aturi_path.as_ref(), value) 267 + .map(|value| Some(Match::upsert(&value))) 268 + .ok_or(anyhow!( 269 + "matcher matched but could not create at-uri: {:?}", 270 + value 271 + )); 272 } 273 } 274 275 + Ok(None) 276 } 277 } 278 ··· 325 None 326 } 327 328 + pub struct RhaiMatcher { 329 + source: String, 330 + engine: Engine, 331 + ast: AST, 332 + } 333 334 + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] 335 + #[serde(untagged)] 336 + pub enum MaybeMatch { 337 + Match(Match), 338 339 + #[serde(untagged)] 340 + Other { 341 + #[serde(flatten)] 342 + extra: HashMap<String, serde_json::Value>, 343 + }, 344 + } 345 346 + impl MaybeMatch { 347 + pub fn into_match(self) -> Option<Match> { 348 + match self { 349 + MaybeMatch::Match(m) => Some(m), 350 + _ => None, 351 + } 352 } 353 + } 354 355 + impl RhaiMatcher { 356 + pub(crate) fn new(source: &str) -> Result<Self> { 357 + let mut engine = Engine::new(); 358 + engine 359 + .build_type::<Match>() 360 + .register_fn("build_aturi", build_aturi) 361 + .register_fn("new_match", Match::upsert); 362 + let ast = engine 363 + .compile_file(PathBuf::from_str(source)?) 364 + .context("cannot compile script")?; 365 + Ok(Self { 366 + source: source.to_string(), 367 + engine, 368 + ast, 369 + }) 370 } 371 + } 372 373 + fn dynamic_to_match(value: Dynamic) -> Result<Option<Match>> { 374 + if value.is_bool() || value.is_int() { 375 + return Ok(None); 376 + } 377 + if let Some(aturi) = value.clone().try_cast::<String>() { 378 + return Ok(Some(Match::upsert(&aturi))); 379 + } 380 + if let Some(match_value) = value.try_cast::<Match>() { 381 + return Ok(Some(match_value)); 382 + } 383 + Err(anyhow!("unsupported return value type: must be int, string, or match")) 384 + } 385 386 + impl Matcher for RhaiMatcher { 387 + fn matches(&self, value: &serde_json::Value) -> Result<Option<Match>> { 388 + let mut scope = Scope::new(); 389 + let value_map = to_dynamic(value); 390 + if let Err(err) = value_map { 391 + tracing::error!(source = ?self.source, error = ?err, "error converting value to dynamic"); 392 + return Ok(None); 393 } 394 + let value_map = value_map.unwrap(); 395 + scope.push("event", value_map); 396 + 397 + self.engine 398 + .eval_ast_with_scope::<Dynamic>(&mut scope, &self.ast) 399 + .context("error evaluating script") 400 + .and_then(dynamic_to_match) 401 } 402 + } 403 404 + fn build_aturi_maybe(event: Dynamic) -> Result<String> { 405 + let event = event.as_map_ref().map_err(|err| anyhow!(err))?; 406 407 + let commit = event 408 + .get("commit") 409 + .ok_or(anyhow!("no commit on event"))? 410 + .as_map_ref() 411 + .map_err(|err| anyhow!(err))?; 412 + let record = commit 413 + .get("record") 414 + .ok_or(anyhow!("no record on event commit"))? 415 + .as_map_ref() 416 + .map_err(|err| anyhow!(err))?; 417 418 + let rtype = record 419 + .get("$type") 420 + .ok_or(anyhow!("no $type on event commit record"))? 421 + .as_immutable_string_ref() 422 + .map_err(|err| anyhow!(err))?; 423 424 + match rtype.as_str() { 425 + "app.bsky.feed.post" => { 426 + let did = event 427 + .get("did") 428 + .ok_or(anyhow!("no did on event"))? 429 + .as_immutable_string_ref() 430 + .map_err(|err| anyhow!(err))?; 431 + let collection = commit 432 + .get("collection") 433 + .ok_or(anyhow!("no collection on event"))? 434 + .as_immutable_string_ref() 435 + .map_err(|err| anyhow!(err))?; 436 + let rkey = commit 437 + .get("rkey") 438 + .ok_or(anyhow!("no rkey on event commit"))? 439 + .as_immutable_string_ref() 440 + .map_err(|err| anyhow!(err))?; 441 442 + Ok(format!( 443 + "at://{}/{}/{}", 444 + did.as_str(), 445 + collection.as_str(), 446 + rkey.as_str() 447 + )) 448 } 449 + _ => Err(anyhow!("no aturi for event")), 450 } 451 + } 452 453 + fn build_aturi(event: Dynamic) -> String { 454 + let aturi = build_aturi_maybe(event); 455 + if let Err(err) = aturi { 456 + tracing::warn!(error = ?err, "error creating at-uri"); 457 + return "".into(); 458 } 459 + aturi.unwrap() 460 } 461 462 #[cfg(test)] 463 mod tests { 464 465 use super::*; 466 + use anyhow::{anyhow, Result}; 467 + use std::path::PathBuf; 468 469 #[test] 470 + fn equals_matcher() -> Result<()> { 471 let raw_json = r#"{ 472 "did": "did:plc:tgudj2fjm77pzkuawquqhsxm", 473 "time_us": 1730491093829414, ··· 504 505 for (path, expected, result) in tests { 506 let matcher = EqualsMatcher::new(expected, path, &None).expect("matcher is valid"); 507 + let maybe_match = matcher.matches(&value)?; 508 + assert_eq!(maybe_match.is_some(), result); 509 } 510 + 511 + Ok(()) 512 } 513 514 #[test] 515 + fn prefix_matcher() -> Result<()> { 516 let raw_json = r#"{ 517 "did": "did:plc:tgudj2fjm77pzkuawquqhsxm", 518 "time_us": 1730491093829414, ··· 555 556 for (path, prefix, result) in tests { 557 let matcher = PrefixMatcher::new(prefix, path, &None).expect("matcher is valid"); 558 + let maybe_match = matcher.matches(&value)?; 559 + assert_eq!(maybe_match.is_some(), result); 560 } 561 + 562 + Ok(()) 563 } 564 565 #[test] 566 + fn sequence_matcher() -> Result<()> { 567 let raw_json = r#"{ 568 "did": "did:plc:tgudj2fjm77pzkuawquqhsxm", 569 "time_us": 1730491093829414, ··· 625 626 for (path, values, result) in tests { 627 let matcher = SequenceMatcher::new(&values, path, &None).expect("matcher is valid"); 628 + let maybe_match = matcher.matches(&value)?; 629 + assert_eq!(maybe_match.is_some(), result); 630 } 631 + 632 + Ok(()) 633 } 634 635 #[test] 636 + fn sequence_matcher_edge_case_1() -> Result<()> { 637 let raw_json = r#"{"text": "Stellwerkstörung. Und Signalstörung. Und der Alternativzug ist auch ausgefallen. Und überhaupt."}"#; 638 let value: serde_json::Value = serde_json::from_str(raw_json).expect("json is valid"); 639 let matcher = SequenceMatcher::new( 640 &vec!["smoke".to_string(), "signal".to_string()], 641 "$.text", 642 &None, 643 + )?; 644 + let maybe_match = matcher.matches(&value)?; 645 + assert_eq!(maybe_match.is_some(), false); 646 647 + Ok(()) 648 + } 649 650 #[test] 651 fn rhai_matcher() -> Result<()> { 652 let testdata = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("testdata"); 653 654 + let tests: Vec<(&str, Vec<(&str, bool, &str)>)> = vec![ 655 ( 656 "post1.json", 657 + vec![ 658 + ("rhai_match_nothing.rhai", false, ""), 659 ( 660 "rhai_match_everything.rhai", 661 + true, 662 + "at://did:plc:cbkjy5n7bk3ax2wplmtjofq2/app.bsky.feed.post/3laadb7behk25", 663 + ), 664 + ( 665 + "rhai_match_everything_simple.rhai", 666 true, 667 "at://did:plc:cbkjy5n7bk3ax2wplmtjofq2/app.bsky.feed.post/3laadb7behk25", 668 ), ··· 681 ), 682 ( 683 "post2.json", 684 + vec![ 685 ( 686 "rhai_match_everything.rhai", 687 true, ··· 720 .context("could not construct matcher")?; 721 let result = matcher.matches(&value)?; 722 assert_eq!( 723 + result.is_some_and(|e| e.1 == aturi), 724 + matched, 725 "matched {}: {}", 726 + input_json, 727 + matcher_file_name 728 ); 729 } 730 }
+39 -5
src/storage.rs
··· 15 pub feed_id: String, 16 pub uri: String, 17 pub indexed_at: i64, 18 } 19 } 20 21 - pub async fn feed_content_insert( 22 - pool: &StoragePool, 23 - feed_content: &model::FeedContent, 24 - ) -> Result<()> { 25 let mut tx = pool.begin().await.context("failed to begin transaction")?; 26 27 let now = Utc::now(); 28 - sqlx::query("INSERT OR REPLACE INTO feed_content (feed_id, uri, indexed_at, updated_at) VALUES (?, ?, ?, ?)") 29 .bind(&feed_content.feed_id) 30 .bind(&feed_content.uri) 31 .bind(feed_content.indexed_at) 32 .bind(now) 33 .execute(tx.as_mut()) 34 .await.context("failed to insert feed content record")?; 35 ··· 37 } 38 39 pub async fn feed_content_paginate( 40 pool: &StoragePool, 41 feed_uri: &str, 42 limit: Option<u16>, ··· 180 uri: "at://did:plc:qadlgs4xioohnhi2jg54mqds/app.bsky.feed.post/3la3bqjg4hx2n" 181 .to_string(), 182 indexed_at: 1730673934229172_i64, 183 }; 184 super::feed_content_insert(&pool, &record) 185 .await
··· 15 pub feed_id: String, 16 pub uri: String, 17 pub indexed_at: i64, 18 + pub score: i32, 19 } 20 } 21 22 + pub async fn feed_content_insert(pool: &StoragePool, feed_content: &FeedContent) -> Result<()> { 23 let mut tx = pool.begin().await.context("failed to begin transaction")?; 24 25 let now = Utc::now(); 26 + sqlx::query("INSERT OR REPLACE INTO feed_content (feed_id, uri, indexed_at, updated_at, score) VALUES (?, ?, ?, ?, ?)") 27 .bind(&feed_content.feed_id) 28 .bind(&feed_content.uri) 29 .bind(feed_content.indexed_at) 30 .bind(now) 31 + .bind(feed_content.score) 32 .execute(tx.as_mut()) 33 .await.context("failed to insert feed content record")?; 34 ··· 36 } 37 38 pub async fn feed_content_paginate( 39 + pool: &StoragePool, 40 + feed_uri: &str, 41 + limit: Option<u16>, 42 + cursor: Option<i64>, 43 + ) -> Result<Vec<FeedContent>> { 44 + let mut tx = pool.begin().await.context("failed to begin transaction")?; 45 + 46 + let limit = limit.unwrap_or(20).clamp(1, 100); 47 + 48 + let results = if let Some(indexed_at) = cursor { 49 + let query = "SELECT * FROM feed_content WHERE feed_id = ? AND indexed_at < ? ORDER BY indexed_at DESC LIMIT ?"; 50 + 51 + sqlx::query_as::<_, FeedContent>(query) 52 + .bind(feed_uri) 53 + .bind(indexed_at) 54 + .bind(limit) 55 + .fetch_all(tx.as_mut()) 56 + .await? 57 + } else { 58 + let query = "SELECT * FROM feed_content WHERE feed_id = ? ORDER BY indexed_at DESC LIMIT ?"; 59 + 60 + sqlx::query_as::<_, FeedContent>(query) 61 + .bind(feed_uri) 62 + .bind(limit) 63 + .fetch_all(tx.as_mut()) 64 + .await? 65 + }; 66 + 67 + tx.commit().await.context("failed to commit transaction")?; 68 + 69 + Ok(results) 70 + } 71 + 72 + pub async fn feed_content_paginate_popular( 73 pool: &StoragePool, 74 feed_uri: &str, 75 limit: Option<u16>, ··· 213 uri: "at://did:plc:qadlgs4xioohnhi2jg54mqds/app.bsky.feed.post/3la3bqjg4hx2n" 214 .to_string(), 215 indexed_at: 1730673934229172_i64, 216 + score: 1, 217 }; 218 super::feed_content_insert(&pool, &record) 219 .await
+2 -8
testdata/rhai_match_everything.rhai
··· 1 - let result = new_matcher_result(); 2 - result.matched = true; 3 - 4 - if result.matched { 5 - result.aturi = build_aturi(event); 6 - } 7 - 8 - result
··· 1 + let aturi = build_aturi(event); 2 + return new_match(aturi);
+1
testdata/rhai_match_everything_simple.rhai
···
··· 1 + build_aturi(event)
+19
testdata/rhai_match_liked.rhai
···
··· 1 + 2 + let result = new_matcher_result(); 3 + 4 + let rtype = event?.commit?.record["$type"]; 5 + 6 + switch rtype { 7 + "app.bsky.feed.like" => { 8 + result.matched = true; 9 + } 10 + // noop 11 + _ => { } 12 + } 13 + 14 + if result.matched { 15 + result.aturi = build_aturi(event); 16 + } 17 + 18 + 19 + result
+1
testdata/rhai_match_nothing.rhai
···
··· 1 + false
+4 -11
testdata/rhai_match_poster.rhai
··· 1 - 2 - let result = new_matcher_result(); 3 - 4 let rtype = event?.commit?.record["$type"]; 5 - 6 switch rtype { 7 "app.bsky.feed.post" => { 8 - result.matched = event.did == "did:plc:cbkjy5n7bk3ax2wplmtjofq2"; 9 } 10 _ => { } 11 } 12 - 13 - if result.matched { 14 - result.aturi = build_aturi(event); 15 - } 16 - 17 - result
··· 1 let rtype = event?.commit?.record["$type"]; 2 switch rtype { 3 "app.bsky.feed.post" => { 4 + if event.did == "did:plc:cbkjy5n7bk3ax2wplmtjofq2" { 5 + return build_aturi(event); 6 + } 7 } 8 _ => { } 9 } 10 + false
+4 -8
testdata/rhai_match_reply_root.rhai
··· 1 - let result = new_matcher_result(); 2 - 3 let rtype = event?.commit?.record["$type"]; 4 5 if rtype != "app.bsky.feed.post" { 6 - return result; 7 } 8 9 let root_uri = event?.commit?.record?.reply?.root?.uri; 10 11 - result.matched = `${root_uri}`.starts_with("at://did:plc:cbkjy5n7bk3ax2wplmtjofq2/app.bsky.feed.post/"); 12 - 13 - if result.matched { 14 - result.aturi = build_aturi(event); 15 } 16 17 - result
··· 1 let rtype = event?.commit?.record["$type"]; 2 3 if rtype != "app.bsky.feed.post" { 4 + return false; 5 } 6 7 let root_uri = event?.commit?.record?.reply?.root?.uri; 8 9 + if `${root_uri}`.starts_with("at://did:plc:cbkjy5n7bk3ax2wplmtjofq2/app.bsky.feed.post/") { 10 + return build_aturi(event); 11 } 12 13 + false
+2 -10
testdata/rhai_match_type.rhai
··· 1 - 2 - let result = new_matcher_result(); 3 - 4 let rtype = event?.commit?.record["$type"]; 5 6 switch rtype { 7 "app.bsky.feed.post" => { 8 - result.matched = true; 9 } 10 // noop 11 _ => { } 12 } 13 14 - if result.matched { 15 - result.aturi = build_aturi(event); 16 - } 17 - 18 - 19 - result
··· 1 let rtype = event?.commit?.record["$type"]; 2 3 switch rtype { 4 "app.bsky.feed.post" => { 5 + return build_aturi(event); 6 } 7 // noop 8 _ => { } 9 } 10 11 + false