forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use std::{env, sync::Arc};
2
3use anyhow::Error;
4use futures_util::{SinkExt, StreamExt};
5use owo_colors::OwoColorize;
6use serde_json::{Value, json};
7use tokio::sync::Mutex;
8use tokio_tungstenite::connect_async;
9
10use crate::players::{Player, get_current_player};
11
12pub async fn connect_to_rocksky_websocket(token: String) -> Result<(), Error> {
13 let rocksky_ws =
14 env::var("ROCKSKY_WS").unwrap_or_else(|_| "wss://api.rocksky.app/ws".to_string());
15 let (ws_stream, _) = connect_async(&rocksky_ws).await?;
16 println!("Connected to {}", rocksky_ws);
17
18 let (mut write, mut read) = ws_stream.split();
19 let device_id = Arc::new(Mutex::new(String::new()));
20
21 write
22 .send(
23 json!({
24 "type": "register",
25 "clientName": "Rockbox",
26 "token": token
27 })
28 .to_string()
29 .into(),
30 )
31 .await?;
32
33 let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(32);
34 let tx_clone = tx.clone();
35
36 tokio::spawn(async move {
37 let player: Box<dyn Player + Send + Sync> = get_current_player().map_err(|err| {
38 println!("Error getting current player: {}", err);
39 err
40 })?;
41 player
42 .broadcast_now_playing(tx_clone)
43 .await
44 .unwrap_or_else(|err| eprintln!("Error broadcasting now playing: {}", err));
45 Ok::<(), Error>(())
46 });
47
48 tokio::spawn(async move {
49 let player: Box<dyn Player + Send + Sync> = get_current_player().map_err(|err| {
50 println!("Error getting current player: {}", err);
51 err
52 })?;
53 player
54 .broadcast_status(tx)
55 .await
56 .unwrap_or_else(|err| eprintln!("Error broadcasting status: {}", err));
57 Ok::<(), Error>(())
58 });
59
60 {
61 let device_id = Arc::clone(&device_id);
62 let token = token.clone();
63 tokio::spawn(async move {
64 while let Some(msg) = rx.recv().await {
65 println!("Sending message: {}", msg);
66 let id = device_id.lock().await.clone();
67 if let Err(err) = write
68 .send(
69 json!({
70 "type": "message",
71 "data": serde_json::from_str::<Value>(&msg).unwrap(),
72 "device_id": id,
73 "token": token
74 })
75 .to_string()
76 .into(),
77 )
78 .await
79 {
80 eprintln!("Send error: {}", err);
81 break;
82 }
83 }
84 });
85 }
86
87 while let Some(msg) = read.next().await {
88 let msg = match msg {
89 Ok(m) => m.to_string(),
90 Err(e) => {
91 eprintln!("Read error: {}", e);
92 break;
93 }
94 };
95
96 let msg: Value = serde_json::from_str(&msg)?;
97 if let Some(id) = msg["deviceId"].as_str() {
98 println!("Device ID: {}", id);
99 *device_id.lock().await = id.to_string();
100 }
101
102 if let Some("command") = msg["type"].as_str() {
103 if let Some(cmd) = msg["action"].as_str() {
104 println!("Received command: {}", cmd);
105
106 let player: Box<dyn Player> = get_current_player()?;
107
108 if let Some("command") = msg["type"].as_str() {
109 if let Some(cmd) = msg["action"].as_str() {
110 match cmd {
111 "play" => player.play().await?,
112 "pause" => player.pause().await?,
113 "next" => player.next().await?,
114 "previous" => player.previous().await?,
115 "seek" => player.seek(msg["position"].as_u64().unwrap_or(0)).await?,
116 _ => {
117 eprintln!("Unknown command: {}", cmd.magenta());
118 continue;
119 }
120 }
121 } else {
122 println!("No action specified in command message, ignoring.");
123 }
124 }
125 }
126 }
127 }
128
129 Ok(())
130}