i18n+filtering fork - fluent-templates v2
at main 995 lines 32 kB view raw
1use std::borrow::Cow; 2use std::collections::HashMap; 3 4use anyhow::Result; 5use chrono::Utc; 6use serde_json::json; 7use sqlx::{Postgres, QueryBuilder}; 8 9use crate::atproto::lexicon::community::lexicon::calendar::event::Event as EventLexicon; 10use crate::atproto::lexicon::community::lexicon::calendar::rsvp::{ 11 Rsvp as RsvpLexicon, RsvpStatus as RsvpStatusLexicon, 12}; 13 14use super::errors::StorageError; 15use super::StoragePool; 16use model::{Event, EventWithRole, Rsvp}; 17 18pub mod model { 19 use chrono::{DateTime, Utc}; 20 use serde::{Deserialize, Serialize}; 21 use sqlx::FromRow; 22 23 #[derive(Clone, FromRow, Deserialize, Serialize, Debug)] 24 pub struct Event { 25 pub aturi: String, 26 pub cid: String, 27 28 pub did: String, 29 pub lexicon: String, 30 31 pub record: sqlx::types::Json<serde_json::Value>, 32 33 pub name: String, 34 35 pub updated_at: Option<DateTime<Utc>>, 36 } 37 38 #[derive(Clone, FromRow, Debug, Serialize)] 39 pub struct EventWithRole { 40 #[sqlx(flatten)] 41 pub event: Event, 42 43 pub role: String, 44 // pub event_handle: String, 45 } 46 47 #[derive(Clone, FromRow, Deserialize, Serialize, Debug)] 48 pub struct Rsvp { 49 pub aturi: String, 50 pub cid: String, 51 52 pub did: String, 53 pub lexicon: String, 54 55 pub record: sqlx::types::Json<serde_json::Value>, 56 57 pub event_aturi: String, 58 pub event_cid: String, 59 pub status: String, 60 61 pub updated_at: Option<DateTime<Utc>>, 62 } 63} 64 65pub async fn event_insert( 66 pool: &StoragePool, 67 aturi: &str, 68 cid: &str, 69 did: &str, 70 lexicon: &str, 71 record: &EventLexicon, 72) -> Result<(), StorageError> { 73 // Extract name from the record 74 let name = match record { 75 EventLexicon::Current { name, .. } => name, 76 }; 77 78 // Call the new function with extracted values 79 event_insert_with_metadata(pool, aturi, cid, did, lexicon, record, name).await 80} 81 82pub async fn event_insert_with_metadata<T: serde::Serialize>( 83 pool: &StoragePool, 84 aturi: &str, 85 cid: &str, 86 did: &str, 87 lexicon: &str, 88 record: &T, 89 name: &str, 90) -> Result<(), StorageError> { 91 let mut tx = pool 92 .begin() 93 .await 94 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 95 96 let now = Utc::now(); 97 98 sqlx::query("INSERT INTO events (aturi, cid, did, lexicon, record, name, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7)") 99 .bind(aturi) 100 .bind(cid) 101 .bind(did) 102 .bind(lexicon) 103 .bind(json!(record)) 104 .bind(name) 105 .bind(now) 106 .execute(tx.as_mut()) 107 .await 108 .map_err(StorageError::UnableToExecuteQuery)?; 109 110 tx.commit() 111 .await 112 .map_err(StorageError::CannotCommitDatabaseTransaction) 113} 114 115pub struct RsvpInsertParams<'a, T: serde::Serialize> { 116 pub aturi: &'a str, 117 pub cid: &'a str, 118 pub did: &'a str, 119 pub lexicon: &'a str, 120 pub record: &'a T, 121 pub event_aturi: &'a str, 122 pub event_cid: &'a str, 123 pub status: &'a str, 124} 125 126pub async fn rsvp_insert_with_metadata<T: serde::Serialize>( 127 pool: &StoragePool, 128 params: RsvpInsertParams<'_, T>, 129) -> Result<(), StorageError> { 130 let mut tx = pool 131 .begin() 132 .await 133 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 134 135 let now = Utc::now(); 136 137 sqlx::query("INSERT INTO rsvps (aturi, cid, did, lexicon, record, event_aturi, event_cid, status, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (aturi) DO UPDATE SET record = $5, cid = $2, status = $8, updated_at = $9") 138 .bind(params.aturi) 139 .bind(params.cid) 140 .bind(params.did) 141 .bind(params.lexicon) 142 .bind(json!(params.record)) 143 .bind(params.event_aturi) 144 .bind(params.event_cid) 145 .bind(params.status) 146 .bind(now) 147 .execute(tx.as_mut()) 148 .await 149 .map_err(StorageError::UnableToExecuteQuery)?; 150 151 tx.commit() 152 .await 153 .map_err(StorageError::CannotCommitDatabaseTransaction) 154} 155 156pub async fn rsvp_insert( 157 pool: &StoragePool, 158 aturi: &str, 159 cid: &str, 160 did: &str, 161 lexicon: &str, 162 record: &RsvpLexicon, 163) -> Result<(), StorageError> { 164 // Extract the metadata from the record 165 let (event_aturi, event_cid, status) = match record { 166 RsvpLexicon::Current { 167 subject, status, .. 168 } => { 169 let event_aturi = subject.uri.clone(); 170 let event_cid = subject.cid.clone(); 171 let status = match status { 172 RsvpStatusLexicon::Going => "going", 173 RsvpStatusLexicon::Interested => "interested", 174 RsvpStatusLexicon::NotGoing => "notgoing", 175 }; 176 (event_aturi, event_cid, status) 177 } 178 }; 179 180 // Call the generic function with extracted values 181 rsvp_insert_with_metadata( 182 pool, 183 RsvpInsertParams { 184 aturi, 185 cid, 186 did, 187 lexicon, 188 record, 189 event_aturi: &event_aturi, 190 event_cid: &event_cid, 191 status, 192 }, 193 ) 194 .await 195} 196 197// Helper function to extract event information based on lexicon type 198// Helper function to format address information into a readable string 199pub fn format_address( 200 address: &crate::atproto::lexicon::community::lexicon::location::Address, 201) -> String { 202 match address { 203 crate::atproto::lexicon::community::lexicon::location::Address::Current { 204 country, 205 postal_code, 206 region, 207 locality, 208 street, 209 name, 210 } => { 211 let mut parts = Vec::new(); 212 213 // Add parts in specified order, omitting empty values 214 if let Some(name_val) = name { 215 if !name_val.trim().is_empty() { 216 parts.push(name_val.clone()); 217 } 218 } 219 220 if let Some(street_val) = street { 221 if !street_val.trim().is_empty() { 222 parts.push(street_val.clone()); 223 } 224 } 225 226 if let Some(locality_val) = locality { 227 if !locality_val.trim().is_empty() { 228 parts.push(locality_val.clone()); 229 } 230 } 231 232 if let Some(region_val) = region { 233 if !region_val.trim().is_empty() { 234 parts.push(region_val.clone()); 235 } 236 } 237 238 if let Some(postal_val) = postal_code { 239 if !postal_val.trim().is_empty() { 240 parts.push(postal_val.clone()); 241 } 242 } 243 244 // Country is required so no need to check if it's empty 245 parts.push(country.clone()); 246 247 // Join parts with commas 248 parts.join(", ") 249 } 250 } 251} 252 253pub fn extract_event_details(event: &Event) -> EventDetails { 254 use crate::atproto::lexicon::{ 255 community::lexicon::calendar::event::{Event as CommunityEvent, Mode, Status}, 256 events::smokesignal::calendar::event::Event as SmokeSignalEvent, 257 }; 258 259 // Try to parse the record based on the lexicon 260 match event.lexicon.as_str() { 261 "community.lexicon.calendar.event" => { 262 if let Ok(community_event) = 263 serde_json::from_value::<CommunityEvent>(event.record.0.clone()) 264 { 265 match community_event { 266 CommunityEvent::Current { 267 name, 268 description, 269 created_at, 270 starts_at, 271 ends_at, 272 mode, 273 status, 274 locations, 275 uris, 276 .. 277 } => EventDetails { 278 name: Cow::Owned(name.clone()), 279 description: Cow::Owned(description.clone()), 280 created_at: Some(created_at), 281 starts_at, 282 ends_at, 283 mode: mode.map(|m| match m { 284 Mode::InPerson => { 285 Cow::Borrowed("community.lexicon.calendar.event#inperson") 286 } 287 Mode::Virtual => { 288 Cow::Borrowed("community.lexicon.calendar.event#virtual") 289 } 290 Mode::Hybrid => { 291 Cow::Borrowed("community.lexicon.calendar.event#hybrid") 292 } 293 }), 294 status: status.map(|s| match s { 295 Status::Scheduled => { 296 Cow::Borrowed("community.lexicon.calendar.event#scheduled") 297 } 298 Status::Rescheduled => { 299 Cow::Borrowed("community.lexicon.calendar.event#rescheduled") 300 } 301 Status::Cancelled => { 302 Cow::Borrowed("community.lexicon.calendar.event#cancelled") 303 } 304 Status::Postponed => { 305 Cow::Borrowed("community.lexicon.calendar.event#postponed") 306 } 307 Status::Planned => { 308 Cow::Borrowed("community.lexicon.calendar.event#planned") 309 } 310 }), 311 locations, 312 uris, 313 }, 314 } 315 } else { 316 // Fallback to the event's direct name if parsing fails 317 EventDetails { 318 name: Cow::Owned(event.name.clone()), 319 description: Cow::Borrowed(""), 320 created_at: None, 321 starts_at: None, 322 ends_at: None, 323 mode: None, 324 status: None, 325 locations: vec![], 326 uris: vec![], 327 } 328 } 329 } 330 "events.smokesignal.calendar.event" => { 331 if let Ok(ss_event) = serde_json::from_value::<SmokeSignalEvent>(event.record.0.clone()) 332 { 333 match ss_event { 334 SmokeSignalEvent::Current { 335 name, 336 text, 337 created_at, 338 starts_at, 339 extra, 340 .. 341 } => { 342 // Extract additional fields from extra map 343 let ends_at = extra 344 .get("endsAt") 345 .and_then(|v| v.as_str()) 346 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) 347 .map(|dt| dt.with_timezone(&chrono::Utc)); 348 349 let mode = extra 350 .get("mode") 351 .and_then(|v| v.as_str().map(ToString::to_string)); 352 let status = extra 353 .get("status") 354 .and_then(|v| v.as_str().map(ToString::to_string)); 355 356 // Convert locations to the same format used by community.lexicon.calendar.event 357 // Process locations from extra data if available 358 let locations = Vec::new(); 359 360 // Extract links from location data 361 let mut uris = Vec::new(); 362 363 // Check for virtual locations in the location array 364 if let Some(location_value) = extra.get("location") { 365 if let Some(location_array) = location_value.as_array() { 366 for loc in location_array { 367 if let Some(loc_type) = loc.get("$type") { 368 if let Some(loc_type_str) = loc_type.as_str() { 369 // Handle virtual locations as links 370 if loc_type_str 371 == "events.smokesignal.calendar.location#virtual" 372 { 373 if let (Some(url), Some(name)) = ( 374 loc.get("url").and_then(|u| u.as_str()), 375 loc.get("name").and_then(|n| n.as_str()), 376 ) { 377 uris.push(crate::atproto::lexicon::community::lexicon::calendar::event::EventLink::Current { 378 uri: url.to_string(), 379 name: Some(name.to_string()), 380 }); 381 } 382 } 383 } 384 } 385 } 386 } 387 } 388 389 // Also check for any additional URIs in the extra map 390 if let Some(links_value) = extra.get("links") { 391 if let Some(links_array) = links_value.as_array() { 392 for link in links_array { 393 if let (Some(uri), Some(name)) = ( 394 link.get("uri").and_then(|u| u.as_str()), 395 link.get("name").and_then(|n| n.as_str()), 396 ) { 397 uris.push(crate::atproto::lexicon::community::lexicon::calendar::event::EventLink::Current { 398 uri: uri.to_string(), 399 name: Some(name.to_string()), 400 }); 401 } else if let Some(uri) = 402 link.get("uri").and_then(|u| u.as_str()) 403 { 404 uris.push(crate::atproto::lexicon::community::lexicon::calendar::event::EventLink::Current { 405 uri: uri.to_string(), 406 name: None, 407 }); 408 } 409 } 410 } 411 } 412 413 EventDetails { 414 name: Cow::Owned(name.clone()), 415 description: Cow::Owned(text.clone().unwrap_or_default()), 416 created_at, 417 starts_at, 418 ends_at: ends_at.map(Some).unwrap_or(None), 419 mode: mode.map(Cow::Owned), 420 status: status.map(Cow::Owned), 421 locations, 422 uris, 423 } 424 } 425 } 426 } else { 427 // Fallback to the event's direct name if parsing fails 428 EventDetails { 429 name: Cow::Owned(event.name.clone()), 430 description: Cow::Borrowed(""), 431 created_at: None, 432 starts_at: None, 433 ends_at: None, 434 mode: None, 435 status: None, 436 locations: vec![], 437 uris: vec![], 438 } 439 } 440 } 441 _ => { 442 // Unknown event type - use the stored name 443 EventDetails { 444 name: Cow::Owned(event.name.clone()), 445 description: Cow::Borrowed(""), 446 created_at: None, 447 starts_at: None, 448 ends_at: None, 449 mode: None, 450 status: None, 451 locations: vec![], 452 uris: vec![], 453 } 454 } 455 } 456} 457 458// Structure to hold extracted event details regardless of source format 459#[derive(Debug, Clone)] 460pub struct EventDetails { 461 pub name: Cow<'static, str>, 462 pub description: Cow<'static, str>, 463 pub created_at: Option<chrono::DateTime<chrono::Utc>>, 464 pub starts_at: Option<chrono::DateTime<chrono::Utc>>, 465 pub ends_at: Option<chrono::DateTime<chrono::Utc>>, 466 pub mode: Option<Cow<'static, str>>, 467 pub status: Option<Cow<'static, str>>, 468 pub locations: Vec<crate::atproto::lexicon::community::lexicon::calendar::event::EventLocation>, 469 pub uris: Vec<crate::atproto::lexicon::community::lexicon::calendar::event::EventLink>, 470} 471 472pub async fn event_get(pool: &StoragePool, aturi: &str) -> Result<Event, StorageError> { 473 // Validate aturi is not empty 474 if aturi.trim().is_empty() { 475 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 476 "Event URI cannot be empty".into(), 477 ))); 478 } 479 480 let mut tx = pool 481 .begin() 482 .await 483 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 484 485 let record = sqlx::query_as::<_, Event>("SELECT * FROM events WHERE aturi = $1") 486 .bind(aturi) 487 .fetch_one(tx.as_mut()) 488 .await 489 .map_err(|err| match err { 490 sqlx::Error::RowNotFound => StorageError::RowNotFound("event".to_string(), err), 491 other => StorageError::UnableToExecuteQuery(other), 492 })?; 493 494 tx.commit() 495 .await 496 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 497 498 Ok(record) 499} 500 501pub async fn event_exists(pool: &StoragePool, aturi: &str) -> Result<bool, StorageError> { 502 // Validate aturi is not empty 503 if aturi.trim().is_empty() { 504 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 505 "Event URI cannot be empty".into(), 506 ))); 507 } 508 509 let mut tx = pool 510 .begin() 511 .await 512 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 513 514 let total_count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM events WHERE aturi = $1") 515 .bind(aturi) 516 .fetch_one(tx.as_mut()) 517 .await 518 .map_err(StorageError::UnableToExecuteQuery)?; 519 520 tx.commit() 521 .await 522 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 523 524 Ok(total_count > 0) 525} 526 527pub async fn event_get_cid( 528 pool: &StoragePool, 529 aturi: &str, 530) -> Result<Option<String>, StorageError> { 531 // Validate aturi is not empty 532 if aturi.trim().is_empty() { 533 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 534 "Event URI cannot be empty".into(), 535 ))); 536 } 537 538 let mut tx = pool 539 .begin() 540 .await 541 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 542 543 let record = sqlx::query_scalar::<_, String>("SELECT cid FROM events WHERE aturi = $1") 544 .bind(aturi) 545 .fetch_optional(tx.as_mut()) 546 .await 547 .map_err(StorageError::UnableToExecuteQuery)?; 548 549 tx.commit() 550 .await 551 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 552 553 Ok(record) 554} 555 556pub async fn event_list_did_recently_updated( 557 pool: &StoragePool, 558 did: &str, 559 page: i64, 560 page_size: i64, 561) -> Result<Vec<EventWithRole>, StorageError> { 562 // Validate did is not empty 563 if did.trim().is_empty() { 564 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 565 "DID cannot be empty".into(), 566 ))); 567 } 568 569 // Validate page and page_size are positive 570 if page < 1 || page_size < 1 { 571 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 572 "Page and page size must be positive".into(), 573 ))); 574 } 575 576 let mut tx = pool 577 .begin() 578 .await 579 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 580 581 let offset = (page - 1) * page_size; 582 583 let events_query = r"SELECT 584 events.*, 585 'organizer' as role 586FROM 587 events 588WHERE 589 events.did = $1 590ORDER BY 591 events.updated_at DESC, 592 events.aturi ASC 593LIMIT 594$2 595OFFSET 596$3 597"; 598 599 let event_roles = sqlx::query_as::<_, EventWithRole>(events_query) 600 .bind(did) 601 .bind(page_size + 1) 602 .bind(offset) 603 .fetch_all(tx.as_mut()) 604 .await 605 .map_err(StorageError::UnableToExecuteQuery)?; 606 607 tx.commit() 608 .await 609 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 610 611 Ok(event_roles) 612} 613 614pub async fn event_list_recently_updated( 615 pool: &StoragePool, 616 page: i64, 617 page_size: i64, 618) -> Result<Vec<EventWithRole>, StorageError> { 619 // Validate page and page_size are positive 620 if page < 1 || page_size < 1 { 621 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 622 "Page and page size must be positive".into(), 623 ))); 624 } 625 626 let mut tx = pool 627 .begin() 628 .await 629 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 630 631 let offset = (page - 1) * page_size; 632 633 let events_query = r"SELECT 634 events.*, 635 'organizer' as role 636 FROM 637 events 638 ORDER BY 639 events.updated_at DESC, 640 events.aturi ASC 641 LIMIT $1 642 OFFSET $2"; 643 644 let event_roles = sqlx::query_as::<_, EventWithRole>(events_query) 645 .bind(page_size + 1) 646 .bind(offset) 647 .fetch_all(tx.as_mut()) 648 .await 649 .map_err(StorageError::UnableToExecuteQuery)?; 650 651 tx.commit() 652 .await 653 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 654 655 Ok(event_roles) 656} 657 658pub async fn get_event_rsvps( 659 pool: &StoragePool, 660 event_aturi: &str, 661 status: Option<&str>, 662) -> Result<Vec<(String, String)>, StorageError> { 663 // Validate event_aturi is not empty 664 if event_aturi.trim().is_empty() { 665 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 666 "Event URI cannot be empty".into(), 667 ))); 668 } 669 670 // If status is provided, validate it's not empty 671 if let Some(status_val) = status { 672 if status_val.trim().is_empty() { 673 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 674 "Status cannot be empty".into(), 675 ))); 676 } 677 } 678 679 let mut tx = pool 680 .begin() 681 .await 682 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 683 684 let query = if status.is_some() { 685 "SELECT did, status FROM rsvps WHERE event_aturi = $1 AND status = $2" 686 } else { 687 "SELECT did, status FROM rsvps WHERE event_aturi = $1" 688 }; 689 690 let rsvps = if let Some(status_value) = status { 691 sqlx::query_as::<_, (String, String)>(query) 692 .bind(event_aturi) 693 .bind(status_value) 694 .fetch_all(tx.as_mut()) 695 .await 696 } else { 697 sqlx::query_as::<_, (String, String)>(query) 698 .bind(event_aturi) 699 .fetch_all(tx.as_mut()) 700 .await 701 } 702 .map_err(StorageError::UnableToExecuteQuery)?; 703 704 tx.commit() 705 .await 706 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 707 708 Ok(rsvps) 709} 710 711pub async fn get_user_rsvp( 712 pool: &StoragePool, 713 event_aturi: &str, 714 did: &str, 715) -> Result<Option<String>, StorageError> { 716 // Validate event_aturi is not empty 717 if event_aturi.trim().is_empty() { 718 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 719 "Event URI cannot be empty".into(), 720 ))); 721 } 722 723 // Validate did is not empty 724 if did.trim().is_empty() { 725 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 726 "DID cannot be empty".into(), 727 ))); 728 } 729 730 let mut tx = pool 731 .begin() 732 .await 733 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 734 735 let status = sqlx::query_scalar::<_, String>( 736 "SELECT status FROM rsvps WHERE event_aturi = $1 AND did = $2", 737 ) 738 .bind(event_aturi) 739 .bind(did) 740 .fetch_optional(tx.as_mut()) 741 .await 742 .map_err(StorageError::UnableToExecuteQuery)?; 743 744 tx.commit() 745 .await 746 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 747 748 Ok(status) 749} 750 751pub async fn rsvp_get(pool: &StoragePool, aturi: &str) -> Result<Option<Rsvp>, StorageError> { 752 // Validate aturi is not empty 753 if aturi.trim().is_empty() { 754 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 755 "RSVP URI cannot be empty".into(), 756 ))); 757 } 758 759 let mut tx = pool 760 .begin() 761 .await 762 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 763 764 let rsvp = sqlx::query_as::<_, Rsvp>("SELECT * FROM rsvps WHERE aturi = $1") 765 .bind(aturi) 766 .fetch_optional(tx.as_mut()) 767 .await 768 .map_err(|err| match err { 769 sqlx::Error::RowNotFound => StorageError::RSVPNotFound, 770 other => StorageError::UnableToExecuteQuery(other), 771 })?; 772 773 tx.commit() 774 .await 775 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 776 777 Ok(rsvp) 778} 779 780pub async fn rsvp_list( 781 pool: &StoragePool, 782 page: i64, 783 page_size: i64, 784) -> Result<(i64, Vec<Rsvp>), StorageError> { 785 // Validate page and page_size are positive 786 if page < 1 || page_size < 1 { 787 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 788 "Page and page size must be positive".into(), 789 ))); 790 } 791 792 let mut tx = pool 793 .begin() 794 .await 795 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 796 797 let total_count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM rsvps") 798 .fetch_one(tx.as_mut()) 799 .await 800 .map_err(StorageError::UnableToExecuteQuery)?; 801 802 let offset = (page - 1) * page_size; 803 804 let rsvps = sqlx::query_as::<_, Rsvp>( 805 r"SELECT * FROM rsvps ORDER BY rsvps.updated_at DESC LIMIT $1 OFFSET $2", 806 ) 807 .bind(page_size + 1) // Fetch one more to know if there are more entries 808 .bind(offset) 809 .fetch_all(tx.as_mut()) 810 .await 811 .map_err(StorageError::UnableToExecuteQuery)?; 812 813 tx.commit() 814 .await 815 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 816 817 Ok((total_count, rsvps)) 818} 819 820pub async fn event_update_with_metadata<T: serde::Serialize>( 821 pool: &StoragePool, 822 aturi: &str, 823 cid: &str, 824 record: &T, 825 name: &str, 826) -> Result<(), StorageError> { 827 // Validate inputs 828 if aturi.trim().is_empty() { 829 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 830 "Event URI cannot be empty".into(), 831 ))); 832 } 833 834 if cid.trim().is_empty() { 835 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 836 "CID cannot be empty".into(), 837 ))); 838 } 839 840 if name.trim().is_empty() { 841 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 842 "Name cannot be empty".into(), 843 ))); 844 } 845 846 let mut tx = pool 847 .begin() 848 .await 849 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 850 851 let now = Utc::now(); 852 853 sqlx::query( 854 "UPDATE events SET cid = $1, record = $2, name = $3, updated_at = $4 WHERE aturi = $5", 855 ) 856 .bind(cid) 857 .bind(json!(record)) 858 .bind(name) 859 .bind(now) 860 .bind(aturi) 861 .execute(tx.as_mut()) 862 .await 863 .map_err(StorageError::UnableToExecuteQuery)?; 864 865 tx.commit() 866 .await 867 .map_err(StorageError::CannotCommitDatabaseTransaction) 868} 869 870pub async fn count_event_rsvps( 871 pool: &StoragePool, 872 event_aturi: &str, 873 status: &str, 874) -> Result<u32, StorageError> { 875 // Validate inputs 876 if event_aturi.trim().is_empty() { 877 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 878 "Event URI cannot be empty".into(), 879 ))); 880 } 881 882 if status.trim().is_empty() { 883 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 884 "Status cannot be empty".into(), 885 ))); 886 } 887 888 let mut tx = pool 889 .begin() 890 .await 891 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 892 893 let count = sqlx::query_scalar::<_, i64>( 894 "SELECT COUNT(*) FROM rsvps WHERE event_aturi = $1 AND status = $2", 895 ) 896 .bind(event_aturi) 897 .bind(status) 898 .fetch_one(tx.as_mut()) 899 .await 900 .map_err(StorageError::UnableToExecuteQuery)?; 901 902 tx.commit() 903 .await 904 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 905 906 Ok(count as u32) 907} 908 909pub async fn get_event_rsvp_counts( 910 pool: &StoragePool, 911 aturis: Vec<String>, 912) -> Result<HashMap<(std::string::String, std::string::String), i64>, StorageError> { 913 // Handle empty list case 914 if aturis.is_empty() { 915 return Ok(HashMap::new()); 916 } 917 918 // Validate all aturis are non-empty 919 for aturi in &aturis { 920 if aturi.trim().is_empty() { 921 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 922 "Event URI cannot be empty".into(), 923 ))); 924 } 925 } 926 927 let mut tx = pool 928 .begin() 929 .await 930 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 931 932 let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new( 933 "SELECT event_aturi, status, COUNT(*) as count FROM rsvps WHERE event_aturi IN (", 934 ); 935 let mut separated = query_builder.separated(", "); 936 for aturi in &aturis { 937 separated.push_bind(aturi); 938 } 939 separated.push_unseparated(") GROUP BY event_aturi, status"); 940 941 // Use build_query_as to correctly include the bindings 942 let query = query_builder.build_query_as::<(String, String, i64)>(); 943 let values = query 944 .fetch_all(tx.as_mut()) 945 .await 946 .map_err(StorageError::UnableToExecuteQuery)?; 947 948 tx.commit() 949 .await 950 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 951 952 Ok(HashMap::from_iter(values.iter().map( 953 |(aturi, status, count)| ((aturi.clone(), status.clone()), *count), 954 ))) 955} 956 957pub async fn event_list( 958 pool: &StoragePool, 959 page: i64, 960 page_size: i64, 961) -> Result<(i64, Vec<Event>), StorageError> { 962 // Validate page and page_size are positive 963 if page < 1 || page_size < 1 { 964 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol( 965 "Page and page size must be positive".into(), 966 ))); 967 } 968 969 let mut tx = pool 970 .begin() 971 .await 972 .map_err(StorageError::CannotBeginDatabaseTransaction)?; 973 974 let total_count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM events") 975 .fetch_one(tx.as_mut()) 976 .await 977 .map_err(StorageError::UnableToExecuteQuery)?; 978 979 let offset = (page - 1) * page_size; 980 981 let events = sqlx::query_as::<_, Event>( 982 "SELECT * FROM events ORDER BY updated_at DESC LIMIT $1 OFFSET $2", 983 ) 984 .bind(page_size + 1) // Fetch one more to know if there are more entries 985 .bind(offset) 986 .fetch_all(tx.as_mut()) 987 .await 988 .map_err(StorageError::UnableToExecuteQuery)?; 989 990 tx.commit() 991 .await 992 .map_err(StorageError::CannotCommitDatabaseTransaction)?; 993 994 Ok((total_count, events)) 995}