Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

pull some duplication out

+54 -61
+3 -26
parakeet/src/xrpc/app_bsky/actor.rs
··· 1 1 use crate::xrpc::error::{Error, XrpcResult}; 2 + use crate::xrpc::{get_actor_did, get_actor_dids}; 2 3 use crate::{hydration, GlobalState}; 3 4 use axum::extract::{Query, State}; 4 5 use axum::Json; ··· 15 16 State(state): State<GlobalState>, 16 17 Query(query): Query<ActorQuery>, 17 18 ) -> XrpcResult<Json<ProfileViewDetailed>> { 18 - // todo: extract this into a function 19 - let did = if query.actor.starts_with("did:") { 20 - query.actor 21 - } else { 22 - state 23 - .dataloaders 24 - .handle 25 - .load(query.actor) 26 - .await 27 - .ok_or(Error::not_found())? 28 - }; 19 + let did = get_actor_did(&state.dataloaders, query.actor).await?; 29 20 30 21 let maybe_profile = hydration::profile::hydrate_profile_detailed(&state.dataloaders, did).await; 31 22 ··· 49 40 State(state): State<GlobalState>, 50 41 ExtraQuery(query): ExtraQuery<ActorsQuery>, 51 42 ) -> XrpcResult<Json<GetProfilesRes>> { 52 - // todo: this should probably be its own function too. 53 - let mut dids = vec![]; 54 - let mut handles = vec![]; 55 - for actor in query.actors { 56 - if actor.starts_with("did:") { 57 - dids.push(actor); 58 - } else { 59 - handles.push(actor); 60 - } 61 - } 62 - 63 - if !handles.is_empty() { 64 - let mapping = state.dataloaders.handle.load_many(handles).await; 65 - dids.extend(mapping.into_values()); 66 - } 43 + let dids = get_actor_dids(&state.dataloaders, query.actors).await; 67 44 68 45 let profiles = hydration::profile::hydrate_profiles_detailed(&state.dataloaders, dids) 69 46 .await
+6 -35
parakeet/src/xrpc/app_bsky/graph/relations.rs
··· 1 - use crate::hydration::profile::hydrate_profiles; 2 1 use crate::xrpc::error::{Error, XrpcResult}; 2 + use crate::xrpc::{datetime_cursor, get_actor_did, ActorWithCursorQuery}; 3 3 use crate::{hydration, GlobalState}; 4 4 use axum::extract::{Query, State}; 5 5 use axum::Json; 6 - use dataloader::cached::Loader; 7 6 use diesel::prelude::*; 8 7 use diesel_async::RunQueryDsl; 9 8 use lexica::app_bsky::actor::ProfileView; 10 9 use parakeet_db::schema; 11 - use serde::{Deserialize, Serialize}; 12 - use std::str::FromStr; 13 - 14 - fn datetime_cursor(cursor: Option<&String>) -> Option<chrono::DateTime<chrono::Utc>> { 15 - cursor 16 - .and_then(|v| i64::from_str(v).ok()) 17 - .and_then(chrono::DateTime::from_timestamp_millis) 18 - } 19 - 20 - #[derive(Debug, Deserialize)] 21 - pub struct ActorWithCursorQuery { 22 - pub actor: String, 23 - pub limit: Option<u8>, 24 - pub cursor: Option<String>, 25 - } 10 + use serde::Serialize; 26 11 27 12 #[derive(Debug, Serialize)] 28 13 pub struct AppBskyGraphGetFollowersRes { ··· 38 23 ) -> XrpcResult<Json<AppBskyGraphGetFollowersRes>> { 39 24 let mut conn = state.pool.get().await?; 40 25 41 - // todo: extract this into a function 42 - let subj_did = if query.actor.starts_with("did:") { 43 - query.actor 44 - } else { 45 - Loader::load(&state.dataloaders.handle, query.actor) 46 - .await 47 - .ok_or(Error::not_found())? 48 - }; 26 + let subj_did = get_actor_did(&state.dataloaders, query.actor).await?; 49 27 50 28 let Some(subject) = hydration::profile::hydrate_profile(&state.dataloaders, subj_did).await 51 29 else { ··· 75 53 76 54 let dids = results.iter().map(|(_, did)| did.clone()).collect(); 77 55 78 - let mut profiles = hydrate_profiles(&state.dataloaders, dids).await; 56 + let mut profiles = hydration::profile::hydrate_profiles(&state.dataloaders, dids).await; 79 57 80 58 let followers = results 81 59 .into_iter() ··· 103 81 ) -> XrpcResult<Json<AppBskyGraphGetFollowsRes>> { 104 82 let mut conn = state.pool.get().await?; 105 83 106 - // todo: extract this into a function 107 - let subj_did = if query.actor.starts_with("did:") { 108 - query.actor 109 - } else { 110 - Loader::load(&state.dataloaders.handle, query.actor) 111 - .await 112 - .ok_or(Error::not_found())? 113 - }; 84 + let subj_did = get_actor_did(&state.dataloaders, query.actor).await?; 114 85 115 86 let Some(subject) = hydration::profile::hydrate_profile(&state.dataloaders, subj_did).await 116 87 else { ··· 140 111 141 112 let dids = results.iter().map(|(_, did)| did.clone()).collect(); 142 113 143 - let mut profiles = hydrate_profiles(&state.dataloaders, dids).await; 114 + let mut profiles = hydration::profile::hydrate_profiles(&state.dataloaders, dids).await; 144 115 145 116 let follows = results 146 117 .into_iter()
+45
parakeet/src/xrpc/mod.rs
··· 1 + use crate::loaders::Dataloaders; 1 2 use axum::Router; 3 + use serde::Deserialize; 4 + use std::str::FromStr; 2 5 3 6 mod app_bsky; 4 7 mod error; ··· 6 9 pub fn xrpc_routes() -> Router<crate::GlobalState> { 7 10 Router::new().merge(app_bsky::routes()) 8 11 } 12 + 13 + fn datetime_cursor(cursor: Option<&String>) -> Option<chrono::DateTime<chrono::Utc>> { 14 + cursor 15 + .and_then(|v| i64::from_str(v).ok()) 16 + .and_then(chrono::DateTime::from_timestamp_millis) 17 + } 18 + 19 + // the docs specify that most places which want an actor can take either a DID or handle 20 + // this converts everything to DIDs 21 + async fn get_actor_did(loaders: &Dataloaders, actor: String) -> error::XrpcResult<String> { 22 + if actor.starts_with("did:") { 23 + Ok(actor) 24 + } else { 25 + loaders.handle.load(actor).await.ok_or(error::Error::not_found()) 26 + } 27 + } 28 + 29 + async fn get_actor_dids(loaders: &Dataloaders, actors: Vec<String>) -> Vec<String> { 30 + let mut dids = vec![]; 31 + let mut handles = vec![]; 32 + for actor in actors { 33 + if actor.starts_with("did:") { 34 + dids.push(actor); 35 + } else { 36 + handles.push(actor); 37 + } 38 + } 39 + 40 + if !handles.is_empty() { 41 + let mapping = loaders.handle.load_many(handles).await; 42 + dids.extend(mapping.into_values()); 43 + } 44 + 45 + dids 46 + } 47 + 48 + #[derive(Debug, Deserialize)] 49 + pub struct ActorWithCursorQuery { 50 + pub actor: String, 51 + pub limit: Option<u8>, 52 + pub cursor: Option<String>, 53 + }