Heavily customized version of smokesignal - https://whtwnd.com/kayrozen.com/3lpwe4ymowg2t
at main 580 lines 21 kB view raw
1// Main filtering service coordination layer 2// 3// Coordinates between query builder, facet calculator, and hydrator 4// to provide a unified interface for event filtering operations. 5 6use sqlx::PgPool; 7use tracing::{instrument, trace, warn}; 8use std::collections::hash_map::DefaultHasher; 9use std::hash::{Hash, Hasher}; 10use unic_langid::LanguageIdentifier; 11 12use super::{ 13 EventFilterCriteria, EventFacets, EventQueryBuilder, FacetCalculator, FilterError, 14 EventHydrator, HydratedEvent, HydrationOptions, EventSortField, SortOrder, 15}; 16use crate::storage::event::model::Event; 17use crate::storage::CachePool; 18 19/// Configuration for filtering service 20#[derive(Debug, Clone)] 21pub struct FilterConfig { 22 /// Cache TTL in seconds (default: 5 minutes) 23 pub cache_ttl: u64, 24 /// Enable caching 25 pub enable_cache: bool, 26} 27 28impl Default for FilterConfig { 29 fn default() -> Self { 30 Self { 31 cache_ttl: 300, // 5 minutes 32 enable_cache: true, 33 } 34 } 35} 36 37/// Main filtering service that coordinates all filtering operations 38#[derive(Debug, Clone)] 39pub struct FilteringService { 40 #[allow(dead_code)] 41 pool: PgPool, 42 cache_pool: Option<CachePool>, 43 config: FilterConfig, 44 query_builder: EventQueryBuilder, 45 facet_calculator: FacetCalculator, 46 hydrator: EventHydrator, 47} 48 49/// Result structure containing filtered events and facets 50#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] 51pub struct FilterResults { 52 pub events: Vec<Event>, 53 pub hydrated_events: Vec<HydratedEvent>, 54 pub facets: EventFacets, 55 pub total_count: i64, 56 pub page: usize, 57 pub page_size: usize, 58 pub has_more: bool, 59} 60 61/// Options for filtering operations 62#[derive(Debug, Clone, Default)] 63pub struct FilterOptions { 64 /// Whether to calculate facets (can be expensive) 65 pub include_facets: bool, 66 67 /// Whether to hydrate events with additional data 68 pub include_hydration: bool, 69 70 /// Hydration options if hydration is enabled 71 pub hydration_options: HydrationOptions, 72} 73 74impl FilteringService { 75 /// Create a new filtering service 76 pub fn new(pool: PgPool) -> Self { 77 Self::new_with_cache(pool, None, FilterConfig::default()) 78 } 79 80 /// Create a new filtering service with cache support 81 pub fn new_with_cache(pool: PgPool, cache_pool: Option<CachePool>, config: FilterConfig) -> Self { 82 let query_builder = EventQueryBuilder::new(pool.clone()); 83 let facet_calculator = FacetCalculator::new(pool.clone()); 84 let hydrator = EventHydrator::new(pool.clone()); 85 86 Self { 87 pool, 88 cache_pool, 89 config, 90 query_builder, 91 facet_calculator, 92 hydrator, 93 } 94 } 95 96 /// Filter events with the given criteria 97 #[instrument(skip(self, criteria, options), fields( 98 search_term = ?criteria.search_term, 99 page = criteria.page, 100 include_facets = options.include_facets, 101 include_hydration = options.include_hydration 102 ))] 103 pub async fn filter_events( 104 &self, 105 criteria: &EventFilterCriteria, 106 options: &FilterOptions, 107 ) -> Result<FilterResults, FilterError> { 108 self.filter_events_with_locale(criteria, options, "en").await 109 } 110 111 /// Filter events with the given criteria and locale for cache key generation 112 #[instrument(skip(self, criteria, options), fields( 113 search_term = ?criteria.search_term, 114 page = criteria.page, 115 include_facets = options.include_facets, 116 include_hydration = options.include_hydration, 117 locale = locale 118 ))] 119 pub async fn filter_events_with_locale( 120 &self, 121 criteria: &EventFilterCriteria, 122 options: &FilterOptions, 123 locale: &str, 124 ) -> Result<FilterResults, FilterError> { 125 // Validate criteria 126 self.validate_criteria(criteria)?; 127 128 // Try cache first if enabled 129 if self.config.enable_cache && self.cache_pool.is_some() { 130 let cache_key = self.generate_cache_key(criteria, options, locale); 131 if let Ok(cached_results) = self.get_from_cache(&cache_key).await { 132 trace!("Cache hit for filter results: {}", cache_key); 133 return Ok(cached_results); 134 } 135 trace!("Cache miss for filter results: {}", cache_key); 136 } 137 138 // Cache miss or caching disabled - perform full filtering operation 139 let results = self.filter_events_uncached_with_locale(criteria, options, locale).await?; 140 141 // Store results in cache if enabled (fire and forget) 142 if self.config.enable_cache && self.cache_pool.is_some() { 143 let cache_key = self.generate_cache_key(criteria, options, locale); 144 let cache_pool = self.cache_pool.clone().unwrap(); 145 let results_clone = results.clone(); 146 let cache_ttl = self.config.cache_ttl; 147 148 tokio::spawn(async move { 149 if let Err(err) = Self::store_in_cache_static(&cache_pool, &cache_key, &results_clone, cache_ttl).await { 150 warn!(error = ?err, cache_key = %cache_key, "Failed to store results in cache"); 151 } 152 }); 153 } 154 155 Ok(results) 156 } 157 158 /// Filter events without caching 159 #[allow(dead_code)] 160 async fn filter_events_uncached( 161 &self, 162 criteria: &EventFilterCriteria, 163 options: &FilterOptions, 164 ) -> Result<FilterResults, FilterError> { 165 // Get filtered events 166 let events = self.query_builder.build_and_execute(criteria).await?; 167 trace!("Found {} events matching criteria", events.len()); 168 169 // Get total count for pagination 170 let total_count = self.query_builder.count_results(criteria).await?; 171 172 // Calculate facets if requested (uses default en-us locale for backward compatibility) 173 let facets = if options.include_facets { 174 let locale = "en-us".parse::<LanguageIdentifier>().unwrap_or_else(|_| { 175 "en-us".parse().expect("Failed to parse default locale") 176 }); 177 self.facet_calculator.calculate_facets_with_locale(criteria, &locale).await? 178 } else { 179 EventFacets::default() 180 }; 181 182 // Hydrate events if requested 183 let mut hydrated_events = if options.include_hydration { 184 self.hydrator.hydrate_events(events.clone(), &options.hydration_options).await? 185 } else { 186 Vec::new() 187 }; 188 189 // Apply post-filtering sorting for PopularityRsvp if needed 190 if options.include_hydration && criteria.sort_by == EventSortField::PopularityRsvp { 191 self.apply_rsvp_based_sorting(&mut hydrated_events, &criteria.sort_order); 192 } 193 194 // Calculate pagination info 195 let has_more = (criteria.page + 1) * criteria.page_size < total_count as usize; 196 197 Ok(FilterResults { 198 events, 199 hydrated_events, 200 facets, 201 total_count, 202 page: criteria.page, 203 page_size: criteria.page_size, 204 has_more, 205 }) 206 } 207 208 /// Filter events without caching with locale-aware facet calculation 209 async fn filter_events_uncached_with_locale( 210 &self, 211 criteria: &EventFilterCriteria, 212 options: &FilterOptions, 213 locale: &str, 214 ) -> Result<FilterResults, FilterError> { 215 // Get filtered events 216 let events = self.query_builder.build_and_execute(criteria).await?; 217 trace!("Found {} events matching criteria", events.len()); 218 219 // Get total count for pagination 220 let total_count = self.query_builder.count_results(criteria).await?; 221 222 // Calculate facets if requested with locale support 223 let facets = if options.include_facets { 224 let language_id = locale.parse::<LanguageIdentifier>().unwrap_or_else(|err| { 225 warn!("Failed to parse locale '{}': {}. Using default 'en-us'", locale, err); 226 "en-us".parse().expect("Failed to parse default locale") 227 }); 228 self.facet_calculator.calculate_facets_with_locale(criteria, &language_id).await? 229 } else { 230 EventFacets::default() 231 }; 232 233 // Hydrate events if requested 234 let mut hydrated_events = if options.include_hydration { 235 self.hydrator.hydrate_events_with_locale(events.clone(), &options.hydration_options, Some(locale)).await? 236 } else { 237 Vec::new() 238 }; 239 240 // Apply post-filtering sorting for PopularityRsvp if needed 241 if options.include_hydration && criteria.sort_by == EventSortField::PopularityRsvp { 242 self.apply_rsvp_based_sorting(&mut hydrated_events, &criteria.sort_order); 243 } 244 245 // Calculate pagination info 246 let has_more = (criteria.page + 1) * criteria.page_size < total_count as usize; 247 248 Ok(FilterResults { 249 events, 250 hydrated_events, 251 facets, 252 total_count, 253 page: criteria.page, 254 page_size: criteria.page_size, 255 has_more, 256 }) 257 } 258 259 /// Generate cache key for the given criteria, options, and locale 260 fn generate_cache_key(&self, criteria: &EventFilterCriteria, options: &FilterOptions, locale: &str) -> String { 261 let criteria_hash = criteria.cache_hash(); 262 let options_hash = self.hash_filter_options(options); 263 format!("events:filter:{}:{}:{}", locale, criteria_hash, options_hash) 264 } 265 266 /// Generate hash for filter options 267 fn hash_filter_options(&self, options: &FilterOptions) -> u64 { 268 let mut hasher = DefaultHasher::new(); 269 options.include_facets.hash(&mut hasher); 270 options.include_hydration.hash(&mut hasher); 271 // Note: We could also hash hydration_options if needed for more granular caching 272 hasher.finish() 273 } 274 275 /// Try to get results from cache 276 async fn get_from_cache(&self, cache_key: &str) -> Result<FilterResults, FilterError> { 277 use deadpool_redis::redis::AsyncCommands; 278 279 let cache_pool = self.cache_pool.as_ref().ok_or_else(|| { 280 FilterError::cache_operation_failed("get", "Cache pool not available") 281 })?; 282 283 let mut conn = cache_pool.get().await.map_err(|err| { 284 FilterError::cache_operation_failed("get", &err.to_string()) 285 })?; 286 287 let cached_data: Option<String> = AsyncCommands::get(&mut *conn, cache_key).await.map_err(|err| { 288 FilterError::cache_operation_failed("get", &err.to_string()) 289 })?; 290 291 match cached_data { 292 Some(data) => { 293 let results: FilterResults = serde_json::from_str(&data).map_err(|err| { 294 FilterError::serialization_error(&err.to_string()) 295 })?; 296 Ok(results) 297 } 298 None => Err(FilterError::cache_operation_failed("get", "Cache miss")), 299 } 300 } 301 302 /// Store results in cache (static method for spawned task) 303 async fn store_in_cache_static( 304 cache_pool: &CachePool, 305 cache_key: &str, 306 results: &FilterResults, 307 ttl_seconds: u64, 308 ) -> Result<(), FilterError> { 309 use deadpool_redis::redis::AsyncCommands; 310 311 let mut conn = cache_pool.get().await.map_err(|err| { 312 FilterError::cache_operation_failed("store", &err.to_string()) 313 })?; 314 315 let serialized = serde_json::to_string(results).map_err(|err| { 316 FilterError::serialization_error(&err.to_string()) 317 })?; 318 319 let _: () = AsyncCommands::set_ex(&mut *conn, cache_key, serialized, ttl_seconds) 320 .await 321 .map_err(|err| FilterError::cache_operation_failed("store", &err.to_string()))?; 322 323 Ok(()) 324 } 325 326 /// Filter events with facets and hydration enabled (convenience method) 327 pub async fn filter_events_full( 328 &self, 329 criteria: &EventFilterCriteria, 330 ) -> Result<FilterResults, FilterError> { 331 let options = FilterOptions { 332 include_facets: true, 333 include_hydration: true, 334 hydration_options: HydrationOptions::list_view(), 335 }; 336 337 self.filter_events(criteria, &options).await 338 } 339 340 /// Filter events with minimal processing (fast query for autocomplete, etc.) 341 pub async fn filter_events_minimal( 342 &self, 343 criteria: &EventFilterCriteria, 344 ) -> Result<FilterResults, FilterError> { 345 let options = FilterOptions { 346 include_facets: false, 347 include_hydration: false, 348 hydration_options: HydrationOptions::basic(), 349 }; 350 351 self.filter_events(criteria, &options).await 352 } 353 354 /// Filter events with minimal processing and locale support (fast query for autocomplete, etc.) 355 pub async fn filter_events_minimal_with_locale( 356 &self, 357 criteria: &EventFilterCriteria, 358 locale: &str, 359 ) -> Result<FilterResults, FilterError> { 360 let options = FilterOptions { 361 include_facets: false, 362 include_hydration: false, 363 hydration_options: HydrationOptions::basic(), 364 }; 365 366 self.filter_events_with_locale(criteria, &options, locale).await 367 } 368 369 /// Get facets only (for facet-only requests) 370 pub async fn get_facets( 371 &self, 372 criteria: &EventFilterCriteria, 373 ) -> Result<EventFacets, FilterError> { 374 // Use default locale for backward compatibility 375 let locale = "en-us".parse::<LanguageIdentifier>().unwrap_or_else(|_| { 376 "en-us".parse().expect("Failed to parse default locale") 377 }); 378 self.facet_calculator.calculate_facets_with_locale(criteria, &locale).await 379 } 380 381 /// Get facets only with locale support (for facet-only requests) 382 pub async fn get_facets_with_locale( 383 &self, 384 criteria: &EventFilterCriteria, 385 locale: &str, 386 ) -> Result<EventFacets, FilterError> { 387 let language_id = locale.parse::<LanguageIdentifier>().unwrap_or_else(|err| { 388 warn!("Failed to parse locale '{}': {}. Using default 'en-us'", locale, err); 389 "en-us".parse().expect("Failed to parse default locale") 390 }); 391 self.facet_calculator.calculate_facets_with_locale(criteria, &language_id).await 392 } 393 394 /// Validate filter criteria 395 fn validate_criteria(&self, criteria: &EventFilterCriteria) -> Result<(), FilterError> { 396 // Validate pagination 397 if criteria.page_size == 0 { 398 return Err(FilterError::invalid_pagination("Page size must be greater than 0")); 399 } 400 401 if criteria.page_size > 100 { 402 return Err(FilterError::invalid_pagination("Page size cannot exceed 100")); 403 } 404 405 // Validate date range 406 if let (Some(start), Some(end)) = (criteria.start_date, criteria.end_date) { 407 if start >= end { 408 return Err(FilterError::invalid_date_range("Start date must be before end date")); 409 } 410 } 411 412 // Validate location 413 if let Some(ref location) = criteria.location { 414 if location.latitude < -90.0 || location.latitude > 90.0 { 415 return Err(FilterError::invalid_location("Latitude must be between -90 and 90")); 416 } 417 418 if location.longitude < -180.0 || location.longitude > 180.0 { 419 return Err(FilterError::invalid_location("Longitude must be between -180 and 180")); 420 } 421 422 if location.radius_km <= 0.0 { 423 return Err(FilterError::invalid_location("Radius must be greater than 0")); 424 } 425 426 if location.radius_km > 1000.0 { 427 return Err(FilterError::invalid_location("Radius cannot exceed 1000 km")); 428 } 429 } 430 431 // Validate search term 432 if let Some(ref term) = criteria.search_term { 433 if term.len() > 200 { 434 return Err(FilterError::invalid_criteria("Search term too long (max 200 characters)")); 435 } 436 } 437 438 // Validate modes 439 if criteria.modes.len() > 10 { 440 return Err(FilterError::invalid_criteria("Too many modes (max 10)")); 441 } 442 443 // Validate statuses 444 if criteria.statuses.len() > 10 { 445 return Err(FilterError::invalid_criteria("Too many statuses (max 10)")); 446 } 447 448 Ok(()) 449 } 450 451 /// Apply RSVP-based sorting to hydrated events 452 /// This implements the Redis-based post-sorting strategy 453 fn apply_rsvp_based_sorting(&self, hydrated_events: &mut [HydratedEvent], sort_order: &SortOrder) { 454 hydrated_events.sort_by(|a, b| { 455 // Get total RSVP counts for comparison (going + interested = popularity score) 456 let a_count = a.rsvp_counts.as_ref() 457 .map(|counts| counts.going + counts.interested) 458 .unwrap_or(0); 459 let b_count = b.rsvp_counts.as_ref() 460 .map(|counts| counts.going + counts.interested) 461 .unwrap_or(0); 462 463 match sort_order { 464 SortOrder::Ascending => a_count.cmp(&b_count), 465 SortOrder::Descending => b_count.cmp(&a_count), 466 } 467 }); 468 } 469} 470 471impl FilterOptions { 472 /// Create options for a standard list view 473 pub fn list_view() -> Self { 474 Self { 475 include_facets: true, 476 include_hydration: true, 477 hydration_options: HydrationOptions::list_view(), 478 } 479 } 480 481 /// Create options for a detailed view 482 pub fn detail_view() -> Self { 483 Self { 484 include_facets: true, 485 include_hydration: true, 486 hydration_options: HydrationOptions::detail_view(), 487 } 488 } 489 490 /// Create options for minimal processing (fast queries) 491 pub fn minimal() -> Self { 492 Self { 493 include_facets: false, 494 include_hydration: false, 495 hydration_options: HydrationOptions::basic(), 496 } 497 } 498 499 /// Create options for facets only 500 pub fn facets_only() -> Self { 501 Self { 502 include_facets: true, 503 include_hydration: false, 504 hydration_options: HydrationOptions::basic(), 505 } 506 } 507} 508 509impl FilterResults { 510 /// Check if there are any results 511 pub fn is_empty(&self) -> bool { 512 self.events.is_empty() 513 } 514 515 /// Get the range of results being displayed 516 pub fn result_range(&self) -> (usize, usize) { 517 let start = self.page * self.page_size + 1; 518 let end = std::cmp::min(start + self.events.len() - 1, self.total_count as usize); 519 (start, end) 520 } 521 522 /// Calculate total number of pages 523 pub fn total_pages(&self) -> usize { 524 ((self.total_count as f64) / (self.page_size as f64)).ceil() as usize 525 } 526 527 /// Check if this is the first page 528 pub fn is_first_page(&self) -> bool { 529 self.page == 0 530 } 531 532 /// Check if this is the last page 533 pub fn is_last_page(&self) -> bool { 534 !self.has_more 535 } 536} 537 538#[cfg(test)] 539mod tests { 540 use super::*; 541 use chrono::Utc; 542 543 #[tokio::test] 544 async fn test_criteria_validation() { 545 // Create a mock pool for testing (validation doesn't need real DB) 546 let pool = PgPool::connect_lazy("postgresql://test:test@localhost/test").unwrap(); 547 let service = FilteringService::new(pool); 548 549 // Valid criteria 550 let valid_criteria = EventFilterCriteria::new(); 551 assert!(service.validate_criteria(&valid_criteria).is_ok()); 552 553 // Invalid page size 554 let invalid_criteria = EventFilterCriteria { 555 page_size: 0, 556 ..EventFilterCriteria::new() 557 }; 558 assert!(service.validate_criteria(&invalid_criteria).is_err()); 559 560 // Invalid date range 561 let now = Utc::now(); 562 let invalid_dates = EventFilterCriteria { 563 start_date: Some(now), 564 end_date: Some(now - chrono::Duration::hours(1)), 565 ..EventFilterCriteria::new() 566 }; 567 assert!(service.validate_criteria(&invalid_dates).is_err()); 568 } 569 570 #[test] 571 fn test_filter_options() { 572 let list_options = FilterOptions::list_view(); 573 assert!(list_options.include_facets); 574 assert!(list_options.include_hydration); 575 576 let minimal_options = FilterOptions::minimal(); 577 assert!(!minimal_options.include_facets); 578 assert!(!minimal_options.include_hydration); 579 } 580}