A modern Music Player Daemon based on Rockbox open source high quality audio player
libadwaita audio rust zig deno mpris rockbox mpd
at master 413 lines 18 kB view raw
1use anyhow::Error; 2use handlers::{ 3 batch::{handle_command_list_begin, handle_command_list_ok_begin}, 4 browse::{handle_listall, handle_listallinfo, handle_listfiles, handle_lsinfo}, 5 library::{ 6 handle_config, handle_find, handle_find_album, handle_find_artist, handle_find_title, 7 handle_list_album, handle_list_artist, handle_list_title, handle_rescan, handle_search, 8 handle_stats, handle_tagtypes, handle_tagtypes_enable, 9 }, 10 playback::{ 11 handle_currentsong, handle_getvol, handle_next, handle_outputs, handle_pause, handle_play, 12 handle_playid, handle_previous, handle_random, handle_repeat, handle_seek, handle_seekcur, 13 handle_seekid, handle_setvol, handle_single, handle_status, handle_toggle, 14 }, 15 queue::{ 16 handle_add, handle_addid, handle_clear, handle_delete, handle_deleteid, handle_move, 17 handle_playlistinfo, handle_shuffle, 18 }, 19 system::{handle_binarylimit, handle_commands, handle_decoders, handle_idle, handle_noidle}, 20 Subsystem, 21}; 22use kv::{build_tracks_kv, KV}; 23use rockbox_graphql::{ 24 schema::objects::{audio_status::AudioStatus, playlist::Playlist, track::Track}, 25 simplebroker::SimpleBroker, 26}; 27use rockbox_library::{create_connection_pool, entity, repo}; 28use rockbox_rpc::api::rockbox::v1alpha1::{ 29 library_service_client::LibraryServiceClient, playback_service_client::PlaybackServiceClient, 30 playlist_service_client::PlaylistServiceClient, settings_service_client::SettingsServiceClient, 31 sound_service_client::SoundServiceClient, system_service_client::SystemServiceClient, 32 GetCurrentRequest, GetGlobalStatusRequest, PlaylistResumeRequest, 33}; 34use rockbox_sys::types::user_settings::UserSettings; 35use sqlx::{Pool, Sqlite}; 36use std::{env, sync::Arc, thread, time::Duration}; 37use tokio::{ 38 io::{AsyncReadExt, AsyncWriteExt}, 39 net::{TcpListener, TcpStream}, 40 sync::{broadcast, Mutex}, 41}; 42use tokio_stream::StreamExt; 43use tonic::transport::Channel; 44 45pub mod consts; 46pub mod dir; 47pub mod handlers; 48pub mod kv; 49 50#[derive(Clone)] 51pub struct Context { 52 pub library: LibraryServiceClient<Channel>, 53 pub playback: PlaybackServiceClient<Channel>, 54 pub settings: SettingsServiceClient<Channel>, 55 pub sound: SoundServiceClient<Channel>, 56 pub playlist: PlaylistServiceClient<Channel>, 57 pub system: SystemServiceClient<Channel>, 58 pub single: Arc<Mutex<String>>, 59 pub batch: bool, 60 pub event_sender: broadcast::Sender<Subsystem>, 61 pub event_receiver: Arc<Mutex<broadcast::Receiver<Subsystem>>>, 62 pub current_track: Arc<Mutex<Option<Track>>>, 63 pub current_playlist: Arc<Mutex<Option<Playlist>>>, 64 pub playback_status: Arc<Mutex<Option<AudioStatus>>>, 65 pub pool: Pool<Sqlite>, 66 pub kv: Arc<Mutex<KV<entity::track::Track>>>, 67 pub current_settings: Arc<Mutex<UserSettings>>, 68} 69 70pub struct MpdServer {} 71 72impl MpdServer { 73 pub async fn start() -> Result<(), Error> { 74 let port = env::var("ROCKBOX_MPD_PORT").unwrap_or_else(|_| "6600".to_string()); 75 let addr = format!("0.0.0.0:{}", port); 76 let context = setup_context(false, None).await?; 77 78 listen_events(context.clone()); 79 80 thread::sleep(Duration::from_millis(200)); 81 82 restore_playlist(context.clone())?; 83 84 let listener = TcpListener::bind(&addr).await?; 85 86 loop { 87 let (stream, _) = listener.accept().await?; 88 let context = context.clone(); 89 tokio::spawn(async move { 90 match handle_client(context, stream).await { 91 Ok(_) => {} 92 Err(e) => { 93 eprintln!("Error: {}", e); 94 } 95 } 96 }); 97 } 98 } 99} 100 101pub async fn handle_client(mut ctx: Context, stream: TcpStream) -> Result<(), Error> { 102 let mut buf = [0; 4096]; 103 let (reader_stream, writer_stream) = tokio::io::split(stream); 104 let mut reader = tokio::io::BufReader::new(reader_stream); 105 let mut writer = tokio::io::BufWriter::new(writer_stream); 106 107 let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(32); 108 109 tokio::spawn(async move { 110 while let Some(msg) = rx.recv().await { 111 writer.write_all(msg.as_bytes()).await?; 112 writer.flush().await?; 113 } 114 Ok::<(), Error>(()) 115 }); 116 117 tx.send("OK MPD 0.23.15\n".to_string()).await?; 118 119 while let Ok(n) = reader.read(&mut buf).await { 120 if n == 0 { 121 break; 122 } 123 let request = String::from_utf8_lossy(&buf[..n]); 124 let command = parse_command(&request)?; 125 println!("request: {}", request); 126 127 match command.as_str() { 128 "play" => handle_play(&mut ctx, &request, tx.clone()).await?, 129 "pause" => handle_pause(&mut ctx, &request, tx.clone()).await?, 130 "toggle" => handle_toggle(&mut ctx, &request, tx.clone()).await?, 131 "next" => handle_next(&mut ctx, &request, tx.clone()).await?, 132 "previous" => handle_previous(&mut ctx, &request, tx.clone()).await?, 133 "playid" => handle_playid(&mut ctx, &request, tx.clone()).await?, 134 "seek" => handle_seek(&mut ctx, &request, tx.clone()).await?, 135 "seekid" => handle_seekid(&mut ctx, &request, tx.clone()).await?, 136 "seekcur" => handle_seekcur(&mut ctx, &request, tx.clone()).await?, 137 "random" => handle_random(&mut ctx, &request, tx.clone()).await?, 138 "repeat" => handle_repeat(&mut ctx, &request, tx.clone()).await?, 139 "getvol" => handle_getvol(&mut ctx, &request, tx.clone()).await?, 140 "setvol" => handle_setvol(&mut ctx, &request, tx.clone()).await?, 141 "volume" => handle_setvol(&mut ctx, &request, tx.clone()).await?, 142 "single" => handle_single(&mut ctx, &request, tx.clone()).await?, 143 "shuffle" => handle_shuffle(&mut ctx, &request, tx.clone()).await?, 144 "add" => handle_add(&mut ctx, &request, tx.clone()).await?, 145 "addid" => handle_addid(&mut ctx, &request, tx.clone()).await?, 146 "deleteid" => handle_deleteid(&mut ctx, &request, tx.clone()).await?, 147 "playlistinfo" => handle_playlistinfo(&mut ctx, &request, tx.clone()).await?, 148 "delete" => handle_delete(&mut ctx, &request, tx.clone()).await?, 149 "clear" => handle_clear(&mut ctx, &request, tx.clone()).await?, 150 "move" => handle_move(&mut ctx, &request, tx.clone()).await?, 151 "list album" => handle_list_album(&mut ctx, &request, tx.clone()).await?, 152 "list albumartist" => handle_list_artist(&mut ctx, &request, tx.clone()).await?, 153 "list artist" => handle_list_artist(&mut ctx, &request, tx.clone()).await?, 154 "list title" => handle_list_title(&mut ctx, &request, tx.clone()).await?, 155 "update" => handle_rescan(&mut ctx, &request, tx.clone()).await?, 156 "search" => handle_search(&mut ctx, &request, tx.clone()).await?, 157 "rescan" => handle_rescan(&mut ctx, &request, tx.clone()).await?, 158 "status" => handle_status(&mut ctx, &request, tx.clone()).await?, 159 "currentsong" => handle_currentsong(&mut ctx, &request, tx.clone()).await?, 160 "config" => handle_config(&mut ctx, &request, tx.clone()).await?, 161 "tagtypes " => handle_tagtypes(&mut ctx, &request, tx.clone()).await?, 162 "tagtypes clear" => handle_clear(&mut ctx, &request, tx.clone()).await?, 163 "tagtypes enable" => handle_tagtypes_enable(&mut ctx, &request, tx.clone()).await?, 164 "stats" => handle_stats(&mut ctx, &request, tx.clone()).await?, 165 "plchanges" => handle_playlistinfo(&mut ctx, &request, tx.clone()).await?, 166 "outputs" => handle_outputs(&mut ctx, &request, tx.clone()).await?, 167 "idle" => handle_idle(&mut ctx, &request, tx.clone()).await?, 168 "noidle" => handle_noidle(&mut ctx, &request, tx.clone()).await?, 169 "decoders" => handle_decoders(&mut ctx, &request, tx.clone()).await?, 170 "lsinfo" => handle_lsinfo(&mut ctx, &request, tx.clone()).await?, 171 "listall" => handle_listall(&mut ctx, &request, tx.clone()).await?, 172 "listallinfo" => handle_listallinfo(&mut ctx, &request, tx.clone()).await?, 173 "listfiles" => handle_listfiles(&mut ctx, &request, tx.clone()).await?, 174 "find artist" => handle_find_artist(&mut ctx, &request, tx.clone()).await?, 175 "find album" => handle_find_album(&mut ctx, &request, tx.clone()).await?, 176 "find title" => handle_find_title(&mut ctx, &request, tx.clone()).await?, 177 "binarylimit" => handle_binarylimit(&mut ctx, &request, tx.clone()).await?, 178 "commands" => handle_commands(&mut ctx, &request, tx.clone()).await?, 179 "command_list_begin" => { 180 handle_command_list_begin(&mut ctx, &request, tx.clone()).await? 181 } 182 "command_list_ok_begin" => { 183 handle_command_list_ok_begin(&mut ctx, &request, tx.clone()).await? 184 } 185 _ => { 186 if command.starts_with("find ") { 187 handle_find(&mut ctx, &request, tx.clone()).await?; 188 return Ok(()); 189 } 190 println!("Unhandled command: {}", command); 191 println!("Unhandled request: {}", request); 192 tx.send("ACK [5@0] {unhandled} unknown command\n".to_string()) 193 .await?; 194 "ACK [5@0] {unhandled} unknown command\n".to_string() 195 } 196 }; 197 } 198 Ok(()) 199} 200 201fn parse_command(request: &str) -> Result<String, Error> { 202 let command = request.split_whitespace().next().unwrap_or_default(); 203 204 if command == "list" { 205 // should parse the next word, and return "list album" or "list artist" or "list title" 206 let r#type = request.split_whitespace().nth(1).unwrap_or_default(); 207 return Ok(format!("list {}", r#type.to_lowercase())); 208 } 209 210 if command == "tagtypes" { 211 let r#type = request.split_whitespace().nth(1).unwrap_or_default(); 212 return Ok(format!("tagtypes {}", r#type.replace("\"", ""))); 213 } 214 215 if command == "find" { 216 let r#type = request.split_whitespace().nth(1).unwrap_or_default(); 217 return Ok(format!("find {}", r#type.to_lowercase())); 218 } 219 220 Ok(command.to_string()) 221} 222 223pub async fn setup_context(batch: bool, ctx: Option<Context>) -> Result<Context, Error> { 224 let port = env::var("ROCKBOX_PORT").unwrap_or_else(|_| "6061".to_string()); 225 let host = env::var("ROCKBOX_HOST").unwrap_or_else(|_| "localhost".to_string()); 226 let url = format!("tcp://{}:{}", host, port); 227 228 let pool = create_connection_pool().await?; 229 let kv = Arc::new(Mutex::new(build_tracks_kv(pool.clone()).await?)); 230 231 let library = LibraryServiceClient::connect(url.clone()).await?; 232 let playback = PlaybackServiceClient::connect(url.clone()).await?; 233 let settings = SettingsServiceClient::connect(url.clone()).await?; 234 let sound = SoundServiceClient::connect(url.clone()).await?; 235 let playlist = PlaylistServiceClient::connect(url.clone()).await?; 236 let system = SystemServiceClient::connect(url.clone()).await?; 237 238 let (event_sender, event_receiver) = broadcast::channel(16); 239 240 Ok(Context { 241 library, 242 playback, 243 settings, 244 sound, 245 playlist, 246 system, 247 single: Arc::new(Mutex::new("\"0\"".to_string())), 248 batch, 249 event_sender: match ctx { 250 Some(ref ctx) => ctx.clone().event_sender, 251 None => event_sender, 252 }, 253 event_receiver: match ctx { 254 Some(ref ctx) => ctx.clone().event_receiver, 255 None => Arc::new(Mutex::new(event_receiver)), 256 }, 257 current_track: match ctx { 258 Some(ref ctx) => ctx.clone().current_track, 259 None => Arc::new(Mutex::new(None)), 260 }, 261 current_playlist: match ctx { 262 Some(ref ctx) => ctx.clone().current_playlist, 263 None => Arc::new(Mutex::new(None)), 264 }, 265 playback_status: match ctx { 266 Some(ref ctx) => ctx.clone().playback_status, 267 None => Arc::new(Mutex::new(None)), 268 }, 269 pool, 270 kv, 271 current_settings: Arc::new(Mutex::new(rockbox_sys::settings::get_global_settings())), 272 }) 273} 274 275pub fn listen_events(ctx: Context) { 276 let ctx_clone = ctx.clone(); 277 let another_ctx = ctx.clone(); 278 let another_cloned_ctx = ctx.clone(); 279 280 thread::spawn(move || { 281 let rt = tokio::runtime::Runtime::new().unwrap(); 282 loop { 283 let mut current_settings = rt.block_on(another_cloned_ctx.current_settings.lock()); 284 *current_settings = rockbox_sys::settings::get_global_settings(); 285 drop(current_settings); 286 thread::sleep(std::time::Duration::from_millis(800)); 287 } 288 }); 289 290 thread::spawn(move || { 291 let mut subscription = SimpleBroker::<Track>::subscribe(); 292 let rt = tokio::runtime::Runtime::new().unwrap(); 293 294 while let Some(track) = rt.block_on(subscription.next()) { 295 let mut current_track = rt.block_on(ctx.current_track.lock()); 296 *current_track = Some(track); 297 } 298 }); 299 300 thread::spawn(move || { 301 let mut subscription = SimpleBroker::<Playlist>::subscribe(); 302 let rt = tokio::runtime::Runtime::new().unwrap(); 303 304 while let Some(playlist) = rt.block_on(subscription.next()) { 305 let mut current_playlist = rt.block_on(ctx_clone.current_playlist.lock()); 306 307 // verify if current_playlist index is different from playlist index 308 if (current_playlist.is_some() 309 && current_playlist.as_ref().unwrap().index != playlist.index) 310 || current_playlist.is_none() 311 { 312 let ctx = ctx_clone.clone(); 313 match ctx.event_sender.send(Subsystem::Playlist) { 314 Ok(_) => {} 315 Err(e) => { 316 eprintln!("Error: {}", e) 317 } 318 } 319 } 320 321 *current_playlist = Some(playlist); 322 } 323 }); 324 325 thread::spawn(move || { 326 let mut subscription = SimpleBroker::<AudioStatus>::subscribe(); 327 let rt = tokio::runtime::Runtime::new().unwrap(); 328 329 while let Some(status) = rt.block_on(subscription.next()) { 330 let mut playback_status = rt.block_on(another_ctx.playback_status.lock()); 331 // verify if playback_status status is different from status status 332 if playback_status.is_some() 333 && playback_status.as_ref().unwrap().status != status.status 334 { 335 let ctx = another_ctx.clone(); 336 match ctx.event_sender.send(Subsystem::Player) { 337 Ok(_) => {} 338 Err(e) => { 339 eprintln!("Error: {}", e) 340 } 341 } 342 } 343 *playback_status = Some(status); 344 } 345 }); 346} 347 348pub fn restore_playlist(ctx: Context) -> Result<(), Error> { 349 let ctx_clone = ctx.clone(); 350 thread::spawn(move || { 351 let mut ctx = ctx_clone.clone(); 352 let rt = tokio::runtime::Runtime::new().unwrap(); 353 rt.block_on(async { 354 let response = ctx 355 .system 356 .get_global_status(GetGlobalStatusRequest {}) 357 .await?; 358 let response = response.into_inner(); 359 360 let playback_status = ctx.playback_status.lock().await; 361 let mut status = 0; 362 363 if playback_status.is_some() { 364 status = playback_status.as_ref().unwrap().status; 365 } 366 367 if response.resume_index > -1 && status != 1 { 368 ctx.playlist 369 .playlist_resume(PlaylistResumeRequest {}) 370 .await?; 371 let resume_index = response.resume_index; 372 let resume_elapsed = response.resume_elapsed; 373 thread::sleep(std::time::Duration::from_millis(500)); 374 let response = ctx.playlist.get_current(GetCurrentRequest {}).await?; 375 let response = response.into_inner(); 376 let mut current_track = ctx.current_track.lock().await; 377 let path = response.tracks[resume_index as usize].path.clone(); 378 379 let mut track: Track = Track { 380 path: path.clone(), 381 artist: response.tracks[resume_index as usize].artist.clone(), 382 album: response.tracks[resume_index as usize].album.clone(), 383 title: response.tracks[resume_index as usize].title.clone(), 384 album_artist: response.tracks[resume_index as usize].album_artist.clone(), 385 elapsed: resume_elapsed as u64, 386 length: response.tracks[resume_index as usize].length, 387 tracknum: response.tracks[resume_index as usize].tracknum, 388 year: response.tracks[resume_index as usize].year, 389 year_string: response.tracks[resume_index as usize].year_string.clone(), 390 ..Default::default() 391 }; 392 393 *current_track = Some(track.clone()); 394 let hash = format!("{:x}", md5::compute(path.as_bytes())); 395 let pool = rockbox_library::create_connection_pool().await?; 396 397 if let Ok(Some(metadata)) = repo::track::find_by_md5(pool.clone(), &hash).await { 398 track.id = Some(metadata.id); 399 track.album_art = metadata.album_art; 400 track.album_id = Some(metadata.album_id); 401 track.artist_id = Some(metadata.artist_id); 402 SimpleBroker::publish(track); 403 } 404 } 405 406 Ok::<(), Error>(()) 407 })?; 408 409 Ok::<(), Error>(()) 410 }); 411 412 Ok(()) 413}