Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

feat: backfill spidering

mia.omg.lol 8f8bf0f0 924b6f6a

verified
+89 -7
+12 -7
crates/consumer/src/backfill/mod.rs
··· 25 mod repo; 26 mod utils; 27 28 const DL_DUP_KEY: &str = "bf_completed"; 29 // There's a 4MiB limit on parakeet-index, so break delta batches up if there's loads. 30 // this should be plenty low enough to not trigger the size limit. (59k did slightly) ··· 80 break; 81 } 82 83 - let did: String = match self.redis.lpop("backfill_queue", None).await { 84 Ok(Some(did)) => did, 85 Ok(None) => { 86 tokio::time::sleep(Duration::from_millis(250)).await; ··· 96 97 let inner = self.inner.clone(); 98 let mut conn = self.pool.get().await?; 99 - let rc = self.redis.clone(); 100 101 tracker.spawn(async move { 102 let _p = p; 103 tracing::trace!("backfilling {did}"); 104 105 - if let Err(e) = do_actor_backfill(&mut conn, rc, inner, &did).await { 106 tracing::error!(did, "backfill failed: {e}"); 107 counter!("backfill_failure").increment(1); 108 } else { ··· 111 db::backfill_job_write(&mut conn, &did, "successful") 112 .await 113 .unwrap(); 114 } 115 }); 116 } ··· 123 124 async fn do_actor_backfill( 125 conn: &mut Object, 126 - mut rc: MultiplexedConnection, 127 mut inner: BackfillManagerInner, 128 did: &str, 129 ) -> eyre::Result<()> { ··· 191 } 192 } 193 194 - utils::enforce_ratelimit(&mut rc, &pds).await?; 195 196 - match backfill_repo(conn, &mut rc, &mut inner, &pds, did).await { 197 Ok(Some((rem, reset))) => { 198 let _ = rc.zadd(utils::BF_REM_KEY, &pds, rem).await; 199 let _ = rc.zadd(utils::BF_RESET_KEY, &pds, reset).await; ··· 203 pds, 204 "got response with no ratelimit headers, using defaults." 205 ); 206 - utils::handle_default_ratelimit(&mut rc, &pds).await?; 207 } 208 Err(e) => { 209 tracing::error!(did, "backfill failed: {e}");
··· 25 mod repo; 26 mod utils; 27 28 + const BF_QUEUE: &str = "backfill_queue"; 29 const DL_DUP_KEY: &str = "bf_completed"; 30 // There's a 4MiB limit on parakeet-index, so break delta batches up if there's loads. 31 // this should be plenty low enough to not trigger the size limit. (59k did slightly) ··· 81 break; 82 } 83 84 + let did: String = match self.redis.lpop(BF_QUEUE, None).await { 85 Ok(Some(did)) => did, 86 Ok(None) => { 87 tokio::time::sleep(Duration::from_millis(250)).await; ··· 97 98 let inner = self.inner.clone(); 99 let mut conn = self.pool.get().await?; 100 + let mut rc = self.redis.clone(); 101 102 tracker.spawn(async move { 103 let _p = p; 104 tracing::trace!("backfilling {did}"); 105 106 + if let Err(e) = do_actor_backfill(&mut conn, &mut rc, inner, &did).await { 107 tracing::error!(did, "backfill failed: {e}"); 108 counter!("backfill_failure").increment(1); 109 } else { ··· 112 db::backfill_job_write(&mut conn, &did, "successful") 113 .await 114 .unwrap(); 115 + 116 + if let Err(e) = utils::handle_spider(&mut conn, &mut rc, &did).await { 117 + tracing::error!("failed to trigger spider for {did}: {e}"); 118 + } 119 } 120 }); 121 } ··· 128 129 async fn do_actor_backfill( 130 conn: &mut Object, 131 + rc: &mut MultiplexedConnection, 132 mut inner: BackfillManagerInner, 133 did: &str, 134 ) -> eyre::Result<()> { ··· 196 } 197 } 198 199 + utils::enforce_ratelimit(rc, &pds).await?; 200 201 + match backfill_repo(conn, rc, &mut inner, &pds, did).await { 202 Ok(Some((rem, reset))) => { 203 let _ = rc.zadd(utils::BF_REM_KEY, &pds, rem).await; 204 let _ = rc.zadd(utils::BF_RESET_KEY, &pds, reset).await; ··· 208 pds, 209 "got response with no ratelimit headers, using defaults." 210 ); 211 + utils::handle_default_ratelimit(rc, &pds).await?; 212 } 213 Err(e) => { 214 tracing::error!(did, "backfill failed: {e}");
+41
crates/consumer/src/backfill/utils.rs
··· 9 use std::time::Duration; 10 use tracing::instrument; 11 12 pub const BF_RESET_KEY: &str = "bf_ratelimit_reset"; 13 pub const BF_REM_KEY: &str = "bf_ratelimit_rem"; 14 const BF_REM_DEFAULT: i32 = 1000; 15 16 #[derive(Debug, Deserialize)] 17 pub struct GetRepoStatusRes {
··· 9 use std::time::Duration; 10 use tracing::instrument; 11 12 + const SPIDER_KEY: &str = "bf_spider"; 13 pub const BF_RESET_KEY: &str = "bf_ratelimit_reset"; 14 pub const BF_REM_KEY: &str = "bf_ratelimit_rem"; 15 const BF_REM_DEFAULT: i32 = 1000; 16 + 17 + pub async fn handle_spider( 18 + conn: &mut Object, 19 + rc: &mut MultiplexedConnection, 20 + did: &str, 21 + ) -> eyre::Result<()> { 22 + let Some(spider_count) = 23 + redis::AsyncCommands::hget::<_, _, Option<i32>>(rc, SPIDER_KEY, did).await? 24 + else { 25 + return Ok(()); 26 + }; 27 + rc.hdel(SPIDER_KEY, did).await?; 28 + 29 + let new_count = spider_count - 1; 30 + 31 + let follows = conn 32 + .query("SELECT subject FROM follows WHERE did=$1", &[&did]) 33 + .await?; 34 + if follows.is_empty() { 35 + return Ok(()); 36 + } 37 + 38 + let follows = follows.iter().map(|v| v.get::<_, String>(0)); 39 + 40 + let items = follows 41 + .clone() 42 + .map(|follow| (follow, new_count)) 43 + .collect::<Vec<_>>(); 44 + 45 + if new_count > 0 { 46 + // write all the new accounts 47 + rc.hset_multiple(SPIDER_KEY, &items).await.unwrap(); 48 + } 49 + 50 + // and then to backfill 51 + let follows = follows.collect::<Vec<_>>(); 52 + rc.rpush(super::BF_QUEUE, &follows).await.unwrap(); 53 + 54 + Ok(()) 55 + } 56 57 #[derive(Debug, Deserialize)] 58 pub struct GetRepoStatusRes {
+35
crates/parakeet/src/xrpc/at_parakeet/admin.rs
··· 53 54 Ok(()) 55 }
··· 53 54 Ok(()) 55 } 56 + 57 + #[derive(Debug, Deserialize)] 58 + pub struct RequestSpiderReq { 59 + pub depth: i32, 60 + pub dids: Vec<String>, 61 + } 62 + 63 + pub async fn request_spider( 64 + State(mut state): State<GlobalState>, 65 + auth: AtpAuth, 66 + Json(form): Json<RequestSpiderReq>, 67 + ) -> XrpcResult<()> { 68 + if !check_admin_did(&state, &auth.0) { 69 + return Err(Error::new(StatusCode::FORBIDDEN, "Forbidden", None)); 70 + } 71 + 72 + let items = form 73 + .dids 74 + .iter() 75 + .clone() 76 + .map(|did| (did, form.depth)) 77 + .collect::<Vec<_>>(); 78 + 79 + if let Err(e) = state.redis_mp.hset_multiple("bf_spider", &items).await { 80 + tracing::error!("failed to push to spider store: {e}"); 81 + return Err(Error::server_error(None)); 82 + } 83 + 84 + if let Err(e) = state.redis_mp.rpush(BACKFILL_QUEUE, form.dids).await { 85 + tracing::error!("failed to push to backfill queue: {e}"); 86 + return Err(Error::server_error(None)); 87 + } 88 + 89 + Ok(()) 90 + }
+1
crates/parakeet/src/xrpc/at_parakeet/mod.rs
··· 8 Router::new() 9 .route("/at.parakeet.admin.backfillQueueSize", get(admin::backfill_queue_size)) 10 .route("/at.parakeet.admin.requestBackfill", post(admin::request_backfill)) 11 } 12 13 pub fn check_admin_did(state: &crate::GlobalState, did: &String) -> bool {
··· 8 Router::new() 9 .route("/at.parakeet.admin.backfillQueueSize", get(admin::backfill_queue_size)) 10 .route("/at.parakeet.admin.requestBackfill", post(admin::request_backfill)) 11 + .route("/at.parakeet.admin.requestSpider", post(admin::request_spider)) 12 } 13 14 pub fn check_admin_did(state: &crate::GlobalState, did: &String) -> bool {