A decentralized music tracking and discovery platform built on AT Protocol 🎵

feat: implement feed generator functionality with new record type and save logic

+82 -4
+69 -4
crates/jetstream/src/repo.rs
··· 8 8 9 9 use crate::{ 10 10 profile::did_to_profile, 11 - subscriber::{ALBUM_NSID, ARTIST_NSID, SCROBBLE_NSID, SONG_NSID}, 12 - types::{AlbumRecord, ArtistRecord, Commit, ScrobbleRecord, SongRecord}, 11 + subscriber::{ALBUM_NSID, ARTIST_NSID, FEED_GENERATOR_NSID, SCROBBLE_NSID, SONG_NSID}, 12 + types::{AlbumRecord, ArtistRecord, Commit, FeedGeneratorRecord, ScrobbleRecord, SongRecord}, 13 13 webhook::discord::{ 14 14 self, 15 15 model::{ScrobbleData, WebhookEnvelope}, ··· 29 29 commit: Commit, 30 30 ) -> Result<(), Error> { 31 31 // skip unknown collection 32 - if !vec![SCROBBLE_NSID, ARTIST_NSID, ALBUM_NSID, SONG_NSID] 33 - .contains(&commit.collection.as_str()) 32 + if !vec![ 33 + SCROBBLE_NSID, 34 + ARTIST_NSID, 35 + ALBUM_NSID, 36 + SONG_NSID, 37 + FEED_GENERATOR_NSID, 38 + ] 39 + .contains(&commit.collection.as_str()) 34 40 { 35 41 return Ok(()); 36 42 } ··· 169 175 let song_record: SongRecord = serde_json::from_value(commit.record.clone())?; 170 176 save_user_track(&mut tx, &user_id, song_record.clone(), &uri).await?; 171 177 update_track_uri(&mut tx, &user_id, song_record, &uri).await?; 178 + 179 + tx.commit().await?; 180 + } 181 + 182 + if commit.collection == FEED_GENERATOR_NSID { 183 + let mut tx = pool.begin().await?; 184 + 185 + let user_id = save_user(&mut tx, did).await?; 186 + let uri = format!("at://{}/app.rocksky.feed.generator/{}", did, commit.rkey); 187 + 188 + let feed_generator_record: FeedGeneratorRecord = 189 + serde_json::from_value(commit.record)?; 190 + save_feed_generator(&mut tx, &user_id, feed_generator_record, &uri).await?; 172 191 173 192 tx.commit().await?; 174 193 } ··· 1011 1030 1012 1031 Ok(()) 1013 1032 } 1033 + 1034 + pub async fn save_feed_generator( 1035 + tx: &mut sqlx::Transaction<'_, Postgres>, 1036 + user_id: &str, 1037 + record: FeedGeneratorRecord, 1038 + uri: &str, 1039 + ) -> Result<(), Error> { 1040 + let did = uri 1041 + .split('/') 1042 + .nth(2) 1043 + .ok_or_else(|| anyhow::anyhow!("Invalid URI: {}", uri))?; 1044 + let avatar = record.avatar.map(|blob| { 1045 + format!( 1046 + "https://cdn.bsky.app/img/avatar/plain/{}/{}@{}", 1047 + did, 1048 + blob.r#ref.link, 1049 + blob.mime_type.split('/').last().unwrap_or("jpeg") 1050 + ) 1051 + }); 1052 + 1053 + tracing::info!(user_id = %user_id, display_name = %record.display_name, uri = %uri, "Saving feed generator"); 1054 + 1055 + sqlx::query( 1056 + r#" 1057 + INSERT INTO feeds ( 1058 + user_id, 1059 + uri, 1060 + display_name, 1061 + description, 1062 + did, 1063 + avatar 1064 + ) VALUES ( 1065 + $1, $2, $3, $4, $5, $6 1066 + ) 1067 + "#, 1068 + ) 1069 + .bind(user_id) 1070 + .bind(uri) 1071 + .bind(record.display_name) 1072 + .bind(record.description) 1073 + .bind(record.did) 1074 + .bind(avatar) 1075 + .execute(&mut **tx) 1076 + .await?; 1077 + Ok(()) 1078 + }
+1
crates/jetstream/src/subscriber.rs
··· 16 16 pub const PLAYLIST_NSID: &str = "app.rocksky.playlist"; 17 17 pub const LIKE_NSID: &str = "app.rocksky.like"; 18 18 pub const SHOUT_NSID: &str = "app.rocksky.shout"; 19 + pub const FEED_GENERATOR_NSID: &str = "app.rocksky.feed.generator"; 19 20 20 21 pub struct ScrobbleSubscriber { 21 22 pub service_url: String,
+12
crates/jetstream/src/types.rs
··· 232 232 #[serde(skip_serializing_if = "Option::is_none")] 233 233 pub mbid: Option<String>, 234 234 } 235 + 236 + #[derive(Debug, Deserialize, Clone)] 237 + #[serde(rename_all = "camelCase")] 238 + pub struct FeedGeneratorRecord { 239 + pub display_name: String, 240 + #[serde(skip_serializing_if = "Option::is_none")] 241 + pub description: Option<String>, 242 + #[serde(skip_serializing_if = "Option::is_none")] 243 + pub avatar: Option<ImageBlob>, 244 + pub did: String, 245 + pub created_at: String, 246 + }