Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

feat: bookmarks

+395
+32
consumer/src/db/record.rs
··· 4 4 use chrono::prelude::*; 5 5 use deadpool_postgres::GenericClient; 6 6 use ipld_core::cid::Cid; 7 + use lexica::community_lexicon::bookmarks::Bookmark; 7 8 8 9 pub async fn record_upsert<C: GenericClient>( 9 10 conn: &mut C, ··· 20 21 pub async fn record_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { 21 22 conn.execute("DELETE FROM records WHERE at_uri=$1", &[&at_uri]) 22 23 .await 24 + } 25 + 26 + pub async fn bookmark_upsert<C: GenericClient>( 27 + conn: &mut C, 28 + rkey: &str, 29 + repo: &str, 30 + rec: Bookmark, 31 + ) -> PgExecResult { 32 + // strip "at://" then break into parts by '/' 33 + let rec_type = match rec.subject.strip_prefix("at://") { 34 + Some(at_uri) => at_uri.split('/').collect::<Vec<_>>()[1], 35 + None => "$uri", 36 + }; 37 + 38 + conn.execute( 39 + include_str!("sql/bookmarks_upsert.sql"), 40 + &[&repo, &rkey, &rec.subject, &rec_type, &rec.tags, &rec.created_at], 41 + ) 42 + .await 43 + } 44 + 45 + pub async fn bookmark_delete<C: GenericClient>( 46 + conn: &mut C, 47 + rkey: &str, 48 + repo: &str, 49 + ) -> PgExecResult { 50 + conn.execute( 51 + "DELETE FROM bookmarks WHERE rkey=$1 AND did=$2", 52 + &[&rkey, &repo], 53 + ) 54 + .await 23 55 } 24 56 25 57 pub async fn block_insert<C: GenericClient>(
+5
consumer/src/db/sql/bookmarks_upsert.sql
··· 1 + INSERT INTO bookmarks (did, rkey, subject, subject_type, tags, created_at) 2 + VALUES ($1, $2, $3, $4, $5, $6) 3 + ON CONFLICT (did, rkey) DO UPDATE SET subject=EXCLUDED.subject, 4 + subject_type=EXCLUDED.subject_type, 5 + tags=EXCLUDED.tags
+6
consumer/src/indexer/mod.rs
··· 723 723 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 724 724 } 725 725 } 726 + RecordTypes::CommunityLexiconBookmark(record) => { 727 + db::bookmark_upsert(conn, rkey, repo, record).await?; 728 + } 726 729 } 727 730 728 731 db::record_upsert(conn, at_uri, repo, cid).await?; ··· 832 835 CollectionType::ChatActorDecl => { 833 836 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 834 837 db::chat_decl_delete(conn, repo).await? 838 + } 839 + CollectionType::CommunityLexiconBookmark => { 840 + db::bookmark_delete(conn, rkey, repo).await? 835 841 } 836 842 _ => unreachable!(), 837 843 };
+5
consumer/src/indexer/types.rs
··· 41 41 AppBskyNotificationDeclaration(records::AppBskyNotificationDeclaration), 42 42 #[serde(rename = "chat.bsky.actor.declaration")] 43 43 ChatBskyActorDeclaration(records::ChatBskyActorDeclaration), 44 + #[serde(rename = "community.lexicon.bookmarks.bookmark")] 45 + CommunityLexiconBookmark(lexica::community_lexicon::bookmarks::Bookmark) 44 46 } 45 47 46 48 #[derive(Debug, PartialOrd, PartialEq, Deserialize, Serialize)] ··· 63 65 BskyLabelerService, 64 66 BskyNotificationDeclaration, 65 67 ChatActorDecl, 68 + CommunityLexiconBookmark, 66 69 Unsupported, 67 70 } 68 71 ··· 87 90 "app.bsky.labeler.service" => CollectionType::BskyLabelerService, 88 91 "app.bsky.notification.declaration" => CollectionType::BskyNotificationDeclaration, 89 92 "chat.bsky.actor.declaration" => CollectionType::ChatActorDecl, 93 + "community.lexicon.bookmarks.bookmark" => CollectionType::CommunityLexiconBookmark, 90 94 _ => CollectionType::Unsupported, 91 95 } 92 96 } ··· 111 115 CollectionType::BskyVerification => false, 112 116 CollectionType::BskyLabelerService => true, 113 117 CollectionType::BskyNotificationDeclaration => true, 118 + CollectionType::CommunityLexiconBookmark => true, 114 119 CollectionType::Unsupported => false, 115 120 } 116 121 }
+32
lexica/src/app_bsky/bookmark.rs
··· 1 + use crate::app_bsky::feed::{BlockedAuthor, PostView}; 2 + use crate::StrongRef; 3 + use chrono::prelude::*; 4 + use serde::Serialize; 5 + 6 + #[derive(Clone, Debug, Serialize)] 7 + #[serde(rename_all = "camelCase")] 8 + pub struct BookmarkView { 9 + pub subject: StrongRef, 10 + pub item: BookmarkViewItem, 11 + pub created_at: DateTime<Utc>, 12 + } 13 + 14 + #[derive(Clone, Debug, Serialize)] 15 + #[serde(tag = "$type")] 16 + // This is technically the same as ReplyRefPost atm, but just in case... 17 + pub enum BookmarkViewItem { 18 + #[serde(rename = "app.bsky.feed.defs#postView")] 19 + Post(PostView), 20 + #[serde(rename = "app.bsky.feed.defs#notFoundPost")] 21 + NotFound { 22 + uri: String, 23 + #[serde(rename = "notFound")] 24 + not_found: bool, 25 + }, 26 + #[serde(rename = "app.bsky.feed.defs#blockedPost")] 27 + Blocked { 28 + uri: String, 29 + blocked: bool, 30 + author: BlockedAuthor, 31 + }, 32 + }
+1
lexica/src/app_bsky/mod.rs
··· 1 1 use serde::Serialize; 2 2 3 3 pub mod actor; 4 + pub mod bookmark; 4 5 pub mod embed; 5 6 pub mod feed; 6 7 pub mod graph;
+14
lexica/src/community_lexicon/bookmarks.rs
··· 1 + use chrono::prelude::*; 2 + use serde::{Deserialize, Serialize}; 3 + 4 + #[derive(Clone, Debug, Deserialize, Serialize)] 5 + #[serde(tag = "$type")] 6 + #[serde(rename = "community.lexicon.bookmarks.bookmark")] 7 + #[serde(rename_all = "camelCase")] 8 + pub struct Bookmark { 9 + pub subject: String, 10 + #[serde(default)] 11 + #[serde(skip_serializing_if = "Vec::is_empty")] 12 + pub tags: Vec<String>, 13 + pub created_at: DateTime<Utc>, 14 + }
+1
lexica/src/community_lexicon/mod.rs
··· 1 + pub mod bookmarks;
+8
lexica/src/lib.rs
··· 5 5 6 6 pub mod app_bsky; 7 7 pub mod com_atproto; 8 + pub mod community_lexicon; 8 9 mod utils; 9 10 10 11 #[derive(Clone, Debug, Serialize)] ··· 21 22 )] 22 23 pub cid: Cid, 23 24 pub uri: String, 25 + } 26 + 27 + impl StrongRef { 28 + pub fn new_from_str(uri: String, cid: &str) -> Result<Self, cid::Error> { 29 + let cid = cid.parse()?; 30 + Ok(StrongRef { uri, cid }) 31 + } 24 32 } 25 33 26 34 #[derive(Clone, Debug, Deserialize, Serialize)]
+1
migrations/2025-09-02-190833_bookmarks/down.sql
··· 1 + drop table bookmarks;
+19
migrations/2025-09-02-190833_bookmarks/up.sql
··· 1 + create table bookmarks 2 + ( 3 + did text not null references actors (did), 4 + rkey text, 5 + subject text not null, 6 + subject_cid text, 7 + subject_type text not null, 8 + tags text[] not null default ARRAY []::text[], 9 + 10 + created_at timestamptz not null default now(), 11 + 12 + primary key (did, subject) 13 + ); 14 + 15 + create index bookmarks_rkey_index on bookmarks (rkey); 16 + create index bookmarks_subject_index on bookmarks (subject); 17 + create index bookmarks_subject_type_index on bookmarks (subject_type); 18 + create index bookmarks_tags_index on bookmarks using gin (tags); 19 + create unique index bookmarks_rkey_ui on bookmarks (did, rkey);
+26
parakeet-db/src/models.rs
··· 383 383 pub did: &'a str, 384 384 pub list_uri: &'a str, 385 385 } 386 + 387 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 388 + #[diesel(table_name = crate::schema::bookmarks)] 389 + #[diesel(primary_key(did, subject, subject_cid))] 390 + #[diesel(check_for_backend(diesel::pg::Pg))] 391 + pub struct Bookmark { 392 + pub did: String, 393 + pub rkey: Option<String>, 394 + pub subject: String, 395 + pub subject_cid: Option<String>, 396 + pub subject_type: String, 397 + pub tags: Vec<Option<String>>, 398 + pub created_at: DateTime<Utc>, 399 + } 400 + 401 + #[derive(Debug, Insertable, AsChangeset)] 402 + #[diesel(table_name = crate::schema::bookmarks)] 403 + #[diesel(check_for_backend(diesel::pg::Pg))] 404 + pub struct NewBookmark<'a> { 405 + pub did: &'a str, 406 + pub rkey: Option<String>, 407 + pub subject: &'a str, 408 + pub subject_cid: Option<String>, 409 + pub subject_type: &'a str, 410 + pub tags: Vec<String>, 411 + }
+14
parakeet-db/src/schema.rs
··· 43 43 } 44 44 45 45 diesel::table! { 46 + bookmarks (did, subject) { 47 + did -> Text, 48 + rkey -> Nullable<Text>, 49 + subject -> Text, 50 + subject_cid -> Nullable<Text>, 51 + subject_type -> Text, 52 + tags -> Array<Nullable<Text>>, 53 + created_at -> Timestamptz, 54 + } 55 + } 56 + 57 + diesel::table! { 46 58 chat_decls (did) { 47 59 did -> Text, 48 60 allow_incoming -> Text, ··· 375 387 376 388 diesel::joinable!(backfill -> actors (repo)); 377 389 diesel::joinable!(blocks -> actors (did)); 390 + diesel::joinable!(bookmarks -> actors (did)); 378 391 diesel::joinable!(chat_decls -> actors (did)); 379 392 diesel::joinable!(feedgens -> actors (owner)); 380 393 diesel::joinable!(follows -> actors (did)); ··· 405 418 backfill, 406 419 backfill_jobs, 407 420 blocks, 421 + bookmarks, 408 422 chat_decls, 409 423 feedgens, 410 424 follows,
+146
parakeet/src/xrpc/app_bsky/bookmark.rs
··· 1 + use crate::hydration::StatefulHydrator; 2 + use crate::xrpc::error::XrpcResult; 3 + use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 4 + use crate::xrpc::{datetime_cursor, CursorQuery}; 5 + use crate::GlobalState; 6 + use axum::extract::{Query, State}; 7 + use axum::Json; 8 + use diesel::prelude::*; 9 + use diesel_async::RunQueryDsl; 10 + use lexica::app_bsky::bookmark::{BookmarkView, BookmarkViewItem}; 11 + use parakeet_db::{models, schema}; 12 + use serde::{Deserialize, Serialize}; 13 + use lexica::StrongRef; 14 + 15 + const BSKY_ALLOWED_TYPES: &[&str] = &["app.bsky.feed.post"]; 16 + 17 + #[derive(Debug, Deserialize)] 18 + pub struct CreateBookmarkReq { 19 + pub uri: String, 20 + pub cid: String, 21 + } 22 + 23 + pub async fn create_bookmark( 24 + State(state): State<GlobalState>, 25 + auth: AtpAuth, 26 + Json(form): Json<CreateBookmarkReq>, 27 + ) -> XrpcResult<()> { 28 + let mut conn = state.pool.get().await?; 29 + 30 + // strip "at://" then break into parts by '/' 31 + let parts = form.uri[5..].split('/').collect::<Vec<_>>(); 32 + 33 + let data = models::NewBookmark { 34 + did: &auth.0, 35 + rkey: None, 36 + subject: &form.uri, 37 + subject_cid: Some(form.cid), 38 + subject_type: &parts[1], 39 + tags: vec![], 40 + }; 41 + 42 + diesel::insert_into(schema::bookmarks::table) 43 + .values(&data) 44 + .on_conflict_do_nothing() 45 + .execute(&mut conn) 46 + .await?; 47 + 48 + Ok(()) 49 + } 50 + 51 + #[derive(Debug, Deserialize)] 52 + pub struct DeleteBookmarkReq { 53 + pub uri: String, 54 + } 55 + 56 + pub async fn delete_bookmark( 57 + State(state): State<GlobalState>, 58 + auth: AtpAuth, 59 + Json(form): Json<DeleteBookmarkReq>, 60 + ) -> XrpcResult<()> { 61 + let mut conn = state.pool.get().await?; 62 + 63 + diesel::delete(schema::bookmarks::table) 64 + .filter( 65 + schema::bookmarks::did 66 + .eq(&auth.0) 67 + .and(schema::bookmarks::subject.eq(&form.uri)), 68 + ) 69 + .execute(&mut conn) 70 + .await?; 71 + 72 + Ok(()) 73 + } 74 + 75 + #[derive(Debug, Serialize)] 76 + pub struct GetBookmarksRes { 77 + #[serde(skip_serializing_if = "Option::is_none")] 78 + cursor: Option<String>, 79 + bookmarks: Vec<BookmarkView>, 80 + } 81 + 82 + pub async fn get_bookmarks( 83 + State(state): State<GlobalState>, 84 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 85 + auth: AtpAuth, 86 + Query(query): Query<CursorQuery>, 87 + ) -> XrpcResult<Json<GetBookmarksRes>> { 88 + let mut conn = state.pool.get().await?; 89 + let did = auth.0.clone(); 90 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, Some(auth)); 91 + 92 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 93 + 94 + let mut bookmarks_query = schema::bookmarks::table 95 + .select(models::Bookmark::as_select()) 96 + .filter(schema::bookmarks::did.eq(&did)) 97 + .filter(schema::bookmarks::subject_type.eq_any(BSKY_ALLOWED_TYPES)) 98 + .into_boxed(); 99 + 100 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 101 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::created_at.lt(cursor)); 102 + } 103 + 104 + let results = bookmarks_query 105 + .order(schema::bookmarks::created_at.desc()) 106 + .limit(limit as i64) 107 + .load(&mut conn) 108 + .await?; 109 + 110 + let cursor = results 111 + .last() 112 + .map(|bm| bm.created_at.timestamp_millis().to_string()); 113 + 114 + let uris = results.iter().map(|bm| bm.subject.clone()).collect(); 115 + 116 + let mut posts = hyd.hydrate_posts(uris).await; 117 + 118 + let bookmarks = results 119 + .into_iter() 120 + .filter_map(|bookmark| { 121 + let maybe_item = posts.remove(&bookmark.subject); 122 + let maybe_cid = maybe_item.as_ref().map(|v| v.cid.clone()); 123 + 124 + // ensure that either the cid is set in the bookmark record *or* in the post record 125 + // otherwise just ditch. we should have one. 126 + let cid = bookmark.subject_cid.or(maybe_cid)?; 127 + 128 + let item = maybe_item.map(BookmarkViewItem::Post).unwrap_or( 129 + BookmarkViewItem::NotFound { 130 + uri: bookmark.subject.clone(), 131 + not_found: true, 132 + }, 133 + ); 134 + 135 + let subject = StrongRef::new_from_str(bookmark.subject, &cid).ok()?; 136 + 137 + Some(BookmarkView { 138 + subject, 139 + item, 140 + created_at: bookmark.created_at, 141 + }) 142 + }) 143 + .collect(); 144 + 145 + Ok(Json(GetBookmarksRes { cursor, bookmarks })) 146 + }
+4
parakeet/src/xrpc/app_bsky/mod.rs
··· 2 2 use axum::Router; 3 3 4 4 mod actor; 5 + mod bookmark; 5 6 mod feed; 6 7 mod graph; 7 8 mod labeler; ··· 14 15 // TODO: app.bsky.actor.getSuggestions (recs) 15 16 // TODO: app.bsky.actor.searchActor (search) 16 17 // TODO: app.bsky.actor.searchActorTypeahead (search) 18 + .route("/app.bsky.bookmark.createBookmark", post(bookmark::create_bookmark)) 19 + .route("/app.bsky.bookmark.deleteBookmark", post(bookmark::delete_bookmark)) 20 + .route("/app.bsky.bookmark.getBookmarks", get(bookmark::get_bookmarks)) 17 21 .route("/app.bsky.feed.getActorFeeds", get(feed::feedgen::get_actor_feeds)) 18 22 .route("/app.bsky.feed.getActorLikes", get(feed::likes::get_actor_likes)) 19 23 .route("/app.bsky.feed.getAuthorFeed", get(feed::posts::get_author_feed))
+69
parakeet/src/xrpc/community_lexicon/bookmarks.rs
··· 1 + use crate::xrpc::datetime_cursor; 2 + use crate::xrpc::error::XrpcResult; 3 + use crate::xrpc::extract::AtpAuth; 4 + use crate::GlobalState; 5 + use axum::extract::{Query, State}; 6 + use axum::Json; 7 + use diesel::prelude::*; 8 + use diesel_async::RunQueryDsl; 9 + use lexica::community_lexicon::bookmarks::Bookmark; 10 + use parakeet_db::{models, schema}; 11 + use serde::{Deserialize, Serialize}; 12 + 13 + #[derive(Debug, Deserialize)] 14 + pub struct BookmarkCursorQuery { 15 + pub tags: Option<Vec<String>>, 16 + pub limit: Option<u8>, 17 + pub cursor: Option<String>, 18 + } 19 + 20 + #[derive(Debug, Serialize)] 21 + pub struct GetActorBookmarksRes { 22 + #[serde(skip_serializing_if = "Option::is_none")] 23 + cursor: Option<String>, 24 + bookmarks: Vec<Bookmark>, 25 + } 26 + 27 + pub async fn get_actor_bookmarks( 28 + State(state): State<GlobalState>, 29 + auth: AtpAuth, 30 + Query(query): Query<BookmarkCursorQuery>, 31 + ) -> XrpcResult<Json<GetActorBookmarksRes>> { 32 + let mut conn = state.pool.get().await?; 33 + 34 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 35 + 36 + let mut bookmarks_query = schema::bookmarks::table 37 + .select(models::Bookmark::as_select()) 38 + .filter(schema::bookmarks::did.eq(&auth.0)) 39 + .into_boxed(); 40 + 41 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 42 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::created_at.lt(cursor)); 43 + } 44 + 45 + if let Some(tags) = query.tags { 46 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::tags.contains(tags)); 47 + } 48 + 49 + let results = bookmarks_query 50 + .order(schema::bookmarks::created_at.desc()) 51 + .limit(limit as i64) 52 + .load(&mut conn) 53 + .await?; 54 + 55 + let cursor = results 56 + .last() 57 + .map(|bm| bm.created_at.timestamp_millis().to_string()); 58 + 59 + let bookmarks = results 60 + .into_iter() 61 + .map(|bookmark| Bookmark { 62 + subject: bookmark.subject, 63 + tags: bookmark.tags.into_iter().flatten().collect(), 64 + created_at: bookmark.created_at, 65 + }) 66 + .collect(); 67 + 68 + Ok(Json(GetActorBookmarksRes { cursor, bookmarks })) 69 + }
+10
parakeet/src/xrpc/community_lexicon/mod.rs
··· 1 + use axum::routing::get; 2 + use axum::Router; 3 + 4 + pub mod bookmarks; 5 + 6 + #[rustfmt::skip] 7 + pub fn routes() -> Router<crate::GlobalState> { 8 + Router::new() 9 + .route("/community.lexicon.bookmarks.getActorBookmarks", get(bookmarks::get_actor_bookmarks)) 10 + }
+2
parakeet/src/xrpc/mod.rs
··· 8 8 mod app_bsky; 9 9 pub mod cdn; 10 10 mod com_atproto; 11 + mod community_lexicon; 11 12 mod error; 12 13 pub mod extract; 13 14 pub mod jwt; ··· 16 17 Router::new() 17 18 .merge(app_bsky::routes()) 18 19 .merge(com_atproto::routes()) 20 + .merge(community_lexicon::routes()) 19 21 } 20 22 21 23 fn datetime_cursor(cursor: Option<&String>) -> Option<chrono::DateTime<chrono::Utc>> {