Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

fix(consumer): tweaks to consumer metrics and make repo update warning debug

+12 -4
+9 -1
consumer/src/backfill/mod.rs
··· 10 use reqwest::{Client, StatusCode}; 11 use std::str::FromStr; 12 use std::sync::Arc; 13 use tracing::{instrument, Instrument}; 14 15 mod db; ··· 68 tracing::trace!("backfilling {did}"); 69 if let Err(e) = backfill_actor(&inner, &did).await { 70 tracing::error!(did, "backfill failed: {e}"); 71 } 72 } 73 tracing::error!("backfill rx error?"); 74 } ··· 164 let mut follow_stats = vec![did.to_string()]; 165 166 for (path, (cid, record)) in records { 167 - let Some((_, rkey)) = path.split_once("/") else { 168 tracing::warn!("record contained invalid path {}", path); 169 return Err(diesel::result::Error::RollbackTransaction); 170 }; 171 172 let full_path = format!("at://{did}/{path}"); 173
··· 10 use reqwest::{Client, StatusCode}; 11 use std::str::FromStr; 12 use std::sync::Arc; 13 + use metrics::counter; 14 use tracing::{instrument, Instrument}; 15 16 mod db; ··· 69 tracing::trace!("backfilling {did}"); 70 if let Err(e) = backfill_actor(&inner, &did).await { 71 tracing::error!(did, "backfill failed: {e}"); 72 + counter!("backfill_failure").increment(1); 73 + } else { 74 + counter!("backfill_success").increment(1); 75 } 76 + 77 + counter!("backfill_pending").absolute(rx.len() as u64); 78 } 79 tracing::error!("backfill rx error?"); 80 } ··· 170 let mut follow_stats = vec![did.to_string()]; 171 172 for (path, (cid, record)) in records { 173 + let Some((collection, rkey)) = path.split_once("/") else { 174 tracing::warn!("record contained invalid path {}", path); 175 return Err(diesel::result::Error::RollbackTransaction); 176 }; 177 + 178 + counter!("backfilled_commits", "collection" => collection.to_string()).increment(1); 179 180 let full_path = format!("at://{did}/{path}"); 181
+3 -3
consumer/src/indexer/mod.rs
··· 6 use diesel_async::{AsyncConnection, AsyncPgConnection}; 7 use futures::StreamExt; 8 use ipld_core::cid::Cid; 9 use parakeet_db::types::{ActorStatus, ActorSyncState}; 10 use std::collections::HashMap; 11 use std::sync::Arc; 12 - use metrics::counter; 13 use tokio::sync::mpsc::{channel, Receiver, Sender}; 14 use tracing::instrument; 15 ··· 208 .await?; 209 210 if !updated { 211 - tracing::warn!( 212 "Got a repo update older than the current rev. not processing ops." 213 ); 214 return Ok(false); ··· 249 return None; 250 }; 251 252 - counter!("backfilled_ops", "collection" => collection_raw.to_string(), "action" => op.action.clone()).increment(1); 253 254 let collection = CollectionType::from_str(collection_raw); 255 if collection == CollectionType::Unsupported {
··· 6 use diesel_async::{AsyncConnection, AsyncPgConnection}; 7 use futures::StreamExt; 8 use ipld_core::cid::Cid; 9 + use metrics::counter; 10 use parakeet_db::types::{ActorStatus, ActorSyncState}; 11 use std::collections::HashMap; 12 use std::sync::Arc; 13 use tokio::sync::mpsc::{channel, Receiver, Sender}; 14 use tracing::instrument; 15 ··· 208 .await?; 209 210 if !updated { 211 + tracing::debug!( 212 "Got a repo update older than the current rev. not processing ops." 213 ); 214 return Ok(false); ··· 249 return None; 250 }; 251 252 + counter!("backfill_pending_ops", "collection" => collection_raw.to_string(), "action" => op.action.clone()).increment(1); 253 254 let collection = CollectionType::from_str(collection_raw); 255 if collection == CollectionType::Unsupported {