A decentralized music tracking and discovery platform built on AT Protocol 🎵

fix: refactor submit_listens to use thread for asynchronous processing and improve error handling

+62 -61
+62 -61
crates/scrobbler/src/listenbrainz/core/submit.rs
··· 3 3 use owo_colors::OwoColorize; 4 4 use serde_json::json; 5 5 use std::sync::Arc; 6 + use std::thread; 6 7 7 8 use crate::auth::decode_token; 8 9 use crate::musicbrainz::client::MusicbrainzClient; ··· 32 33 }))); 33 34 } 34 35 35 - const RETRIES: usize = 15; 36 - for attempt in 1..=RETRIES { 37 - match scrobble_listenbrainz(pool, cache, mb_client, &payload, token).await { 38 - Ok(_) => { 39 - return Ok(HttpResponse::Ok().json(json!({ 40 - "status": "ok", 41 - "payload": { 42 - "submitted_listens": 1, 43 - "ignored_listens": 0 44 - }, 45 - }))); 46 - } 47 - Err(e) => { 48 - let artist = payload.payload[0].track_metadata.artist_name.clone(); 49 - let track = payload.payload[0].track_metadata.track_name.clone(); 36 + let pool = Arc::clone(pool); 37 + let cache = cache.clone(); 38 + let mb_client = Arc::clone(mb_client); 39 + let payload = payload.clone(); 40 + let token = token.to_string(); 41 + thread::spawn(move || { 42 + tokio::runtime::Builder::new_current_thread() 43 + .enable_all() 44 + .build() 45 + .unwrap() 46 + .block_on(async move { 47 + const RETRIES: usize = 15; 48 + for attempt in 1..=RETRIES { 49 + match scrobble_listenbrainz(&pool, &cache, &mb_client, &payload, &token).await { 50 + Ok(_) => { 51 + tracing::info!("Successfully submitted listens"); 52 + } 53 + Err(e) => { 54 + let artist = payload.payload[0].track_metadata.artist_name.clone(); 55 + let track = payload.payload[0].track_metadata.track_name.clone(); 50 56 51 - let did = match decode_token(token) { 52 - Ok(claims) => claims.did, 53 - Err(e) => { 54 - let user = repo::user::get_user_by_apikey(pool, token) 55 - .await? 56 - .map(|user| user.did); 57 - if let Some(did) = user { 58 - did 59 - } else { 60 - return Err(Error::msg(format!( 61 - "Failed to decode token: {} {}", 62 - e, token 63 - ))); 64 - } 65 - } 66 - }; 57 + let did = match decode_token(&token) { 58 + Ok(claims) => claims.did, 59 + Err(e) => { 60 + let user = repo::user::get_user_by_apikey(&pool, &token) 61 + .await? 62 + .map(|user| user.did); 63 + if let Some(did) = user { 64 + did 65 + } else { 66 + return Err(Error::msg(format!( 67 + "Failed to decode token: {} {}", 68 + e, token 69 + ))); 70 + } 71 + } 72 + }; 73 + 74 + cache.del(&format!("listenbrainz:cache:{}:{}:{}", artist, track, did))?; 75 + 76 + tracing::error!(error = %e, attempt = attempt, "Retryable error submitting listens for {} - {} (attempt {}/{})", artist, track, attempt, RETRIES); 67 77 68 - cache.del(&format!("listenbrainz:cache:{}:{}:{}", artist, track, did))?; 78 + if attempt == RETRIES { 79 + tracing::error!( 80 + "Max retries reached, giving up on submitting listens for {} - {}", 81 + artist, 82 + track 83 + ); 69 84 70 - tracing::error!(error = %e, attempt = attempt, "Retryable error submitting listens for {} - {} (attempt {}/{})", artist, track, attempt, RETRIES); 85 + break; 86 + } 71 87 72 - if attempt == RETRIES { 73 - return Ok(HttpResponse::BadRequest().json(serde_json::json!({ 74 - "error": 4, 75 - "message": format!("Failed to parse listens after {} attempts: {}", RETRIES, e) 76 - }))); 88 + tokio::time::sleep(std::time::Duration::from_secs(2)).await; 89 + } 90 + } 77 91 } 78 92 79 - tokio::time::sleep(std::time::Duration::from_secs(2)).await; 80 - } 81 - } 82 - } 93 + Ok::<(), Error>(()) 94 + }) 95 + }); 83 96 84 - unreachable!(); 85 - 86 - /* match scrobble_listenbrainz(pool, cache, payload, token).await { 87 - Ok(_) => Ok(HttpResponse::Ok().json(json!({ 88 - "status": "ok", 89 - "payload": { 90 - "submitted_listens": 1, 91 - "ignored_listens": 0 92 - }, 93 - }))), 94 - Err(e) => { 95 - println!("Error submitting listens: {}", e); 96 - Ok(HttpResponse::BadRequest().json(serde_json::json!({ 97 - "error": 4, 98 - "message": format!("Failed to parse listens: {}", e) 99 - }))) 100 - } 101 - } 102 - */ 97 + return Ok(HttpResponse::Ok().json(json!({ 98 + "status": "ok", 99 + "payload": { 100 + "submitted_listens": 0, 101 + "ignored_listens": 1 102 + }, 103 + }))); 103 104 }