Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

feat: app.bsky.actor.status

+269 -27
+33
consumer/src/db/record.rs
··· 586 586 .await 587 587 } 588 588 589 + pub async fn status_upsert<C: GenericClient>( 590 + conn: &mut C, 591 + repo: &str, 592 + rec: AppBskyActorStatus, 593 + ) -> PgExecResult { 594 + let record = serde_json::to_value(&rec).unwrap(); 595 + let thumb = rec.embed.as_ref().and_then(|v| v.external.thumb.clone()); 596 + let thumb_mime = thumb.as_ref().map(|v| v.mime_type.clone()); 597 + let thumb_cid = thumb.as_ref().map(|v| v.r#ref.to_string()); 598 + 599 + conn.execute( 600 + include_str!("sql/status_upsert.sql"), 601 + &[ 602 + &repo, 603 + &rec.status.to_string(), 604 + &rec.duration_minutes, 605 + &record, 606 + &rec.embed.as_ref().map(|v| v.external.uri.clone()), 607 + &rec.embed.as_ref().map(|v| v.external.title.clone()), 608 + &rec.embed.as_ref().map(|v| v.external.description.clone()), 609 + &thumb_mime, 610 + &thumb_cid, 611 + &rec.created_at, 612 + ], 613 + ) 614 + .await 615 + } 616 + 617 + pub async fn status_delete<C: GenericClient>(conn: &mut C, did: &str) -> PgExecResult { 618 + conn.execute("DELETE FROM statuses WHERE did=$1", &[&did]) 619 + .await 620 + } 621 + 589 622 pub async fn threadgate_upsert<C: GenericClient>( 590 623 conn: &mut C, 591 624 at_uri: &str,
+12
consumer/src/db/sql/status_upsert.sql
··· 1 + INSERT INTO statuses (did, status, duration, record, embed_uri, embed_title, embed_description, thumb_mime_type, 2 + thumb_cid, created_at) 3 + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 4 + ON CONFLICT (did) DO UPDATE SET status=EXCLUDED.status, 5 + duration=EXCLUDED.duration, 6 + record=EXCLUDED.record, 7 + embed_uri=EXCLUDED.embed_uri, 8 + embed_title=EXCLUDED.embed_title, 9 + embed_description=EXCLUDED.embed_description, 10 + thumb_mime_type=EXCLUDED.thumb_mime_type, 11 + thumb_cid=EXCLUDED.thumb_cid, 12 + indexed_at=NOW()
+6
consumer/src/indexer/mod.rs
··· 518 518 } 519 519 } 520 520 } 521 + RecordTypes::AppBskyActorStatus(record) => { 522 + if rkey == "self" { 523 + db::status_upsert(conn, repo, record).await?; 524 + } 525 + } 521 526 RecordTypes::AppBskyFeedGenerator(record) => { 522 527 let labels = record.labels.clone(); 523 528 let count = db::feedgen_upsert(conn, at_uri, repo, cid, record).await?; ··· 691 696 ) -> Result<(), tokio_postgres::Error> { 692 697 match collection { 693 698 CollectionType::BskyProfile => db::profile_delete(conn, repo).await?, 699 + CollectionType::BskyStatus => db::status_delete(conn, repo).await?, 694 700 CollectionType::BskyBlock => db::block_delete(conn, at_uri).await?, 695 701 CollectionType::BskyFeedGen => { 696 702 let count = db::feedgen_delete(conn, at_uri).await?;
+13 -1
consumer/src/indexer/records.rs
··· 1 1 use crate::utils; 2 2 use chrono::{DateTime, Utc}; 3 3 use ipld_core::cid::Cid; 4 - use lexica::app_bsky::actor::ChatAllowIncoming; 4 + use lexica::app_bsky::actor::{ChatAllowIncoming, Status}; 5 5 use lexica::app_bsky::embed::AspectRatio; 6 6 use lexica::app_bsky::labeler::LabelerPolicy; 7 7 use lexica::app_bsky::richtext::FacetMain; ··· 41 41 pub joined_via_starter_pack: Option<StrongRef>, 42 42 pub pinned_post: Option<StrongRef>, 43 43 pub created_at: Option<DateTime<Utc>>, 44 + } 45 + 46 + #[derive(Debug, Deserialize, Serialize)] 47 + #[serde(rename_all = "camelCase")] 48 + pub struct AppBskyActorStatus { 49 + pub status: Status, 50 + pub duration_minutes: Option<i32>, 51 + pub embed: Option<AppBskyEmbedExternal>, 52 + 53 + pub created_at: DateTime<Utc>, 44 54 } 45 55 46 56 #[derive(Clone, Debug, Deserialize, Serialize)] ··· 151 161 } 152 162 153 163 #[derive(Clone, Debug, Deserialize, Serialize)] 164 + #[serde(tag = "$type")] 165 + #[serde(rename = "app.bsky.embed.external")] 154 166 pub struct AppBskyEmbedExternal { 155 167 pub external: EmbedExternal, 156 168 }
+5
consumer/src/indexer/types.rs
··· 7 7 pub enum RecordTypes { 8 8 #[serde(rename = "app.bsky.actor.profile")] 9 9 AppBskyActorProfile(records::AppBskyActorProfile), 10 + #[serde(rename = "app.bsky.actor.status")] 11 + AppBskyActorStatus(records::AppBskyActorStatus), 10 12 #[serde(rename = "app.bsky.feed.generator")] 11 13 AppBskyFeedGenerator(records::AppBskyFeedGenerator), 12 14 #[serde(rename = "app.bsky.feed.like")] ··· 42 44 #[derive(Debug, PartialOrd, PartialEq, Deserialize, Serialize)] 43 45 pub enum CollectionType { 44 46 BskyProfile, 47 + BskyStatus, 45 48 BskyFeedGen, 46 49 BskyFeedLike, 47 50 BskyFeedPost, ··· 64 67 pub(crate) fn from_str(input: &str) -> CollectionType { 65 68 match input { 66 69 "app.bsky.actor.profile" => CollectionType::BskyProfile, 70 + "app.bsky.actor.status" => CollectionType::BskyStatus, 67 71 "app.bsky.feed.generator" => CollectionType::BskyFeedGen, 68 72 "app.bsky.feed.like" => CollectionType::BskyFeedLike, 69 73 "app.bsky.feed.post" => CollectionType::BskyFeedPost, ··· 86 90 pub fn can_update(&self) -> bool { 87 91 match self { 88 92 CollectionType::BskyProfile => true, 93 + CollectionType::BskyStatus => true, 89 94 CollectionType::BskyFeedGen => true, 90 95 CollectionType::BskyFeedLike => false, 91 96 CollectionType::BskyFeedPost => false,
+53
lexica/src/app_bsky/actor.rs
··· 1 + use crate::app_bsky::embed::External; 1 2 use crate::com_atproto::label::Label; 2 3 use chrono::prelude::*; 3 4 use serde::{Deserialize, Serialize}; ··· 47 48 "all" => Ok(ChatAllowIncoming::All), 48 49 "none" => Ok(ChatAllowIncoming::None), 49 50 "following" => Ok(ChatAllowIncoming::Following), 51 + x => Err(format!("Unrecognized variant {}", x)), 52 + } 53 + } 54 + } 55 + 56 + #[derive(Copy, Clone, Debug, Deserialize, Serialize)] 57 + pub enum Status { 58 + /// Advertises an account as currently offering live content. 59 + #[serde(rename = "app.bsky.actor.status#live")] 60 + Live, 61 + } 62 + 63 + impl Display for Status { 64 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 65 + match self { 66 + Status::Live => write!(f, "app.bsky.actor.status#live"), 67 + } 68 + } 69 + } 70 + 71 + impl FromStr for Status { 72 + type Err = String; 73 + 74 + fn from_str(s: &str) -> Result<Self, Self::Err> { 75 + match s { 76 + "app.bsky.actor.status#live" => Ok(Status::Live), 50 77 x => Err(format!("Unrecognized variant {}", x)), 51 78 } 52 79 } ··· 70 97 pub labels: Vec<Label>, 71 98 #[serde(skip_serializing_if = "Option::is_none")] 72 99 pub verification: Option<VerificationState>, 100 + #[serde(skip_serializing_if = "Option::is_none")] 101 + pub status: Option<StatusView>, 73 102 74 103 pub created_at: DateTime<Utc>, 75 104 } ··· 94 123 pub labels: Vec<Label>, 95 124 #[serde(skip_serializing_if = "Option::is_none")] 96 125 pub verification: Option<VerificationState>, 126 + #[serde(skip_serializing_if = "Option::is_none")] 127 + pub status: Option<StatusView>, 97 128 98 129 pub created_at: DateTime<Utc>, 99 130 pub indexed_at: NaiveDateTime, ··· 127 158 // pub pinned_post: Option<()>, 128 159 #[serde(skip_serializing_if = "Option::is_none")] 129 160 pub verification: Option<VerificationState>, 161 + #[serde(skip_serializing_if = "Option::is_none")] 162 + pub status: Option<StatusView>, 130 163 131 164 pub created_at: DateTime<Utc>, 132 165 pub indexed_at: NaiveDateTime, ··· 164 197 Invalid, 165 198 None, 166 199 } 200 + 201 + #[derive(Clone, Debug, Serialize)] 202 + #[serde(rename_all = "camelCase")] 203 + pub struct StatusView { 204 + pub status: Status, 205 + pub record: serde_json::Value, 206 + #[serde(skip_serializing_if = "Option::is_none")] 207 + pub embed: Option<StatusViewEmbed>, 208 + #[serde(skip_serializing_if = "Option::is_none")] 209 + pub expires_at: Option<DateTime<Utc>>, 210 + #[serde(skip_serializing_if = "Option::is_none")] 211 + pub is_active: Option<bool>, 212 + } 213 + 214 + #[derive(Clone, Debug, Serialize)] 215 + #[serde(tag = "$type")] 216 + #[serde(rename = "app.bsky.embed.external#view")] 217 + pub struct StatusViewEmbed { 218 + pub external: External 219 + }
+1
migrations/2025-06-11-192947_statuses/down.sql
··· 1 + drop table statuses;
+16
migrations/2025-06-11-192947_statuses/up.sql
··· 1 + create table statuses 2 + ( 3 + did text primary key references actors (did), 4 + status text not null, 5 + duration int, 6 + record jsonb not null, 7 + 8 + embed_uri text, 9 + embed_title text, 10 + embed_description text, 11 + thumb_mime_type text, 12 + thumb_cid text, 13 + 14 + created_at timestamptz not null default now(), 15 + indexed_at timestamp not null default now() 16 + );
+22
parakeet-db/src/models.rs
··· 718 718 #[diesel(treat_none_as_default_value = true)] 719 719 pub indexed_at: Option<NaiveDateTime>, 720 720 } 721 + 722 + #[derive(Clone, Debug, Queryable, Selectable, Identifiable)] 723 + #[diesel(table_name = crate::schema::statuses)] 724 + #[diesel(primary_key(did))] 725 + #[diesel(check_for_backend(diesel::pg::Pg))] 726 + pub struct Status { 727 + pub did: String, 728 + pub status: String, 729 + pub duration: Option<i32>, 730 + 731 + pub record: serde_json::Value, 732 + 733 + pub embed_uri: Option<String>, 734 + pub embed_title: Option<String>, 735 + pub embed_description: Option<String>, 736 + pub thumb_mime_type: Option<String>, 737 + pub thumb_cid: Option<String>, 738 + 739 + pub created_at: NaiveDateTime, 740 + pub indexed_at: NaiveDateTime, 741 + } 742 +
+18
parakeet-db/src/schema.rs
··· 303 303 } 304 304 305 305 diesel::table! { 306 + statuses (did) { 307 + did -> Text, 308 + status -> Text, 309 + duration -> Nullable<Int4>, 310 + record -> Jsonb, 311 + embed_uri -> Nullable<Text>, 312 + embed_title -> Nullable<Text>, 313 + embed_description -> Nullable<Text>, 314 + thumb_mime_type -> Nullable<Text>, 315 + thumb_cid -> Nullable<Text>, 316 + created_at -> Timestamptz, 317 + indexed_at -> Timestamp, 318 + } 319 + } 320 + 321 + diesel::table! { 306 322 threadgates (at_uri) { 307 323 at_uri -> Text, 308 324 cid -> Text, ··· 349 365 diesel::joinable!(profiles -> actors (did)); 350 366 diesel::joinable!(reposts -> actors (did)); 351 367 diesel::joinable!(starterpacks -> actors (owner)); 368 + diesel::joinable!(statuses -> actors (did)); 352 369 diesel::joinable!(threadgates -> posts (post_uri)); 353 370 diesel::joinable!(verification -> actors (verifier)); 354 371 ··· 378 395 records, 379 396 reposts, 380 397 starterpacks, 398 + statuses, 381 399 threadgates, 382 400 verification, 383 401 );
+84 -24
parakeet/src/hydration/profile.rs
··· 1 1 use crate::hydration::map_labels; 2 + use chrono::prelude::*; 2 3 use lexica::app_bsky::actor::*; 4 + use lexica::app_bsky::embed::External; 3 5 use parakeet_db::models; 4 6 use parakeet_index::ProfileStats; 5 7 use std::collections::HashMap; 8 + use std::str::FromStr; 6 9 use std::sync::OnceLock; 10 + use chrono::TimeDelta; 7 11 8 12 pub static TRUSTED_VERIFIERS: OnceLock<Vec<String>> = OnceLock::new(); 9 13 ··· 109 113 } 110 114 } 111 115 116 + fn build_status(status: models::Status) -> Option<StatusView> { 117 + let s = Status::from_str(&status.status).ok()?; 118 + let embed = status 119 + .embed_uri 120 + .zip(status.embed_title) 121 + .zip(status.embed_description) 122 + .map(|((uri, title), description)| StatusViewEmbed { 123 + external: External { 124 + uri, 125 + title, 126 + description, 127 + thumb: status 128 + .thumb_cid 129 + .map(|v| format!("https://localhost/embed/{v}")), 130 + }, 131 + }); 132 + 133 + let expires_at = status 134 + .duration 135 + .map(|v| TimeDelta::seconds(v as i64)) 136 + .map(|v| (status.created_at + v).and_utc()); 137 + let is_active = expires_at.map(|v| Utc::now() < v); 138 + 139 + Some(StatusView { 140 + status: s, 141 + record: status.record, 142 + embed, 143 + expires_at, 144 + is_active, 145 + }) 146 + } 147 + 112 148 fn build_basic( 113 149 handle: Option<String>, 114 150 profile: models::Profile, ··· 117 153 labels: Vec<models::Label>, 118 154 verifications: Option<Vec<models::VerificationEntry>>, 119 155 stats: Option<ProfileStats>, 156 + status: Option<models::Status>, 120 157 ) -> ProfileViewBasic { 121 158 let associated = build_associated(chat_decl, is_labeler, stats); 122 159 let verification = build_verification(&profile, &handle, verifications); 160 + let status = status.and_then(build_status); 123 161 124 162 ProfileViewBasic { 125 163 did: profile.did, ··· 131 169 associated, 132 170 labels: map_labels(labels), 133 171 verification, 172 + status, 134 173 created_at: profile.created_at.and_utc(), 135 174 } 136 175 } ··· 143 182 labels: Vec<models::Label>, 144 183 verifications: Option<Vec<models::VerificationEntry>>, 145 184 stats: Option<ProfileStats>, 185 + status: Option<models::Status>, 146 186 ) -> ProfileView { 147 187 let associated = build_associated(chat_decl, is_labeler, stats); 148 188 let verification = build_verification(&profile, &handle, verifications); 189 + let status = status.and_then(build_status); 149 190 150 191 ProfileView { 151 192 did: profile.did, ··· 158 199 associated, 159 200 labels: map_labels(labels), 160 201 verification, 202 + status, 161 203 created_at: profile.created_at.and_utc(), 162 204 indexed_at: profile.indexed_at, 163 205 } ··· 171 213 labels: Vec<models::Label>, 172 214 verifications: Option<Vec<models::VerificationEntry>>, 173 215 stats: Option<ProfileStats>, 216 + status: Option<models::Status>, 174 217 ) -> ProfileViewDetailed { 175 218 let associated = build_associated(chat_decl, is_labeler, stats); 176 219 let verification = build_verification(&profile, &handle, verifications); 220 + let status = status.and_then(build_status); 177 221 178 222 ProfileViewDetailed { 179 223 did: profile.did, ··· 191 235 associated, 192 236 labels: map_labels(labels), 193 237 verification, 238 + status, 194 239 created_at: profile.created_at.and_utc(), 195 240 indexed_at: profile.indexed_at, 196 241 } ··· 200 245 pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> { 201 246 let labels = self.get_profile_label(&did).await; 202 247 let verif = self.loaders.verification.load(did.clone()).await; 203 - let (handle, profile, chat_decl, labeler, stats) = self.loaders.profile.load(did).await?; 248 + let (handle, profile, chat_decl, labeler, stats, status) = 249 + self.loaders.profile.load(did).await?; 204 250 205 251 Some(build_basic( 206 - handle, profile, chat_decl, labeler, labels, verif, stats, 252 + handle, profile, chat_decl, labeler, labels, verif, stats, status, 207 253 )) 208 254 } 209 255 ··· 217 263 218 264 profiles 219 265 .into_iter() 220 - .map(|(k, (handle, profile, chat_decl, labeler, stats))| { 221 - let labels = labels.get(&k).cloned().unwrap_or_default(); 222 - let verif = verif.get(&k).cloned(); 266 + .map( 267 + |(k, (handle, profile, chat_decl, labeler, stats, status))| { 268 + let labels = labels.get(&k).cloned().unwrap_or_default(); 269 + let verif = verif.get(&k).cloned(); 223 270 224 - let v = build_basic(handle, profile, chat_decl, labeler, labels, verif, stats); 225 - (k, v) 226 - }) 271 + let v = build_basic( 272 + handle, profile, chat_decl, labeler, labels, verif, stats, status, 273 + ); 274 + (k, v) 275 + }, 276 + ) 227 277 .collect() 228 278 } 229 279 ··· 231 281 let labels = self.get_profile_label(&did).await; 232 282 233 283 let verif = self.loaders.verification.load(did.clone()).await; 234 - let (handle, profile, chat_decl, labeler, stats) = self.loaders.profile.load(did).await?; 284 + let (handle, profile, chat_decl, labeler, stats, status) = 285 + self.loaders.profile.load(did).await?; 235 286 236 287 Some(build_profile( 237 - handle, profile, chat_decl, labeler, labels, verif, stats, 288 + handle, profile, chat_decl, labeler, labels, verif, stats, status, 238 289 )) 239 290 } 240 291 ··· 245 296 246 297 profiles 247 298 .into_iter() 248 - .map(|(k, (handle, profile, chat_decl, labeler, stats))| { 249 - let labels = labels.get(&k).cloned().unwrap_or_default(); 250 - let verif = verif.get(&k).cloned(); 299 + .map( 300 + |(k, (handle, profile, chat_decl, labeler, stats, status))| { 301 + let labels = labels.get(&k).cloned().unwrap_or_default(); 302 + let verif = verif.get(&k).cloned(); 251 303 252 - let v = build_profile(handle, profile, chat_decl, labeler, labels, verif, stats); 253 - (k, v) 254 - }) 304 + let v = build_profile( 305 + handle, profile, chat_decl, labeler, labels, verif, stats, status, 306 + ); 307 + (k, v) 308 + }, 309 + ) 255 310 .collect() 256 311 } 257 312 ··· 259 314 let labels = self.get_profile_label(&did).await; 260 315 261 316 let verif = self.loaders.verification.load(did.clone()).await; 262 - let (handle, profile, chat_decl, labeler, stats) = self.loaders.profile.load(did).await?; 317 + let (handle, profile, chat_decl, labeler, stats, status) = 318 + self.loaders.profile.load(did).await?; 263 319 264 320 Some(build_detailed( 265 - handle, profile, chat_decl, labeler, labels, verif, stats, 321 + handle, profile, chat_decl, labeler, labels, verif, stats, status, 266 322 )) 267 323 } 268 324 ··· 276 332 277 333 profiles 278 334 .into_iter() 279 - .map(|(k, (handle, profile, chat_decl, labeler, stats))| { 280 - let labels = labels.get(&k).cloned().unwrap_or_default(); 281 - let verif = verif.get(&k).cloned(); 335 + .map( 336 + |(k, (handle, profile, chat_decl, labeler, stats, status))| { 337 + let labels = labels.get(&k).cloned().unwrap_or_default(); 338 + let verif = verif.get(&k).cloned(); 282 339 283 - let v = build_detailed(handle, profile, chat_decl, labeler, labels, verif, stats); 284 - (k, v) 285 - }) 340 + let v = build_detailed( 341 + handle, profile, chat_decl, labeler, labels, verif, stats, status, 342 + ); 343 + (k, v) 344 + }, 345 + ) 286 346 .collect() 287 347 } 288 348 }
+6 -2
parakeet/src/loaders.rs
··· 73 73 Option<ChatAllowIncoming>, 74 74 bool, 75 75 Option<parakeet_index::ProfileStats>, 76 + Option<models::Status>, 76 77 ); 77 78 impl BatchFn<String, ProfileLoaderRet> for ProfileLoader { 78 79 async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> { ··· 84 85 schema::chat_decls::table.on(schema::chat_decls::did.eq(schema::actors::did)), 85 86 ) 86 87 .left_join(schema::labelers::table.on(schema::labelers::did.eq(schema::actors::did))) 88 + .left_join(schema::statuses::table.on(schema::statuses::did.eq(schema::actors::did))) 87 89 .select(( 88 90 schema::actors::did, 89 91 schema::actors::handle, 90 92 models::Profile::as_select(), 91 93 schema::chat_decls::allow_incoming.nullable(), 92 94 schema::labelers::cid.nullable(), 95 + Option::<models::Status>::as_select(), 93 96 )) 94 97 .filter( 95 98 schema::actors::did ··· 102 105 models::Profile, 103 106 Option<String>, 104 107 Option<String>, 108 + Option<models::Status>, 105 109 )>(&mut conn) 106 110 .await; 107 111 ··· 118 122 119 123 match res { 120 124 Ok(res) => HashMap::from_iter(res.into_iter().map( 121 - |(did, handle, profile, chat_decl, labeler_cid)| { 125 + |(did, handle, profile, chat_decl, labeler_cid, status)| { 122 126 let chat_decl = chat_decl.and_then(|v| ChatAllowIncoming::from_str(&v).ok()); 123 127 let is_labeler = labeler_cid.is_some(); 124 128 let maybe_stats = stats.remove(&did); 125 129 126 - let val = (handle, profile, chat_decl, is_labeler, maybe_stats); 130 + let val = (handle, profile, chat_decl, is_labeler, maybe_stats, status); 127 131 128 132 (did, val) 129 133 },