Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

blocks and follows

+288 -37
+63
consumer/src/indexer/db.rs
··· 1 + use super::records; 1 2 use chrono::prelude::*; 2 3 use diesel::prelude::*; 4 + use diesel::sql_types::{Array, Text}; 3 5 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 4 6 use ipld_core::cid::Cid; 5 7 use parakeet_db::{models, schema, types}; ··· 60 62 .execute(conn) 61 63 .await 62 64 } 65 + 66 + pub async fn insert_block( 67 + conn: &mut AsyncPgConnection, 68 + repo: &str, 69 + at_uri: &str, 70 + rec: records::AppBskyGraphBlock, 71 + ) -> QueryResult<usize> { 72 + diesel::insert_into(schema::blocks::table) 73 + .values(&models::NewBlock { 74 + at_uri, 75 + did: repo, 76 + subject: &rec.subject, 77 + created_at: rec.created_at.naive_utc(), 78 + }) 79 + .on_conflict_do_nothing() 80 + .execute(conn) 81 + .await 82 + } 83 + 84 + pub async fn delete_block(conn: &mut AsyncPgConnection, at_uri: &str) -> QueryResult<usize> { 85 + diesel::delete(schema::blocks::table) 86 + .filter(schema::blocks::at_uri.eq(at_uri)) 87 + .execute(conn) 88 + .await 89 + } 90 + 91 + pub async fn insert_follow( 92 + conn: &mut AsyncPgConnection, 93 + repo: &str, 94 + at_uri: &str, 95 + rec: records::AppBskyGraphFollow, 96 + ) -> QueryResult<usize> { 97 + diesel::insert_into(schema::follows::table) 98 + .values(&models::NewFollow { 99 + at_uri, 100 + did: repo, 101 + subject: &rec.subject, 102 + created_at: rec.created_at.naive_utc(), 103 + }) 104 + .on_conflict_do_nothing() 105 + .execute(conn) 106 + .await 107 + } 108 + 109 + pub async fn delete_follow(conn: &mut AsyncPgConnection, at_uri: &str) -> QueryResult<String> { 110 + diesel::delete(schema::follows::table) 111 + .filter(schema::follows::at_uri.eq(at_uri)) 112 + .returning(schema::follows::subject) 113 + .get_result(conn) 114 + .await 115 + } 116 + 117 + pub async fn update_follow_stats( 118 + conn: &mut AsyncPgConnection, 119 + subjects: &[&str], 120 + ) -> QueryResult<usize> { 121 + diesel::sql_query(include_str!("../sql/follow_stats_upsert.sql")) 122 + .bind::<Array<Text>, _>(subjects) 123 + .execute(conn) 124 + .await 125 + }
+77 -36
consumer/src/indexer/mod.rs
··· 1 - use crate::firehose::{AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, FirehoseEvent}; 1 + use crate::firehose::{AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, CommitOp, FirehoseEvent}; 2 2 use crate::indexer::types::{CollectionType, RecordTypes}; 3 3 use diesel_async::pooled_connection::deadpool::Pool; 4 4 use diesel_async::{AsyncConnection, AsyncPgConnection}; ··· 10 10 use tracing::{instrument, Instrument}; 11 11 12 12 mod db; 13 + mod records; 13 14 mod types; 14 15 15 16 pub async fn relay_indexer( ··· 103 104 } 104 105 }; 105 106 106 - conn.transaction::<_, diesel::result::Error, _>(|t| Box::pin(async move { 107 - // ensure new repo rev > current rev, but only when the repo is synced 108 - // (if repo is processing, they'll end up on the queue anyway) 109 - if is_active && db::update_repo_state(t, &commit.repo, &commit.rev, commit.commit).await? == 0 { 110 - tracing::warn!("Got a repo update older than the current rev. not processing ops."); 111 - return Ok(false); 107 + conn.transaction::<_, diesel::result::Error, _>(|t| { 108 + Box::pin(async move { 109 + // ensure new repo rev > current rev, but only when the repo is synced 110 + // (if repo is processing, they'll end up on the queue anyway) 111 + if is_active 112 + && db::update_repo_state(t, &commit.repo, &commit.rev, commit.commit).await? == 0 113 + { 114 + tracing::warn!("Got a repo update older than the current rev. not processing ops."); 115 + return Ok(false); 116 + } 117 + 118 + for op in &commit.ops { 119 + process_op(t, &commit.repo, op, &blocks).await?; 120 + } 121 + 122 + Ok(true) 123 + }) 124 + }) 125 + .await?; 126 + 127 + Ok(()) 128 + } 129 + 130 + async fn process_op( 131 + conn: &mut AsyncPgConnection, 132 + repo: &str, 133 + op: &CommitOp, 134 + blocks: &HashMap<Cid, Vec<u8>>, 135 + ) -> diesel::QueryResult<()> { 136 + let Some((collection_raw, _)) = op.path.split_once("/") else { 137 + tracing::warn!("op contained invalid path {}", op.path); 138 + return Ok(()); 139 + }; 140 + 141 + let collection = CollectionType::from_str(collection_raw); 142 + if collection == CollectionType::Unsupported { 143 + tracing::debug!("{} {collection_raw} is unsupported", op.action); 144 + return Ok(()); 145 + } 146 + 147 + let full_path = format!("at://{repo}/{}", &op.path); 148 + 149 + if op.action == "create" || op.action == "update" { 150 + if op.action == "update" && !collection.can_update() { 151 + return Ok(()); 112 152 } 113 153 114 - for op in &commit.ops { 115 - let Some((collection_raw, rkey)) = op.path.split_once("/") else { 116 - tracing::warn!("op contained invalid path {}", op.path); 117 - continue; 118 - }; 154 + let Some(block) = op.cid.and_then(|cid| blocks.get(&cid)) else { 155 + tracing::error!("Missing Cid or the block was not found"); 156 + return Ok(()); 157 + }; 119 158 120 - let collection = CollectionType::from_str(collection_raw); 121 - if collection == CollectionType::Unsupported { 122 - tracing::debug!("{} {collection_raw} is unsupported", op.action); 123 - continue; 159 + let decoded: RecordTypes = match serde_ipld_dagcbor::from_slice(block) { 160 + Ok(decoded) => decoded, 161 + Err(err) => { 162 + tracing::error!("Failed to decode record: {err}"); 163 + return Ok(()); 124 164 } 165 + }; 125 166 126 - if op.action == "create" || op.action == "update" { 127 - if let Some(block) = op.cid.and_then(|cid| blocks.get(&cid)) { 128 - let decoded: RecordTypes = match serde_ipld_dagcbor::from_slice(block) { 129 - Ok(decoded) => decoded, 130 - Err(err) => { 131 - tracing::error!("Failed to decode record: {err}"); 132 - continue; 133 - } 134 - }; 135 - } else { 136 - tracing::error!("Missing Cid or the block was not found"); 137 - continue; 138 - } 139 - } else if op.action == "delete" { 140 - // 141 - } else { 142 - tracing::warn!("op contained invalid action {}", op.action); 167 + match decoded { 168 + RecordTypes::AppBskyGraphBlock(record) => { 169 + db::insert_block(conn, repo, &full_path, record).await?; 170 + } 171 + RecordTypes::AppBskyGraphFollow(record) => { 172 + let subj = record.subject.clone(); 173 + db::insert_follow(conn, repo, &full_path, record).await?; 174 + db::update_follow_stats(conn, &[repo, &subj]).await?; 143 175 } 144 176 } 145 - 146 - Ok(true) 147 - })).await?; 177 + } else if op.action == "delete" { 178 + match collection { 179 + CollectionType::BskyBlock => db::delete_block(conn, &full_path).await?, 180 + CollectionType::BskyFollow => { 181 + let subject = db::delete_follow(conn, &full_path).await?; 182 + db::update_follow_stats(conn, &[repo, &subject]).await? 183 + }, 184 + _ => unreachable!(), 185 + }; 186 + } else { 187 + tracing::warn!("op contained invalid action {}", op.action); 188 + } 148 189 149 190 Ok(()) 150 191 }
+16
consumer/src/indexer/records.rs
··· 1 + use chrono::{DateTime, Utc}; 2 + use serde::{Deserialize, Serialize}; 3 + 4 + #[derive(Debug, Deserialize, Serialize)] 5 + #[serde(rename_all = "camelCase")] 6 + pub struct AppBskyGraphBlock { 7 + pub subject: String, 8 + pub created_at: DateTime<Utc>, 9 + } 10 + 11 + #[derive(Debug, Deserialize, Serialize)] 12 + #[serde(rename_all = "camelCase")] 13 + pub struct AppBskyGraphFollow { 14 + pub subject: String, 15 + pub created_at: DateTime<Utc>, 16 + }
+13 -1
consumer/src/indexer/types.rs
··· 1 + use super::records; 1 2 use serde::{Deserialize, Serialize}; 2 3 3 4 #[derive(Debug, Deserialize, Serialize)] 4 5 #[serde(tag = "$type")] 5 - pub enum RecordTypes {} 6 + pub enum RecordTypes { 7 + #[serde(rename = "app.bsky.graph.block")] 8 + AppBskyGraphBlock(records::AppBskyGraphBlock), 9 + #[serde(rename = "app.bsky.graph.follow")] 10 + AppBskyGraphFollow(records::AppBskyGraphFollow), 11 + } 6 12 7 13 #[derive(Debug, PartialOrd, PartialEq)] 8 14 pub enum CollectionType { 15 + BskyBlock, 16 + BskyFollow, 9 17 Unsupported, 10 18 } 11 19 12 20 impl CollectionType { 13 21 pub(crate) fn from_str(input: &str) -> CollectionType { 14 22 match input { 23 + "app.bsky.graph.block" => CollectionType::BskyBlock, 24 + "app.bsky.graph.follow" => CollectionType::BskyFollow, 15 25 _ => CollectionType::Unsupported, 16 26 } 17 27 } 18 28 19 29 pub fn can_update(&self) -> bool { 20 30 match self { 31 + CollectionType::BskyBlock => false, 32 + CollectionType::BskyFollow => false, 21 33 CollectionType::Unsupported => false, 22 34 } 23 35 }
+21
consumer/src/sql/follow_stats_upsert.sql
··· 1 + with new_info as ( 2 + SELECT did, coalesce(following, 0) as following, coalesce(followers, 0) as followers 3 + FROM actors 4 + LEFT JOIN (SELECT did, count(*) AS following 5 + FROM follows 6 + GROUP BY 1) fi USING (did) 7 + LEFT JOIN (SELECT subject as did, count(*) AS followers 8 + FROM follows 9 + GROUP BY 1) fe USING (did) 10 + where actors.did = any($1) 11 + ) 12 + merge into follow_stats fs 13 + using new_info ni 14 + on ni.did = fs.did 15 + when matched then 16 + update 17 + set followers=ni.followers, 18 + following=ni.following 19 + when not matched then 20 + insert (did, followers, following) 21 + values (ni.did, ni.followers, ni.following);
+3
migrations/2025-01-29-213341_follows_and_blocks/down.sql
··· 1 + drop table blocks; 2 + drop table follow_stats; 3 + drop table follows;
+29
migrations/2025-01-29-213341_follows_and_blocks/up.sql
··· 1 + create table blocks 2 + ( 3 + at_uri text primary key, 4 + did text not null references actors (did), 5 + subject text not null, 6 + created_at timestamptz not null 7 + ); 8 + 9 + create index blocks_did_index on blocks using hash (did); 10 + create index blocks_subject_index on blocks using hash (subject); 11 + 12 + create table follows 13 + ( 14 + at_uri text primary key, 15 + did text not null references actors (did), 16 + subject text not null, 17 + created_at timestamptz not null 18 + ); 19 + 20 + create index follow_did_index on follows using hash (did); 21 + create index follow_subject_index on follows using hash (subject); 22 + 23 + create table follow_stats 24 + ( 25 + did text primary key, 26 + 27 + followers integer not null default 0, 28 + following integer not null default 0 29 + );
+30
parakeet-db/src/models.rs
··· 25 25 pub status: Option<ActorStatus>, 26 26 pub last_indexed: Option<NaiveDateTime>, 27 27 } 28 + 29 + #[derive(Insertable)] 30 + #[diesel(table_name = crate::schema::blocks)] 31 + #[diesel(check_for_backend(diesel::pg::Pg))] 32 + pub struct NewBlock<'a> { 33 + pub at_uri: &'a str, 34 + pub did: &'a str, 35 + pub subject: &'a str, 36 + pub created_at: NaiveDateTime, 37 + } 38 + 39 + #[derive(Insertable)] 40 + #[diesel(table_name = crate::schema::follows)] 41 + #[diesel(check_for_backend(diesel::pg::Pg))] 42 + pub struct NewFollow<'a> { 43 + pub at_uri: &'a str, 44 + pub did: &'a str, 45 + pub subject: &'a str, 46 + pub created_at: NaiveDateTime, 47 + } 48 + 49 + #[derive(Debug, Queryable, Selectable, Identifiable)] 50 + #[diesel(table_name = crate::schema::follow_stats)] 51 + #[diesel(primary_key(did))] 52 + #[diesel(check_for_backend(diesel::pg::Pg))] 53 + pub struct FollowStats { 54 + pub did: String, 55 + pub followers: i32, 56 + pub following: i32, 57 + }
+36
parakeet-db/src/schema.rs
··· 11 11 last_indexed -> Nullable<Timestamp>, 12 12 } 13 13 } 14 + 15 + diesel::table! { 16 + blocks (at_uri) { 17 + at_uri -> Text, 18 + did -> Text, 19 + subject -> Text, 20 + created_at -> Timestamptz, 21 + } 22 + } 23 + 24 + diesel::table! { 25 + follow_stats (did) { 26 + did -> Text, 27 + followers -> Int4, 28 + following -> Int4, 29 + } 30 + } 31 + 32 + diesel::table! { 33 + follows (at_uri) { 34 + at_uri -> Text, 35 + did -> Text, 36 + subject -> Text, 37 + created_at -> Timestamptz, 38 + } 39 + } 40 + 41 + diesel::joinable!(blocks -> actors (did)); 42 + diesel::joinable!(follows -> actors (did)); 43 + 44 + diesel::allow_tables_to_appear_in_same_query!( 45 + actors, 46 + blocks, 47 + follow_stats, 48 + follows, 49 + );