Two teams try and fill in any horizontal, vertical, or diagonal line on a bingo board by playing maps on osu!
osu.bingo
osu
1use futures_lite::StreamExt;
2use lapin::{
3 Channel,
4 message::Delivery,
5 options::{BasicAckOptions, BasicConsumeOptions, QueueDeclareOptions},
6 types::FieldTable,
7};
8
9/// A helper struct to declare and initialize queues and consumers
10pub struct QueueConsumerInitializer<'a> {
11 channel: &'a Channel,
12 queue_declare_options: QueueDeclareOptions,
13 basic_consume_options: BasicConsumeOptions,
14}
15
16impl<'a> QueueConsumerInitializer<'a> {
17 pub fn new(
18 channel: &'a Channel,
19 queue_declare_options: QueueDeclareOptions,
20 basic_consume_options: BasicConsumeOptions,
21 ) -> Self {
22 QueueConsumerInitializer {
23 channel,
24 queue_declare_options,
25 basic_consume_options,
26 }
27 }
28
29 pub async fn declare<F, Fut>(&self, queue_name: String, cb: F) -> Result<(), lapin::Error>
30 where
31 F: Fn(Delivery) -> Fut + Send + Sync + 'static,
32 Fut: Future<Output = ()> + Send + 'static,
33 {
34 let queue = match self
35 .channel
36 .queue_declare(
37 queue_name.clone().into(),
38 self.queue_declare_options.clone(),
39 FieldTable::default(),
40 )
41 .await
42 {
43 Ok(x) => x,
44 Err(err) => {
45 log::error!("Failed to declare queue {queue_name}: {err}");
46 return Err(err);
47 }
48 };
49
50 let mut consumer = match self
51 .channel
52 .basic_consume(
53 queue.name().clone(),
54 "".into(),
55 self.basic_consume_options.clone(),
56 FieldTable::default(),
57 )
58 .await
59 {
60 Ok(x) => x,
61 Err(err) => {
62 log::error!("Failed to declare consumer: {err}");
63 return Err(err);
64 }
65 };
66
67 log::info!("Initializing consumer for queue {}", queue.name());
68
69 // Spawn task to continuously consume messages
70 tokio::task::spawn(async move {
71 while let Some(delivery) = consumer.next().await {
72 let delivery = match delivery {
73 Ok(x) => x,
74 Err(err) => {
75 log::warn!("Recieved delivery had an error: {err}");
76 continue;
77 }
78 };
79
80 // Ack Delivery
81 match delivery.ack(BasicAckOptions::default()).await {
82 Ok(_) => (),
83 Err(err) => {
84 log::warn!("Failed to send ack: {err}");
85 }
86 }
87
88 // Run provided callback on data
89 cb(delivery).await;
90 }
91 });
92
93 Ok(())
94 }
95}