A modern Music Player Daemon based on Rockbox open source high quality audio player
libadwaita
audio
rust
zig
deno
mpris
rockbox
mpd
1use anyhow::Error;
2use handlers::{
3 batch::{handle_command_list_begin, handle_command_list_ok_begin},
4 browse::{handle_listall, handle_listallinfo, handle_listfiles, handle_lsinfo},
5 library::{
6 handle_config, handle_find, handle_find_album, handle_find_artist, handle_find_title,
7 handle_list_album, handle_list_artist, handle_list_title, handle_rescan, handle_search,
8 handle_stats, handle_tagtypes, handle_tagtypes_enable,
9 },
10 playback::{
11 handle_currentsong, handle_getvol, handle_next, handle_outputs, handle_pause, handle_play,
12 handle_playid, handle_previous, handle_random, handle_repeat, handle_seek, handle_seekcur,
13 handle_seekid, handle_setvol, handle_single, handle_status, handle_toggle,
14 },
15 queue::{
16 handle_add, handle_addid, handle_clear, handle_delete, handle_deleteid, handle_move,
17 handle_playlistinfo, handle_shuffle,
18 },
19 system::{handle_binarylimit, handle_commands, handle_decoders, handle_idle, handle_noidle},
20 Subsystem,
21};
22use kv::{build_tracks_kv, KV};
23use rockbox_graphql::{
24 schema::objects::{audio_status::AudioStatus, playlist::Playlist, track::Track},
25 simplebroker::SimpleBroker,
26};
27use rockbox_library::{create_connection_pool, entity, repo};
28use rockbox_rpc::api::rockbox::v1alpha1::{
29 library_service_client::LibraryServiceClient, playback_service_client::PlaybackServiceClient,
30 playlist_service_client::PlaylistServiceClient, settings_service_client::SettingsServiceClient,
31 sound_service_client::SoundServiceClient, system_service_client::SystemServiceClient,
32 GetCurrentRequest, GetGlobalStatusRequest, PlaylistResumeRequest,
33};
34use rockbox_sys::types::user_settings::UserSettings;
35use sqlx::{Pool, Sqlite};
36use std::{env, sync::Arc, thread, time::Duration};
37use tokio::{
38 io::{AsyncReadExt, AsyncWriteExt},
39 net::{TcpListener, TcpStream},
40 sync::{broadcast, Mutex},
41};
42use tokio_stream::StreamExt;
43use tonic::transport::Channel;
44
45pub mod consts;
46pub mod dir;
47pub mod handlers;
48pub mod kv;
49
50#[derive(Clone)]
51pub struct Context {
52 pub library: LibraryServiceClient<Channel>,
53 pub playback: PlaybackServiceClient<Channel>,
54 pub settings: SettingsServiceClient<Channel>,
55 pub sound: SoundServiceClient<Channel>,
56 pub playlist: PlaylistServiceClient<Channel>,
57 pub system: SystemServiceClient<Channel>,
58 pub single: Arc<Mutex<String>>,
59 pub batch: bool,
60 pub event_sender: broadcast::Sender<Subsystem>,
61 pub event_receiver: Arc<Mutex<broadcast::Receiver<Subsystem>>>,
62 pub current_track: Arc<Mutex<Option<Track>>>,
63 pub current_playlist: Arc<Mutex<Option<Playlist>>>,
64 pub playback_status: Arc<Mutex<Option<AudioStatus>>>,
65 pub pool: Pool<Sqlite>,
66 pub kv: Arc<Mutex<KV<entity::track::Track>>>,
67 pub current_settings: Arc<Mutex<UserSettings>>,
68}
69
70pub struct MpdServer {}
71
72impl MpdServer {
73 pub async fn start() -> Result<(), Error> {
74 let port = env::var("ROCKBOX_MPD_PORT").unwrap_or_else(|_| "6600".to_string());
75 let addr = format!("0.0.0.0:{}", port);
76 let context = setup_context(false, None).await?;
77
78 listen_events(context.clone());
79
80 thread::sleep(Duration::from_millis(200));
81
82 restore_playlist(context.clone())?;
83
84 let listener = TcpListener::bind(&addr).await?;
85
86 loop {
87 let (stream, _) = listener.accept().await?;
88 let context = context.clone();
89 tokio::spawn(async move {
90 match handle_client(context, stream).await {
91 Ok(_) => {}
92 Err(e) => {
93 eprintln!("Error: {}", e);
94 }
95 }
96 });
97 }
98 }
99}
100
101pub async fn handle_client(mut ctx: Context, stream: TcpStream) -> Result<(), Error> {
102 let mut buf = [0; 4096];
103 let (reader_stream, writer_stream) = tokio::io::split(stream);
104 let mut reader = tokio::io::BufReader::new(reader_stream);
105 let mut writer = tokio::io::BufWriter::new(writer_stream);
106
107 let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(32);
108
109 tokio::spawn(async move {
110 while let Some(msg) = rx.recv().await {
111 writer.write_all(msg.as_bytes()).await?;
112 writer.flush().await?;
113 }
114 Ok::<(), Error>(())
115 });
116
117 tx.send("OK MPD 0.23.15\n".to_string()).await?;
118
119 while let Ok(n) = reader.read(&mut buf).await {
120 if n == 0 {
121 break;
122 }
123 let request = String::from_utf8_lossy(&buf[..n]);
124 let command = parse_command(&request)?;
125 println!("request: {}", request);
126
127 match command.as_str() {
128 "play" => handle_play(&mut ctx, &request, tx.clone()).await?,
129 "pause" => handle_pause(&mut ctx, &request, tx.clone()).await?,
130 "toggle" => handle_toggle(&mut ctx, &request, tx.clone()).await?,
131 "next" => handle_next(&mut ctx, &request, tx.clone()).await?,
132 "previous" => handle_previous(&mut ctx, &request, tx.clone()).await?,
133 "playid" => handle_playid(&mut ctx, &request, tx.clone()).await?,
134 "seek" => handle_seek(&mut ctx, &request, tx.clone()).await?,
135 "seekid" => handle_seekid(&mut ctx, &request, tx.clone()).await?,
136 "seekcur" => handle_seekcur(&mut ctx, &request, tx.clone()).await?,
137 "random" => handle_random(&mut ctx, &request, tx.clone()).await?,
138 "repeat" => handle_repeat(&mut ctx, &request, tx.clone()).await?,
139 "getvol" => handle_getvol(&mut ctx, &request, tx.clone()).await?,
140 "setvol" => handle_setvol(&mut ctx, &request, tx.clone()).await?,
141 "volume" => handle_setvol(&mut ctx, &request, tx.clone()).await?,
142 "single" => handle_single(&mut ctx, &request, tx.clone()).await?,
143 "shuffle" => handle_shuffle(&mut ctx, &request, tx.clone()).await?,
144 "add" => handle_add(&mut ctx, &request, tx.clone()).await?,
145 "addid" => handle_addid(&mut ctx, &request, tx.clone()).await?,
146 "deleteid" => handle_deleteid(&mut ctx, &request, tx.clone()).await?,
147 "playlistinfo" => handle_playlistinfo(&mut ctx, &request, tx.clone()).await?,
148 "delete" => handle_delete(&mut ctx, &request, tx.clone()).await?,
149 "clear" => handle_clear(&mut ctx, &request, tx.clone()).await?,
150 "move" => handle_move(&mut ctx, &request, tx.clone()).await?,
151 "list album" => handle_list_album(&mut ctx, &request, tx.clone()).await?,
152 "list albumartist" => handle_list_artist(&mut ctx, &request, tx.clone()).await?,
153 "list artist" => handle_list_artist(&mut ctx, &request, tx.clone()).await?,
154 "list title" => handle_list_title(&mut ctx, &request, tx.clone()).await?,
155 "update" => handle_rescan(&mut ctx, &request, tx.clone()).await?,
156 "search" => handle_search(&mut ctx, &request, tx.clone()).await?,
157 "rescan" => handle_rescan(&mut ctx, &request, tx.clone()).await?,
158 "status" => handle_status(&mut ctx, &request, tx.clone()).await?,
159 "currentsong" => handle_currentsong(&mut ctx, &request, tx.clone()).await?,
160 "config" => handle_config(&mut ctx, &request, tx.clone()).await?,
161 "tagtypes " => handle_tagtypes(&mut ctx, &request, tx.clone()).await?,
162 "tagtypes clear" => handle_clear(&mut ctx, &request, tx.clone()).await?,
163 "tagtypes enable" => handle_tagtypes_enable(&mut ctx, &request, tx.clone()).await?,
164 "stats" => handle_stats(&mut ctx, &request, tx.clone()).await?,
165 "plchanges" => handle_playlistinfo(&mut ctx, &request, tx.clone()).await?,
166 "outputs" => handle_outputs(&mut ctx, &request, tx.clone()).await?,
167 "idle" => handle_idle(&mut ctx, &request, tx.clone()).await?,
168 "noidle" => handle_noidle(&mut ctx, &request, tx.clone()).await?,
169 "decoders" => handle_decoders(&mut ctx, &request, tx.clone()).await?,
170 "lsinfo" => handle_lsinfo(&mut ctx, &request, tx.clone()).await?,
171 "listall" => handle_listall(&mut ctx, &request, tx.clone()).await?,
172 "listallinfo" => handle_listallinfo(&mut ctx, &request, tx.clone()).await?,
173 "listfiles" => handle_listfiles(&mut ctx, &request, tx.clone()).await?,
174 "find artist" => handle_find_artist(&mut ctx, &request, tx.clone()).await?,
175 "find album" => handle_find_album(&mut ctx, &request, tx.clone()).await?,
176 "find title" => handle_find_title(&mut ctx, &request, tx.clone()).await?,
177 "binarylimit" => handle_binarylimit(&mut ctx, &request, tx.clone()).await?,
178 "commands" => handle_commands(&mut ctx, &request, tx.clone()).await?,
179 "command_list_begin" => {
180 handle_command_list_begin(&mut ctx, &request, tx.clone()).await?
181 }
182 "command_list_ok_begin" => {
183 handle_command_list_ok_begin(&mut ctx, &request, tx.clone()).await?
184 }
185 _ => {
186 if command.starts_with("find ") {
187 handle_find(&mut ctx, &request, tx.clone()).await?;
188 return Ok(());
189 }
190 println!("Unhandled command: {}", command);
191 println!("Unhandled request: {}", request);
192 tx.send("ACK [5@0] {unhandled} unknown command\n".to_string())
193 .await?;
194 "ACK [5@0] {unhandled} unknown command\n".to_string()
195 }
196 };
197 }
198 Ok(())
199}
200
201fn parse_command(request: &str) -> Result<String, Error> {
202 let command = request.split_whitespace().next().unwrap_or_default();
203
204 if command == "list" {
205 // should parse the next word, and return "list album" or "list artist" or "list title"
206 let r#type = request.split_whitespace().nth(1).unwrap_or_default();
207 return Ok(format!("list {}", r#type.to_lowercase()));
208 }
209
210 if command == "tagtypes" {
211 let r#type = request.split_whitespace().nth(1).unwrap_or_default();
212 return Ok(format!("tagtypes {}", r#type.replace("\"", "")));
213 }
214
215 if command == "find" {
216 let r#type = request.split_whitespace().nth(1).unwrap_or_default();
217 return Ok(format!("find {}", r#type.to_lowercase()));
218 }
219
220 Ok(command.to_string())
221}
222
223pub async fn setup_context(batch: bool, ctx: Option<Context>) -> Result<Context, Error> {
224 let port = env::var("ROCKBOX_PORT").unwrap_or_else(|_| "6061".to_string());
225 let host = env::var("ROCKBOX_HOST").unwrap_or_else(|_| "localhost".to_string());
226 let url = format!("tcp://{}:{}", host, port);
227
228 let pool = create_connection_pool().await?;
229 let kv = Arc::new(Mutex::new(build_tracks_kv(pool.clone()).await?));
230
231 let library = LibraryServiceClient::connect(url.clone()).await?;
232 let playback = PlaybackServiceClient::connect(url.clone()).await?;
233 let settings = SettingsServiceClient::connect(url.clone()).await?;
234 let sound = SoundServiceClient::connect(url.clone()).await?;
235 let playlist = PlaylistServiceClient::connect(url.clone()).await?;
236 let system = SystemServiceClient::connect(url.clone()).await?;
237
238 let (event_sender, event_receiver) = broadcast::channel(16);
239
240 Ok(Context {
241 library,
242 playback,
243 settings,
244 sound,
245 playlist,
246 system,
247 single: Arc::new(Mutex::new("\"0\"".to_string())),
248 batch,
249 event_sender: match ctx {
250 Some(ref ctx) => ctx.clone().event_sender,
251 None => event_sender,
252 },
253 event_receiver: match ctx {
254 Some(ref ctx) => ctx.clone().event_receiver,
255 None => Arc::new(Mutex::new(event_receiver)),
256 },
257 current_track: match ctx {
258 Some(ref ctx) => ctx.clone().current_track,
259 None => Arc::new(Mutex::new(None)),
260 },
261 current_playlist: match ctx {
262 Some(ref ctx) => ctx.clone().current_playlist,
263 None => Arc::new(Mutex::new(None)),
264 },
265 playback_status: match ctx {
266 Some(ref ctx) => ctx.clone().playback_status,
267 None => Arc::new(Mutex::new(None)),
268 },
269 pool,
270 kv,
271 current_settings: Arc::new(Mutex::new(rockbox_sys::settings::get_global_settings())),
272 })
273}
274
275pub fn listen_events(ctx: Context) {
276 let ctx_clone = ctx.clone();
277 let another_ctx = ctx.clone();
278 let another_cloned_ctx = ctx.clone();
279
280 thread::spawn(move || {
281 let rt = tokio::runtime::Runtime::new().unwrap();
282 loop {
283 let mut current_settings = rt.block_on(another_cloned_ctx.current_settings.lock());
284 *current_settings = rockbox_sys::settings::get_global_settings();
285 drop(current_settings);
286 thread::sleep(std::time::Duration::from_millis(800));
287 }
288 });
289
290 thread::spawn(move || {
291 let mut subscription = SimpleBroker::<Track>::subscribe();
292 let rt = tokio::runtime::Runtime::new().unwrap();
293
294 while let Some(track) = rt.block_on(subscription.next()) {
295 let mut current_track = rt.block_on(ctx.current_track.lock());
296 *current_track = Some(track);
297 }
298 });
299
300 thread::spawn(move || {
301 let mut subscription = SimpleBroker::<Playlist>::subscribe();
302 let rt = tokio::runtime::Runtime::new().unwrap();
303
304 while let Some(playlist) = rt.block_on(subscription.next()) {
305 let mut current_playlist = rt.block_on(ctx_clone.current_playlist.lock());
306
307 // verify if current_playlist index is different from playlist index
308 if (current_playlist.is_some()
309 && current_playlist.as_ref().unwrap().index != playlist.index)
310 || current_playlist.is_none()
311 {
312 let ctx = ctx_clone.clone();
313 match ctx.event_sender.send(Subsystem::Playlist) {
314 Ok(_) => {}
315 Err(e) => {
316 eprintln!("Error: {}", e)
317 }
318 }
319 }
320
321 *current_playlist = Some(playlist);
322 }
323 });
324
325 thread::spawn(move || {
326 let mut subscription = SimpleBroker::<AudioStatus>::subscribe();
327 let rt = tokio::runtime::Runtime::new().unwrap();
328
329 while let Some(status) = rt.block_on(subscription.next()) {
330 let mut playback_status = rt.block_on(another_ctx.playback_status.lock());
331 // verify if playback_status status is different from status status
332 if playback_status.is_some()
333 && playback_status.as_ref().unwrap().status != status.status
334 {
335 let ctx = another_ctx.clone();
336 match ctx.event_sender.send(Subsystem::Player) {
337 Ok(_) => {}
338 Err(e) => {
339 eprintln!("Error: {}", e)
340 }
341 }
342 }
343 *playback_status = Some(status);
344 }
345 });
346}
347
348pub fn restore_playlist(ctx: Context) -> Result<(), Error> {
349 let ctx_clone = ctx.clone();
350 thread::spawn(move || {
351 let mut ctx = ctx_clone.clone();
352 let rt = tokio::runtime::Runtime::new().unwrap();
353 rt.block_on(async {
354 let response = ctx
355 .system
356 .get_global_status(GetGlobalStatusRequest {})
357 .await?;
358 let response = response.into_inner();
359
360 let playback_status = ctx.playback_status.lock().await;
361 let mut status = 0;
362
363 if playback_status.is_some() {
364 status = playback_status.as_ref().unwrap().status;
365 }
366
367 if response.resume_index > -1 && status != 1 {
368 ctx.playlist
369 .playlist_resume(PlaylistResumeRequest {})
370 .await?;
371 let resume_index = response.resume_index;
372 let resume_elapsed = response.resume_elapsed;
373 thread::sleep(std::time::Duration::from_millis(500));
374 let response = ctx.playlist.get_current(GetCurrentRequest {}).await?;
375 let response = response.into_inner();
376 let mut current_track = ctx.current_track.lock().await;
377 let path = response.tracks[resume_index as usize].path.clone();
378
379 let mut track: Track = Track {
380 path: path.clone(),
381 artist: response.tracks[resume_index as usize].artist.clone(),
382 album: response.tracks[resume_index as usize].album.clone(),
383 title: response.tracks[resume_index as usize].title.clone(),
384 album_artist: response.tracks[resume_index as usize].album_artist.clone(),
385 elapsed: resume_elapsed as u64,
386 length: response.tracks[resume_index as usize].length,
387 tracknum: response.tracks[resume_index as usize].tracknum,
388 year: response.tracks[resume_index as usize].year,
389 year_string: response.tracks[resume_index as usize].year_string.clone(),
390 ..Default::default()
391 };
392
393 *current_track = Some(track.clone());
394 let hash = format!("{:x}", md5::compute(path.as_bytes()));
395 let pool = rockbox_library::create_connection_pool().await?;
396
397 if let Ok(Some(metadata)) = repo::track::find_by_md5(pool.clone(), &hash).await {
398 track.id = Some(metadata.id);
399 track.album_art = metadata.album_art;
400 track.album_id = Some(metadata.album_id);
401 track.artist_id = Some(metadata.artist_id);
402 SimpleBroker::publish(track);
403 }
404 }
405
406 Ok::<(), Error>(())
407 })?;
408
409 Ok::<(), Error>(())
410 });
411
412 Ok(())
413}