A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz

Handle scrobble deletions across services

Add delete handling for scrobbles: in jetstream handle "delete"
commits for SCROBBLE_NSID by deleting the DB record and publishing a
"rocksky.delete.scrobble" NATS event; add a delete_scrobble helper.
In analytics add on_delete_scrobble to subscribe to that subject and
remove scrobbles from the local DB. Remove an unused NotNull import.

+64 -1
+1 -1
apps/api/src/xrpc/app/rocksky/actor/getProfile.ts
··· 3 3 import type { OutputSchema } from "@atproto/api/dist/client/types/com/atproto/repo/getRecord"; 4 4 import type { HandlerAuth } from "@atproto/xrpc-server"; 5 5 import type { Context } from "context"; 6 - import { eq, NotNull } from "drizzle-orm"; 6 + import { eq } from "drizzle-orm"; 7 7 import { Effect, pipe } from "effect"; 8 8 import type { Server } from "lexicon"; 9 9 import type { ProfileViewDetailed } from "lexicon/types/app/rocksky/actor/defs";
+38
crates/analytics/src/subscriber/mod.rs
··· 24 24 on_like(nc.clone(), conn.clone()); 25 25 on_unlike(nc.clone(), conn.clone()); 26 26 on_new_user(nc.clone(), conn.clone()); 27 + on_delete_scrobble(nc, conn.clone()); 27 28 28 29 Ok(()) 29 30 } ··· 190 191 tracing::error!("Error parsing payload: {}", e); 191 192 tracing::debug!("{}", data); 192 193 } 194 + } 195 + } 196 + 197 + Ok::<(), Error>(()) 198 + })?; 199 + 200 + Ok::<(), Error>(()) 201 + }); 202 + } 203 + 204 + pub fn on_delete_scrobble(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) { 205 + thread::spawn(move || { 206 + let rt = tokio::runtime::Runtime::new().unwrap(); 207 + let conn = conn.clone(); 208 + let nc = nc.clone(); 209 + rt.block_on(async { 210 + let nc = nc.lock().unwrap(); 211 + let mut sub = nc.subscribe("rocksky.delete.scrobble".to_string()).await?; 212 + drop(nc); 213 + 214 + while let Some(msg) = sub.next().await { 215 + let uri = String::from_utf8(msg.payload.to_vec()).unwrap(); 216 + match delete_scrobble(conn.clone(), &uri).await { 217 + Ok(_) => tracing::info!(uri = %uri.cyan(), "Scrobble deleted successfully"), 218 + Err(e) => tracing::error!("Error deleting scrobble: {}", e), 193 219 } 194 220 } 195 221 ··· 776 802 tracing::error!("[users] error: {}", e); 777 803 return Err(e.into()); 778 804 } 805 + } 806 + } 807 + Ok(()) 808 + } 809 + 810 + pub async fn delete_scrobble(conn: Arc<Mutex<Connection>>, uri: &str) -> Result<(), Error> { 811 + let conn = conn.lock().unwrap(); 812 + match conn.execute("DELETE FROM scrobbles WHERE uri = ?", params![uri]) { 813 + Ok(_) => (), 814 + Err(e) => { 815 + tracing::error!("[scrobbles] error: {}", e); 816 + return Err(e.into()); 779 817 } 780 818 } 781 819 Ok(())
+25
crates/jetstream/src/repo.rs
··· 233 233 publish_user(&nc, &pool, &subject_user_id).await?; 234 234 } 235 235 } 236 + "delete" => { 237 + if commit.collection == SCROBBLE_NSID { 238 + let uri = format!("at://{}/app.rocksky.scrobble/{}", did, commit.rkey); 239 + match delete_scrobble(&pool, &uri).await { 240 + Ok(_) => { 241 + nc.publish("rocksky.delete.scrobble", uri.into()).await?; 242 + nc.flush().await?; 243 + tracing::info!(operation = %commit.operation, collection = %commit.collection, "Scrobble deleted"); 244 + } 245 + Err(e) => { 246 + tracing::error!(error = %e, operation = %commit.operation, collection = %commit.collection, "Failed to delete scrobble"); 247 + } 248 + } 249 + } else { 250 + tracing::warn!(operation = %commit.operation, collection = %commit.collection, "Delete operation not implemented for this collection"); 251 + } 252 + } 236 253 _ => { 237 254 tracing::warn!(operation = %commit.operation, "Unsupported operation"); 238 255 } ··· 1180 1197 .await?; 1181 1198 Ok(()) 1182 1199 } 1200 + 1201 + pub async fn delete_scrobble(pool: &Pool<Postgres>, uri: &str) -> Result<(), Error> { 1202 + sqlx::query("DELETE FROM scrobbles WHERE uri = $1") 1203 + .bind(uri) 1204 + .execute(pool) 1205 + .await?; 1206 + Ok(()) 1207 + }