Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

fix(parakeet): mute and list related clownery

+39 -39
+25 -30
parakeet/src/xrpc/app_bsky/graph/lists.rs
··· 21 21 } 22 22 23 23 #[derive(Debug, Serialize)] 24 - pub struct AppBskyGraphGetListsRes { 24 + pub struct GetListsRes { 25 25 #[serde(skip_serializing_if = "Option::is_none")] 26 26 cursor: Option<String>, 27 27 lists: Vec<ListView>, ··· 32 32 AtpAcceptLabelers(labelers): AtpAcceptLabelers, 33 33 maybe_auth: Option<AtpAuth>, 34 34 Query(query): Query<ActorWithCursorQuery>, 35 - ) -> XrpcResult<Json<AppBskyGraphGetListsRes>> { 35 + ) -> XrpcResult<Json<GetListsRes>> { 36 36 let mut conn = state.pool.get().await?; 37 37 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 38 38 ··· 70 70 .filter_map(|(_, uri)| lists.remove(&uri)) 71 71 .collect(); 72 72 73 - Ok(Json(AppBskyGraphGetListsRes { cursor, lists })) 73 + Ok(Json(GetListsRes { cursor, lists })) 74 74 } 75 75 76 76 #[derive(Debug, Serialize)] ··· 138 138 })) 139 139 } 140 140 141 - #[derive(Debug, Serialize)] 142 - pub struct GetListMutesRes { 143 - #[serde(skip_serializing_if = "Option::is_none")] 144 - cursor: Option<String>, 145 - lists: Vec<ListView>, 146 - } 147 - 148 141 pub async fn get_list_mutes( 149 142 State(state): State<GlobalState>, 150 143 AtpAcceptLabelers(labelers): AtpAcceptLabelers, 151 144 auth: AtpAuth, 152 145 Query(query): Query<CursorQuery>, 153 - ) -> XrpcResult<Json<GetListMutesRes>> { 146 + ) -> XrpcResult<Json<GetListsRes>> { 154 147 let mut conn = state.pool.get().await?; 155 148 let did = auth.0.clone(); 156 149 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, Some(auth)); ··· 158 151 let limit = query.limit.unwrap_or(50).clamp(1, 100); 159 152 160 153 let mut mutes_query = schema::list_mutes::table 161 - .select(schema::list_mutes::list_uri) 154 + .select((schema::list_mutes::created_at, schema::list_mutes::list_uri)) 162 155 .filter(schema::list_mutes::did.eq(did)) 163 156 .into_boxed(); 164 157 165 - if let Some(cursor) = query.cursor { 166 - mutes_query = mutes_query.filter(schema::list_mutes::list_uri.lt(cursor)); 158 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 159 + mutes_query = mutes_query.filter(schema::list_mutes::created_at.lt(cursor)); 167 160 } 168 161 169 - let mutes = mutes_query 170 - .order(schema::list_mutes::list_uri.desc()) 162 + let results = mutes_query 163 + .order(schema::list_mutes::created_at.desc()) 171 164 .limit(limit as i64) 172 - .load(&mut conn) 165 + .load::<(chrono::DateTime<chrono::Utc>, String)>(&mut conn) 173 166 .await?; 174 167 175 - let lists = hyd.hydrate_lists(mutes).await; 176 - let mutes = lists.into_values().collect::<Vec<_>>(); 177 - let cursor = mutes.last().map(|v| v.uri.clone()); 168 + let cursor = results 169 + .last() 170 + .map(|(last, _)| last.timestamp_millis().to_string()); 178 171 179 - Ok(Json(GetListMutesRes { 180 - cursor, 181 - lists: mutes, 182 - })) 172 + let uris = results.iter().map(|(_, uri)| uri.clone()).collect(); 173 + 174 + let lists = hyd.hydrate_lists(uris).await; 175 + let lists = lists.into_values().collect::<Vec<_>>(); 176 + 177 + Ok(Json(GetListsRes { cursor, lists })) 183 178 } 184 179 185 180 pub async fn get_list_blocks( ··· 187 182 AtpAcceptLabelers(labelers): AtpAcceptLabelers, 188 183 auth: AtpAuth, 189 184 Query(query): Query<CursorQuery>, 190 - ) -> XrpcResult<Json<GetListMutesRes>> { 185 + ) -> XrpcResult<Json<GetListsRes>> { 191 186 let mut conn = state.pool.get().await?; 192 187 let did = auth.0.clone(); 193 188 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, Some(auth)); ··· 195 190 let limit = query.limit.unwrap_or(50).clamp(1, 100); 196 191 197 192 let mut blocks_query = schema::list_blocks::table 198 - .select((schema::list_blocks::created_at, schema::list_blocks::list_uri)) 193 + .select(( 194 + schema::list_blocks::created_at, 195 + schema::list_blocks::list_uri, 196 + )) 199 197 .filter(schema::list_blocks::did.eq(did)) 200 198 .into_boxed(); 201 199 ··· 218 216 let lists = hyd.hydrate_lists(uris).await; 219 217 let lists = lists.into_values().collect::<Vec<_>>(); 220 218 221 - Ok(Json(GetListMutesRes { 222 - cursor, 223 - lists, 224 - })) 219 + Ok(Json(GetListsRes { cursor, lists })) 225 220 }
+14 -9
parakeet/src/xrpc/app_bsky/graph/mutes.rs
··· 1 1 use crate::hydration::StatefulHydrator; 2 2 use crate::xrpc::error::XrpcResult; 3 3 use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 4 - use crate::xrpc::CursorQuery; 4 + use crate::xrpc::{datetime_cursor, CursorQuery}; 5 5 use crate::GlobalState; 6 6 use axum::extract::{Query, State}; 7 7 use axum::Json; ··· 31 31 let limit = query.limit.unwrap_or(50).clamp(1, 100); 32 32 33 33 let mut muted_query = schema::mutes::table 34 - .select(schema::mutes::subject) 34 + .select((schema::mutes::created_at, schema::mutes::subject)) 35 35 .filter(schema::mutes::did.eq(did)) 36 36 .into_boxed(); 37 37 38 - if let Some(cursor) = query.cursor { 39 - muted_query = muted_query.filter(schema::mutes::subject.lt(cursor)); 38 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 39 + muted_query = muted_query.filter(schema::mutes::created_at.lt(cursor)); 40 40 } 41 41 42 - let muted = muted_query 43 - .order(schema::mutes::subject.desc()) 42 + let results = muted_query 43 + .order(schema::mutes::created_at.desc()) 44 44 .limit(limit as i64) 45 - .load(&mut conn) 45 + .load::<(chrono::DateTime<chrono::Utc>, String)>(&mut conn) 46 46 .await?; 47 47 48 - let profiles = hyd.hydrate_profiles(muted).await; 48 + let cursor = results 49 + .last() 50 + .map(|(last, _)| last.timestamp_millis().to_string()); 51 + 52 + let dids = results.iter().map(|(_, did)| did.clone()).collect(); 53 + 54 + let profiles = hyd.hydrate_profiles(dids).await; 49 55 let mutes = profiles.into_values().collect::<Vec<_>>(); 50 - let cursor = mutes.last().map(|v| v.did.clone()); 51 56 52 57 Ok(Json(GetMutesRes { cursor, mutes })) 53 58 }