Two teams try and fill in any horizontal, vertical, or diagonal line on a bingo board by playing maps on osu! osu.bingo
osu
at microservice 95 lines 2.8 kB view raw
1use futures_lite::StreamExt; 2use lapin::{ 3 Channel, 4 message::Delivery, 5 options::{BasicAckOptions, BasicConsumeOptions, QueueDeclareOptions}, 6 types::FieldTable, 7}; 8 9/// A helper struct to declare and initialize queues and consumers 10pub struct QueueConsumerInitializer<'a> { 11 channel: &'a Channel, 12 queue_declare_options: QueueDeclareOptions, 13 basic_consume_options: BasicConsumeOptions, 14} 15 16impl<'a> QueueConsumerInitializer<'a> { 17 pub fn new( 18 channel: &'a Channel, 19 queue_declare_options: QueueDeclareOptions, 20 basic_consume_options: BasicConsumeOptions, 21 ) -> Self { 22 QueueConsumerInitializer { 23 channel, 24 queue_declare_options, 25 basic_consume_options, 26 } 27 } 28 29 pub async fn declare<F, Fut>(&self, queue_name: String, cb: F) -> Result<(), lapin::Error> 30 where 31 F: Fn(Delivery) -> Fut + Send + Sync + 'static, 32 Fut: Future<Output = ()> + Send + 'static, 33 { 34 let queue = match self 35 .channel 36 .queue_declare( 37 queue_name.clone().into(), 38 self.queue_declare_options.clone(), 39 FieldTable::default(), 40 ) 41 .await 42 { 43 Ok(x) => x, 44 Err(err) => { 45 log::error!("Failed to declare queue {queue_name}: {err}"); 46 return Err(err); 47 } 48 }; 49 50 let mut consumer = match self 51 .channel 52 .basic_consume( 53 queue.name().clone(), 54 "".into(), 55 self.basic_consume_options.clone(), 56 FieldTable::default(), 57 ) 58 .await 59 { 60 Ok(x) => x, 61 Err(err) => { 62 log::error!("Failed to declare consumer: {err}"); 63 return Err(err); 64 } 65 }; 66 67 log::info!("Initializing consumer for queue {}", queue.name()); 68 69 // Spawn task to continuously consume messages 70 tokio::task::spawn(async move { 71 while let Some(delivery) = consumer.next().await { 72 let delivery = match delivery { 73 Ok(x) => x, 74 Err(err) => { 75 log::warn!("Recieved delivery had an error: {err}"); 76 continue; 77 } 78 }; 79 80 // Ack Delivery 81 match delivery.ack(BasicAckOptions::default()).await { 82 Ok(_) => (), 83 Err(err) => { 84 log::warn!("Failed to send ack: {err}"); 85 } 86 } 87 88 // Run provided callback on data 89 cb(delivery).await; 90 } 91 }); 92 93 Ok(()) 94 } 95}