A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at feat/pgpull 130 lines 4.3 kB view raw
1use std::{env, sync::Arc}; 2 3use anyhow::Error; 4use futures_util::{SinkExt, StreamExt}; 5use owo_colors::OwoColorize; 6use serde_json::{Value, json}; 7use tokio::sync::Mutex; 8use tokio_tungstenite::connect_async; 9 10use crate::players::{Player, get_current_player}; 11 12pub async fn connect_to_rocksky_websocket(token: String) -> Result<(), Error> { 13 let rocksky_ws = 14 env::var("ROCKSKY_WS").unwrap_or_else(|_| "wss://api.rocksky.app/ws".to_string()); 15 let (ws_stream, _) = connect_async(&rocksky_ws).await?; 16 println!("Connected to {}", rocksky_ws); 17 18 let (mut write, mut read) = ws_stream.split(); 19 let device_id = Arc::new(Mutex::new(String::new())); 20 21 write 22 .send( 23 json!({ 24 "type": "register", 25 "clientName": "Rockbox", 26 "token": token 27 }) 28 .to_string() 29 .into(), 30 ) 31 .await?; 32 33 let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(32); 34 let tx_clone = tx.clone(); 35 36 tokio::spawn(async move { 37 let player: Box<dyn Player + Send + Sync> = get_current_player().map_err(|err| { 38 println!("Error getting current player: {}", err); 39 err 40 })?; 41 player 42 .broadcast_now_playing(tx_clone) 43 .await 44 .unwrap_or_else(|err| eprintln!("Error broadcasting now playing: {}", err)); 45 Ok::<(), Error>(()) 46 }); 47 48 tokio::spawn(async move { 49 let player: Box<dyn Player + Send + Sync> = get_current_player().map_err(|err| { 50 println!("Error getting current player: {}", err); 51 err 52 })?; 53 player 54 .broadcast_status(tx) 55 .await 56 .unwrap_or_else(|err| eprintln!("Error broadcasting status: {}", err)); 57 Ok::<(), Error>(()) 58 }); 59 60 { 61 let device_id = Arc::clone(&device_id); 62 let token = token.clone(); 63 tokio::spawn(async move { 64 while let Some(msg) = rx.recv().await { 65 println!("Sending message: {}", msg); 66 let id = device_id.lock().await.clone(); 67 if let Err(err) = write 68 .send( 69 json!({ 70 "type": "message", 71 "data": serde_json::from_str::<Value>(&msg).unwrap(), 72 "device_id": id, 73 "token": token 74 }) 75 .to_string() 76 .into(), 77 ) 78 .await 79 { 80 eprintln!("Send error: {}", err); 81 break; 82 } 83 } 84 }); 85 } 86 87 while let Some(msg) = read.next().await { 88 let msg = match msg { 89 Ok(m) => m.to_string(), 90 Err(e) => { 91 eprintln!("Read error: {}", e); 92 break; 93 } 94 }; 95 96 let msg: Value = serde_json::from_str(&msg)?; 97 if let Some(id) = msg["deviceId"].as_str() { 98 println!("Device ID: {}", id); 99 *device_id.lock().await = id.to_string(); 100 } 101 102 if let Some("command") = msg["type"].as_str() { 103 if let Some(cmd) = msg["action"].as_str() { 104 println!("Received command: {}", cmd); 105 106 let player: Box<dyn Player> = get_current_player()?; 107 108 if let Some("command") = msg["type"].as_str() { 109 if let Some(cmd) = msg["action"].as_str() { 110 match cmd { 111 "play" => player.play().await?, 112 "pause" => player.pause().await?, 113 "next" => player.next().await?, 114 "previous" => player.previous().await?, 115 "seek" => player.seek(msg["position"].as_u64().unwrap_or(0)).await?, 116 _ => { 117 eprintln!("Unknown command: {}", cmd.magenta()); 118 continue; 119 } 120 } 121 } else { 122 println!("No action specified in command message, ignoring."); 123 } 124 } 125 } 126 } 127 } 128 129 Ok(()) 130}