this repo has no description

move from Arc<Mutex<VecDeque<T>>> to broadcast for channel system

this creates a spmc architecture so we can create com.atproto.sync.subscribeRepos and implement jetstream etc

vielle.dev 9610deac 98ec0e0f

verified
+29 -26
+15 -5
src/ingest/handler.rs
··· 1 - use std::{collections::VecDeque, sync::Arc}; 1 + use std::sync::Arc; 2 2 3 3 use futures_util::future; 4 4 use ipld_core::ipld::Ipld; ··· 9 9 use jacquard_repo::{BlockStore, MemoryBlockStore}; 10 10 use sqlx::{Pool, Postgres, query}; 11 11 use thiserror::Error; 12 - use tokio::{sync::Mutex, task::JoinHandle}; 12 + use tokio::{sync::broadcast, task::JoinHandle}; 13 13 14 14 use crate::{backfill::backfill, utils::ipld_json::ipld_to_json_value}; 15 15 ··· 157 157 } 158 158 159 159 pub fn ingest( 160 - queue: Arc<Mutex<VecDeque<SubscribeReposMessage<'static>>>>, 160 + mut reciever: broadcast::Receiver<SubscribeReposMessage<'static>>, 161 161 conn: Arc<Pool<Postgres>>, 162 162 ) -> JoinHandle<()> { 163 163 tokio::spawn(async move { 164 164 loop { 165 - let Some(next) = queue.lock().await.pop_front() else { 166 - continue; 165 + let next = match reciever.recv().await { 166 + Ok(val) => val, 167 + Err(err) => match err { 168 + broadcast::error::RecvError::Closed => { 169 + eprintln!("Ingestion failed. Quitting"); 170 + break; 171 + } 172 + broadcast::error::RecvError::Lagged(skipped) => { 173 + eprintln!("Warning: lagging behind. Skipping {skipped} messages"); 174 + continue; 175 + } 176 + }, 167 177 }; 168 178 169 179 match next {
+8 -18
src/ingest/queue.rs
··· 1 1 use std::thread; 2 2 use std::time::Duration; 3 - use std::{collections::VecDeque, sync::Arc}; 4 3 5 4 use futures_util::stream::StreamExt; 6 5 use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 7 6 use jacquard::url::Url; 8 7 use jacquard::{common::xrpc::TungsteniteSubscriptionClient, xrpc::SubscriptionClient}; 9 - use tokio::{ 10 - sync::Mutex, 11 - task::{self, JoinHandle}, 12 - }; 8 + use tokio::sync::broadcast; 9 + use tokio::task::{self, JoinHandle}; 13 10 14 11 use crate::config; 15 12 16 - pub async fn queue() -> ( 17 - Arc<Mutex<VecDeque<SubscribeReposMessage<'static>>>>, 18 - JoinHandle<()>, 19 - ) { 20 - let queue = Arc::new(Mutex::new(VecDeque::new())); 13 + pub async fn queue<'a>(tx: broadcast::Sender<SubscribeReposMessage<'static>>) -> JoinHandle<()> { 14 + // let queue = Arc::new(Mutex::new(VecDeque::new())); 21 15 22 16 // USER_SUBSCRIBE_URL is formatted as a domain 23 17 let uri = Url::parse(&format!("wss://{}/", config::USER_SUBSCRIBE_URL)) ··· 29 23 .expect("Could not subscribe to new events") 30 24 .into_stream(); 31 25 32 - let queue_clone = queue.clone(); 33 - let handle = task::spawn(async move { 34 - let queue = queue_clone; 35 - 26 + // let queue_clone = queue.clone(); 27 + task::spawn(async move { 36 28 loop { 37 29 if let Some(msg) = messages.next().await { 38 30 let msg = match msg { ··· 126 118 SubscribeReposMessage::Unknown(data) => SubscribeReposMessage::Unknown(data), 127 119 }; 128 120 129 - queue.lock().await.push_back(ev); 121 + let _ = tx.send(ev); 130 122 } 131 123 } 132 - }); 133 - 134 - (queue, handle) 124 + }) 135 125 }
+6 -3
src/main.rs
··· 1 + use tokio::sync::broadcast; 2 + 1 3 use crate::backfill::backfill; 2 4 3 5 mod backfill; ··· 35 37 let conn = db::conn().await; 36 38 println!("Database connected and initialized"); 37 39 38 - let (queue, queue_handle) = ingest::queue().await; 40 + let (tx, ingest_reciever) = broadcast::channel(16); 41 + let broadcast_handle = ingest::queue(tx).await; 39 42 40 43 println!("Starting backfill"); 41 44 let timer = std::time::Instant::now(); ··· 55 58 }); 56 59 57 60 println!("Handling new events. Ctrl+C to quit."); 58 - let ingest_handle = ingest::ingest(queue, conn.clone()); 61 + let ingest_handle = ingest::ingest(ingest_reciever, conn.clone()); 59 62 60 63 let _ = rx.await; 61 - queue_handle.abort(); 64 + broadcast_handle.abort(); 62 65 ingest_handle.abort(); 63 66 64 67 Ok(())