A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at feat/pgpull 440 lines 15 kB view raw
1use std::{env, fs::File, io::Write, path::Path, sync::Arc}; 2 3use anyhow::Error; 4use futures::future::BoxFuture; 5use lofty::{ 6 file::TaggedFileExt, 7 picture::{MimeType, Picture}, 8 probe::Probe, 9 tag::Accessor, 10}; 11use owo_colors::OwoColorize; 12use reqwest::{multipart, Client}; 13use serde_json::json; 14use sqlx::{Pool, Postgres}; 15use symphonia::core::{ 16 formats::FormatOptions, io::MediaSourceStream, meta::MetadataOptions, probe::Hint, 17}; 18use tempfile::TempDir; 19 20use crate::{ 21 client::{get_access_token, BASE_URL, CONTENT_URL}, 22 consts::AUDIO_EXTENSIONS, 23 crypto::decrypt_aes_256_ctr, 24 repo::{ 25 dropbox_directory::create_dropbox_directory, 26 dropbox_path::create_dropbox_path, 27 dropbox_token::{find_dropbox_refresh_token, find_dropbox_refresh_tokens}, 28 track::get_track_by_hash, 29 }, 30 token::generate_token, 31 types::file::{Entry, EntryList}, 32}; 33 34pub async fn scan_dropbox(pool: Arc<Pool<Postgres>>) -> Result<(), Error> { 35 let refresh_tokens = find_dropbox_refresh_tokens(&pool).await?; 36 for token in refresh_tokens { 37 let refresh_token = decrypt_aes_256_ctr( 38 &token.refresh_token, 39 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 40 )?; 41 scan_audio_files( 42 pool.clone(), 43 "/Music".to_string(), 44 refresh_token, 45 token.did, 46 token.xata_id, 47 ) 48 .await?; 49 } 50 Ok(()) 51} 52 53pub async fn scan_folder(pool: Arc<Pool<Postgres>>, did: &str, path: &str) -> Result<(), Error> { 54 let refresh_tokens = find_dropbox_refresh_token(&pool, did).await?; 55 if let Some((refresh_token, dropbox_id)) = refresh_tokens { 56 let refresh_token = decrypt_aes_256_ctr( 57 &refresh_token, 58 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 59 )?; 60 61 scan_audio_files( 62 pool.clone(), 63 path.to_string(), 64 refresh_token, 65 did.to_string(), 66 dropbox_id, 67 ) 68 .await?; 69 } 70 Ok(()) 71} 72 73pub fn scan_audio_files( 74 pool: Arc<Pool<Postgres>>, 75 path: String, 76 refresh_token: String, 77 did: String, 78 dropbox_id: String, 79) -> BoxFuture<'static, Result<(), Error>> { 80 Box::pin(async move { 81 let res = get_access_token(&refresh_token).await?; 82 let access_token = res.access_token; 83 84 let client = Client::new(); 85 86 let res = client 87 .post(&format!("{}/files/get_metadata", BASE_URL)) 88 .bearer_auth(&access_token) 89 .json(&json!({ "path": path })) 90 .send() 91 .await?; 92 93 if res.status().as_u16() == 400 || res.status().as_u16() == 409 { 94 tracing::error!(path = %path.bright_red(), "Path not found"); 95 return Ok(()); 96 } 97 98 let entry = res.json::<Entry>().await?; 99 100 if entry.tag.clone().unwrap().as_str() == "folder" { 101 tracing::info!(path = %path.bright_green(), "Scanning folder"); 102 103 let parent_path = Path::new(&path) 104 .parent() 105 .map(|p| p.to_string_lossy().to_string()) 106 .unwrap_or_else(|| "".to_string()); 107 108 create_dropbox_directory(&pool, &entry, &dropbox_id, &parent_path).await?; 109 110 // TODO: publish folder metadata to nats 111 112 let mut entries: Vec<Entry> = Vec::new(); 113 114 let res = client 115 .post(&format!("{}/files/list_folder", BASE_URL)) 116 .bearer_auth(&access_token) 117 .json(&json!({ "path": path })) 118 .send() 119 .await?; 120 121 let mut entry_list = res.json::<EntryList>().await?; 122 entries.extend(entry_list.entries); 123 124 // Handle pagination using list_folder/continue 125 while entry_list.has_more { 126 let res = client 127 .post(&format!("{}/files/list_folder/continue", BASE_URL)) 128 .bearer_auth(&access_token) 129 .json(&json!({ "cursor": entry_list.cursor })) 130 .send() 131 .await?; 132 133 tokio::time::sleep(std::time::Duration::from_secs(1)).await; 134 135 entry_list = res.json::<EntryList>().await?; 136 entries.extend(entry_list.entries); 137 } 138 139 for entry in entries { 140 scan_audio_files( 141 pool.clone(), 142 entry.path_display, 143 refresh_token.clone(), 144 did.clone(), 145 dropbox_id.clone(), 146 ) 147 .await?; 148 tokio::time::sleep(std::time::Duration::from_secs(1)).await; 149 } 150 151 return Ok(()); 152 } 153 154 if !AUDIO_EXTENSIONS 155 .into_iter() 156 .any(|ext| path.ends_with(&format!(".{}", ext))) 157 { 158 return Ok(()); 159 } 160 161 let client = Client::new(); 162 163 tracing::info!(path = %path.bright_green(), "Downloading file"); 164 165 let res = client 166 .post(&format!("{}/files/download", CONTENT_URL)) 167 .bearer_auth(&access_token) 168 .header("Dropbox-API-Arg", &json!({ "path": path }).to_string()) 169 .send() 170 .await?; 171 172 let bytes = res.bytes().await?; 173 174 let temp_dir = TempDir::new()?; 175 let tmppath = temp_dir.path().join(&format!("{}", entry.name)); 176 let mut tmpfile = File::create(&tmppath)?; 177 tmpfile.write_all(&bytes)?; 178 179 tracing::info!(path = %tmppath.clone().display().to_string().bright_green(), "Reading file"); 180 181 let tagged_file = match Probe::open(&tmppath)?.read() { 182 Ok(tagged_file) => tagged_file, 183 Err(e) => { 184 tracing::error!(path = %tmppath.clone().display().to_string().bright_red(), "Error reading file: {}", e); 185 return Ok(()); 186 } 187 }; 188 189 let primary_tag = tagged_file.primary_tag(); 190 let tag = match primary_tag { 191 Some(tag) => tag, 192 None => { 193 tracing::error!(path = %tmppath.clone().display().to_string().bright_red(), "No tag found in file"); 194 return Ok(()); 195 } 196 }; 197 198 let pictures = tag.pictures(); 199 200 tracing::info!( 201 title = %tag 202 .get_string(&lofty::tag::ItemKey::TrackTitle) 203 .unwrap_or_default(), 204 ); 205 tracing::info!( 206 artist = %tag 207 .get_string(&lofty::tag::ItemKey::TrackArtist) 208 .unwrap_or_default(), 209 ); 210 tracing::info!( 211 album = %tag 212 .get_string(&lofty::tag::ItemKey::AlbumTitle) 213 .unwrap_or_default(), 214 ); 215 tracing::info!( 216 album_artist = %tag 217 .get_string(&lofty::tag::ItemKey::AlbumArtist) 218 .unwrap_or_default(), 219 ); 220 tracing::info!( 221 lyrics = %tag 222 .get_string(&lofty::tag::ItemKey::Lyrics) 223 .unwrap_or_default(), 224 ); 225 tracing::info!(year = %tag.year().unwrap_or_default()); 226 tracing::info!(track_number = %tag.track().unwrap_or_default()); 227 tracing::info!(track_total = %tag.track_total().unwrap_or_default()); 228 tracing::info!( 229 release_date = %tag 230 .get_string(&lofty::tag::ItemKey::OriginalReleaseDate) 231 .unwrap_or_default(), 232 ); 233 tracing::info!( 234 recording_date = %tag 235 .get_string(&lofty::tag::ItemKey::RecordingDate) 236 .unwrap_or_default(), 237 ); 238 tracing::info!( 239 copyright_message = %tag 240 .get_string(&lofty::tag::ItemKey::CopyrightMessage) 241 .unwrap_or_default(), 242 ); 243 tracing::info!(pictures = ?pictures); 244 245 let title = tag 246 .get_string(&lofty::tag::ItemKey::TrackTitle) 247 .unwrap_or_default(); 248 let artist = tag 249 .get_string(&lofty::tag::ItemKey::TrackArtist) 250 .unwrap_or_default(); 251 let album = tag 252 .get_string(&lofty::tag::ItemKey::AlbumTitle) 253 .unwrap_or_default(); 254 let album_artist = tag 255 .get_string(&lofty::tag::ItemKey::AlbumArtist) 256 .unwrap_or_default(); 257 258 let access_token = generate_token(&did)?; 259 260 // check if track exists 261 // 262 // if not, create track 263 // upload album art 264 // 265 // link path to track 266 267 let hash = sha256::digest(format!("{} - {} - {}", title, artist, album).to_lowercase()); 268 269 let track = get_track_by_hash(&pool, &hash).await?; 270 let duration = get_track_duration(&tmppath).await?; 271 let albumart_id = md5::compute(&format!("{} - {}", album_artist, album).to_lowercase()); 272 let albumart_id = format!("{:x}", albumart_id); 273 274 match track { 275 Some(track) => { 276 tracing::info!(title = %title.bright_green(), "Track exists"); 277 let parent_path = Path::new(&path) 278 .parent() 279 .map(|p| p.to_string_lossy().to_string()); 280 let status = 281 create_dropbox_path(&pool, &entry, &track, &dropbox_id, parent_path).await; 282 tracing::info!(status = ?status); 283 284 // TODO: publish file metadata to nats 285 } 286 None => { 287 tracing::info!(title = %title.bright_green(), "Creating track"); 288 let album_art = 289 upload_album_cover(albumart_id.into(), pictures, &access_token).await?; 290 let client = Client::new(); 291 const URL: &str = "https://api.rocksky.app/tracks"; 292 let response = client 293 .post(URL) 294 .header("Authorization", format!("Bearer {}", access_token)) 295 .json(&serde_json::json!({ 296 "title": tag.get_string(&lofty::tag::ItemKey::TrackTitle), 297 "album": tag.get_string(&lofty::tag::ItemKey::AlbumTitle), 298 "artist": tag.get_string(&lofty::tag::ItemKey::TrackArtist), 299 "albumArtist": match tag.get_string(&lofty::tag::ItemKey::AlbumArtist) { 300 Some(album_artist) => Some(album_artist), 301 None => Some(tag.get_string(&lofty::tag::ItemKey::TrackArtist).unwrap_or_default()), 302 }, 303 "duration": duration, 304 "trackNumber": tag.track(), 305 "releaseDate": tag.get_string(&lofty::tag::ItemKey::OriginalReleaseDate).map(|date| match date.contains("-") { 306 true => Some(date), 307 false => None, 308 }), 309 "year": tag.year(), 310 "discNumber": tag.disk().map(|disc| match disc { 311 0 => Some(1), 312 _ => Some(disc), 313 }).unwrap_or(Some(1)), 314 "composer": tag.get_string(&lofty::tag::ItemKey::Composer), 315 "albumArt": match album_art{ 316 Some(album_art) => Some(format!("https://cdn.rocksky.app/covers/{}", album_art)), 317 None => None 318 }, 319 "lyrics": tag.get_string(&lofty::tag::ItemKey::Lyrics), 320 "copyrightMessage": tag.get_string(&lofty::tag::ItemKey::CopyrightMessage), 321 })) 322 .send() 323 .await?; 324 tracing::info!(title = title, status = %response.status(), "Track saved"); 325 tokio::time::sleep(std::time::Duration::from_secs(3)).await; 326 327 let track = get_track_by_hash(&pool, &hash).await?; 328 if let Some(track) = track { 329 let parent_path = Path::new(&path) 330 .parent() 331 .map(|p| p.to_string_lossy().to_string()); 332 create_dropbox_path(&pool, &entry, &track, &dropbox_id, parent_path).await?; 333 334 // TODO: publish file metadata to nats 335 336 return Ok(()); 337 } 338 339 tracing::error!(title = %title.bright_red(), "Failed to create track"); 340 } 341 } 342 343 Ok(()) 344 }) 345} 346 347pub async fn upload_album_cover( 348 name: String, 349 pictures: &[Picture], 350 token: &str, 351) -> Result<Option<String>, Error> { 352 if pictures.is_empty() { 353 return Ok(None); 354 } 355 356 let picture = &pictures[0]; 357 358 let buffer = match picture.mime_type() { 359 Some(MimeType::Jpeg) => Some(picture.data().to_vec()), 360 Some(MimeType::Png) => Some(picture.data().to_vec()), 361 Some(MimeType::Gif) => Some(picture.data().to_vec()), 362 Some(MimeType::Bmp) => Some(picture.data().to_vec()), 363 Some(MimeType::Tiff) => Some(picture.data().to_vec()), 364 _ => None, 365 }; 366 367 if buffer.is_none() { 368 return Ok(None); 369 } 370 371 let buffer = buffer.unwrap(); 372 373 let ext = match picture.mime_type() { 374 Some(MimeType::Jpeg) => "jpg", 375 Some(MimeType::Png) => "png", 376 Some(MimeType::Gif) => "gif", 377 Some(MimeType::Bmp) => "bmp", 378 Some(MimeType::Tiff) => "tiff", 379 _ => { 380 return Ok(None); 381 } 382 }; 383 384 let name = format!("{}.{}", name, ext); 385 386 let part = multipart::Part::bytes(buffer).file_name(name.clone()); 387 let form = multipart::Form::new().part("file", part); 388 let client = Client::new(); 389 390 const URL: &str = "https://uploads.rocksky.app"; 391 392 let response = client 393 .post(URL) 394 .header("Authorization", format!("Bearer {}", token)) 395 .multipart(form) 396 .send() 397 .await?; 398 399 tracing::info!(status = %response.status(), "Cover uploaded"); 400 401 Ok(Some(name)) 402} 403 404pub async fn get_track_duration(path: &Path) -> Result<u64, Error> { 405 let duration = 0; 406 let media_source = 407 MediaSourceStream::new(Box::new(std::fs::File::open(path)?), Default::default()); 408 let mut hint = Hint::new(); 409 410 if let Some(extension) = path.extension() { 411 if let Some(extension) = extension.to_str() { 412 hint.with_extension(extension); 413 } 414 } 415 416 let meta_opts = MetadataOptions::default(); 417 let format_opts = FormatOptions::default(); 418 419 let probed = match symphonia::default::get_probe().format( 420 &hint, 421 media_source, 422 &format_opts, 423 &meta_opts, 424 ) { 425 Ok(probed) => probed, 426 Err(e) => { 427 tracing::error!(path = %path.display().to_string().bright_red(), "Error probing file: {}", e); 428 return Ok(duration); 429 } 430 }; 431 432 if let Some(track) = probed.format.tracks().first() { 433 if let Some(duration) = track.codec_params.n_frames { 434 if let Some(sample_rate) = track.codec_params.sample_rate { 435 return Ok((duration as f64 / sample_rate as f64) as u64 * 1000); 436 } 437 } 438 } 439 Ok(duration) 440}