forked from
smokesignal.events/smokesignal
i18n+filtering fork - fluent-templates v2
1// Main filtering service coordination layer
2//
3// Coordinates between query builder, facet calculator, and hydrator
4// to provide a unified interface for event filtering operations.
5
6use sqlx::PgPool;
7use tracing::{instrument, trace, warn};
8use std::collections::hash_map::DefaultHasher;
9use std::hash::{Hash, Hasher};
10use unic_langid::LanguageIdentifier;
11
12use super::{
13 EventFilterCriteria, EventFacets, EventQueryBuilder, FacetCalculator, FilterError,
14 EventHydrator, HydratedEvent, HydrationOptions, EventSortField, SortOrder,
15};
16use crate::storage::event::model::Event;
17use crate::storage::CachePool;
18
19/// Configuration for filtering service
20#[derive(Debug, Clone)]
21pub struct FilterConfig {
22 /// Cache TTL in seconds (default: 5 minutes)
23 pub cache_ttl: u64,
24 /// Enable caching
25 pub enable_cache: bool,
26}
27
28impl Default for FilterConfig {
29 fn default() -> Self {
30 Self {
31 cache_ttl: 300, // 5 minutes
32 enable_cache: true,
33 }
34 }
35}
36
37/// Main filtering service that coordinates all filtering operations
38#[derive(Debug, Clone)]
39pub struct FilteringService {
40 #[allow(dead_code)]
41 pool: PgPool,
42 cache_pool: Option<CachePool>,
43 config: FilterConfig,
44 query_builder: EventQueryBuilder,
45 facet_calculator: FacetCalculator,
46 hydrator: EventHydrator,
47}
48
49/// Result structure containing filtered events and facets
50#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
51pub struct FilterResults {
52 pub events: Vec<Event>,
53 pub hydrated_events: Vec<HydratedEvent>,
54 pub facets: EventFacets,
55 pub total_count: i64,
56 pub page: usize,
57 pub page_size: usize,
58 pub has_more: bool,
59}
60
61/// Options for filtering operations
62#[derive(Debug, Clone, Default)]
63pub struct FilterOptions {
64 /// Whether to calculate facets (can be expensive)
65 pub include_facets: bool,
66
67 /// Whether to hydrate events with additional data
68 pub include_hydration: bool,
69
70 /// Hydration options if hydration is enabled
71 pub hydration_options: HydrationOptions,
72}
73
74impl FilteringService {
75 /// Create a new filtering service
76 pub fn new(pool: PgPool) -> Self {
77 Self::new_with_cache(pool, None, FilterConfig::default())
78 }
79
80 /// Create a new filtering service with cache support
81 pub fn new_with_cache(pool: PgPool, cache_pool: Option<CachePool>, config: FilterConfig) -> Self {
82 let query_builder = EventQueryBuilder::new(pool.clone());
83 let facet_calculator = FacetCalculator::new(pool.clone());
84 let hydrator = EventHydrator::new(pool.clone());
85
86 Self {
87 pool,
88 cache_pool,
89 config,
90 query_builder,
91 facet_calculator,
92 hydrator,
93 }
94 }
95
96 /// Filter events with the given criteria
97 #[instrument(skip(self, criteria, options), fields(
98 search_term = ?criteria.search_term,
99 page = criteria.page,
100 include_facets = options.include_facets,
101 include_hydration = options.include_hydration
102 ))]
103 pub async fn filter_events(
104 &self,
105 criteria: &EventFilterCriteria,
106 options: &FilterOptions,
107 ) -> Result<FilterResults, FilterError> {
108 self.filter_events_with_locale(criteria, options, "en").await
109 }
110
111 /// Filter events with the given criteria and locale for cache key generation
112 #[instrument(skip(self, criteria, options), fields(
113 search_term = ?criteria.search_term,
114 page = criteria.page,
115 include_facets = options.include_facets,
116 include_hydration = options.include_hydration,
117 locale = locale
118 ))]
119 pub async fn filter_events_with_locale(
120 &self,
121 criteria: &EventFilterCriteria,
122 options: &FilterOptions,
123 locale: &str,
124 ) -> Result<FilterResults, FilterError> {
125 // Validate criteria
126 self.validate_criteria(criteria)?;
127
128 // Try cache first if enabled
129 if self.config.enable_cache && self.cache_pool.is_some() {
130 let cache_key = self.generate_cache_key(criteria, options, locale);
131 if let Ok(cached_results) = self.get_from_cache(&cache_key).await {
132 trace!("Cache hit for filter results: {}", cache_key);
133 return Ok(cached_results);
134 }
135 trace!("Cache miss for filter results: {}", cache_key);
136 }
137
138 // Cache miss or caching disabled - perform full filtering operation
139 let results = self.filter_events_uncached_with_locale(criteria, options, locale).await?;
140
141 // Store results in cache if enabled (fire and forget)
142 if self.config.enable_cache && self.cache_pool.is_some() {
143 let cache_key = self.generate_cache_key(criteria, options, locale);
144 let cache_pool = self.cache_pool.clone().unwrap();
145 let results_clone = results.clone();
146 let cache_ttl = self.config.cache_ttl;
147
148 tokio::spawn(async move {
149 if let Err(err) = Self::store_in_cache_static(&cache_pool, &cache_key, &results_clone, cache_ttl).await {
150 warn!(error = ?err, cache_key = %cache_key, "Failed to store results in cache");
151 }
152 });
153 }
154
155 Ok(results)
156 }
157
158 /// Filter events without caching
159 #[allow(dead_code)]
160 async fn filter_events_uncached(
161 &self,
162 criteria: &EventFilterCriteria,
163 options: &FilterOptions,
164 ) -> Result<FilterResults, FilterError> {
165 // Get filtered events
166 let events = self.query_builder.build_and_execute(criteria).await?;
167 trace!("Found {} events matching criteria", events.len());
168
169 // Get total count for pagination
170 let total_count = self.query_builder.count_results(criteria).await?;
171
172 // Calculate facets if requested (uses default en-us locale for backward compatibility)
173 let facets = if options.include_facets {
174 let locale = "en-us".parse::<LanguageIdentifier>().unwrap_or_else(|_| {
175 "en-us".parse().expect("Failed to parse default locale")
176 });
177 self.facet_calculator.calculate_facets_with_locale(criteria, &locale).await?
178 } else {
179 EventFacets::default()
180 };
181
182 // Hydrate events if requested
183 let mut hydrated_events = if options.include_hydration {
184 self.hydrator.hydrate_events(events.clone(), &options.hydration_options).await?
185 } else {
186 Vec::new()
187 };
188
189 // Apply post-filtering sorting for PopularityRsvp if needed
190 if options.include_hydration && criteria.sort_by == EventSortField::PopularityRsvp {
191 self.apply_rsvp_based_sorting(&mut hydrated_events, &criteria.sort_order);
192 }
193
194 // Calculate pagination info
195 let has_more = (criteria.page + 1) * criteria.page_size < total_count as usize;
196
197 Ok(FilterResults {
198 events,
199 hydrated_events,
200 facets,
201 total_count,
202 page: criteria.page,
203 page_size: criteria.page_size,
204 has_more,
205 })
206 }
207
208 /// Filter events without caching with locale-aware facet calculation
209 async fn filter_events_uncached_with_locale(
210 &self,
211 criteria: &EventFilterCriteria,
212 options: &FilterOptions,
213 locale: &str,
214 ) -> Result<FilterResults, FilterError> {
215 // Get filtered events
216 let events = self.query_builder.build_and_execute(criteria).await?;
217 trace!("Found {} events matching criteria", events.len());
218
219 // Get total count for pagination
220 let total_count = self.query_builder.count_results(criteria).await?;
221
222 // Calculate facets if requested with locale support
223 let facets = if options.include_facets {
224 let language_id = locale.parse::<LanguageIdentifier>().unwrap_or_else(|err| {
225 warn!("Failed to parse locale '{}': {}. Using default 'en-us'", locale, err);
226 "en-us".parse().expect("Failed to parse default locale")
227 });
228 self.facet_calculator.calculate_facets_with_locale(criteria, &language_id).await?
229 } else {
230 EventFacets::default()
231 };
232
233 // Hydrate events if requested
234 let mut hydrated_events = if options.include_hydration {
235 self.hydrator.hydrate_events_with_locale(events.clone(), &options.hydration_options, Some(locale)).await?
236 } else {
237 Vec::new()
238 };
239
240 // Apply post-filtering sorting for PopularityRsvp if needed
241 if options.include_hydration && criteria.sort_by == EventSortField::PopularityRsvp {
242 self.apply_rsvp_based_sorting(&mut hydrated_events, &criteria.sort_order);
243 }
244
245 // Calculate pagination info
246 let has_more = (criteria.page + 1) * criteria.page_size < total_count as usize;
247
248 Ok(FilterResults {
249 events,
250 hydrated_events,
251 facets,
252 total_count,
253 page: criteria.page,
254 page_size: criteria.page_size,
255 has_more,
256 })
257 }
258
259 /// Generate cache key for the given criteria, options, and locale
260 fn generate_cache_key(&self, criteria: &EventFilterCriteria, options: &FilterOptions, locale: &str) -> String {
261 let criteria_hash = criteria.cache_hash();
262 let options_hash = self.hash_filter_options(options);
263 format!("events:filter:{}:{}:{}", locale, criteria_hash, options_hash)
264 }
265
266 /// Generate hash for filter options
267 fn hash_filter_options(&self, options: &FilterOptions) -> u64 {
268 let mut hasher = DefaultHasher::new();
269 options.include_facets.hash(&mut hasher);
270 options.include_hydration.hash(&mut hasher);
271 // Note: We could also hash hydration_options if needed for more granular caching
272 hasher.finish()
273 }
274
275 /// Try to get results from cache
276 async fn get_from_cache(&self, cache_key: &str) -> Result<FilterResults, FilterError> {
277 use deadpool_redis::redis::AsyncCommands;
278
279 let cache_pool = self.cache_pool.as_ref().ok_or_else(|| {
280 FilterError::cache_operation_failed("get", "Cache pool not available")
281 })?;
282
283 let mut conn = cache_pool.get().await.map_err(|err| {
284 FilterError::cache_operation_failed("get", &err.to_string())
285 })?;
286
287 let cached_data: Option<String> = AsyncCommands::get(&mut *conn, cache_key).await.map_err(|err| {
288 FilterError::cache_operation_failed("get", &err.to_string())
289 })?;
290
291 match cached_data {
292 Some(data) => {
293 let results: FilterResults = serde_json::from_str(&data).map_err(|err| {
294 FilterError::serialization_error(&err.to_string())
295 })?;
296 Ok(results)
297 }
298 None => Err(FilterError::cache_operation_failed("get", "Cache miss")),
299 }
300 }
301
302 /// Store results in cache (static method for spawned task)
303 async fn store_in_cache_static(
304 cache_pool: &CachePool,
305 cache_key: &str,
306 results: &FilterResults,
307 ttl_seconds: u64,
308 ) -> Result<(), FilterError> {
309 use deadpool_redis::redis::AsyncCommands;
310
311 let mut conn = cache_pool.get().await.map_err(|err| {
312 FilterError::cache_operation_failed("store", &err.to_string())
313 })?;
314
315 let serialized = serde_json::to_string(results).map_err(|err| {
316 FilterError::serialization_error(&err.to_string())
317 })?;
318
319 let _: () = AsyncCommands::set_ex(&mut *conn, cache_key, serialized, ttl_seconds)
320 .await
321 .map_err(|err| FilterError::cache_operation_failed("store", &err.to_string()))?;
322
323 Ok(())
324 }
325
326 /// Filter events with facets and hydration enabled (convenience method)
327 pub async fn filter_events_full(
328 &self,
329 criteria: &EventFilterCriteria,
330 ) -> Result<FilterResults, FilterError> {
331 let options = FilterOptions {
332 include_facets: true,
333 include_hydration: true,
334 hydration_options: HydrationOptions::list_view(),
335 };
336
337 self.filter_events(criteria, &options).await
338 }
339
340 /// Filter events with minimal processing (fast query for autocomplete, etc.)
341 pub async fn filter_events_minimal(
342 &self,
343 criteria: &EventFilterCriteria,
344 ) -> Result<FilterResults, FilterError> {
345 let options = FilterOptions {
346 include_facets: false,
347 include_hydration: false,
348 hydration_options: HydrationOptions::basic(),
349 };
350
351 self.filter_events(criteria, &options).await
352 }
353
354 /// Filter events with minimal processing and locale support (fast query for autocomplete, etc.)
355 pub async fn filter_events_minimal_with_locale(
356 &self,
357 criteria: &EventFilterCriteria,
358 locale: &str,
359 ) -> Result<FilterResults, FilterError> {
360 let options = FilterOptions {
361 include_facets: false,
362 include_hydration: false,
363 hydration_options: HydrationOptions::basic(),
364 };
365
366 self.filter_events_with_locale(criteria, &options, locale).await
367 }
368
369 /// Get facets only (for facet-only requests)
370 pub async fn get_facets(
371 &self,
372 criteria: &EventFilterCriteria,
373 ) -> Result<EventFacets, FilterError> {
374 // Use default locale for backward compatibility
375 let locale = "en-us".parse::<LanguageIdentifier>().unwrap_or_else(|_| {
376 "en-us".parse().expect("Failed to parse default locale")
377 });
378 self.facet_calculator.calculate_facets_with_locale(criteria, &locale).await
379 }
380
381 /// Get facets only with locale support (for facet-only requests)
382 pub async fn get_facets_with_locale(
383 &self,
384 criteria: &EventFilterCriteria,
385 locale: &str,
386 ) -> Result<EventFacets, FilterError> {
387 let language_id = locale.parse::<LanguageIdentifier>().unwrap_or_else(|err| {
388 warn!("Failed to parse locale '{}': {}. Using default 'en-us'", locale, err);
389 "en-us".parse().expect("Failed to parse default locale")
390 });
391 self.facet_calculator.calculate_facets_with_locale(criteria, &language_id).await
392 }
393
394 /// Validate filter criteria
395 fn validate_criteria(&self, criteria: &EventFilterCriteria) -> Result<(), FilterError> {
396 // Validate pagination
397 if criteria.page_size == 0 {
398 return Err(FilterError::invalid_pagination("Page size must be greater than 0"));
399 }
400
401 if criteria.page_size > 100 {
402 return Err(FilterError::invalid_pagination("Page size cannot exceed 100"));
403 }
404
405 // Validate date range
406 if let (Some(start), Some(end)) = (criteria.start_date, criteria.end_date) {
407 if start >= end {
408 return Err(FilterError::invalid_date_range("Start date must be before end date"));
409 }
410 }
411
412 // Validate location
413 if let Some(ref location) = criteria.location {
414 if location.latitude < -90.0 || location.latitude > 90.0 {
415 return Err(FilterError::invalid_location("Latitude must be between -90 and 90"));
416 }
417
418 if location.longitude < -180.0 || location.longitude > 180.0 {
419 return Err(FilterError::invalid_location("Longitude must be between -180 and 180"));
420 }
421
422 if location.radius_km <= 0.0 {
423 return Err(FilterError::invalid_location("Radius must be greater than 0"));
424 }
425
426 if location.radius_km > 1000.0 {
427 return Err(FilterError::invalid_location("Radius cannot exceed 1000 km"));
428 }
429 }
430
431 // Validate search term
432 if let Some(ref term) = criteria.search_term {
433 if term.len() > 200 {
434 return Err(FilterError::invalid_criteria("Search term too long (max 200 characters)"));
435 }
436 }
437
438 // Validate modes
439 if criteria.modes.len() > 10 {
440 return Err(FilterError::invalid_criteria("Too many modes (max 10)"));
441 }
442
443 // Validate statuses
444 if criteria.statuses.len() > 10 {
445 return Err(FilterError::invalid_criteria("Too many statuses (max 10)"));
446 }
447
448 Ok(())
449 }
450
451 /// Apply RSVP-based sorting to hydrated events
452 /// This implements the Redis-based post-sorting strategy
453 fn apply_rsvp_based_sorting(&self, hydrated_events: &mut [HydratedEvent], sort_order: &SortOrder) {
454 hydrated_events.sort_by(|a, b| {
455 // Get total RSVP counts for comparison (going + interested = popularity score)
456 let a_count = a.rsvp_counts.as_ref()
457 .map(|counts| counts.going + counts.interested)
458 .unwrap_or(0);
459 let b_count = b.rsvp_counts.as_ref()
460 .map(|counts| counts.going + counts.interested)
461 .unwrap_or(0);
462
463 match sort_order {
464 SortOrder::Ascending => a_count.cmp(&b_count),
465 SortOrder::Descending => b_count.cmp(&a_count),
466 }
467 });
468 }
469}
470
471impl FilterOptions {
472 /// Create options for a standard list view
473 pub fn list_view() -> Self {
474 Self {
475 include_facets: true,
476 include_hydration: true,
477 hydration_options: HydrationOptions::list_view(),
478 }
479 }
480
481 /// Create options for a detailed view
482 pub fn detail_view() -> Self {
483 Self {
484 include_facets: true,
485 include_hydration: true,
486 hydration_options: HydrationOptions::detail_view(),
487 }
488 }
489
490 /// Create options for minimal processing (fast queries)
491 pub fn minimal() -> Self {
492 Self {
493 include_facets: false,
494 include_hydration: false,
495 hydration_options: HydrationOptions::basic(),
496 }
497 }
498
499 /// Create options for facets only
500 pub fn facets_only() -> Self {
501 Self {
502 include_facets: true,
503 include_hydration: false,
504 hydration_options: HydrationOptions::basic(),
505 }
506 }
507}
508
509impl FilterResults {
510 /// Check if there are any results
511 pub fn is_empty(&self) -> bool {
512 self.events.is_empty()
513 }
514
515 /// Get the range of results being displayed
516 pub fn result_range(&self) -> (usize, usize) {
517 let start = self.page * self.page_size + 1;
518 let end = std::cmp::min(start + self.events.len() - 1, self.total_count as usize);
519 (start, end)
520 }
521
522 /// Calculate total number of pages
523 pub fn total_pages(&self) -> usize {
524 ((self.total_count as f64) / (self.page_size as f64)).ceil() as usize
525 }
526
527 /// Check if this is the first page
528 pub fn is_first_page(&self) -> bool {
529 self.page == 0
530 }
531
532 /// Check if this is the last page
533 pub fn is_last_page(&self) -> bool {
534 !self.has_more
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541 use chrono::Utc;
542
543 #[tokio::test]
544 async fn test_criteria_validation() {
545 // Create a mock pool for testing (validation doesn't need real DB)
546 let pool = PgPool::connect_lazy("postgresql://test:test@localhost/test").unwrap();
547 let service = FilteringService::new(pool);
548
549 // Valid criteria
550 let valid_criteria = EventFilterCriteria::new();
551 assert!(service.validate_criteria(&valid_criteria).is_ok());
552
553 // Invalid page size
554 let invalid_criteria = EventFilterCriteria {
555 page_size: 0,
556 ..EventFilterCriteria::new()
557 };
558 assert!(service.validate_criteria(&invalid_criteria).is_err());
559
560 // Invalid date range
561 let now = Utc::now();
562 let invalid_dates = EventFilterCriteria {
563 start_date: Some(now),
564 end_date: Some(now - chrono::Duration::hours(1)),
565 ..EventFilterCriteria::new()
566 };
567 assert!(service.validate_criteria(&invalid_dates).is_err());
568 }
569
570 #[test]
571 fn test_filter_options() {
572 let list_options = FilterOptions::list_view();
573 assert!(list_options.include_facets);
574 assert!(list_options.include_hydration);
575
576 let minimal_options = FilterOptions::minimal();
577 assert!(!minimal_options.include_facets);
578 assert!(!minimal_options.include_hydration);
579 }
580}