Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

fix(consumer): use a mutex when checking status to stop a race

mia.omg.lol 885cb966 637cb01d

verified
+23 -1
+23 -1
crates/consumer/src/backfill/mod.rs
··· 13 use redis::aio::MultiplexedConnection; 14 use redis::AsyncTypedCommands; 15 use reqwest::Client; 16 use std::path::PathBuf; 17 use std::str::FromStr; 18 use std::sync::Arc; 19 use std::time::Duration; 20 use tokio::sync::watch::Receiver as WatchReceiver; 21 - use tokio::sync::Semaphore; 22 use tokio_util::task::TaskTracker; 23 use tracing::instrument; 24 ··· 32 33 #[derive(Clone)] 34 pub struct BackfillManagerInner { 35 index_client: Option<parakeet_index::Client>, 36 tmp_dir: PathBuf, 37 resolver: JacquardResolver, ··· 57 58 let client = Client::builder().brotli(true).build()?; 59 60 Ok(BackfillManager { 61 pool, 62 redis, 63 semaphore, 64 inner: BackfillManagerInner { 65 index_client, 66 tmp_dir: PathBuf::from_str(&opts.download_tmp_dir)?, 67 resolver, ··· 131 mut inner: BackfillManagerInner, 132 did: &str, 133 ) -> eyre::Result<()> { 134 // has the repo already been downloaded? 135 match db::actor_get_statuses(conn, did).await { 136 Ok(Some((_, state))) => { 137 if state == ActorSyncState::Synced || state == ActorSyncState::Processing { ··· 153 &[&did], 154 ) 155 .await?; 156 157 let jd = Did::raw(did); 158 let (pds, handle) = match utils::resolve_service(&inner.resolver, &jd).await { ··· 208 db::actor_set_sync_status(conn, did, ActorSyncState::Dirty, Utc::now()).await?; 209 db::backfill_job_write(conn, did, "failed.write").await?; 210 } 211 } 212 213 Ok(())
··· 13 use redis::aio::MultiplexedConnection; 14 use redis::AsyncTypedCommands; 15 use reqwest::Client; 16 + use std::collections::HashSet; 17 use std::path::PathBuf; 18 use std::str::FromStr; 19 use std::sync::Arc; 20 use std::time::Duration; 21 use tokio::sync::watch::Receiver as WatchReceiver; 22 + use tokio::sync::{Mutex, Semaphore}; 23 use tokio_util::task::TaskTracker; 24 use tracing::instrument; 25 ··· 33 34 #[derive(Clone)] 35 pub struct BackfillManagerInner { 36 + // we don't need to store anything, just ensure only one thread in the status check at a time 37 + status_lookup_lock: Arc<Mutex<HashSet<String>>>, 38 index_client: Option<parakeet_index::Client>, 39 tmp_dir: PathBuf, 40 resolver: JacquardResolver, ··· 60 61 let client = Client::builder().brotli(true).build()?; 62 63 + let current_tasks = Arc::new(Mutex::new(HashSet::new())); 64 + 65 Ok(BackfillManager { 66 pool, 67 redis, 68 semaphore, 69 inner: BackfillManagerInner { 70 + status_lookup_lock: current_tasks, 71 index_client, 72 tmp_dir: PathBuf::from_str(&opts.download_tmp_dir)?, 73 resolver, ··· 137 mut inner: BackfillManagerInner, 138 did: &str, 139 ) -> eyre::Result<()> { 140 + let mut l = inner.status_lookup_lock.lock().await; 141 + 142 // has the repo already been downloaded? 143 + if l.contains(did) { 144 + tracing::info!("skipping duplicate repo {did}"); 145 + return Ok(()); 146 + } else { 147 + l.insert(did.to_string()); 148 + } 149 + 150 match db::actor_get_statuses(conn, did).await { 151 Ok(Some((_, state))) => { 152 if state == ActorSyncState::Synced || state == ActorSyncState::Processing { ··· 168 &[&did], 169 ) 170 .await?; 171 + 172 + drop(l); 173 174 let jd = Did::raw(did); 175 let (pds, handle) = match utils::resolve_service(&inner.resolver, &jd).await { ··· 225 db::actor_set_sync_status(conn, did, ActorSyncState::Dirty, Utc::now()).await?; 226 db::backfill_job_write(conn, did, "failed.write").await?; 227 } 228 + } 229 + 230 + { 231 + let mut l = inner.status_lookup_lock.lock().await; 232 + l.remove(did); 233 } 234 235 Ok(())