Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

feat: feed generators

+412 -1
+42
consumer/src/indexer/db.rs
··· 257 257 .execute(conn) 258 258 .await 259 259 } 260 + 261 + pub async fn upsert_feedgen( 262 + conn: &mut AsyncPgConnection, 263 + repo: &str, 264 + cid: Cid, 265 + at_uri: &str, 266 + rec: records::AppBskyFeedGenerator, 267 + ) -> QueryResult<usize> { 268 + let description_facets = rec 269 + .description_facets 270 + .and_then(|v| serde_json::to_value(v).ok()); 271 + 272 + let data = models::UpsertFeedGen { 273 + at_uri, 274 + cid: &cid.to_string(), 275 + owner: repo, 276 + service_did: &rec.did, 277 + content_mode: rec.content_mode, 278 + name: &rec.display_name, 279 + description: rec.description, 280 + description_facets, 281 + avatar_cid: blob_ref(rec.avatar), 282 + accepts_interactions: rec.accepts_interactions, 283 + created_at: rec.created_at.naive_utc(), 284 + indexed_at: Utc::now().naive_utc(), 285 + }; 286 + 287 + diesel::insert_into(schema::feedgens::table) 288 + .values(&data) 289 + .on_conflict(schema::feedgens::at_uri) 290 + .do_update() 291 + .set(&data) 292 + .execute(conn) 293 + .await 294 + } 295 + 296 + pub async fn delete_feedgen(conn: &mut AsyncPgConnection, at_uri: &str) -> QueryResult<usize> { 297 + diesel::delete(schema::feedgens::table) 298 + .filter(schema::feedgens::at_uri.eq(at_uri)) 299 + .execute(conn) 300 + .await 301 + }
+4
consumer/src/indexer/mod.rs
··· 221 221 222 222 db::upsert_profile(conn, repo, cid, record).await?; 223 223 } 224 + RecordTypes::AppBskyFeedGenerator(record) => { 225 + db::upsert_feedgen(conn, repo, cid, &full_path, record).await?; 226 + } 224 227 RecordTypes::AppBskyGraphBlock(record) => { 225 228 db::insert_block(conn, repo, &full_path, record).await?; 226 229 } ··· 252 255 } else if op.action == "delete" { 253 256 match collection { 254 257 CollectionType::BskyBlock => db::delete_block(conn, &full_path).await?, 258 + CollectionType::BskyFeedGen => db::delete_feedgen(conn, &full_path).await?, 255 259 CollectionType::BskyFollow => { 256 260 if let Some(subject) = db::delete_follow(conn, &full_path).await? { 257 261 db::update_follow_stats(conn, &[repo, &subject]).await?
+14
consumer/src/indexer/records.rs
··· 39 39 40 40 #[derive(Debug, Deserialize, Serialize)] 41 41 #[serde(rename_all = "camelCase")] 42 + pub struct AppBskyFeedGenerator { 43 + pub did: String, 44 + pub display_name: String, 45 + pub description: Option<String>, 46 + pub description_facets: Option<Vec<FacetMain>>, 47 + pub avatar: Option<Blob>, 48 + pub accepts_interactions: Option<bool>, 49 + // pub labels: Option<Vec<()>>, 50 + pub content_mode: Option<String>, 51 + pub created_at: DateTime<Utc>, 52 + } 53 + 54 + #[derive(Debug, Deserialize, Serialize)] 55 + #[serde(rename_all = "camelCase")] 42 56 pub struct AppBskyGraphBlock { 43 57 pub subject: String, 44 58 pub created_at: DateTime<Utc>,
+5
consumer/src/indexer/types.rs
··· 6 6 pub enum RecordTypes { 7 7 #[serde(rename = "app.bsky.actor.profile")] 8 8 AppBskyActorProfile(records::AppBskyActorProfile), 9 + #[serde(rename = "app.bsky.feed.generator")] 10 + AppBskyFeedGenerator(records::AppBskyFeedGenerator), 9 11 #[serde(rename = "app.bsky.graph.block")] 10 12 AppBskyGraphBlock(records::AppBskyGraphBlock), 11 13 #[serde(rename = "app.bsky.graph.follow")] ··· 21 23 #[derive(Debug, PartialOrd, PartialEq)] 22 24 pub enum CollectionType { 23 25 BskyProfile, 26 + BskyFeedGen, 24 27 BskyBlock, 25 28 BskyFollow, 26 29 BskyList, ··· 33 36 pub(crate) fn from_str(input: &str) -> CollectionType { 34 37 match input { 35 38 "app.bsky.actor.profile" => CollectionType::BskyProfile, 39 + "app.bsky.feed.generator" => CollectionType::BskyFeedGen, 36 40 "app.bsky.graph.block" => CollectionType::BskyBlock, 37 41 "app.bsky.graph.follow" => CollectionType::BskyFollow, 38 42 "app.bsky.graph.list" => CollectionType::BskyList, ··· 45 49 pub fn can_update(&self) -> bool { 46 50 match self { 47 51 CollectionType::BskyProfile => true, 52 + CollectionType::BskyFeedGen => true, 48 53 CollectionType::BskyBlock => false, 49 54 CollectionType::BskyFollow => false, 50 55 CollectionType::BskyList => true,
+53
lexica/src/app_bsky/feed.rs
··· 1 + use crate::app_bsky::actor::ProfileView; 2 + use crate::app_bsky::richtext::FacetMain; 3 + use chrono::prelude::*; 4 + use serde::Serialize; 5 + use std::str::FromStr; 6 + 7 + #[derive(Debug, Serialize)] 8 + #[serde(rename_all = "camelCase")] 9 + pub struct GeneratorView { 10 + pub uri: String, 11 + pub cid: String, 12 + pub did: String, 13 + pub creator: ProfileView, 14 + pub display_name: String, 15 + 16 + #[serde(skip_serializing_if = "Option::is_none")] 17 + pub description: Option<String>, 18 + #[serde(skip_serializing_if = "Option::is_none")] 19 + pub description_facets: Option<Vec<FacetMain>>, 20 + 21 + #[serde(skip_serializing_if = "Option::is_none")] 22 + pub avatar: Option<String>, 23 + pub like_count: i64, 24 + 25 + pub accepts_interactions: bool, 26 + // pub labels: Vec<()>, 27 + // #[serde(skip_serializing_if = "Option::is_none")] 28 + // pub viewer: Option<()>, 29 + #[serde(skip_serializing_if = "Option::is_none")] 30 + pub content_mode: Option<GeneratorContentMode>, 31 + 32 + pub indexed_at: NaiveDateTime, 33 + } 34 + 35 + #[derive(Debug, Serialize)] 36 + pub enum GeneratorContentMode { 37 + #[serde(rename = "app.bsky.feed.defs#contentModeUnspecified")] 38 + Unspecified, 39 + #[serde(rename = "app.bsky.feed.defs#contentModeVideo")] 40 + Video, 41 + } 42 + 43 + impl FromStr for GeneratorContentMode { 44 + type Err = (); 45 + 46 + fn from_str(s: &str) -> Result<Self, Self::Err> { 47 + match s { 48 + "app.bsky.feed.defs#contentModeUnspecified" => Ok(Self::Unspecified), 49 + "app.bsky.feed.defs#contentModeVideo" => Ok(Self::Video), 50 + _ => Err(()), 51 + } 52 + } 53 + }
+1
lexica/src/app_bsky/mod.rs
··· 1 1 pub mod actor; 2 + pub mod feed; 2 3 pub mod graph; 3 4 pub mod richtext;
+1
migrations/2025-02-08-162441_feedgen/down.sql
··· 1 + drop table feedgens;
+20
migrations/2025-02-08-162441_feedgen/up.sql
··· 1 + create table feedgens 2 + ( 3 + at_uri text primary key, 4 + cid text not null, 5 + owner text not null references actors (did), 6 + 7 + service_did text not null, 8 + content_mode text, 9 + name text not null, 10 + 11 + description text, 12 + description_facets jsonb, 13 + avatar_cid text, 14 + accepts_interactions bool not null default false, 15 + 16 + created_at timestamptz not null default now(), 17 + indexed_at timestamp not null default now() 18 + ); 19 + 20 + create index feedgens_owner_index on feedgens using hash (owner);
+46 -1
parakeet-db/src/models.rs
··· 193 193 194 194 pub created_at: NaiveDateTime, 195 195 pub indexed_at: NaiveDateTime, 196 - } 196 + } 197 + 198 + #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 199 + #[diesel(table_name = crate::schema::feedgens)] 200 + #[diesel(primary_key(at_uri))] 201 + #[diesel(check_for_backend(diesel::pg::Pg))] 202 + pub struct FeedGen { 203 + pub at_uri: String, 204 + pub cid: String, 205 + pub owner: String, 206 + 207 + pub service_did: String, 208 + pub content_mode: Option<String>, 209 + pub name: String, 210 + 211 + pub description: Option<String>, 212 + pub description_facets: Option<serde_json::Value>, 213 + pub avatar_cid: Option<String>, 214 + pub accepts_interactions: bool, 215 + 216 + pub created_at: NaiveDateTime, 217 + pub indexed_at: NaiveDateTime, 218 + } 219 + 220 + #[derive(Insertable, AsChangeset)] 221 + #[diesel(table_name = crate::schema::feedgens)] 222 + #[diesel(check_for_backend(diesel::pg::Pg))] 223 + #[diesel(treat_none_as_null = true)] 224 + pub struct UpsertFeedGen<'a> { 225 + pub at_uri: &'a str, 226 + pub cid: &'a str, 227 + pub owner: &'a str, 228 + 229 + pub service_did: &'a str, 230 + pub content_mode: Option<String>, 231 + pub name: &'a str, 232 + 233 + pub description: Option<String>, 234 + pub description_facets: Option<serde_json::Value>, 235 + pub avatar_cid: Option<String>, 236 + #[diesel(treat_none_as_null = false)] 237 + pub accepts_interactions: Option<bool>, 238 + 239 + pub created_at: NaiveDateTime, 240 + pub indexed_at: NaiveDateTime, 241 + }
+19
parakeet-db/src/schema.rs
··· 22 22 } 23 23 24 24 diesel::table! { 25 + feedgens (at_uri) { 26 + at_uri -> Text, 27 + cid -> Text, 28 + owner -> Text, 29 + service_did -> Text, 30 + content_mode -> Nullable<Text>, 31 + name -> Text, 32 + description -> Nullable<Text>, 33 + description_facets -> Nullable<Jsonb>, 34 + avatar_cid -> Nullable<Text>, 35 + accepts_interactions -> Bool, 36 + created_at -> Timestamptz, 37 + indexed_at -> Timestamp, 38 + } 39 + } 40 + 41 + diesel::table! { 25 42 follow_stats (did) { 26 43 did -> Text, 27 44 followers -> Int4, ··· 91 108 } 92 109 93 110 diesel::joinable!(blocks -> actors (did)); 111 + diesel::joinable!(feedgens -> actors (owner)); 94 112 diesel::joinable!(follows -> actors (did)); 95 113 diesel::joinable!(list_blocks -> actors (did)); 96 114 diesel::joinable!(list_blocks -> lists (list_uri)); ··· 101 119 diesel::allow_tables_to_appear_in_same_query!( 102 120 actors, 103 121 blocks, 122 + feedgens, 104 123 follow_stats, 105 124 follows, 106 125 list_blocks,
+63
parakeet/src/hydration/feedgen.rs
··· 1 + use crate::hydration::profile::{hydrate_profile, hydrate_profiles}; 2 + use crate::loaders::Dataloaders; 3 + use lexica::app_bsky::actor::ProfileView; 4 + use lexica::app_bsky::feed::{GeneratorContentMode, GeneratorView}; 5 + use parakeet_db::models; 6 + use std::collections::HashMap; 7 + use std::str::FromStr; 8 + 9 + fn build_feedgen(feedgen: models::FeedGen, creator: ProfileView) -> GeneratorView { 10 + let content_mode = feedgen 11 + .content_mode 12 + .and_then(|cm| GeneratorContentMode::from_str(&cm).ok()); 13 + 14 + let description_facets = feedgen 15 + .description_facets 16 + .and_then(|v| serde_json::from_value(v).ok()); 17 + 18 + GeneratorView { 19 + uri: feedgen.at_uri, 20 + cid: feedgen.cid, 21 + did: feedgen.service_did, 22 + creator, 23 + display_name: feedgen.name, 24 + description: feedgen.description, 25 + description_facets, 26 + avatar: feedgen 27 + .avatar_cid 28 + .map(|v| format!("https://localhost/feedgen/{v}")), 29 + like_count: 0, 30 + accepts_interactions: feedgen.accepts_interactions, 31 + content_mode, 32 + indexed_at: feedgen.indexed_at, 33 + } 34 + } 35 + 36 + pub async fn hydrate_feedgen(loaders: &Dataloaders, feedgen: String) -> Option<GeneratorView> { 37 + let feedgen = loaders.feedgen.load(feedgen).await?; 38 + let profile = hydrate_profile(loaders, feedgen.owner.clone()).await?; 39 + 40 + Some(build_feedgen(feedgen, profile)) 41 + } 42 + 43 + pub async fn hydrate_feedgens( 44 + loaders: &Dataloaders, 45 + feedgens: Vec<String>, 46 + ) -> HashMap<String, GeneratorView> { 47 + let feedgens = loaders.feedgen.load_many(feedgens).await; 48 + 49 + let creators = feedgens 50 + .values() 51 + .map(|feedgen| feedgen.owner.clone()) 52 + .collect(); 53 + let creators = hydrate_profiles(loaders, creators).await; 54 + 55 + feedgens 56 + .into_iter() 57 + .filter_map(|(uri, feedgen)| { 58 + let creator = creators.get(&feedgen.owner)?; 59 + 60 + Some((uri, build_feedgen(feedgen, creator.to_owned()))) 61 + }) 62 + .collect() 63 + }
+1
parakeet/src/hydration/mod.rs
··· 1 1 #![allow(unused)] 2 2 3 + pub mod feedgen; 3 4 pub mod list; 4 5 pub mod profile;
+27
parakeet/src/loaders.rs
··· 7 7 use std::collections::HashMap; 8 8 9 9 pub struct Dataloaders { 10 + pub feedgen: Loader<String, FeedGenLoaderRet, FeedGenLoader>, 10 11 pub handle: Loader<String, String, HandleLoader>, 11 12 pub list: Loader<String, ListLoaderRet, ListLoader>, 12 13 pub profile: Loader<String, ProfileLoaderRet, ProfileLoader>, ··· 17 18 // we should build a redis/valkey backend at some point in the future. 18 19 pub fn new(pool: Pool<AsyncPgConnection>) -> Dataloaders { 19 20 Dataloaders { 21 + feedgen: Loader::new(FeedGenLoader(pool.clone())), 20 22 handle: Loader::new(HandleLoader(pool.clone())), 21 23 list: Loader::new(ListLoader(pool.clone())), 22 24 profile: Loader::new(ProfileLoader(pool.clone())), ··· 119 121 } 120 122 } 121 123 } 124 + 125 + pub struct FeedGenLoader(Pool<AsyncPgConnection>); 126 + type FeedGenLoaderRet = models::FeedGen; //todo: when we have likes, we'll need the count here 127 + impl BatchFn<String, FeedGenLoaderRet> for FeedGenLoader { 128 + async fn load(&mut self, keys: &[String]) -> HashMap<String, FeedGenLoaderRet> { 129 + let mut conn = self.0.get().await.unwrap(); 130 + 131 + let res = schema::feedgens::table 132 + .select(models::FeedGen::as_select()) 133 + .filter(schema::feedgens::at_uri.eq_any(keys)) 134 + .load(&mut conn) 135 + .await; 136 + 137 + match res { 138 + Ok(res) => HashMap::from_iter( 139 + res.into_iter() 140 + .map(|feedgen| (feedgen.at_uri.clone(), feedgen)), 141 + ), 142 + Err(e) => { 143 + tracing::error!("feedgen load failed: {e}"); 144 + HashMap::new() 145 + } 146 + } 147 + } 148 + }
+111
parakeet/src/xrpc/app_bsky/feed/feedgen.rs
··· 1 + use crate::xrpc::error::{Error, XrpcResult}; 2 + use crate::xrpc::{datetime_cursor, get_actor_did, ActorWithCursorQuery}; 3 + use crate::{hydration, GlobalState}; 4 + use axum::extract::{Query, State}; 5 + use axum::Json; 6 + use axum_extra::extract::Query as ExtraQuery; 7 + use diesel::prelude::*; 8 + use diesel_async::RunQueryDsl; 9 + use lexica::app_bsky::feed::GeneratorView; 10 + use parakeet_db::schema; 11 + use serde::{Deserialize, Serialize}; 12 + 13 + #[derive(Debug, Serialize)] 14 + pub struct GetActorFeedRes { 15 + #[serde(skip_serializing_if = "Option::is_none")] 16 + cursor: Option<String>, 17 + feeds: Vec<GeneratorView>, 18 + } 19 + 20 + pub async fn get_actor_feeds( 21 + State(state): State<GlobalState>, 22 + Query(query): Query<ActorWithCursorQuery>, 23 + ) -> XrpcResult<Json<GetActorFeedRes>> { 24 + let mut conn = state.pool.get().await?; 25 + 26 + let did = get_actor_did(&state.dataloaders, query.actor).await?; 27 + 28 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 29 + 30 + let mut feeds_query = schema::feedgens::table 31 + .select((schema::feedgens::created_at, schema::feedgens::at_uri)) 32 + .filter(schema::feedgens::owner.eq(did)) 33 + .into_boxed(); 34 + 35 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 36 + feeds_query = feeds_query.filter(schema::feedgens::created_at.lt(cursor)); 37 + } 38 + 39 + let results = feeds_query 40 + .order(schema::feedgens::created_at.desc()) 41 + .limit(limit as i64) 42 + .load::<(chrono::DateTime<chrono::Utc>, String)>(&mut conn) 43 + .await?; 44 + 45 + let cursor = results 46 + .last() 47 + .map(|(last, _)| last.timestamp_millis().to_string()); 48 + 49 + let at_uris = results.iter().map(|(_, uri)| uri.clone()).collect(); 50 + 51 + let mut feeds = hydration::feedgen::hydrate_feedgens(&state.dataloaders, at_uris).await; 52 + 53 + let feeds = results 54 + .into_iter() 55 + .filter_map(|(_, uri)| feeds.remove(&uri)) 56 + .collect(); 57 + 58 + Ok(Json(GetActorFeedRes { cursor, feeds })) 59 + } 60 + 61 + #[derive(Debug, Deserialize)] 62 + pub struct GetFeedGeneratorQuery { 63 + pub feed: String, 64 + } 65 + 66 + #[derive(Debug, Serialize)] 67 + #[serde(rename_all = "camelCase")] 68 + pub struct GetFeedGeneratorRes { 69 + pub view: GeneratorView, 70 + pub is_online: bool, 71 + pub is_valid: bool, 72 + } 73 + 74 + pub async fn get_feed_generator( 75 + State(state): State<GlobalState>, 76 + Query(query): Query<GetFeedGeneratorQuery>, 77 + ) -> XrpcResult<Json<GetFeedGeneratorRes>> { 78 + let Some(view) = hydration::feedgen::hydrate_feedgen(&state.dataloaders, query.feed).await 79 + else { 80 + return Err(Error::not_found()); 81 + }; 82 + 83 + // todo: make the two flags work 84 + Ok(Json(GetFeedGeneratorRes { 85 + view, 86 + is_online: true, 87 + is_valid: true, 88 + })) 89 + } 90 + 91 + #[derive(Debug, Deserialize)] 92 + pub struct FeedsQuery { 93 + pub feeds: Vec<String>, 94 + } 95 + 96 + #[derive(Debug, Serialize)] 97 + pub struct GetFeedGeneratorsRes { 98 + pub feeds: Vec<GeneratorView>, 99 + } 100 + 101 + pub async fn get_feed_generators( 102 + State(state): State<GlobalState>, 103 + ExtraQuery(query): ExtraQuery<FeedsQuery>, 104 + ) -> XrpcResult<Json<GetFeedGeneratorsRes>> { 105 + let feeds = hydration::feedgen::hydrate_feedgens(&state.dataloaders, query.feeds) 106 + .await 107 + .into_values() 108 + .collect(); 109 + 110 + Ok(Json(GetFeedGeneratorsRes { feeds })) 111 + }
+1
parakeet/src/xrpc/app_bsky/feed/mod.rs
··· 1 + pub mod feedgen;
+4
parakeet/src/xrpc/app_bsky/mod.rs
··· 2 2 use axum::Router; 3 3 4 4 mod actor; 5 + mod feed; 5 6 mod graph; 6 7 7 8 pub fn routes() -> Router<crate::GlobalState> { 8 9 Router::new() 9 10 .route("/app.bsky.actor.getProfile", get(actor::get_profile)) 10 11 .route("/app.bsky.actor.getProfiles", get(actor::get_profiles)) 12 + .route("/app.bsky.feed.getActorFeeds", get(feed::feedgen::get_actor_feeds)) 13 + .route("/app.bsky.feed.getFeedGenerator", get(feed::feedgen::get_feed_generator)) 14 + .route("/app.bsky.feed.getFeedGenerators", get(feed::feedgen::get_feed_generators)) 11 15 .route("/app.bsky.graph.getFollowers", get(graph::relations::get_followers)) 12 16 .route("/app.bsky.graph.getFollows", get(graph::relations::get_follows)) 13 17 .route("/app.bsky.graph.getList", get(graph::lists::get_list))