Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

feat(parakeet): getLists and getList

+356 -8
+1
Cargo.lock
··· 1842 "lexica", 1843 "parakeet-db", 1844 "serde", 1845 "tokio", 1846 "tower-http", 1847 "tracing",
··· 1842 "lexica", 1843 "parakeet-db", 1844 "serde", 1845 + "serde_json", 1846 "tokio", 1847 "tower-http", 1848 "tracing",
+4 -4
lexica/src/app_bsky/actor.rs
··· 1 use chrono::prelude::*; 2 use serde::Serialize; 3 4 - #[derive(Default, Debug, Serialize)] 5 #[serde(rename_all = "camelCase")] 6 pub struct ProfileAssociated { 7 pub lists: i64, ··· 12 pub chat: Option<ProfileAssociatedChat>, 13 } 14 15 - #[derive(Debug, Serialize)] 16 #[serde(rename_all = "camelCase")] 17 pub struct ProfileAssociatedChat { 18 pub allow_incoming: ChatAllowIncoming, 19 } 20 21 - #[derive(Debug, Serialize)] 22 #[serde(rename_all = "lowercase")] 23 pub enum ChatAllowIncoming { 24 All, ··· 45 pub created_at: DateTime<Utc>, 46 } 47 48 - #[derive(Debug, Serialize)] 49 #[serde(rename_all = "camelCase")] 50 pub struct ProfileView { 51 pub did: String,
··· 1 use chrono::prelude::*; 2 use serde::Serialize; 3 4 + #[derive(Clone, Default, Debug, Serialize)] 5 #[serde(rename_all = "camelCase")] 6 pub struct ProfileAssociated { 7 pub lists: i64, ··· 12 pub chat: Option<ProfileAssociatedChat>, 13 } 14 15 + #[derive(Clone, Debug, Serialize)] 16 #[serde(rename_all = "camelCase")] 17 pub struct ProfileAssociatedChat { 18 pub allow_incoming: ChatAllowIncoming, 19 } 20 21 + #[derive(Copy, Clone, Debug, Serialize)] 22 #[serde(rename_all = "lowercase")] 23 pub enum ChatAllowIncoming { 24 All, ··· 45 pub created_at: DateTime<Utc>, 46 } 47 48 + #[derive(Clone, Debug, Serialize)] 49 #[serde(rename_all = "camelCase")] 50 pub struct ProfileView { 51 pub did: String,
+81
lexica/src/app_bsky/graph.rs
···
··· 1 + use std::str::FromStr; 2 + use crate::app_bsky::actor::ProfileView; 3 + use crate::app_bsky::richtext::FacetMain; 4 + use chrono::prelude::*; 5 + use serde::{Deserialize, Serialize}; 6 + 7 + #[derive(Debug, Serialize)] 8 + #[serde(rename_all = "camelCase")] 9 + pub struct ListViewBasic { 10 + pub uri: String, 11 + pub cid: String, 12 + pub name: String, 13 + pub purpose: ListPurpose, 14 + 15 + #[serde(skip_serializing_if = "Option::is_none")] 16 + pub avatar: Option<String>, 17 + pub list_item_count: i64, 18 + 19 + // #[serde(skip_serializing_if = "Option::is_none")] 20 + // pub viewer: Option<()>, 21 + // pub labels: Vec<()>, 22 + 23 + pub indexed_at: NaiveDateTime, 24 + } 25 + 26 + #[derive(Debug, Serialize)] 27 + #[serde(rename_all = "camelCase")] 28 + pub struct ListView { 29 + pub uri: String, 30 + pub cid: String, 31 + pub name: String, 32 + pub creator: ProfileView, 33 + pub purpose: ListPurpose, 34 + 35 + #[serde(skip_serializing_if = "Option::is_none")] 36 + pub description: Option<String>, 37 + #[serde(skip_serializing_if = "Option::is_none")] 38 + pub description_facets: Option<Vec<FacetMain>>, 39 + 40 + #[serde(skip_serializing_if = "Option::is_none")] 41 + pub avatar: Option<String>, 42 + pub list_item_count: i64, 43 + 44 + // #[serde(skip_serializing_if = "Option::is_none")] 45 + // pub viewer: Option<()>, 46 + // pub labels: Vec<()>, 47 + 48 + pub indexed_at: NaiveDateTime, 49 + } 50 + 51 + #[derive(Debug, Serialize)] 52 + pub struct ListItemView { 53 + pub uri: String, 54 + pub subject: ProfileView, 55 + } 56 + 57 + #[derive(Debug, Deserialize, Serialize)] 58 + pub enum ListPurpose { 59 + /// A list of actors to apply an aggregate moderation action (mute/block) on. 60 + #[serde(rename = "app.bsky.graph.defs#modlist")] 61 + ModList, 62 + /// A list of actors used for curation purposes such as list feeds or interaction gating. 63 + #[serde(rename = "app.bsky.graph.defs#curatelist")] 64 + CurateList, 65 + /// A list of actors used for only for reference purposes such as within a starter pack. 66 + #[serde(rename = "app.bsky.graph.defs#referencelist")] 67 + ReferenceList, 68 + } 69 + 70 + impl FromStr for ListPurpose { 71 + type Err = (); 72 + 73 + fn from_str(s: &str) -> Result<Self, Self::Err> { 74 + match s { 75 + "app.bsky.graph.defs#modlist" => Ok(Self::ModList), 76 + "app.bsky.graph.defs#curatelist" => Ok(Self::CurateList), 77 + "app.bsky.graph.defs#referencelist" => Ok(Self::ReferenceList), 78 + _ => Err(()), 79 + } 80 + } 81 + }
+1
lexica/src/app_bsky/mod.rs
··· 1 pub mod actor; 2 pub mod richtext;
··· 1 pub mod actor; 2 + pub mod graph; 3 pub mod richtext;
+1
parakeet/Cargo.toml
··· 16 lexica = { path = "../lexica" } 17 parakeet-db = { path = "../parakeet-db" } 18 serde = { version = "1.0.217", features = ["derive"] } 19 tokio = { version = "1.42.0", features = ["full"] } 20 tower-http = { version = "0.6.2", features = ["trace"] } 21 tracing = "0.1.40"
··· 16 lexica = { path = "../lexica" } 17 parakeet-db = { path = "../parakeet-db" } 18 serde = { version = "1.0.217", features = ["derive"] } 19 + serde_json = "1.0.134" 20 tokio = { version = "1.42.0", features = ["full"] } 21 tower-http = { version = "0.6.2", features = ["trace"] } 22 tracing = "0.1.40"
+91
parakeet/src/hydration/list.rs
···
··· 1 + use crate::hydration::profile::{hydrate_profile, hydrate_profiles}; 2 + use crate::loaders::Dataloaders; 3 + use lexica::app_bsky::actor::ProfileView; 4 + use lexica::app_bsky::graph::{ListPurpose, ListView, ListViewBasic}; 5 + use parakeet_db::models; 6 + use std::collections::HashMap; 7 + use std::str::FromStr; 8 + 9 + fn build_basic(list: models::List, list_item_count: i64) -> Option<ListViewBasic> { 10 + let purpose = ListPurpose::from_str(&list.list_type).ok()?; 11 + 12 + Some(ListViewBasic { 13 + uri: list.at_uri, 14 + cid: list.cid, 15 + name: list.name, 16 + purpose, 17 + avatar: list 18 + .avatar_cid 19 + .map(|v| format!("https://localhost/list/{v}")), 20 + list_item_count, 21 + indexed_at: list.indexed_at, 22 + }) 23 + } 24 + 25 + fn build_listview( 26 + list: models::List, 27 + list_item_count: i64, 28 + creator: ProfileView, 29 + ) -> Option<ListView> { 30 + let purpose = ListPurpose::from_str(&list.list_type).ok()?; 31 + 32 + let description_facets = list 33 + .description_facets 34 + .and_then(|v| serde_json::from_value(v).ok()); 35 + 36 + Some(ListView { 37 + uri: list.at_uri, 38 + cid: list.cid, 39 + name: list.name, 40 + creator, 41 + purpose, 42 + description: list.description, 43 + description_facets, 44 + avatar: list 45 + .avatar_cid 46 + .map(|v| format!("https://localhost/list/{v}")), 47 + list_item_count, 48 + indexed_at: list.indexed_at, 49 + }) 50 + } 51 + 52 + pub async fn hydrate_list_basic(loaders: &Dataloaders, list: String) -> Option<ListViewBasic> { 53 + let (list, count) = loaders.list.load(list).await?; 54 + 55 + build_basic(list, count) 56 + } 57 + 58 + pub async fn hydrate_lists_basic( 59 + loaders: &Dataloaders, 60 + lists: Vec<String>, 61 + ) -> HashMap<String, ListViewBasic> { 62 + let lists = loaders.list.load_many(lists).await; 63 + 64 + lists 65 + .into_iter() 66 + .filter_map(|(uri, (list, count))| build_basic(list, count).map(|v| (uri, v))) 67 + .collect() 68 + } 69 + 70 + pub async fn hydrate_list(loaders: &Dataloaders, list: String) -> Option<ListView> { 71 + let (list, count) = loaders.list.load(list).await?; 72 + let profile = hydrate_profile(loaders, list.owner.clone()).await?; 73 + 74 + build_listview(list, count, profile) 75 + } 76 + 77 + pub async fn hydrate_lists(loaders: &Dataloaders, lists: Vec<String>) -> HashMap<String, ListView> { 78 + let lists = loaders.list.load_many(lists).await; 79 + 80 + let creators = lists.values().map(|(list, _)| list.owner.clone()).collect(); 81 + let creators = hydrate_profiles(loaders, creators).await; 82 + 83 + lists 84 + .into_iter() 85 + .filter_map(|(uri, (list, count))| { 86 + let creator = creators.get(&list.owner)?; 87 + 88 + build_listview(list, count, creator.to_owned()).map(|v| (uri, v)) 89 + }) 90 + .collect() 91 + }
+4 -1
parakeet/src/hydration/mod.rs
··· 1 - pub mod profile;
··· 1 + #![allow(unused)] 2 + 3 + pub mod list; 4 + pub mod profile;
+43 -3
parakeet/src/loaders.rs
··· 8 9 pub struct Dataloaders { 10 pub handle: Loader<String, String, HandleLoader>, 11 pub profile: Loader<String, ProfileLoaderRet, ProfileLoader>, 12 } 13 ··· 17 pub fn new(pool: Pool<AsyncPgConnection>) -> Dataloaders { 18 Dataloaders { 19 handle: Loader::new(HandleLoader(pool.clone())), 20 profile: Loader::new(ProfileLoader(pool.clone())), 21 } 22 } ··· 64 Option::<models::FollowStats>::as_select(), 65 )) 66 .filter(schema::actors::did.eq_any(keys)) 67 - .load::<(String, Option<String>, models::Profile, Option<models::FollowStats>)>(&mut conn) 68 .await; 69 70 match res { 71 Ok(res) => HashMap::from_iter( 72 res.into_iter() 73 - .map(|(did, handle, profile, follow_stats)| (did, (handle, profile, follow_stats))), 74 ), 75 Err(e) => { 76 - tracing::error!("profile load failed: {e}"); 77 HashMap::new() 78 } 79 }
··· 8 9 pub struct Dataloaders { 10 pub handle: Loader<String, String, HandleLoader>, 11 + pub list: Loader<String, ListLoaderRet, ListLoader>, 12 pub profile: Loader<String, ProfileLoaderRet, ProfileLoader>, 13 } 14 ··· 18 pub fn new(pool: Pool<AsyncPgConnection>) -> Dataloaders { 19 Dataloaders { 20 handle: Loader::new(HandleLoader(pool.clone())), 21 + list: Loader::new(ListLoader(pool.clone())), 22 profile: Loader::new(ProfileLoader(pool.clone())), 23 } 24 } ··· 66 Option::<models::FollowStats>::as_select(), 67 )) 68 .filter(schema::actors::did.eq_any(keys)) 69 + .load::<( 70 + String, 71 + Option<String>, 72 + models::Profile, 73 + Option<models::FollowStats>, 74 + )>(&mut conn) 75 + .await; 76 + 77 + match res { 78 + Ok(res) => { 79 + HashMap::from_iter(res.into_iter().map(|(did, handle, profile, follow_stats)| { 80 + (did, (handle, profile, follow_stats)) 81 + })) 82 + } 83 + Err(e) => { 84 + tracing::error!("profile load failed: {e}"); 85 + HashMap::new() 86 + } 87 + } 88 + } 89 + } 90 + 91 + pub struct ListLoader(Pool<AsyncPgConnection>); 92 + type ListLoaderRet = (models::List, i64); 93 + impl BatchFn<String, ListLoaderRet> for ListLoader { 94 + async fn load(&mut self, keys: &[String]) -> HashMap<String, ListLoaderRet> { 95 + let mut conn = self.0.get().await.unwrap(); 96 + 97 + let res = schema::lists::table 98 + .left_join( 99 + schema::list_items::table.on(schema::list_items::at_uri.eq(schema::lists::at_uri)), 100 + ) 101 + .group_by(schema::lists::all_columns) 102 + .select(( 103 + models::List::as_select(), 104 + diesel::dsl::count(schema::lists::at_uri.assume_not_null()), 105 + )) 106 + .filter(schema::lists::at_uri.eq_any(keys)) 107 + .load::<(models::List, i64)>(&mut conn) 108 .await; 109 110 match res { 111 Ok(res) => HashMap::from_iter( 112 res.into_iter() 113 + .map(|(list, count)| (list.at_uri.clone(), (list, count))), 114 ), 115 Err(e) => { 116 + tracing::error!("list load failed: {e}"); 117 HashMap::new() 118 } 119 }
+127
parakeet/src/xrpc/app_bsky/graph/lists.rs
···
··· 1 + use crate::xrpc::error::{Error, XrpcResult}; 2 + use crate::xrpc::{datetime_cursor, get_actor_did, ActorWithCursorQuery}; 3 + use crate::{hydration, GlobalState}; 4 + use axum::extract::{Query, State}; 5 + use axum::Json; 6 + use diesel::prelude::*; 7 + use diesel_async::RunQueryDsl; 8 + use lexica::app_bsky::graph::{ListItemView, ListView}; 9 + use parakeet_db::{models, schema}; 10 + use serde::{Deserialize, Serialize}; 11 + 12 + #[derive(Debug, Deserialize)] 13 + pub struct ListWithCursorQuery { 14 + pub list: String, 15 + pub limit: Option<u8>, 16 + pub cursor: Option<String>, 17 + } 18 + 19 + #[derive(Debug, Serialize)] 20 + pub struct AppBskyGraphGetListsRes { 21 + #[serde(skip_serializing_if = "Option::is_none")] 22 + cursor: Option<String>, 23 + lists: Vec<ListView>, 24 + } 25 + 26 + pub async fn get_lists( 27 + State(state): State<GlobalState>, 28 + Query(query): Query<ActorWithCursorQuery>, 29 + ) -> XrpcResult<Json<AppBskyGraphGetListsRes>> { 30 + let mut conn = state.pool.get().await?; 31 + 32 + let did = get_actor_did(&state.dataloaders, query.actor).await?; 33 + 34 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 35 + 36 + let mut items_query = schema::lists::table 37 + .select((schema::lists::created_at, schema::lists::at_uri)) 38 + .filter(schema::lists::owner.eq(did)) 39 + .into_boxed(); 40 + 41 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 42 + items_query = items_query.filter(schema::lists::created_at.lt(cursor)); 43 + } 44 + 45 + let results = items_query 46 + .order(schema::lists::created_at.desc()) 47 + .limit(limit as i64) 48 + .load::<(chrono::DateTime<chrono::Utc>, String)>(&mut conn) 49 + .await?; 50 + 51 + let cursor = results 52 + .last() 53 + .map(|(last, _)| last.timestamp_millis().to_string()); 54 + 55 + let at_uris = results.iter().map(|(_, uri)| uri.clone()).collect(); 56 + 57 + let mut lists = hydration::list::hydrate_lists(&state.dataloaders, at_uris).await; 58 + 59 + let lists = results 60 + .into_iter() 61 + .filter_map(|(_, uri)| lists.remove(&uri)) 62 + .collect(); 63 + 64 + Ok(Json(AppBskyGraphGetListsRes { cursor, lists })) 65 + } 66 + 67 + #[derive(Debug, Serialize)] 68 + pub struct AppBskyGraphGetListRes { 69 + #[serde(skip_serializing_if = "Option::is_none")] 70 + cursor: Option<String>, 71 + list: ListView, 72 + items: Vec<ListItemView>, 73 + } 74 + 75 + pub async fn get_list( 76 + State(state): State<GlobalState>, 77 + Query(query): Query<ListWithCursorQuery>, 78 + ) -> XrpcResult<Json<AppBskyGraphGetListRes>> { 79 + let mut conn = state.pool.get().await?; 80 + 81 + let Some(list) = hydration::list::hydrate_list(&state.dataloaders, query.list).await else { 82 + return Err(Error::not_found()); 83 + }; 84 + 85 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 86 + 87 + let mut items_query = schema::list_items::table 88 + .select(models::ListItem::as_select()) 89 + .filter(schema::list_items::list_uri.eq(&list.uri)) 90 + .into_boxed(); 91 + 92 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 93 + items_query = items_query.filter(schema::list_items::created_at.lt(cursor)); 94 + } 95 + 96 + let results = items_query 97 + .order(schema::list_items::created_at.desc()) 98 + .limit(limit as i64) 99 + .load(&mut conn) 100 + .await?; 101 + 102 + let cursor = results 103 + .last() 104 + .map(|last| last.created_at.and_utc().timestamp_millis().to_string()); 105 + 106 + let dids = results.iter().map(|item| item.subject.clone()).collect(); 107 + 108 + let mut profiles = hydration::profile::hydrate_profiles(&state.dataloaders, dids).await; 109 + 110 + let items = results 111 + .into_iter() 112 + .filter_map(|item| { 113 + let subject = profiles.remove(&item.subject)?; 114 + 115 + Some(ListItemView { 116 + uri: item.at_uri, 117 + subject, 118 + }) 119 + }) 120 + .collect(); 121 + 122 + Ok(Json(AppBskyGraphGetListRes { 123 + cursor, 124 + list, 125 + items, 126 + })) 127 + }
+1
parakeet/src/xrpc/app_bsky/graph/mod.rs
··· 1 pub mod relations;
··· 1 + pub mod lists; 2 pub mod relations;
+2
parakeet/src/xrpc/app_bsky/mod.rs
··· 10 .route("/app.bsky.actor.getProfiles", get(actor::get_profiles)) 11 .route("/app.bsky.graph.getFollowers", get(graph::relations::get_followers)) 12 .route("/app.bsky.graph.getFollows", get(graph::relations::get_follows)) 13 }
··· 10 .route("/app.bsky.actor.getProfiles", get(actor::get_profiles)) 11 .route("/app.bsky.graph.getFollowers", get(graph::relations::get_followers)) 12 .route("/app.bsky.graph.getFollows", get(graph::relations::get_follows)) 13 + .route("/app.bsky.graph.getList", get(graph::lists::get_list)) 14 + .route("/app.bsky.graph.getLists", get(graph::lists::get_lists)) 15 }