A decentralized music tracking and discovery platform built on AT Protocol 🎵

[spotify] restart thread on error

+406 -23
+54 -4
crates/spotify/src/main.rs
··· 58 let email = user.0.clone(); 59 let token = user.1.clone(); 60 let did = user.2.clone(); 61 let stop_flag = Arc::new(AtomicBool::new(false)); 62 let cache = cache.clone(); 63 let thread_map = Arc::clone(&thread_map); 64 65 thread_map ··· 82 email.bright_green(), 83 e.to_string().bright_red() 84 ); 85 } 86 } 87 }); ··· 161 let did = user.2.clone(); 162 let stop_flag = Arc::new(AtomicBool::new(false)); 163 let cache = cache.clone(); 164 165 thread_map.insert(email.clone(), Arc::clone(&stop_flag)); 166 ··· 185 email.bright_green(), 186 e.to_string().bright_red() 187 ); 188 } 189 } 190 }); ··· 562 return Ok(None); 563 } 564 565 - cache.setex(album_id, &data, 20)?; 566 567 Ok(Some(serde_json::from_str(&data)?)) 568 } ··· 615 } 616 617 let all_tracks_json = serde_json::to_string(&all_tracks)?; 618 - cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20)?; 619 620 Ok(AlbumTracks { 621 items: all_tracks, ··· 627 pool: &Pool<Postgres>, 628 offset: usize, 629 limit: usize, 630 - ) -> Result<Vec<(String, String, String)>, Error> { 631 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as( 632 r#" 633 SELECT * FROM spotify_tokens ··· 648 &result.refresh_token, 649 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 650 )?; 651 - user_tokens.push((result.email.clone(), token, result.did.clone())); 652 } 653 654 Ok(user_tokens)
··· 58 let email = user.0.clone(); 59 let token = user.1.clone(); 60 let did = user.2.clone(); 61 + let user_id = user.3.clone(); 62 let stop_flag = Arc::new(AtomicBool::new(false)); 63 let cache = cache.clone(); 64 + let nc = nc.clone(); 65 let thread_map = Arc::clone(&thread_map); 66 67 thread_map ··· 84 email.bright_green(), 85 e.to_string().bright_red() 86 ); 87 + 88 + // If there's an error, publish a message to restart the thread 89 + match rt.block_on(nc.publish("rocksky.spotify.user", user_id.into())) { 90 + Ok(_) => { 91 + println!( 92 + "{} Published message to restart thread for user: {}", 93 + format!("[{}]", email).bright_green(), 94 + email.bright_green() 95 + ); 96 + } 97 + Err(e) => { 98 + println!( 99 + "{} Error publishing message to restart thread: {}", 100 + format!("[{}]", email).bright_green(), 101 + e.to_string().bright_red() 102 + ); 103 + } 104 + } 105 } 106 } 107 }); ··· 181 let did = user.2.clone(); 182 let stop_flag = Arc::new(AtomicBool::new(false)); 183 let cache = cache.clone(); 184 + let nc = nc.clone(); 185 186 thread_map.insert(email.clone(), Arc::clone(&stop_flag)); 187 ··· 206 email.bright_green(), 207 e.to_string().bright_red() 208 ); 209 + match rt.block_on(nc.publish("rocksky.spotify.user", user_id.into())) { 210 + Ok(_) => {}, 211 + Err(e) => { 212 + println!( 213 + "{} Error publishing message to restart thread: {}", 214 + format!("[{}]", email).bright_green(), 215 + e.to_string().bright_red() 216 + ); 217 + } 218 + } 219 } 220 } 221 }); ··· 593 return Ok(None); 594 } 595 596 + match cache.setex(album_id, &data, 20) { 597 + Ok(_) => {} 598 + Err(e) => { 599 + println!( 600 + "{} redis error: {}", 601 + format!("[{}]", album_id).bright_green(), 602 + e.to_string().bright_red() 603 + ); 604 + return Ok(None); 605 + } 606 + } 607 608 Ok(Some(serde_json::from_str(&data)?)) 609 } ··· 656 } 657 658 let all_tracks_json = serde_json::to_string(&all_tracks)?; 659 + match cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20) { 660 + Ok(_) => {} 661 + Err(e) => { 662 + println!( 663 + "{} redis error: {}", 664 + format!("[{}]", album_id).bright_green(), 665 + e.to_string().bright_red() 666 + ); 667 + } 668 + } 669 670 Ok(AlbumTracks { 671 items: all_tracks, ··· 677 pool: &Pool<Postgres>, 678 offset: usize, 679 limit: usize, 680 + ) -> Result<Vec<(String, String, String, String)>, Error> { 681 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as( 682 r#" 683 SELECT * FROM spotify_tokens ··· 698 &result.refresh_token, 699 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 700 )?; 701 + user_tokens.push((result.email.clone(), token, result.did.clone(), result.user_id.clone())); 702 } 703 704 Ok(user_tokens)
+82 -3
rockskyapi/rocksky-auth/src/xrpc/app/rocksky/spotify/pause.ts
··· 1 import { HandlerAuth } from "@atproto/xrpc-server"; 2 import { Context } from "context"; 3 import { Effect, pipe } from "effect"; 4 import { Server } from "lexicon"; 5 6 export default function (server: Server, ctx: Context) { 7 const pause = (params, auth: HandlerAuth) => 8 pipe( 9 { params, ctx, did: auth.credentials?.did }, 10 - withSpotifyToken, 11 Effect.flatMap(handlePause), 12 Effect.flatMap(presentation), 13 Effect.retry({ times: 3 }), ··· 25 }); 26 } 27 28 - const withSpotifyToken = () => { 29 return Effect.tryPromise({ 30 - try: async () => {}, 31 catch: (error) => new Error(`Failed to retrieve Spotify token: ${error}`), 32 }); 33 };
··· 1 import { HandlerAuth } from "@atproto/xrpc-server"; 2 import { Context } from "context"; 3 + import { eq } from "drizzle-orm"; 4 import { Effect, pipe } from "effect"; 5 import { Server } from "lexicon"; 6 + import { QueryParams } from "lexicon/types/app/rocksky/spotify/pause"; 7 + import { decrypt } from "lib/crypto"; 8 + import { env } from "lib/env"; 9 + import tables from "schema"; 10 + import { SelectUser } from "schema/users"; 11 12 export default function (server: Server, ctx: Context) { 13 const pause = (params, auth: HandlerAuth) => 14 pipe( 15 { params, ctx, did: auth.credentials?.did }, 16 + withUser, 17 + Effect.flatMap(withSpotifyRefreshToken), 18 + Effect.flatMap(withSpotifyToken), 19 Effect.flatMap(handlePause), 20 Effect.flatMap(presentation), 21 Effect.retry({ times: 3 }), ··· 33 }); 34 } 35 36 + const withUser = ({ 37 + did, 38 + ctx, 39 + }: { 40 + params: QueryParams; 41 + did: string; 42 + ctx: Context; 43 + }) => { 44 + return Effect.tryPromise({ 45 + try: () => 46 + ctx.db 47 + .select() 48 + .from(tables.users) 49 + .where(eq(tables.users.did, did)) 50 + .execute() 51 + .then(([user]) => ({ 52 + user, 53 + ctx, 54 + did, 55 + })), 56 + catch: (error) => new Error(`Failed to retrieve User: ${error}`), 57 + }); 58 + }; 59 + 60 + const withSpotifyRefreshToken = ({ 61 + user, 62 + ctx, 63 + }: { 64 + user: SelectUser; 65 + ctx: Context; 66 + }) => { 67 + return Effect.tryPromise({ 68 + try: () => 69 + ctx.db 70 + .select() 71 + .from(tables.spotifyTokens) 72 + .where(eq(tables.spotifyTokens.userId, user.id)) 73 + .execute() 74 + .then(([spotifyToken]) => 75 + decrypt(spotifyToken.refreshToken, env.SPOTIFY_ENCRYPTION_KEY) 76 + ) 77 + .then((refreshToken) => ({ 78 + user, 79 + ctx, 80 + refreshToken, 81 + })), 82 + catch: (error) => 83 + new Error(`Failed to retrieve Spotify Refresh token: ${error}`), 84 + }); 85 + }; 86 + 87 + const withSpotifyToken = ({ 88 + refreshToken, 89 + ctx, 90 + }: { 91 + refreshToken: string; 92 + ctx: Context; 93 + }) => { 94 return Effect.tryPromise({ 95 + try: () => 96 + fetch("https://accounts.spotify.com/api/token", { 97 + method: "POST", 98 + headers: { 99 + "Content-Type": "application/x-www-form-urlencoded", 100 + }, 101 + body: new URLSearchParams({ 102 + grant_type: "refresh_token", 103 + refresh_token: refreshToken, 104 + client_id: env.SPOTIFY_CLIENT_ID, 105 + client_secret: env.SPOTIFY_CLIENT_SECRET, 106 + }), 107 + }) 108 + .then((res) => res.json()) 109 + .then((data) => data.access_token), 110 catch: (error) => new Error(`Failed to retrieve Spotify token: ${error}`), 111 }); 112 };
+82 -3
rockskyapi/rocksky-auth/src/xrpc/app/rocksky/spotify/play.ts
··· 1 import { HandlerAuth } from "@atproto/xrpc-server"; 2 import { Context } from "context"; 3 import { Effect, pipe } from "effect"; 4 import { Server } from "lexicon"; 5 6 export default function (server: Server, ctx: Context) { 7 const play = (params, auth: HandlerAuth) => 8 pipe( 9 { params, ctx, did: auth.credentials?.did }, 10 - withSpotifyToken, 11 Effect.flatMap(handlePlay), 12 Effect.flatMap(presentation), 13 Effect.retry({ times: 3 }), ··· 25 }); 26 } 27 28 - const withSpotifyToken = () => { 29 return Effect.tryPromise({ 30 - try: async () => {}, 31 catch: (error) => new Error(`Failed to retrieve Spotify token: ${error}`), 32 }); 33 };
··· 1 import { HandlerAuth } from "@atproto/xrpc-server"; 2 import { Context } from "context"; 3 + import { eq } from "drizzle-orm"; 4 import { Effect, pipe } from "effect"; 5 import { Server } from "lexicon"; 6 + import { QueryParams } from "lexicon/types/app/rocksky/spotify/play"; 7 + import { decrypt } from "lib/crypto"; 8 + import { env } from "lib/env"; 9 + import tables from "schema"; 10 + import { SelectUser } from "schema/users"; 11 12 export default function (server: Server, ctx: Context) { 13 const play = (params, auth: HandlerAuth) => 14 pipe( 15 { params, ctx, did: auth.credentials?.did }, 16 + withUser, 17 + Effect.flatMap(withSpotifyRefreshToken), 18 + Effect.flatMap(withSpotifyToken), 19 Effect.flatMap(handlePlay), 20 Effect.flatMap(presentation), 21 Effect.retry({ times: 3 }), ··· 33 }); 34 } 35 36 + const withUser = ({ 37 + did, 38 + ctx, 39 + }: { 40 + params: QueryParams; 41 + did: string; 42 + ctx: Context; 43 + }) => { 44 + return Effect.tryPromise({ 45 + try: () => 46 + ctx.db 47 + .select() 48 + .from(tables.users) 49 + .where(eq(tables.users.did, did)) 50 + .execute() 51 + .then(([user]) => ({ 52 + user, 53 + ctx, 54 + did, 55 + })), 56 + catch: (error) => new Error(`Failed to retrieve User: ${error}`), 57 + }); 58 + }; 59 + 60 + const withSpotifyRefreshToken = ({ 61 + user, 62 + ctx, 63 + }: { 64 + user: SelectUser; 65 + ctx: Context; 66 + }) => { 67 + return Effect.tryPromise({ 68 + try: () => 69 + ctx.db 70 + .select() 71 + .from(tables.spotifyTokens) 72 + .where(eq(tables.spotifyTokens.userId, user.id)) 73 + .execute() 74 + .then(([spotifyToken]) => 75 + decrypt(spotifyToken.refreshToken, env.SPOTIFY_ENCRYPTION_KEY) 76 + ) 77 + .then((refreshToken) => ({ 78 + user, 79 + ctx, 80 + refreshToken, 81 + })), 82 + catch: (error) => 83 + new Error(`Failed to retrieve Spotify Refresh token: ${error}`), 84 + }); 85 + }; 86 + 87 + const withSpotifyToken = ({ 88 + refreshToken, 89 + ctx, 90 + }: { 91 + refreshToken: string; 92 + ctx: Context; 93 + }) => { 94 return Effect.tryPromise({ 95 + try: () => 96 + fetch("https://accounts.spotify.com/api/token", { 97 + method: "POST", 98 + headers: { 99 + "Content-Type": "application/x-www-form-urlencoded", 100 + }, 101 + body: new URLSearchParams({ 102 + grant_type: "refresh_token", 103 + refresh_token: refreshToken, 104 + client_id: env.SPOTIFY_CLIENT_ID, 105 + client_secret: env.SPOTIFY_CLIENT_SECRET, 106 + }), 107 + }) 108 + .then((res) => res.json()) 109 + .then((data) => data.access_token), 110 catch: (error) => new Error(`Failed to retrieve Spotify token: ${error}`), 111 }); 112 };
+82 -6
rockskyapi/rocksky-auth/src/xrpc/app/rocksky/spotify/previous.ts
··· 1 import { HandlerAuth } from "@atproto/xrpc-server"; 2 import { Context } from "context"; 3 import { Effect, pipe } from "effect"; 4 import { Server } from "lexicon"; 5 6 export default function (server: Server, ctx: Context) { 7 const previous = (params, auth: HandlerAuth) => 8 pipe( 9 { params, ctx, did: auth.credentials?.did }, 10 - withSpotifyToken, 11 Effect.flatMap(handlePrevious), 12 Effect.flatMap(presentation), 13 Effect.retry({ times: 3 }), ··· 25 }); 26 } 27 28 - const withSpotifyToken = () => { 29 return Effect.tryPromise({ 30 - try: async () => {}, 31 catch: (error) => new Error(`Failed to retrieve Spotify token: ${error}`), 32 }); 33 }; 34 35 - const handlePrevious = (params) => { 36 - // Logic to handle the previous action in Spotify 37 return Effect.tryPromise({ 38 - try: async () => ({}), 39 catch: (error) => new Error(`Failed to handle previous action: ${error}`), 40 }); 41 };
··· 1 import { HandlerAuth } from "@atproto/xrpc-server"; 2 import { Context } from "context"; 3 + import { eq } from "drizzle-orm"; 4 import { Effect, pipe } from "effect"; 5 import { Server } from "lexicon"; 6 + import { QueryParams } from "lexicon/types/app/rocksky/spotify/previous"; 7 + import { decrypt } from "lib/crypto"; 8 + import { env } from "lib/env"; 9 + import tables from "schema"; 10 + import { SelectUser } from "schema/users"; 11 12 export default function (server: Server, ctx: Context) { 13 const previous = (params, auth: HandlerAuth) => 14 pipe( 15 { params, ctx, did: auth.credentials?.did }, 16 + withUser, 17 + Effect.flatMap(withSpotifyRefreshToken), 18 + Effect.flatMap(withSpotifyToken), 19 Effect.flatMap(handlePrevious), 20 Effect.flatMap(presentation), 21 Effect.retry({ times: 3 }), ··· 33 }); 34 } 35 36 + const withUser = ({ 37 + did, 38 + ctx, 39 + }: { 40 + params: QueryParams; 41 + did: string; 42 + ctx: Context; 43 + }) => { 44 return Effect.tryPromise({ 45 + try: () => 46 + ctx.db 47 + .select() 48 + .from(tables.users) 49 + .where(eq(tables.users.did, did)) 50 + .execute() 51 + .then(([user]) => ({ 52 + user, 53 + ctx, 54 + did, 55 + })), 56 + catch: (error) => new Error(`Failed to retrieve User: ${error}`), 57 + }); 58 + }; 59 + 60 + const withSpotifyRefreshToken = ({ 61 + user, 62 + ctx, 63 + }: { 64 + user: SelectUser; 65 + ctx: Context; 66 + }) => { 67 + return Effect.tryPromise({ 68 + try: () => 69 + ctx.db 70 + .select() 71 + .from(tables.spotifyTokens) 72 + .where(eq(tables.spotifyTokens.userId, user.id)) 73 + .execute() 74 + .then(([spotifyToken]) => 75 + decrypt(spotifyToken.refreshToken, env.SPOTIFY_ENCRYPTION_KEY) 76 + ) 77 + .then((refreshToken) => ({ 78 + refreshToken, 79 + })), 80 + catch: (error) => 81 + new Error(`Failed to retrieve Spotify Refresh token: ${error}`), 82 + }); 83 + }; 84 + 85 + const withSpotifyToken = ({ refreshToken }: { refreshToken: string }) => { 86 + return Effect.tryPromise({ 87 + try: () => 88 + fetch("https://accounts.spotify.com/api/token", { 89 + method: "POST", 90 + headers: { 91 + "Content-Type": "application/x-www-form-urlencoded", 92 + }, 93 + body: new URLSearchParams({ 94 + grant_type: "refresh_token", 95 + refresh_token: refreshToken, 96 + client_id: env.SPOTIFY_CLIENT_ID, 97 + client_secret: env.SPOTIFY_CLIENT_SECRET, 98 + }), 99 + }) 100 + .then((res) => res.json()) 101 + .then((data) => data.access_token), 102 catch: (error) => new Error(`Failed to retrieve Spotify token: ${error}`), 103 }); 104 }; 105 106 + const handlePrevious = (accessToken: string) => { 107 return Effect.tryPromise({ 108 + try: () => 109 + fetch("https://api.spotify.com/v1/me/player/previous", { 110 + method: "POST", 111 + headers: { 112 + Authorization: `Bearer ${accessToken}`, 113 + }, 114 + }).then((res) => res.status), 115 catch: (error) => new Error(`Failed to handle previous action: ${error}`), 116 }); 117 };
+106 -7
rockskyapi/rocksky-auth/src/xrpc/app/rocksky/spotify/seek.ts
··· 1 import { HandlerAuth } from "@atproto/xrpc-server"; 2 import { Context } from "context"; 3 import { Effect, pipe } from "effect"; 4 import { Server } from "lexicon"; 5 6 export default function (server: Server, ctx: Context) { 7 const seek = (params, auth: HandlerAuth) => 8 pipe( 9 { params, ctx, did: auth.credentials?.did }, 10 - withSpotifyToken, 11 Effect.flatMap(handleSeek), 12 Effect.flatMap(presentation), 13 Effect.retry({ times: 3 }), ··· 25 }); 26 } 27 28 - const withSpotifyToken = () => { 29 return Effect.tryPromise({ 30 - try: async () => {}, 31 catch: (error) => new Error(`Failed to retrieve Spotify token: ${error}`), 32 }); 33 }; 34 35 - const handleSeek = (params) => { 36 - // Logic to handle the seek action in Spotify 37 return Effect.tryPromise({ 38 - try: async () => {}, 39 - catch: (error) => new Error(`Failed to handle seek action: ${error}`), 40 }); 41 }; 42 43 const presentation = (result) => { 44 // Logic to format the result for presentation 45 return Effect.sync(() => ({})); 46 };
··· 1 import { HandlerAuth } from "@atproto/xrpc-server"; 2 import { Context } from "context"; 3 + import { eq } from "drizzle-orm"; 4 import { Effect, pipe } from "effect"; 5 import { Server } from "lexicon"; 6 + import { QueryParams } from "lexicon/types/app/rocksky/spotify/seek"; 7 + import { decrypt } from "lib/crypto"; 8 + import { env } from "lib/env"; 9 + import tables from "schema"; 10 + import { SelectUser } from "schema/users"; 11 12 export default function (server: Server, ctx: Context) { 13 const seek = (params, auth: HandlerAuth) => 14 pipe( 15 { params, ctx, did: auth.credentials?.did }, 16 + withUser, 17 + Effect.flatMap(withSpotifyRefreshToken), 18 + Effect.flatMap(withSpotifyToken), 19 Effect.flatMap(handleSeek), 20 Effect.flatMap(presentation), 21 Effect.retry({ times: 3 }), ··· 33 }); 34 } 35 36 + const withUser = ({ 37 + did, 38 + ctx, 39 + params, 40 + }: { 41 + params: QueryParams; 42 + did: string; 43 + ctx: Context; 44 + }) => { 45 + return Effect.tryPromise({ 46 + try: () => 47 + ctx.db 48 + .select() 49 + .from(tables.users) 50 + .where(eq(tables.users.did, did)) 51 + .execute() 52 + .then(([user]) => ({ 53 + user, 54 + ctx, 55 + params, 56 + })), 57 + catch: (error) => new Error(`Failed to retrieve User: ${error}`), 58 + }); 59 + }; 60 + 61 + const withSpotifyRefreshToken = ({ 62 + user, 63 + ctx, 64 + params, 65 + }: { 66 + user: SelectUser; 67 + ctx: Context; 68 + params: QueryParams; 69 + }) => { 70 + return Effect.tryPromise({ 71 + try: () => 72 + ctx.db 73 + .select() 74 + .from(tables.spotifyTokens) 75 + .where(eq(tables.spotifyTokens.userId, user.id)) 76 + .execute() 77 + .then(([spotifyToken]) => 78 + decrypt(spotifyToken.refreshToken, env.SPOTIFY_ENCRYPTION_KEY) 79 + ) 80 + .then((refreshToken) => ({ 81 + refreshToken, 82 + params, 83 + })), 84 + catch: (error) => 85 + new Error(`Failed to retrieve Spotify Refresh token: ${error}`), 86 + }); 87 + }; 88 + 89 + const withSpotifyToken = ({ 90 + refreshToken, 91 + params, 92 + }: { 93 + refreshToken: string; 94 + params: QueryParams; 95 + }) => { 96 return Effect.tryPromise({ 97 + try: () => 98 + fetch("https://accounts.spotify.com/api/token", { 99 + method: "POST", 100 + headers: { 101 + "Content-Type": "application/x-www-form-urlencoded", 102 + }, 103 + body: new URLSearchParams({ 104 + grant_type: "refresh_token", 105 + refresh_token: refreshToken, 106 + client_id: env.SPOTIFY_CLIENT_ID, 107 + client_secret: env.SPOTIFY_CLIENT_SECRET, 108 + }), 109 + }) 110 + .then((res) => res.json()) 111 + .then((data) => ({ 112 + accessToken: data.access_token, 113 + position: params.position, 114 + })), 115 catch: (error) => new Error(`Failed to retrieve Spotify token: ${error}`), 116 }); 117 }; 118 119 + const handleSeek = ({ 120 + accessToken, 121 + position, 122 + }: { 123 + accessToken: string; 124 + position: number; 125 + }) => { 126 return Effect.tryPromise({ 127 + try: () => 128 + fetch( 129 + `https://api.spotify.com/v1/me/player/seek?position_ms=${position}`, 130 + { 131 + method: "PUT", 132 + headers: { 133 + Authorization: `Bearer ${accessToken}`, 134 + }, 135 + } 136 + ).then((res) => res.status), 137 + catch: (error) => new Error(`Failed to handle next action: ${error}`), 138 }); 139 }; 140 141 const presentation = (result) => { 142 // Logic to format the result for presentation 143 + console.log("Seek action result:", result); 144 return Effect.sync(() => ({})); 145 };