forked from
smokesignal.events/smokesignal
i18n+filtering fork - fluent-templates v2
1use std::borrow::Cow;
2use std::collections::HashMap;
3
4use anyhow::Result;
5use chrono::Utc;
6use serde_json::json;
7use sqlx::{Postgres, QueryBuilder};
8
9use crate::atproto::lexicon::community::lexicon::calendar::event::Event as EventLexicon;
10use crate::atproto::lexicon::community::lexicon::calendar::rsvp::{
11 Rsvp as RsvpLexicon, RsvpStatus as RsvpStatusLexicon,
12};
13
14use super::errors::StorageError;
15use super::StoragePool;
16use model::{Event, EventWithRole, Rsvp};
17
18pub mod model {
19 use chrono::{DateTime, Utc};
20 use serde::{Deserialize, Serialize};
21 use sqlx::FromRow;
22
23 #[derive(Clone, FromRow, Deserialize, Serialize, Debug)]
24 pub struct Event {
25 pub aturi: String,
26 pub cid: String,
27
28 pub did: String,
29 pub lexicon: String,
30
31 pub record: sqlx::types::Json<serde_json::Value>,
32
33 pub name: String,
34
35 pub updated_at: Option<DateTime<Utc>>,
36 }
37
38 #[derive(Clone, FromRow, Debug, Serialize)]
39 pub struct EventWithRole {
40 #[sqlx(flatten)]
41 pub event: Event,
42
43 pub role: String,
44 // pub event_handle: String,
45 }
46
47 #[derive(Clone, FromRow, Deserialize, Serialize, Debug)]
48 pub struct Rsvp {
49 pub aturi: String,
50 pub cid: String,
51
52 pub did: String,
53 pub lexicon: String,
54
55 pub record: sqlx::types::Json<serde_json::Value>,
56
57 pub event_aturi: String,
58 pub event_cid: String,
59 pub status: String,
60
61 pub updated_at: Option<DateTime<Utc>>,
62 }
63}
64
65pub async fn event_insert(
66 pool: &StoragePool,
67 aturi: &str,
68 cid: &str,
69 did: &str,
70 lexicon: &str,
71 record: &EventLexicon,
72) -> Result<(), StorageError> {
73 // Extract name from the record
74 let name = match record {
75 EventLexicon::Current { name, .. } => name,
76 };
77
78 // Call the new function with extracted values
79 event_insert_with_metadata(pool, aturi, cid, did, lexicon, record, name).await
80}
81
82pub async fn event_insert_with_metadata<T: serde::Serialize>(
83 pool: &StoragePool,
84 aturi: &str,
85 cid: &str,
86 did: &str,
87 lexicon: &str,
88 record: &T,
89 name: &str,
90) -> Result<(), StorageError> {
91 let mut tx = pool
92 .begin()
93 .await
94 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
95
96 let now = Utc::now();
97
98 sqlx::query("INSERT INTO events (aturi, cid, did, lexicon, record, name, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7)")
99 .bind(aturi)
100 .bind(cid)
101 .bind(did)
102 .bind(lexicon)
103 .bind(json!(record))
104 .bind(name)
105 .bind(now)
106 .execute(tx.as_mut())
107 .await
108 .map_err(StorageError::UnableToExecuteQuery)?;
109
110 tx.commit()
111 .await
112 .map_err(StorageError::CannotCommitDatabaseTransaction)
113}
114
115pub struct RsvpInsertParams<'a, T: serde::Serialize> {
116 pub aturi: &'a str,
117 pub cid: &'a str,
118 pub did: &'a str,
119 pub lexicon: &'a str,
120 pub record: &'a T,
121 pub event_aturi: &'a str,
122 pub event_cid: &'a str,
123 pub status: &'a str,
124}
125
126pub async fn rsvp_insert_with_metadata<T: serde::Serialize>(
127 pool: &StoragePool,
128 params: RsvpInsertParams<'_, T>,
129) -> Result<(), StorageError> {
130 let mut tx = pool
131 .begin()
132 .await
133 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
134
135 let now = Utc::now();
136
137 sqlx::query("INSERT INTO rsvps (aturi, cid, did, lexicon, record, event_aturi, event_cid, status, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (aturi) DO UPDATE SET record = $5, cid = $2, status = $8, updated_at = $9")
138 .bind(params.aturi)
139 .bind(params.cid)
140 .bind(params.did)
141 .bind(params.lexicon)
142 .bind(json!(params.record))
143 .bind(params.event_aturi)
144 .bind(params.event_cid)
145 .bind(params.status)
146 .bind(now)
147 .execute(tx.as_mut())
148 .await
149 .map_err(StorageError::UnableToExecuteQuery)?;
150
151 tx.commit()
152 .await
153 .map_err(StorageError::CannotCommitDatabaseTransaction)
154}
155
156pub async fn rsvp_insert(
157 pool: &StoragePool,
158 aturi: &str,
159 cid: &str,
160 did: &str,
161 lexicon: &str,
162 record: &RsvpLexicon,
163) -> Result<(), StorageError> {
164 // Extract the metadata from the record
165 let (event_aturi, event_cid, status) = match record {
166 RsvpLexicon::Current {
167 subject, status, ..
168 } => {
169 let event_aturi = subject.uri.clone();
170 let event_cid = subject.cid.clone();
171 let status = match status {
172 RsvpStatusLexicon::Going => "going",
173 RsvpStatusLexicon::Interested => "interested",
174 RsvpStatusLexicon::NotGoing => "notgoing",
175 };
176 (event_aturi, event_cid, status)
177 }
178 };
179
180 // Call the generic function with extracted values
181 rsvp_insert_with_metadata(
182 pool,
183 RsvpInsertParams {
184 aturi,
185 cid,
186 did,
187 lexicon,
188 record,
189 event_aturi: &event_aturi,
190 event_cid: &event_cid,
191 status,
192 },
193 )
194 .await
195}
196
197// Helper function to extract event information based on lexicon type
198// Helper function to format address information into a readable string
199pub fn format_address(
200 address: &crate::atproto::lexicon::community::lexicon::location::Address,
201) -> String {
202 match address {
203 crate::atproto::lexicon::community::lexicon::location::Address::Current {
204 country,
205 postal_code,
206 region,
207 locality,
208 street,
209 name,
210 } => {
211 let mut parts = Vec::new();
212
213 // Add parts in specified order, omitting empty values
214 if let Some(name_val) = name {
215 if !name_val.trim().is_empty() {
216 parts.push(name_val.clone());
217 }
218 }
219
220 if let Some(street_val) = street {
221 if !street_val.trim().is_empty() {
222 parts.push(street_val.clone());
223 }
224 }
225
226 if let Some(locality_val) = locality {
227 if !locality_val.trim().is_empty() {
228 parts.push(locality_val.clone());
229 }
230 }
231
232 if let Some(region_val) = region {
233 if !region_val.trim().is_empty() {
234 parts.push(region_val.clone());
235 }
236 }
237
238 if let Some(postal_val) = postal_code {
239 if !postal_val.trim().is_empty() {
240 parts.push(postal_val.clone());
241 }
242 }
243
244 // Country is required so no need to check if it's empty
245 parts.push(country.clone());
246
247 // Join parts with commas
248 parts.join(", ")
249 }
250 }
251}
252
253pub fn extract_event_details(event: &Event) -> EventDetails {
254 use crate::atproto::lexicon::{
255 community::lexicon::calendar::event::{Event as CommunityEvent, Mode, Status},
256 events::smokesignal::calendar::event::Event as SmokeSignalEvent,
257 };
258
259 // Try to parse the record based on the lexicon
260 match event.lexicon.as_str() {
261 "community.lexicon.calendar.event" => {
262 if let Ok(community_event) =
263 serde_json::from_value::<CommunityEvent>(event.record.0.clone())
264 {
265 match community_event {
266 CommunityEvent::Current {
267 name,
268 description,
269 created_at,
270 starts_at,
271 ends_at,
272 mode,
273 status,
274 locations,
275 uris,
276 ..
277 } => EventDetails {
278 name: Cow::Owned(name.clone()),
279 description: Cow::Owned(description.clone()),
280 created_at: Some(created_at),
281 starts_at,
282 ends_at,
283 mode: mode.map(|m| match m {
284 Mode::InPerson => {
285 Cow::Borrowed("community.lexicon.calendar.event#inperson")
286 }
287 Mode::Virtual => {
288 Cow::Borrowed("community.lexicon.calendar.event#virtual")
289 }
290 Mode::Hybrid => {
291 Cow::Borrowed("community.lexicon.calendar.event#hybrid")
292 }
293 }),
294 status: status.map(|s| match s {
295 Status::Scheduled => {
296 Cow::Borrowed("community.lexicon.calendar.event#scheduled")
297 }
298 Status::Rescheduled => {
299 Cow::Borrowed("community.lexicon.calendar.event#rescheduled")
300 }
301 Status::Cancelled => {
302 Cow::Borrowed("community.lexicon.calendar.event#cancelled")
303 }
304 Status::Postponed => {
305 Cow::Borrowed("community.lexicon.calendar.event#postponed")
306 }
307 Status::Planned => {
308 Cow::Borrowed("community.lexicon.calendar.event#planned")
309 }
310 }),
311 locations,
312 uris,
313 },
314 }
315 } else {
316 // Fallback to the event's direct name if parsing fails
317 EventDetails {
318 name: Cow::Owned(event.name.clone()),
319 description: Cow::Borrowed(""),
320 created_at: None,
321 starts_at: None,
322 ends_at: None,
323 mode: None,
324 status: None,
325 locations: vec![],
326 uris: vec![],
327 }
328 }
329 }
330 "events.smokesignal.calendar.event" => {
331 if let Ok(ss_event) = serde_json::from_value::<SmokeSignalEvent>(event.record.0.clone())
332 {
333 match ss_event {
334 SmokeSignalEvent::Current {
335 name,
336 text,
337 created_at,
338 starts_at,
339 extra,
340 ..
341 } => {
342 // Extract additional fields from extra map
343 let ends_at = extra
344 .get("endsAt")
345 .and_then(|v| v.as_str())
346 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
347 .map(|dt| dt.with_timezone(&chrono::Utc));
348
349 let mode = extra
350 .get("mode")
351 .and_then(|v| v.as_str().map(ToString::to_string));
352 let status = extra
353 .get("status")
354 .and_then(|v| v.as_str().map(ToString::to_string));
355
356 // Convert locations to the same format used by community.lexicon.calendar.event
357 // Process locations from extra data if available
358 let locations = Vec::new();
359
360 // Extract links from location data
361 let mut uris = Vec::new();
362
363 // Check for virtual locations in the location array
364 if let Some(location_value) = extra.get("location") {
365 if let Some(location_array) = location_value.as_array() {
366 for loc in location_array {
367 if let Some(loc_type) = loc.get("$type") {
368 if let Some(loc_type_str) = loc_type.as_str() {
369 // Handle virtual locations as links
370 if loc_type_str
371 == "events.smokesignal.calendar.location#virtual"
372 {
373 if let (Some(url), Some(name)) = (
374 loc.get("url").and_then(|u| u.as_str()),
375 loc.get("name").and_then(|n| n.as_str()),
376 ) {
377 uris.push(crate::atproto::lexicon::community::lexicon::calendar::event::EventLink::Current {
378 uri: url.to_string(),
379 name: Some(name.to_string()),
380 });
381 }
382 }
383 }
384 }
385 }
386 }
387 }
388
389 // Also check for any additional URIs in the extra map
390 if let Some(links_value) = extra.get("links") {
391 if let Some(links_array) = links_value.as_array() {
392 for link in links_array {
393 if let (Some(uri), Some(name)) = (
394 link.get("uri").and_then(|u| u.as_str()),
395 link.get("name").and_then(|n| n.as_str()),
396 ) {
397 uris.push(crate::atproto::lexicon::community::lexicon::calendar::event::EventLink::Current {
398 uri: uri.to_string(),
399 name: Some(name.to_string()),
400 });
401 } else if let Some(uri) =
402 link.get("uri").and_then(|u| u.as_str())
403 {
404 uris.push(crate::atproto::lexicon::community::lexicon::calendar::event::EventLink::Current {
405 uri: uri.to_string(),
406 name: None,
407 });
408 }
409 }
410 }
411 }
412
413 EventDetails {
414 name: Cow::Owned(name.clone()),
415 description: Cow::Owned(text.clone().unwrap_or_default()),
416 created_at,
417 starts_at,
418 ends_at: ends_at.map(Some).unwrap_or(None),
419 mode: mode.map(Cow::Owned),
420 status: status.map(Cow::Owned),
421 locations,
422 uris,
423 }
424 }
425 }
426 } else {
427 // Fallback to the event's direct name if parsing fails
428 EventDetails {
429 name: Cow::Owned(event.name.clone()),
430 description: Cow::Borrowed(""),
431 created_at: None,
432 starts_at: None,
433 ends_at: None,
434 mode: None,
435 status: None,
436 locations: vec![],
437 uris: vec![],
438 }
439 }
440 }
441 _ => {
442 // Unknown event type - use the stored name
443 EventDetails {
444 name: Cow::Owned(event.name.clone()),
445 description: Cow::Borrowed(""),
446 created_at: None,
447 starts_at: None,
448 ends_at: None,
449 mode: None,
450 status: None,
451 locations: vec![],
452 uris: vec![],
453 }
454 }
455 }
456}
457
458// Structure to hold extracted event details regardless of source format
459#[derive(Debug, Clone)]
460pub struct EventDetails {
461 pub name: Cow<'static, str>,
462 pub description: Cow<'static, str>,
463 pub created_at: Option<chrono::DateTime<chrono::Utc>>,
464 pub starts_at: Option<chrono::DateTime<chrono::Utc>>,
465 pub ends_at: Option<chrono::DateTime<chrono::Utc>>,
466 pub mode: Option<Cow<'static, str>>,
467 pub status: Option<Cow<'static, str>>,
468 pub locations: Vec<crate::atproto::lexicon::community::lexicon::calendar::event::EventLocation>,
469 pub uris: Vec<crate::atproto::lexicon::community::lexicon::calendar::event::EventLink>,
470}
471
472pub async fn event_get(pool: &StoragePool, aturi: &str) -> Result<Event, StorageError> {
473 // Validate aturi is not empty
474 if aturi.trim().is_empty() {
475 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
476 "Event URI cannot be empty".into(),
477 )));
478 }
479
480 let mut tx = pool
481 .begin()
482 .await
483 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
484
485 let record = sqlx::query_as::<_, Event>("SELECT * FROM events WHERE aturi = $1")
486 .bind(aturi)
487 .fetch_one(tx.as_mut())
488 .await
489 .map_err(|err| match err {
490 sqlx::Error::RowNotFound => StorageError::RowNotFound("event".to_string(), err),
491 other => StorageError::UnableToExecuteQuery(other),
492 })?;
493
494 tx.commit()
495 .await
496 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
497
498 Ok(record)
499}
500
501pub async fn event_exists(pool: &StoragePool, aturi: &str) -> Result<bool, StorageError> {
502 // Validate aturi is not empty
503 if aturi.trim().is_empty() {
504 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
505 "Event URI cannot be empty".into(),
506 )));
507 }
508
509 let mut tx = pool
510 .begin()
511 .await
512 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
513
514 let total_count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM events WHERE aturi = $1")
515 .bind(aturi)
516 .fetch_one(tx.as_mut())
517 .await
518 .map_err(StorageError::UnableToExecuteQuery)?;
519
520 tx.commit()
521 .await
522 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
523
524 Ok(total_count > 0)
525}
526
527pub async fn event_get_cid(
528 pool: &StoragePool,
529 aturi: &str,
530) -> Result<Option<String>, StorageError> {
531 // Validate aturi is not empty
532 if aturi.trim().is_empty() {
533 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
534 "Event URI cannot be empty".into(),
535 )));
536 }
537
538 let mut tx = pool
539 .begin()
540 .await
541 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
542
543 let record = sqlx::query_scalar::<_, String>("SELECT cid FROM events WHERE aturi = $1")
544 .bind(aturi)
545 .fetch_optional(tx.as_mut())
546 .await
547 .map_err(StorageError::UnableToExecuteQuery)?;
548
549 tx.commit()
550 .await
551 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
552
553 Ok(record)
554}
555
556pub async fn event_list_did_recently_updated(
557 pool: &StoragePool,
558 did: &str,
559 page: i64,
560 page_size: i64,
561) -> Result<Vec<EventWithRole>, StorageError> {
562 // Validate did is not empty
563 if did.trim().is_empty() {
564 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
565 "DID cannot be empty".into(),
566 )));
567 }
568
569 // Validate page and page_size are positive
570 if page < 1 || page_size < 1 {
571 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
572 "Page and page size must be positive".into(),
573 )));
574 }
575
576 let mut tx = pool
577 .begin()
578 .await
579 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
580
581 let offset = (page - 1) * page_size;
582
583 let events_query = r"SELECT
584 events.*,
585 'organizer' as role
586FROM
587 events
588WHERE
589 events.did = $1
590ORDER BY
591 events.updated_at DESC,
592 events.aturi ASC
593LIMIT
594$2
595OFFSET
596$3
597";
598
599 let event_roles = sqlx::query_as::<_, EventWithRole>(events_query)
600 .bind(did)
601 .bind(page_size + 1)
602 .bind(offset)
603 .fetch_all(tx.as_mut())
604 .await
605 .map_err(StorageError::UnableToExecuteQuery)?;
606
607 tx.commit()
608 .await
609 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
610
611 Ok(event_roles)
612}
613
614pub async fn event_list_recently_updated(
615 pool: &StoragePool,
616 page: i64,
617 page_size: i64,
618) -> Result<Vec<EventWithRole>, StorageError> {
619 // Validate page and page_size are positive
620 if page < 1 || page_size < 1 {
621 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
622 "Page and page size must be positive".into(),
623 )));
624 }
625
626 let mut tx = pool
627 .begin()
628 .await
629 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
630
631 let offset = (page - 1) * page_size;
632
633 let events_query = r"SELECT
634 events.*,
635 'organizer' as role
636 FROM
637 events
638 ORDER BY
639 events.updated_at DESC,
640 events.aturi ASC
641 LIMIT $1
642 OFFSET $2";
643
644 let event_roles = sqlx::query_as::<_, EventWithRole>(events_query)
645 .bind(page_size + 1)
646 .bind(offset)
647 .fetch_all(tx.as_mut())
648 .await
649 .map_err(StorageError::UnableToExecuteQuery)?;
650
651 tx.commit()
652 .await
653 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
654
655 Ok(event_roles)
656}
657
658pub async fn get_event_rsvps(
659 pool: &StoragePool,
660 event_aturi: &str,
661 status: Option<&str>,
662) -> Result<Vec<(String, String)>, StorageError> {
663 // Validate event_aturi is not empty
664 if event_aturi.trim().is_empty() {
665 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
666 "Event URI cannot be empty".into(),
667 )));
668 }
669
670 // If status is provided, validate it's not empty
671 if let Some(status_val) = status {
672 if status_val.trim().is_empty() {
673 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
674 "Status cannot be empty".into(),
675 )));
676 }
677 }
678
679 let mut tx = pool
680 .begin()
681 .await
682 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
683
684 let query = if status.is_some() {
685 "SELECT did, status FROM rsvps WHERE event_aturi = $1 AND status = $2"
686 } else {
687 "SELECT did, status FROM rsvps WHERE event_aturi = $1"
688 };
689
690 let rsvps = if let Some(status_value) = status {
691 sqlx::query_as::<_, (String, String)>(query)
692 .bind(event_aturi)
693 .bind(status_value)
694 .fetch_all(tx.as_mut())
695 .await
696 } else {
697 sqlx::query_as::<_, (String, String)>(query)
698 .bind(event_aturi)
699 .fetch_all(tx.as_mut())
700 .await
701 }
702 .map_err(StorageError::UnableToExecuteQuery)?;
703
704 tx.commit()
705 .await
706 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
707
708 Ok(rsvps)
709}
710
711pub async fn get_user_rsvp(
712 pool: &StoragePool,
713 event_aturi: &str,
714 did: &str,
715) -> Result<Option<String>, StorageError> {
716 // Validate event_aturi is not empty
717 if event_aturi.trim().is_empty() {
718 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
719 "Event URI cannot be empty".into(),
720 )));
721 }
722
723 // Validate did is not empty
724 if did.trim().is_empty() {
725 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
726 "DID cannot be empty".into(),
727 )));
728 }
729
730 let mut tx = pool
731 .begin()
732 .await
733 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
734
735 let status = sqlx::query_scalar::<_, String>(
736 "SELECT status FROM rsvps WHERE event_aturi = $1 AND did = $2",
737 )
738 .bind(event_aturi)
739 .bind(did)
740 .fetch_optional(tx.as_mut())
741 .await
742 .map_err(StorageError::UnableToExecuteQuery)?;
743
744 tx.commit()
745 .await
746 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
747
748 Ok(status)
749}
750
751pub async fn rsvp_get(pool: &StoragePool, aturi: &str) -> Result<Option<Rsvp>, StorageError> {
752 // Validate aturi is not empty
753 if aturi.trim().is_empty() {
754 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
755 "RSVP URI cannot be empty".into(),
756 )));
757 }
758
759 let mut tx = pool
760 .begin()
761 .await
762 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
763
764 let rsvp = sqlx::query_as::<_, Rsvp>("SELECT * FROM rsvps WHERE aturi = $1")
765 .bind(aturi)
766 .fetch_optional(tx.as_mut())
767 .await
768 .map_err(|err| match err {
769 sqlx::Error::RowNotFound => StorageError::RSVPNotFound,
770 other => StorageError::UnableToExecuteQuery(other),
771 })?;
772
773 tx.commit()
774 .await
775 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
776
777 Ok(rsvp)
778}
779
780pub async fn rsvp_list(
781 pool: &StoragePool,
782 page: i64,
783 page_size: i64,
784) -> Result<(i64, Vec<Rsvp>), StorageError> {
785 // Validate page and page_size are positive
786 if page < 1 || page_size < 1 {
787 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
788 "Page and page size must be positive".into(),
789 )));
790 }
791
792 let mut tx = pool
793 .begin()
794 .await
795 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
796
797 let total_count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM rsvps")
798 .fetch_one(tx.as_mut())
799 .await
800 .map_err(StorageError::UnableToExecuteQuery)?;
801
802 let offset = (page - 1) * page_size;
803
804 let rsvps = sqlx::query_as::<_, Rsvp>(
805 r"SELECT * FROM rsvps ORDER BY rsvps.updated_at DESC LIMIT $1 OFFSET $2",
806 )
807 .bind(page_size + 1) // Fetch one more to know if there are more entries
808 .bind(offset)
809 .fetch_all(tx.as_mut())
810 .await
811 .map_err(StorageError::UnableToExecuteQuery)?;
812
813 tx.commit()
814 .await
815 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
816
817 Ok((total_count, rsvps))
818}
819
820pub async fn event_update_with_metadata<T: serde::Serialize>(
821 pool: &StoragePool,
822 aturi: &str,
823 cid: &str,
824 record: &T,
825 name: &str,
826) -> Result<(), StorageError> {
827 // Validate inputs
828 if aturi.trim().is_empty() {
829 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
830 "Event URI cannot be empty".into(),
831 )));
832 }
833
834 if cid.trim().is_empty() {
835 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
836 "CID cannot be empty".into(),
837 )));
838 }
839
840 if name.trim().is_empty() {
841 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
842 "Name cannot be empty".into(),
843 )));
844 }
845
846 let mut tx = pool
847 .begin()
848 .await
849 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
850
851 let now = Utc::now();
852
853 sqlx::query(
854 "UPDATE events SET cid = $1, record = $2, name = $3, updated_at = $4 WHERE aturi = $5",
855 )
856 .bind(cid)
857 .bind(json!(record))
858 .bind(name)
859 .bind(now)
860 .bind(aturi)
861 .execute(tx.as_mut())
862 .await
863 .map_err(StorageError::UnableToExecuteQuery)?;
864
865 tx.commit()
866 .await
867 .map_err(StorageError::CannotCommitDatabaseTransaction)
868}
869
870pub async fn count_event_rsvps(
871 pool: &StoragePool,
872 event_aturi: &str,
873 status: &str,
874) -> Result<u32, StorageError> {
875 // Validate inputs
876 if event_aturi.trim().is_empty() {
877 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
878 "Event URI cannot be empty".into(),
879 )));
880 }
881
882 if status.trim().is_empty() {
883 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
884 "Status cannot be empty".into(),
885 )));
886 }
887
888 let mut tx = pool
889 .begin()
890 .await
891 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
892
893 let count = sqlx::query_scalar::<_, i64>(
894 "SELECT COUNT(*) FROM rsvps WHERE event_aturi = $1 AND status = $2",
895 )
896 .bind(event_aturi)
897 .bind(status)
898 .fetch_one(tx.as_mut())
899 .await
900 .map_err(StorageError::UnableToExecuteQuery)?;
901
902 tx.commit()
903 .await
904 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
905
906 Ok(count as u32)
907}
908
909pub async fn get_event_rsvp_counts(
910 pool: &StoragePool,
911 aturis: Vec<String>,
912) -> Result<HashMap<(std::string::String, std::string::String), i64>, StorageError> {
913 // Handle empty list case
914 if aturis.is_empty() {
915 return Ok(HashMap::new());
916 }
917
918 // Validate all aturis are non-empty
919 for aturi in &aturis {
920 if aturi.trim().is_empty() {
921 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
922 "Event URI cannot be empty".into(),
923 )));
924 }
925 }
926
927 let mut tx = pool
928 .begin()
929 .await
930 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
931
932 let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
933 "SELECT event_aturi, status, COUNT(*) as count FROM rsvps WHERE event_aturi IN (",
934 );
935 let mut separated = query_builder.separated(", ");
936 for aturi in &aturis {
937 separated.push_bind(aturi);
938 }
939 separated.push_unseparated(") GROUP BY event_aturi, status");
940
941 // Use build_query_as to correctly include the bindings
942 let query = query_builder.build_query_as::<(String, String, i64)>();
943 let values = query
944 .fetch_all(tx.as_mut())
945 .await
946 .map_err(StorageError::UnableToExecuteQuery)?;
947
948 tx.commit()
949 .await
950 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
951
952 Ok(HashMap::from_iter(values.iter().map(
953 |(aturi, status, count)| ((aturi.clone(), status.clone()), *count),
954 )))
955}
956
957pub async fn event_list(
958 pool: &StoragePool,
959 page: i64,
960 page_size: i64,
961) -> Result<(i64, Vec<Event>), StorageError> {
962 // Validate page and page_size are positive
963 if page < 1 || page_size < 1 {
964 return Err(StorageError::UnableToExecuteQuery(sqlx::Error::Protocol(
965 "Page and page size must be positive".into(),
966 )));
967 }
968
969 let mut tx = pool
970 .begin()
971 .await
972 .map_err(StorageError::CannotBeginDatabaseTransaction)?;
973
974 let total_count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM events")
975 .fetch_one(tx.as_mut())
976 .await
977 .map_err(StorageError::UnableToExecuteQuery)?;
978
979 let offset = (page - 1) * page_size;
980
981 let events = sqlx::query_as::<_, Event>(
982 "SELECT * FROM events ORDER BY updated_at DESC LIMIT $1 OFFSET $2",
983 )
984 .bind(page_size + 1) // Fetch one more to know if there are more entries
985 .bind(offset)
986 .fetch_all(tx.as_mut())
987 .await
988 .map_err(StorageError::UnableToExecuteQuery)?;
989
990 tx.commit()
991 .await
992 .map_err(StorageError::CannotCommitDatabaseTransaction)?;
993
994 Ok((total_count, events))
995}