A decentralized music tracking and discovery platform built on AT Protocol 🎵

[api] simplify scrobble logic, listen to jetstream

+697 -327
+3 -2
crates/jetstream/src/main.rs
··· 1 1 use std::env; 2 2 3 - use subscriber::{ScrobbleSubscriber, ALBUM_NSID, ARTIST_NSID, SCROBBLE_NSID, SONG_NSID}; 3 + use subscriber::ScrobbleSubscriber; 4 4 use dotenv::dotenv; 5 5 6 6 pub mod subscriber; ··· 13 13 async fn main() -> Result<(), anyhow::Error> { 14 14 dotenv()?; 15 15 let jetstream_server = env::var("JETSTREAM_SERVER").unwrap_or_else(|_| "wss://jetstream2.us-east.bsky.network".to_string()); 16 - let url = format!("{}/subscribe?wantedCollections={},{},{},{}", jetstream_server, SCROBBLE_NSID, ARTIST_NSID, ALBUM_NSID, SONG_NSID); 16 + let url = format!("{}/subscribe?wantedCollections=app.rocksky.*", jetstream_server); 17 17 let subscriber = ScrobbleSubscriber::new(&url); 18 + 18 19 subscriber.run().await?; 19 20 Ok(()) 20 21 }
+16 -1
crates/jetstream/src/profile.rs
··· 1 1 use anyhow::Error; 2 - use tokio::io::split; 3 2 4 3 use crate::types::{Profile, ProfileResponse}; 5 4 ··· 49 48 assert_eq!(profile.r#type, "app.bsky.actor.profile"); 50 49 assert!(profile.display_name.map(|s| s.starts_with("Tsiry Sandratraina")).unwrap_or(false)); 51 50 assert!(profile.handle.map(|s| s == "tsiry-sandratraina.com").unwrap_or(false)); 51 + 52 + let did = "did:plc:fgvx5xqinqoqgpfhito5er3s"; 53 + let profile = did_to_profile(did).await?; 54 + 55 + assert_eq!(profile.r#type, "app.bsky.actor.profile"); 56 + assert!(profile.display_name.map(|s| s.starts_with("Lixtrix")).unwrap_or(false)); 57 + assert!(profile.handle.map(|s| s == "lixtrix.art").unwrap_or(false)); 58 + 59 + let did = "did:plc:d5jvs7uo4z6lw63zzreukgt4"; 60 + let profile = did_to_profile(did).await?; 61 + assert_eq!(profile.r#type, "app.bsky.actor.profile"); 62 + 63 + let did = "did:plc:gwxwdfmun3aqaiu5mx7nnyof"; 64 + let profile = did_to_profile(did).await?; 65 + assert_eq!(profile.r#type, "app.bsky.actor.profile"); 66 + 52 67 Ok(()) 53 68 } 54 69 }
+516 -30
crates/jetstream/src/repo.rs
··· 1 + use std::sync::Arc; 2 + 1 3 use anyhow::Error; 4 + use owo_colors::OwoColorize; 2 5 use sqlx::{Pool, Postgres}; 6 + use tokio::sync::Mutex; 3 7 4 - use crate::{profile::did_to_profile, subscriber::{ALBUM_NSID, ARTIST_NSID, SCROBBLE_NSID, SONG_NSID}, types::{AlbumRecord, ArtistRecord, Commit, ScrobbleRecord, SongRecord}, xata::{album_track::AlbumTrack, artist::Artist, track::{self, Track}, user::User}}; 8 + use crate::{profile::did_to_profile, subscriber::{ALBUM_NSID, ARTIST_NSID, SCROBBLE_NSID, SONG_NSID}, types::{AlbumRecord, ArtistRecord, Commit, ScrobbleRecord, SongRecord}, xata::{album::Album, album_track::AlbumTrack, artist::Artist, artist_album::ArtistAlbum, artist_track::ArtistTrack, track::Track, user::User, user_album::UserAlbum, user_artist::UserArtist, user_track::UserTrack}}; 5 9 6 - pub async fn save_scrobble(pool: &Pool<Postgres>, did: &str, commit: Commit) -> Result<(), Error> { 10 + pub async fn save_scrobble(pool: Arc<Mutex<Pool<Postgres>>>, did: &str, commit: Commit) -> Result<(), Error> { 7 11 // skip unknown collection 8 12 if !vec![ 9 13 SCROBBLE_NSID, 14 + ARTIST_NSID, 15 + ALBUM_NSID, 16 + SONG_NSID, 10 17 ].contains(&commit.collection.as_str()) { 11 18 return Ok(()); 12 19 } 13 20 21 + let pool = pool.lock().await; 22 + 14 23 match commit.operation.as_str() { 15 24 "create" => { 16 25 if commit.collection == SCROBBLE_NSID { 17 26 let mut tx = pool.begin().await?; 18 27 let scrobble_record: ScrobbleRecord = serde_json::from_value(commit.record.clone())?; 19 28 20 - let album_id = save_album(&mut tx, scrobble_record.clone()).await?; 29 + let album_id = save_album(&mut tx, scrobble_record.clone(), did).await?; 21 30 let artist_id = save_artist(&mut tx, scrobble_record.clone()).await?; 22 - let track_id = save_track(&mut tx, scrobble_record.clone()).await?; 31 + let track_id = save_track(&mut tx, scrobble_record.clone(), did).await?; 23 32 24 33 save_album_track(&mut tx, &album_id, &track_id).await?; 25 34 save_artist_track(&mut tx, &artist_id, &track_id).await?; ··· 29 38 30 39 let user_id = save_user(&mut tx, did).await?; 31 40 41 + println!("Saving scrobble: {} ", format!("{} - {} - {}", scrobble_record.title, scrobble_record.artist, scrobble_record.album).magenta()); 42 + 32 43 sqlx::query(r#" 33 44 INSERT INTO scrobbles ( 34 45 album_id, 35 46 artist_id, 36 47 track_id, 37 48 uri, 38 - user_id, 49 + user_id 39 50 ) VALUES ($1, $2, $3, $4, $5) 40 51 "#) 41 52 .bind(album_id) ··· 52 63 let mut tx = pool.begin().await?; 53 64 54 65 let user_id = save_user(&mut tx, did).await?; 66 + let uri = format!("at://{}/app.rocksky.artist/{}", did, commit.rkey); 55 67 56 68 let artist_record: ArtistRecord = serde_json::from_value(commit.record.clone())?; 57 - save_user_artist(&mut tx, &user_id, artist_record).await?; 69 + save_user_artist(&mut tx, &user_id, artist_record.clone(), &uri).await?; 70 + update_artist_uri(&mut tx, &user_id, artist_record, &uri).await?; 58 71 59 72 tx.commit().await?; 60 73 } ··· 62 75 if commit.collection == ALBUM_NSID { 63 76 let mut tx = pool.begin().await?; 64 77 let user_id = save_user(&mut tx, did).await?; 78 + let uri = format!("at://{}/app.rocksky.album/{}", did, commit.rkey); 65 79 66 80 let album_record: AlbumRecord = serde_json::from_value(commit.record.clone())?; 67 - save_user_album(&mut tx, &user_id, album_record).await?; 81 + save_user_album(&mut tx, &user_id, album_record.clone(), &uri).await?; 82 + update_album_uri(&mut tx, &user_id, album_record, &uri).await?; 68 83 69 84 tx.commit().await?; 70 85 } ··· 73 88 let mut tx = pool.begin().await?; 74 89 75 90 let user_id = save_user(&mut tx, did).await?; 91 + let uri = format!("at://{}/app.rocksky.song/{}", did, commit.rkey); 76 92 77 93 let song_record: SongRecord = serde_json::from_value(commit.record.clone())?; 78 - save_user_track(&mut tx, &user_id, song_record).await?; 94 + save_user_track(&mut tx, &user_id, song_record.clone(), &uri).await?; 95 + update_track_uri(&mut tx, &user_id, song_record, &uri).await?; 79 96 80 97 tx.commit().await?; 81 98 } ··· 117 134 Ok(users[0].xata_id.clone()) 118 135 } 119 136 120 - pub async fn save_track(tx: &mut sqlx::Transaction<'_, Postgres>, scrobble_record: ScrobbleRecord) -> Result<String, Error> { 137 + pub async fn save_track(tx: &mut sqlx::Transaction<'_, Postgres>, scrobble_record: ScrobbleRecord, did: &str) -> Result<String, Error> { 121 138 let uri: Option<String> = None; 122 139 let hash = sha256::digest( 123 140 format!( ··· 137 154 return Ok(tracks[0].xata_id.clone()); 138 155 } 139 156 140 - let did = ""; 141 157 sqlx::query(r#" 142 158 INSERT INTO tracks ( 143 159 title, ··· 155 171 copyright_message, 156 172 uri, 157 173 spotify_link, 158 - label, 174 + label 159 175 ) VALUES ( 160 176 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16 161 177 ) ··· 167 183 .bind(scrobble_record.album_artist) 168 184 .bind(scrobble_record.track_number) 169 185 .bind(scrobble_record.duration) 186 + .bind(scrobble_record.mbid) 170 187 .bind(scrobble_record.composer) 171 188 .bind(scrobble_record.lyrics) 172 189 .bind(scrobble_record.disc_number) 173 - .bind(hash) 190 + .bind(&hash) 174 191 .bind(scrobble_record.copyright_message) 175 192 .bind(uri) 176 193 .bind(scrobble_record.spotify_link) 177 194 .bind(scrobble_record.label) 178 195 .execute(&mut **tx).await?; 179 - todo!() 196 + 197 + let tracks: Vec<Track> = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1") 198 + .bind(&hash) 199 + .fetch_all(&mut **tx) 200 + .await?; 201 + 202 + Ok(tracks[0].xata_id.clone()) 180 203 } 181 204 182 - pub async fn save_album(tx: &mut sqlx::Transaction<'_, Postgres>, scrobble_record: ScrobbleRecord) -> Result<String, Error> { 205 + pub async fn save_album(tx: &mut sqlx::Transaction<'_, Postgres>, scrobble_record: ScrobbleRecord, did: &str) -> Result<String, Error> { 183 206 let hash = sha256::digest(format!( 184 207 "{} - {}", 185 208 scrobble_record.album, ··· 188 211 .to_lowercase() 189 212 ); 190 213 191 - let albums: Vec<Artist> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1") 214 + let albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1") 192 215 .bind(&hash) 193 216 .fetch_all(&mut **tx) 194 217 .await?; 195 218 196 219 if !albums.is_empty() { 220 + println!("Album already exists: {}", albums[0].title.magenta()); 197 221 return Ok(albums[0].xata_id.clone()); 198 222 } 199 223 224 + println!("Saving album: {}", scrobble_record.album.magenta()); 225 + 200 226 let uri: Option<String> = None; 201 227 let artist_uri: Option<String> = None; 202 - let did = ""; 203 228 sqlx::query(r#" 204 229 INSERT INTO albums ( 205 230 title, ··· 209 234 release_date, 210 235 sha256, 211 236 uri, 212 - artist_uri, 237 + artist_uri 213 238 ) VALUES ( 214 239 $1, $2, $3, $4, $5, $6, $7, $8 215 240 ) ··· 219 244 .bind(scrobble_record.album_art.map(|x| format!("https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}", did, x.r#ref.link, x.mime_type.split('/').last().unwrap_or("jpeg")))) 220 245 .bind(scrobble_record.year) 221 246 .bind(scrobble_record.release_date) 222 - .bind(hash) 247 + .bind(&hash) 223 248 .bind(uri) 224 249 .bind(artist_uri) 225 250 .execute(&mut **tx).await?; 226 - todo!() 251 + 252 + let albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1") 253 + .bind(&hash) 254 + .fetch_all(&mut **tx) 255 + .await?; 256 + 257 + Ok(albums[0].xata_id.clone()) 227 258 } 228 259 229 260 pub async fn save_artist(tx: &mut sqlx::Transaction<'_, Postgres>, scrobble_record: ScrobbleRecord) -> Result<String, Error> { ··· 234 265 .await?; 235 266 236 267 if !artists.is_empty() { 268 + println!("Artist already exists: {}", artists[0].name.magenta()); 237 269 return Ok(artists[0].xata_id.clone()); 238 270 } 271 + 272 + println!("Saving artist: {}", scrobble_record.album_artist.magenta()); 239 273 240 274 let uri: Option<String> = None; 241 275 let picture = ""; ··· 244 278 name, 245 279 sha256, 246 280 uri, 247 - picture, 281 + picture 248 282 ) VALUES ( 249 283 $1, $2, $3, $4 250 284 ) 251 285 "#) 252 286 .bind(scrobble_record.artist) 253 - .bind(hash) 287 + .bind(&hash) 254 288 .bind(uri) 255 289 .bind(picture) 256 290 .execute(&mut **tx).await?; 257 - todo!() 291 + 292 + let artists: Vec<Artist> = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1") 293 + .bind(&hash) 294 + .fetch_all(&mut **tx) 295 + .await?; 296 + 297 + Ok(artists[0].xata_id.clone()) 258 298 } 259 299 260 300 pub async fn save_album_track(tx: &mut sqlx::Transaction<'_, Postgres>, album_id: &str, track_id: &str) -> Result<(), Error> { ··· 265 305 .await?; 266 306 267 307 if !album_tracks.is_empty() { 308 + println!("Album track already exists: {}", format!("{} - {}", album_id, track_id).magenta()); 268 309 return Ok(()); 269 310 } 270 311 312 + println!("Saving album track: {}", format!("{} - {}", album_id, track_id).magenta()); 313 + 271 314 sqlx::query(r#" 272 315 INSERT INTO album_tracks ( 273 316 album_id, 274 - track_id, 317 + track_id 275 318 ) VALUES ( 276 319 $1, $2 277 320 ) ··· 283 326 } 284 327 285 328 pub async fn save_artist_track(tx: &mut sqlx::Transaction<'_, Postgres>, artist_id: &str, track_id: &str) -> Result<(), Error> { 286 - let artist_tracks : Vec<AlbumTrack> = sqlx::query_as("SELECT * FROM artist_tracks WHERE artist_id = $1 AND track_id = $2") 329 + let artist_tracks : Vec<ArtistTrack> = sqlx::query_as("SELECT * FROM artist_tracks WHERE artist_id = $1 AND track_id = $2") 287 330 .bind(artist_id) 288 331 .bind(track_id) 289 332 .fetch_all(&mut **tx) 290 333 .await?; 291 334 292 335 if !artist_tracks.is_empty() { 336 + println!("Artist track already exists: {}", format!("{} - {}", artist_id, track_id).magenta()); 293 337 return Ok(()); 294 338 } 295 339 340 + println!("Saving artist track: {}", format!("{} - {}", artist_id, track_id).magenta()); 341 + 296 342 sqlx::query(r#" 297 343 INSERT INTO artist_tracks ( 298 344 artist_id, 299 - track_id, 345 + track_id 300 346 ) VALUES ( 301 347 $1, $2 302 348 ) ··· 308 354 } 309 355 310 356 pub async fn save_artist_album(tx: &mut sqlx::Transaction<'_, Postgres>, artist_id: &str, album_id: &str) -> Result<(), Error> { 311 - let artist_albums : Vec<AlbumTrack> = sqlx::query_as("SELECT * FROM artist_albums WHERE artist_id = $1 AND album_id = $2") 357 + let artist_albums : Vec<ArtistAlbum> = sqlx::query_as("SELECT * FROM artist_albums WHERE artist_id = $1 AND album_id = $2") 312 358 .bind(artist_id) 313 359 .bind(album_id) 314 360 .fetch_all(&mut **tx) 315 361 .await?; 316 362 317 363 if !artist_albums.is_empty() { 364 + println!("Artist album already exists: {}", format!("{} - {}", artist_id, album_id).magenta()); 318 365 return Ok(()); 319 366 } 320 367 368 + println!("Saving artist album: {}", format!("{} - {}", artist_id, album_id).magenta()); 369 + 321 370 sqlx::query(r#" 322 371 INSERT INTO artist_albums ( 323 372 artist_id, 324 - album_id, 373 + album_id 325 374 ) VALUES ( 326 375 $1, $2 327 376 ) ··· 333 382 } 334 383 335 384 336 - pub async fn save_user_artist(tx: &mut sqlx::Transaction<'_, Postgres>, user_id: &str, record: ArtistRecord) -> Result<(), Error> { 385 + pub async fn save_user_artist(tx: &mut sqlx::Transaction<'_, Postgres>, user_id: &str, record: ArtistRecord, uri: &str) -> Result<(), Error> { 386 + let hash = sha256::digest(record.name.to_lowercase()); 387 + 388 + let mut artists: Vec<Artist> = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1") 389 + .bind(&hash) 390 + .fetch_all(&mut **tx) 391 + .await?; 392 + 393 + let users: Vec<User> = sqlx::query_as("SELECT * FROM users WHERE xata_id = $1") 394 + .bind(user_id) 395 + .fetch_all(&mut **tx) 396 + .await?; 397 + 398 + let artist_id: &str; 399 + 400 + match artists.is_empty() { 401 + true => { 402 + println!("Saving artist: {}", record.name.magenta()); 403 + let did = users[0].did.clone(); 404 + sqlx::query(r#" 405 + INSERT INTO artists ( 406 + name, 407 + sha256, 408 + uri, 409 + picture 410 + ) VALUES ( 411 + $1, $2, $3, $4 412 + ) 413 + "#) 414 + .bind(record.name) 415 + .bind(&hash) 416 + .bind(uri) 417 + .bind(record.picture.map(|x| format!("https://cdn.bsky.app/img/avatar/plain/{}/{}@{}", did, x.r#ref.link, x.mime_type.split('/').last().unwrap_or("jpeg")))) 418 + .execute(&mut **tx).await?; 419 + 420 + artists = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1") 421 + .bind(&hash) 422 + .fetch_all(&mut **tx) 423 + .await?; 424 + artist_id = &artists[0].xata_id; 425 + }, 426 + false => { 427 + artist_id = &artists[0].xata_id; 428 + } 429 + }; 430 + 431 + let user_artists: Vec<UserArtist> = sqlx::query_as("SELECT * FROM user_artists WHERE user_id = $1 AND artist_id = $2") 432 + .bind(user_id) 433 + .bind(artist_id) 434 + .fetch_all(&mut **tx) 435 + .await?; 436 + 437 + if !user_artists.is_empty() { 438 + println!("User artist already exists: {}", format!("{} - {}", user_id, artist_id).magenta()); 439 + sqlx::query(r#" 440 + UPDATE user_artists 441 + SET scrobbles = scrobbles + 1, 442 + uri = $3 443 + WHERE user_id = $1 AND artist_id = $2 444 + "#) 445 + .bind(user_id) 446 + .bind(artist_id) 447 + .bind(uri) 448 + .execute(&mut **tx).await?; 449 + return Ok(()); 450 + } 451 + 452 + println!("Saving user artist: {}", format!("{} - {}", user_id, artist_id).magenta()); 453 + 454 + sqlx::query(r#" 455 + INSERT INTO user_artists ( 456 + user_id, 457 + artist_id, 458 + uri, 459 + scrobbles 460 + ) VALUES ( 461 + $1, $2, $3, $4 462 + ) 463 + "#) 464 + .bind(user_id) 465 + .bind(artist_id) 466 + .bind(uri) 467 + .bind(1) 468 + .execute(&mut **tx).await?; 337 469 Ok(()) 338 470 } 339 471 340 - pub async fn save_user_album(tx: &mut sqlx::Transaction<'_, Postgres>, user_id: &str, record: AlbumRecord) -> Result<(), Error> { 472 + pub async fn save_user_album(tx: &mut sqlx::Transaction<'_, Postgres>, user_id: &str, record: AlbumRecord, uri: &str) -> Result<(), Error> { 473 + let users: Vec<User> = sqlx::query_as("SELECT * FROM users WHERE xata_id = $1") 474 + .bind(user_id) 475 + .fetch_all(&mut **tx) 476 + .await?; 477 + 478 + let hash = sha256::digest(format!( 479 + "{} - {}", 480 + record.title, 481 + record.artist 482 + ) 483 + .to_lowercase() 484 + ); 485 + let mut albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1") 486 + .bind(&hash) 487 + .fetch_all(&mut **tx) 488 + .await?; 489 + 490 + let album_id: &str; 491 + 492 + match albums.is_empty() { 493 + true => { 494 + println!("Saving album: {}", record.title.magenta()); 495 + let did = users[0].did.clone(); 496 + sqlx::query(r#" 497 + INSERT INTO albums ( 498 + title, 499 + artist, 500 + album_art, 501 + year, 502 + release_date, 503 + sha256, 504 + uri 505 + ) VALUES ( 506 + $1, $2, $3, $4, $5, $6, $7 507 + ) 508 + "#) 509 + .bind(record.title) 510 + .bind(record.artist) 511 + .bind(record.album_art.map(|x| format!("https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}", did, x.r#ref.link, x.mime_type.split('/').last().unwrap_or("jpeg")))) 512 + .bind(record.year) 513 + .bind(record.release_date) 514 + .bind(&hash) 515 + .bind(uri) 516 + .execute(&mut **tx).await?; 517 + 518 + albums = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1") 519 + .bind(&hash) 520 + .fetch_all(&mut **tx) 521 + .await?; 522 + album_id = &albums[0].xata_id; 523 + }, 524 + false => { 525 + album_id = &albums[0].xata_id; 526 + } 527 + }; 528 + 529 + let user_albums: Vec<UserAlbum> = sqlx::query_as("SELECT * FROM user_albums WHERE user_id = $1 AND album_id = $2") 530 + .bind(user_id) 531 + .bind(album_id) 532 + .fetch_all(&mut **tx) 533 + .await?; 534 + 535 + if !user_albums.is_empty() { 536 + println!("User album already exists: {}", format!("{} - {}", user_id, album_id).magenta()); 537 + sqlx::query(r#" 538 + UPDATE user_albums 539 + SET scrobbles = scrobbles + 1, 540 + uri = $3 541 + WHERE user_id = $1 AND album_id = $2 542 + "#) 543 + .bind(user_id) 544 + .bind(album_id) 545 + .bind(uri) 546 + .execute(&mut **tx).await?; 547 + return Ok(()); 548 + } 549 + 550 + println!("Saving user album: {}", format!("{} - {}", user_id, album_id).magenta()); 551 + 552 + sqlx::query(r#" 553 + INSERT INTO user_albums ( 554 + user_id, 555 + album_id, 556 + uri, 557 + scrobbles 558 + ) VALUES ( 559 + $1, $2, $3, $4 560 + ) 561 + "#) 562 + .bind(user_id) 563 + .bind(album_id) 564 + .bind(uri) 565 + .bind(1) 566 + .execute(&mut **tx).await?; 341 567 Ok(()) 342 568 } 343 569 344 - pub async fn save_user_track(tx: &mut sqlx::Transaction<'_, Postgres>, user_id: &str, record: SongRecord) -> Result<(), Error> { 570 + pub async fn save_user_track(tx: &mut sqlx::Transaction<'_, Postgres>, user_id: &str, record: SongRecord, uri: &str) -> Result<(), Error> { 571 + let hash = sha256::digest(format!( 572 + "{} - {} - {}", 573 + record.title, 574 + record.artist, 575 + record.album 576 + ) 577 + .to_lowercase() 578 + ); 579 + 580 + let mut tracks: Vec<Track> = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1") 581 + .bind(&hash) 582 + .fetch_all(&mut **tx) 583 + .await?; 584 + 585 + let users: Vec<User> = sqlx::query_as("SELECT * FROM users WHERE xata_id = $1") 586 + .bind(user_id) 587 + .fetch_all(&mut **tx) 588 + .await?; 589 + 590 + let track_id: &str; 591 + 592 + match tracks.is_empty() { 593 + true => { 594 + println!("Saving track: {}", record.title.magenta()); 595 + let did = users[0].did.clone(); 596 + sqlx::query(r#" 597 + INSERT INTO tracks ( 598 + title, 599 + artist, 600 + album, 601 + album_art, 602 + album_artist, 603 + track_number, 604 + duration, 605 + mb_id, 606 + composer, 607 + lyrics, 608 + disc_number, 609 + sha256, 610 + copyright_message, 611 + uri, 612 + spotify_link, 613 + label 614 + ) VALUES ( 615 + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16 616 + ) 617 + "#) 618 + .bind(record.title) 619 + .bind(record.artist) 620 + .bind(record.album) 621 + .bind(record.album_art.map(|x| format!("https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}", did, x.r#ref.link, x.mime_type.split('/').last().unwrap_or("jpeg")))) 622 + .bind(record.album_artist) 623 + .bind(record.track_number) 624 + .bind(record.duration) 625 + .bind(record.mbid) 626 + .bind(record.composer) 627 + .bind(record.lyrics) 628 + .bind(record.disc_number) 629 + .bind(&hash) 630 + .bind(record.copyright_message) 631 + .bind(uri) 632 + .bind(record.spotify_link) 633 + .bind(record.label) 634 + .execute(&mut **tx).await?; 635 + 636 + tracks = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1") 637 + .bind(&hash) 638 + .fetch_all(&mut **tx) 639 + .await?; 640 + 641 + track_id = &tracks[0].xata_id; 642 + }, 643 + false => { 644 + track_id = &tracks[0].xata_id; 645 + } 646 + } 647 + 648 + let user_tracks: Vec<UserTrack> = sqlx::query_as("SELECT * FROM user_tracks WHERE user_id = $1 AND track_id = $2") 649 + .bind(user_id) 650 + .bind(track_id) 651 + .fetch_all(&mut **tx) 652 + .await?; 653 + 654 + if !user_tracks.is_empty() { 655 + println!("User track already exists: {}", format!("{} - {}", user_id, track_id).magenta()); 656 + sqlx::query(r#" 657 + UPDATE user_tracks 658 + SET scrobbles = scrobbles + 1, 659 + uri = $3 660 + WHERE user_id = $1 AND track_id = $2 661 + "#) 662 + .bind(user_id) 663 + .bind(track_id) 664 + .bind(uri) 665 + .execute(&mut **tx).await?; 666 + return Ok(()); 667 + } 668 + 669 + println!("Saving user track: {}", format!("{} - {}", user_id, track_id).magenta()); 670 + 671 + sqlx::query(r#" 672 + INSERT INTO user_tracks ( 673 + user_id, 674 + track_id, 675 + uri, 676 + scrobbles 677 + ) VALUES ( 678 + $1, $2, $3, $4 679 + ) 680 + "#) 681 + .bind(user_id) 682 + .bind(track_id) 683 + .bind(uri) 684 + .bind(1) 685 + .execute(&mut **tx).await?; 686 + 687 + Ok(()) 688 + } 689 + 690 + pub async fn update_artist_uri(tx: &mut sqlx::Transaction<'_, Postgres>, user_id: &str, record: ArtistRecord, uri: &str) -> Result<(), Error> { 691 + let hash = sha256::digest(record.name.to_lowercase()); 692 + let artists: Vec<Artist> = sqlx::query_as("SELECT * FROM artists WHERE sha256 = $1") 693 + .bind(&hash) 694 + .fetch_all(&mut **tx) 695 + .await?; 696 + 697 + if artists.is_empty() { 698 + println!("Artist not found: {}", record.name.magenta()); 699 + return Ok(()); 700 + } 701 + 702 + let artist_id = &artists[0].xata_id; 703 + 704 + sqlx::query(r#" 705 + UPDATE user_artists 706 + SET uri = $3 707 + WHERE user_id = $1 AND artist_id = $2 708 + "#) 709 + .bind(user_id) 710 + .bind(artist_id) 711 + .bind(uri) 712 + .execute(&mut **tx).await?; 713 + 714 + sqlx::query(r#" 715 + UPDATE tracks 716 + SET artist_uri = $2 717 + WHERE artist_uri IS NULL AND album_artist = $1 718 + "#) 719 + .bind(&record.name) 720 + .bind(uri) 721 + .execute(&mut **tx).await?; 722 + 723 + sqlx::query(r#" 724 + UPDATE artists 725 + SET uri = $2 726 + WHERE sha256 = $1 727 + "#) 728 + .bind(&hash) 729 + .bind(uri) 730 + .execute(&mut **tx).await?; 731 + 732 + sqlx::query(r#" 733 + UPDATE albums 734 + SET artist_uri = $2 735 + WHERE artist_uri IS NULL AND artist = $1 736 + "#) 737 + .bind(&record.name) 738 + .bind(uri) 739 + .execute(&mut **tx).await?; 345 740 Ok(()) 346 741 } 742 + 743 + pub async fn update_album_uri(tx: &mut sqlx::Transaction<'_, Postgres>, user_id: &str, record: AlbumRecord, uri: &str) -> Result<(), Error> { 744 + let hash = sha256::digest(format!( 745 + "{} - {}", 746 + record.title, 747 + record.artist 748 + ) 749 + .to_lowercase() 750 + ); 751 + let albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE sha256 = $1") 752 + .bind(&hash) 753 + .fetch_all(&mut **tx) 754 + .await?; 755 + if albums.is_empty() { 756 + println!("Album not found: {}", record.title.magenta()); 757 + return Ok(()); 758 + } 759 + let album_id = &albums[0].xata_id; 760 + sqlx::query(r#" 761 + UPDATE user_albums 762 + SET uri = $3 763 + WHERE user_id = $1 AND album_id = $2 764 + "#) 765 + .bind(user_id) 766 + .bind(album_id) 767 + .bind(uri) 768 + .execute(&mut **tx).await?; 769 + 770 + sqlx::query(r#" 771 + UPDATE tracks 772 + SET album_uri = $2 773 + WHERE album_uri IS NULL AND album = $1 774 + "#) 775 + .bind(record.title) 776 + .bind(uri) 777 + .execute(&mut **tx).await?; 778 + 779 + sqlx::query(r#" 780 + UPDATE albums 781 + SET uri = $2 782 + WHERE sha256 = $1 783 + "#) 784 + .bind(&hash) 785 + .bind(uri) 786 + .execute(&mut **tx).await?; 787 + 788 + Ok(()) 789 + } 790 + 791 + pub async fn update_track_uri(tx: &mut sqlx::Transaction<'_, Postgres>, user_id: &str, record: SongRecord, uri: &str) -> Result<(), Error> { 792 + let hash = sha256::digest(format!( 793 + "{} - {} - {}", 794 + record.title, 795 + record.artist, 796 + record.album 797 + ) 798 + .to_lowercase() 799 + ); 800 + let tracks: Vec<Track> = sqlx::query_as("SELECT * FROM tracks WHERE sha256 = $1") 801 + .bind(&hash) 802 + .fetch_all(&mut **tx) 803 + .await?; 804 + 805 + if tracks.is_empty() { 806 + println!("Track not found: {}", record.title.magenta()); 807 + return Ok(()); 808 + } 809 + 810 + let track_id = &tracks[0].xata_id; 811 + sqlx::query(r#" 812 + UPDATE user_tracks 813 + SET uri = $3 814 + WHERE user_id = $1 AND track_id = $2 815 + "#) 816 + .bind(user_id) 817 + .bind(track_id) 818 + .bind(uri) 819 + .execute(&mut **tx).await?; 820 + 821 + sqlx::query(r#" 822 + UPDATE tracks 823 + SET uri = $2 824 + WHERE sha256 = $1 AND uri IS NULL 825 + "#) 826 + .bind(&hash) 827 + .bind(uri) 828 + .execute(&mut **tx).await?; 829 + 830 + Ok(()) 831 + } 832 +
+29 -47
crates/jetstream/src/subscriber.rs
··· 1 - use std::env; 1 + use std::{env, sync::Arc}; 2 2 3 3 use anyhow::{Error, Context}; 4 4 use futures_util::StreamExt; 5 5 use owo_colors::OwoColorize; 6 6 use sqlx::postgres::PgPoolOptions; 7 + use tokio::sync::Mutex; 7 8 use tokio_tungstenite::{connect_async, tungstenite::Message}; 8 - use tokio::sync::mpsc; 9 + 9 10 10 - use crate::{repo::save_scrobble, types::{Commit, Root}}; 11 + use crate::{repo::save_scrobble, types::Root}; 11 12 12 13 pub const SCROBBLE_NSID: &str = "app.rocksky.scrobble"; 13 14 pub const ARTIST_NSID: &str = "app.rocksky.artist"; ··· 34 35 let db_url = env::var("XATA_POSTGRES_URL") 35 36 .context("Failed to get XATA_POSTGRES_URL environment variable")?; 36 37 37 - let (tx, rx) = mpsc::channel::<(String, Commit)>(100); 38 - 39 - let tx_clone = tx.clone(); 40 - 41 - // Start the processor task 42 - let processor = tokio::spawn(async move { 43 - let pool = PgPoolOptions::new().max_connections(5) 44 - .connect(&db_url).await?; 45 - 46 - process_scrobble_events(rx, &pool).await 47 - }); 38 + let pool = PgPoolOptions::new().max_connections(5) 39 + .connect(&db_url).await?; 40 + let pool = Arc::new(Mutex::new(pool)); 48 41 49 42 let (mut ws_stream, _) = connect_async(&self.service_url).await?; 50 43 println!("Connected to jetstream at {}", self.service_url.bright_green()); ··· 52 45 while let Some(msg) = ws_stream.next().await { 53 46 match msg { 54 47 Ok(msg) => { 55 - if let Err(e) = self.handle_message(msg, &tx_clone).await { 48 + if let Err(e) = handle_message(pool.clone(), msg).await { 56 49 eprintln!("Error handling message: {}", e); 57 50 } 58 51 } ··· 63 56 } 64 57 } 65 58 66 - drop(tx); 67 - 68 - // Wait for the processor task to complete 69 - match processor.await { 70 - Ok(result) => { 71 - if let Err(e) = result { 72 - eprintln!("Processor task had an error: {}", e); 73 - } 74 - } 75 - Err(e) => { 76 - eprintln!("Processor task panicked: {}", e); 77 - } 78 - } 79 59 80 60 Ok(()) 81 61 } 62 + } 82 63 83 - async fn handle_message( 84 - &self, 85 - msg: Message, 86 - tx: &mpsc::Sender<(String, Commit)>, 87 - ) -> Result<(), Error> { 64 + async fn handle_message( 65 + pool: Arc<Mutex<sqlx::PgPool>>, 66 + msg: Message, 67 + ) -> Result<(), Error> { 68 + tokio::spawn(async move { 88 69 if let Message::Text(text) = msg { 89 70 let message: Root = serde_json::from_str(&text)?; 71 + 72 + if message.kind != "commit" { 73 + return Ok::<(), Error>(()); 74 + } 75 + 90 76 println!("Received message: {:#?}", message); 91 77 if let Some(commit) = message.commit { 92 - tx.send((message.did, commit)).await.map_err(|e| { 93 - Error::msg(format!("Failed to send message to channel: {}", e)) 94 - })?; 78 + match save_scrobble(pool, &message.did, commit).await { 79 + Ok(_) => { 80 + println!("Scrobble saved successfully"); 81 + } 82 + Err(e) => { 83 + eprintln!("Error saving scrobble: {}", e); 84 + } 85 + } 95 86 } 96 87 } 97 - 98 88 Ok(()) 99 - } 100 - } 89 + }); 101 90 102 - async fn process_scrobble_events( 103 - mut rx: mpsc::Receiver<(String, Commit)>, 104 - pool: &sqlx::Pool<sqlx::Postgres>, 105 - ) -> Result<(), Error> { 106 - while let Some((did, record)) = rx.recv().await { 107 - save_scrobble(pool, &did, record).await?; 108 - } 109 91 Ok(()) 110 - } 92 + }
+89 -12
crates/jetstream/src/types.rs
··· 21 21 22 22 #[derive(Debug, Deserialize, Clone)] 23 23 #[serde(rename_all = "camelCase")] 24 - pub struct AlbumArt { 24 + pub struct Blob { 25 25 #[serde(rename = "$type")] 26 26 pub r#type: String, 27 27 pub r#ref: Ref, ··· 64 64 #[serde(skip_serializing_if = "Option::is_none")] 65 65 pub wiki: Option<String>, 66 66 #[serde(skip_serializing_if = "Option::is_none")] 67 - pub album_art: Option<AlbumArt>, 67 + pub album_art: Option<ImageBlob>, 68 68 #[serde(skip_serializing_if = "Option::is_none")] 69 69 pub youtube_link: Option<String>, 70 70 #[serde(skip_serializing_if = "Option::is_none")] ··· 94 94 pub r#type: String, 95 95 pub avatar: Option<Blob>, 96 96 pub banner: Option<Blob>, 97 - pub created_at: String, 98 - #[serde(rename = "pinnedPost")] 97 + pub created_at: Option<String>, 99 98 pub pinned_post: Option<PinnedPost>, 100 99 pub description: Option<String>, 101 - #[serde(rename = "displayName")] 102 100 pub display_name: Option<String>, 103 101 pub handle: Option<String>, 104 102 } 105 103 106 - #[derive(Debug, Deserialize)] 104 + #[derive(Debug, Deserialize, Clone)] 107 105 #[serde(rename_all = "camelCase")] 108 - pub struct Blob { 106 + pub struct ImageBlob { 109 107 #[serde(rename = "$type")] 110 108 pub r#type: String, 111 109 #[serde(rename = "ref")] 112 110 pub r#ref: BlobRef, 113 - #[serde(rename = "mimeType")] 114 111 pub mime_type: String, 115 112 pub size: u64, 116 113 } 117 114 118 - #[derive(Debug, Deserialize)] 115 + #[derive(Debug, Deserialize, Clone)] 119 116 pub struct BlobRef { 120 117 #[serde(rename = "$link")] 121 118 pub link: String, ··· 129 126 130 127 #[derive(Debug, Deserialize, Clone)] 131 128 #[serde(rename_all = "camelCase")] 132 - pub struct ArtistRecord {} 129 + pub struct ArtistRecord { 130 + pub name: String, 131 + #[serde(skip_serializing_if = "Option::is_none")] 132 + pub bio: Option<String>, 133 + #[serde(skip_serializing_if = "Option::is_none")] 134 + pub picture: Option<ImageBlob>, 135 + #[serde(skip_serializing_if = "Option::is_none")] 136 + pub tags: Option<Vec<String>>, 137 + #[serde(skip_serializing_if = "Option::is_none")] 138 + pub born: Option<String>, 139 + #[serde(skip_serializing_if = "Option::is_none")] 140 + pub died: Option<String>, 141 + #[serde(skip_serializing_if = "Option::is_none")] 142 + pub born_in: Option<String>, 143 + pub created_at: String, 144 + } 133 145 134 146 #[derive(Debug, Deserialize, Clone)] 135 147 #[serde(rename_all = "camelCase")] 136 - pub struct AlbumRecord {} 148 + pub struct AlbumRecord { 149 + pub title: String, 150 + pub artist: String, 151 + #[serde(skip_serializing_if = "Option::is_none")] 152 + pub duration: Option<i32>, 153 + #[serde(skip_serializing_if = "Option::is_none")] 154 + pub release_date: Option<String>, 155 + #[serde(skip_serializing_if = "Option::is_none")] 156 + pub year: Option<i32>, 157 + #[serde(skip_serializing_if = "Option::is_none")] 158 + pub genre: Option<String>, 159 + #[serde(skip_serializing_if = "Option::is_none")] 160 + pub album_art: Option<ImageBlob>, 161 + #[serde(skip_serializing_if = "Option::is_none")] 162 + pub tags: Option<Vec<String>>, 163 + #[serde(skip_serializing_if = "Option::is_none")] 164 + pub youtube_link: Option<String>, 165 + #[serde(skip_serializing_if = "Option::is_none")] 166 + pub spotify_link: Option<String>, 167 + #[serde(skip_serializing_if = "Option::is_none")] 168 + pub tidal_link: Option<String>, 169 + #[serde(skip_serializing_if = "Option::is_none")] 170 + pub apple_music_link: Option<String>, 171 + pub created_at: String, 172 + } 137 173 138 174 #[derive(Debug, Deserialize, Clone)] 139 175 #[serde(rename_all = "camelCase")] 140 - pub struct SongRecord {} 176 + pub struct SongRecord { 177 + pub title: String, 178 + pub artist: String, 179 + pub album: String, 180 + pub album_artist: String, 181 + pub duration: i32, 182 + pub created_at: String, 183 + #[serde(skip_serializing_if = "Option::is_none")] 184 + pub track_number: Option<i32>, 185 + #[serde(skip_serializing_if = "Option::is_none")] 186 + pub disc_number: Option<i32>, 187 + #[serde(skip_serializing_if = "Option::is_none")] 188 + pub genre: Option<String>, 189 + #[serde(skip_serializing_if = "Option::is_none")] 190 + pub release_date: Option<String>, 191 + #[serde(skip_serializing_if = "Option::is_none")] 192 + pub year: Option<i32>, 193 + #[serde(skip_serializing_if = "Option::is_none")] 194 + pub tags: Option<Vec<String>>, 195 + #[serde(skip_serializing_if = "Option::is_none")] 196 + pub composer: Option<String>, 197 + #[serde(skip_serializing_if = "Option::is_none")] 198 + pub lyrics: Option<String>, 199 + #[serde(skip_serializing_if = "Option::is_none")] 200 + pub copyright_message: Option<String>, 201 + #[serde(skip_serializing_if = "Option::is_none")] 202 + pub wiki: Option<String>, 203 + #[serde(skip_serializing_if = "Option::is_none")] 204 + pub album_art: Option<ImageBlob>, 205 + #[serde(skip_serializing_if = "Option::is_none")] 206 + pub youtube_link: Option<String>, 207 + #[serde(skip_serializing_if = "Option::is_none")] 208 + pub spotify_link: Option<String>, 209 + #[serde(skip_serializing_if = "Option::is_none")] 210 + pub tidal_link: Option<String>, 211 + #[serde(skip_serializing_if = "Option::is_none")] 212 + pub apple_music_link: Option<String>, 213 + #[serde(skip_serializing_if = "Option::is_none")] 214 + pub label: Option<String>, 215 + #[serde(skip_serializing_if = "Option::is_none")] 216 + pub mbid: Option<String>, 217 + }
+1
crates/jetstream/src/xata/mod.rs
··· 9 9 pub mod user; 10 10 pub mod user_album; 11 11 pub mod user_artist; 12 + pub mod user_playlist; 12 13 pub mod user_track;
+1
crates/jetstream/src/xata/user_album.rs
··· 5 5 pub xata_id: String, 6 6 pub user_id: String, 7 7 pub album_id: String, 8 + pub uri: Option<String>, 8 9 #[serde(with = "chrono::serde::ts_seconds")] 9 10 pub xata_createdat: chrono::DateTime<chrono::Utc>, 10 11 }
+1
crates/jetstream/src/xata/user_artist.rs
··· 5 5 pub xata_id: String, 6 6 pub user_id: String, 7 7 pub artist_id: String, 8 + pub uri: Option<String>, 8 9 #[serde(with = "chrono::serde::ts_seconds")] 9 10 pub xata_createdat: chrono::DateTime<chrono::Utc>, 10 11 }
+11
crates/jetstream/src/xata/user_playlist.rs
··· 1 + use serde::Deserialize; 2 + 3 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 4 + pub struct UserPlaylist { 5 + pub xata_id: String, 6 + pub user_id: String, 7 + pub playlist_id: String, 8 + pub uri: Option<String>, 9 + #[serde(with = "chrono::serde::ts_seconds")] 10 + pub xata_createdat: chrono::DateTime<chrono::Utc>, 11 + }
+3 -2
crates/jetstream/src/xata/user_track.rs
··· 1 1 use serde::Deserialize; 2 2 3 3 #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 4 - pub struct UserPlaylist { 4 + pub struct UserTrack { 5 5 pub xata_id: String, 6 6 pub user_id: String, 7 - pub playlist_id: String, 7 + pub track_id: String, 8 + pub uri: Option<String>, 8 9 #[serde(with = "chrono::serde::ts_seconds")] 9 10 pub xata_createdat: chrono::DateTime<chrono::Utc>, 10 11 }
+1 -1
rockskyapi/rocksky-auth/src/index.ts
··· 112 112 return c.text("Unauthorized"); 113 113 } 114 114 115 - await scrobbleTrack(ctx, track, user, agent); 115 + await scrobbleTrack(ctx, track, agent); 116 116 117 117 return c.json({ status: "ok" }); 118 118 });
+26 -232
rockskyapi/rocksky-auth/src/nowplaying/nowplaying.service.ts
··· 1 1 import { Agent, BlobRef } from "@atproto/api"; 2 2 import { TID } from "@atproto/common"; 3 - import { equals, SelectedPick } from "@xata.io/client"; 3 + import { equals } from "@xata.io/client"; 4 + import chalk from "chalk"; 4 5 import { Context } from "context"; 5 6 import { createHash } from "crypto"; 6 7 import dayjs from "dayjs"; ··· 10 11 import * as Song from "lexicon/types/app/rocksky/song"; 11 12 import downloadImage, { getContentType } from "lib/downloadImage"; 12 13 import { Track } from "types/track"; 13 - import { ScrobblesRecord } from "xata"; 14 14 15 15 export async function putArtistRecord( 16 16 track: Track, ··· 254 254 } 255 255 } 256 256 257 - export async function updateUserLibrary( 258 - ctx: Context, 259 - user, 260 - track: Track, 261 - agent: Agent, 262 - track_id: string, 263 - album_id: string, 264 - artist_id: string, 265 - trackUri: string, 266 - albumUri: string, 267 - artistUri: string 268 - ): Promise<void> { 269 - const existingUserTrack = await ctx.client.db.user_tracks 270 - .filter("user_id", equals(user.xata_id)) 271 - .filter("track_id", equals(track_id)) 272 - .getFirst(); 273 - 274 - if (!trackUri.includes(user.did)) { 275 - trackUri = await putSongRecord(track, agent); 276 - } 277 - 278 - if (!existingUserTrack) { 279 - await ctx.client.db.user_tracks.create({ 280 - user_id: user.xata_id, 281 - track_id, 282 - uri: trackUri, 283 - scrobbles: 1, 284 - }); 285 - } else { 286 - await ctx.client.db.user_tracks.update({ 287 - xata_id: existingUserTrack.xata_id, 288 - uri: trackUri, 289 - scrobbles: existingUserTrack.scrobbles 290 - ? existingUserTrack.scrobbles + 1 291 - : 1, 292 - }); 293 - } 294 - 295 - const existingUserArtist = await ctx.client.db.user_artists 296 - .filter("user_id", equals(user.xata_id)) 297 - .filter({ 298 - $any: [ 299 - { 300 - artist_id, 301 - }, 302 - { 303 - uri: artistUri, 304 - }, 305 - ], 306 - }) 307 - .getFirst(); 308 - 309 - if (!artistUri.includes(user.did)) { 310 - artistUri = await putArtistRecord(track, agent); 311 - } 312 - 313 - if (!existingUserArtist) { 314 - await ctx.client.db.user_artists.create({ 315 - user_id: user.xata_id, 316 - artist_id, 317 - uri: artistUri, 318 - scrobbles: 1, 319 - }); 320 - } else { 321 - await ctx.client.db.user_artists.update({ 322 - xata_id: existingUserArtist.xata_id, 323 - uri: artistUri, 324 - scrobbles: existingUserArtist.scrobbles 325 - ? existingUserArtist.scrobbles + 1 326 - : 1, 327 - }); 328 - } 329 - 330 - const existingUserAlbum = await ctx.client.db.user_albums 331 - .filter("user_id", equals(user.xata_id)) 332 - .filter("album_id", equals(album_id)) 333 - .getFirst(); 334 - 335 - if (!albumUri.includes(user.did)) { 336 - albumUri = await putAlbumRecord(track, agent); 337 - } 338 - 339 - if (!existingUserAlbum) { 340 - await ctx.client.db.user_albums.create({ 341 - user_id: user.xata_id, 342 - album_id, 343 - uri: albumUri, 344 - scrobbles: 1, 345 - }); 346 - } else { 347 - await ctx.client.db.user_albums.update({ 348 - xata_id: existingUserAlbum.xata_id, 349 - uri: albumUri, 350 - scrobbles: existingUserAlbum.scrobbles 351 - ? existingUserAlbum.scrobbles + 1 352 - : 1, 353 - }); 354 - } 355 - } 356 - 357 257 export async function publishScrobble(ctx: Context, id: string) { 358 258 const scrobble = await ctx.client.db.scrobbles 359 259 .select(["*", "track_id.*", "album_id.*", "artist_id.*", "user_id.*"]) ··· 411 311 export async function scrobbleTrack( 412 312 ctx: Context, 413 313 track: Track, 414 - user, 415 314 agent: Agent 416 - ): Promise<Readonly<SelectedPick<ScrobblesRecord, ["*"]>>> { 315 + ): Promise<void> { 417 316 const existingTrack = await ctx.client.db.tracks 418 317 .filter( 419 318 "sha256", ··· 427 326 ) 428 327 .getFirst(); 429 328 430 - let trackUri = existingTrack?.uri; 431 329 if (!existingTrack?.uri) { 432 - trackUri = await putSongRecord(track, agent); 330 + await putSongRecord(track, agent); 433 331 } 434 332 435 - const { xata_id: track_id } = await ctx.client.db.tracks.createOrUpdate( 436 - existingTrack?.xata_id, 437 - { 438 - title: track.title, 439 - artist: track.artist, 440 - album: track.album, 441 - album_art: track.albumArt, 442 - album_artist: track.albumArtist, 443 - track_number: track.trackNumber, 444 - duration: track.duration, 445 - mb_id: track.mbId, 446 - composer: track.composer, 447 - lyrics: track.lyrics, 448 - disc_number: track.discNumber, 449 - // compute sha256 (lowercase(title + artist + album)) 450 - sha256: createHash("sha256") 451 - .update( 452 - `${track.title} - ${track.artist} - ${track.album}`.toLowerCase() 453 - ) 454 - .digest("hex"), 455 - copyright_message: track.copyrightMessage, 456 - uri: trackUri ? trackUri : undefined, 457 - spotify_link: track.spotifyLink ? track.spotifyLink : undefined, 458 - label: track.label ? track.label : undefined, 459 - } 460 - ); 461 - 462 333 const existingArtist = await ctx.client.db.artists 463 334 .filter( 464 335 "sha256", ··· 470 341 ) 471 342 .getFirst(); 472 343 473 - let artistUri = existingArtist?.uri; 474 344 if (!existingArtist?.uri) { 475 - artistUri = await putArtistRecord(track, agent); 345 + await putArtistRecord(track, agent); 476 346 } 477 347 478 - const { xata_id: artist_id, uri: new_artist_uri } = 479 - await ctx.client.db.artists.createOrUpdate(existingArtist?.xata_id, { 480 - name: track.albumArtist, 481 - // compute sha256 (lowercase(name)) 482 - sha256: createHash("sha256") 483 - .update(track.albumArtist.toLowerCase()) 484 - .digest("hex"), 485 - uri: artistUri ? artistUri : undefined, 486 - picture: track.artistPicture ? track.artistPicture : undefined, 487 - }); 488 - 489 348 const existingAlbum = await ctx.client.db.albums 490 349 .filter( 491 350 "sha256", ··· 497 356 ) 498 357 .getFirst(); 499 358 500 - let albumUri = existingAlbum?.uri; 501 359 if (!existingAlbum?.uri) { 502 - albumUri = await putAlbumRecord(track, agent); 360 + await putAlbumRecord(track, agent); 503 361 } 504 362 505 - const { xata_id: album_id, uri: new_album_uri } = 506 - await ctx.client.db.albums.createOrUpdate(existingAlbum?.xata_id, { 507 - title: track.album, 508 - artist: track.albumArtist, 509 - album_art: track.albumArt, 510 - year: track.year, 511 - release_date: track.releaseDate 512 - ? track.releaseDate.toISOString() 513 - : undefined, 514 - // compute sha256 (lowercase(title + artist)) 515 - sha256: createHash("sha256") 516 - .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 517 - .digest("hex"), 518 - uri: albumUri ? albumUri : undefined, 519 - artist_uri: new_artist_uri, 520 - }); 363 + const scrobbleUri = await putScrobbleRecord(track, agent); 521 364 522 - const existingAlbumTrack = await ctx.client.db.album_tracks 523 - .filter("album_id", equals(album_id)) 524 - .filter("track_id", equals(track_id)) 525 - .getFirst(); 526 - 527 - await ctx.client.db.album_tracks.createOrUpdate(existingAlbumTrack?.xata_id, { 528 - album_id, 529 - track_id, 530 - }); 365 + // loop while scrobble is null, try 5 times, sleep 1 second between tries 366 + let tries = 0, 367 + scrobble = null; 368 + while (!scrobble && tries < 5) { 369 + scrobble = await ctx.client.db.scrobbles 370 + .select(["*", "track_id.*", "album_id.*", "artist_id.*", "user_id.*"]) 371 + .filter("uri", equals(scrobbleUri)) 372 + .getFirst(); 531 373 532 - const existingArtistTrack = await ctx.client.db.artist_tracks 533 - .filter("artist_id", equals(artist_id)) 534 - .filter("track_id", equals(track_id)) 535 - .getFirst(); 536 - 537 - await ctx.client.db.artist_tracks.createOrUpdate( 538 - existingArtistTrack?.xata_id, 539 - { 540 - artist_id, 541 - track_id, 374 + if (scrobble) { 375 + await publishScrobble(ctx, scrobble.xata_id); 376 + console.log("Scrobble published"); 377 + break; 542 378 } 543 - ); 544 - 545 - const existingArtistAlbum = await ctx.client.db.artist_albums 546 - .filter("artist_id", equals(artist_id)) 547 - .filter("album_id", equals(album_id)) 548 - .getFirst(); 549 - 550 - await ctx.client.db.artist_albums.createOrUpdate( 551 - existingArtistAlbum?.xata_id, 552 - { 553 - artist_id, 554 - album_id, 555 - } 556 - ); 557 - 558 - const scrobbleUri = await putScrobbleRecord(track, agent); 559 - 560 - await updateUserLibrary( 561 - ctx, 562 - user, 563 - track, 564 - agent, 565 - track_id, 566 - album_id, 567 - artist_id, 568 - trackUri, 569 - albumUri, 570 - artistUri 571 - ); 572 - 573 - await ctx.client.db.tracks.update({ 574 - xata_id: track_id, 575 - artist_uri: new_artist_uri, 576 - album_uri: new_album_uri, 577 - }); 578 - 579 - const scrobble = await ctx.client.db.scrobbles.create({ 580 - user_id: user.xata_id, 581 - track_id, 582 - album_id, 583 - artist_id, 584 - uri: scrobbleUri, 585 - timestamp: track.timestamp 586 - ? dayjs.unix(track.timestamp).toDate() 587 - : new Date(), 588 - }); 589 - 590 - await publishScrobble(ctx, scrobble.xata_id); 379 + tries += 1; 380 + console.log("Scrobble not found, trying again: ", chalk.magenta(tries)); 381 + await new Promise((resolve) => setTimeout(resolve, 1000)); 382 + } 591 383 592 - return scrobble; 384 + if (tries === 5 && !scrobble) { 385 + console.log(`Scrobble not found after ${chalk.magenta("5 tries")}`); 386 + } 593 387 }