A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at setup-tracing 431 lines 15 kB view raw
1use std::{env, io::Write, path::Path, sync::Arc}; 2 3use anyhow::Error; 4use futures::future::BoxFuture; 5use lofty::{ 6 file::TaggedFileExt, 7 picture::{MimeType, Picture}, 8 probe::Probe, 9 tag::Accessor, 10}; 11use owo_colors::OwoColorize; 12use reqwest::{multipart, Client}; 13use sqlx::{Pool, Postgres}; 14use symphonia::core::{ 15 formats::FormatOptions, io::MediaSourceStream, meta::MetadataOptions, probe::Hint, 16}; 17use tempfile::TempDir; 18 19use crate::{ 20 client::{GoogleDriveClient, BASE_URL}, 21 consts::AUDIO_EXTENSIONS, 22 crypto::decrypt_aes_256_ctr, 23 repo::{ 24 google_drive_directory::create_google_drive_directory, 25 google_drive_path::create_google_drive_path, 26 google_drive_token::{find_google_drive_refresh_token, find_google_drive_refresh_tokens}, 27 track::get_track_by_hash, 28 }, 29 token::generate_token, 30 types::file::{File, FileList}, 31}; 32 33pub async fn scan_googledrive(pool: Arc<Pool<Postgres>>) -> Result<(), Error> { 34 let refresh_tokens = find_google_drive_refresh_tokens(&pool).await?; 35 for token in refresh_tokens { 36 let refresh_token = decrypt_aes_256_ctr( 37 &token.refresh_token, 38 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 39 )?; 40 41 let client = GoogleDriveClient::new(&refresh_token).await?; 42 let filelist = client.get_music_directory().await?; 43 let music_dir = filelist.files.first().unwrap(); 44 scan_audio_files( 45 pool.clone(), 46 music_dir.id.clone(), 47 refresh_token.clone(), 48 token.did.clone(), 49 token.xata_id.clone(), 50 None, 51 ) 52 .await?; 53 } 54 Ok(()) 55} 56 57pub async fn scan_folder( 58 pool: Arc<Pool<Postgres>>, 59 did: &str, 60 folder_id: &str, 61) -> Result<(), Error> { 62 let refresh_token = find_google_drive_refresh_token(&pool, did).await?; 63 if let Some((refresh_token, google_drive_id)) = refresh_token { 64 let refresh_token = decrypt_aes_256_ctr( 65 &refresh_token, 66 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?, 67 )?; 68 69 scan_audio_files( 70 pool.clone(), 71 folder_id.to_string(), 72 refresh_token, 73 did.to_string(), 74 google_drive_id.clone(), 75 None, 76 ) 77 .await?; 78 } 79 80 Ok(()) 81} 82 83pub fn scan_audio_files( 84 pool: Arc<Pool<Postgres>>, 85 file_id: String, 86 refresh_token: String, 87 did: String, 88 google_drive_id: String, 89 parent_drive_file_id: Option<String>, 90) -> BoxFuture<'static, Result<(), Error>> { 91 Box::pin(async move { 92 let client = GoogleDriveClient::new(&refresh_token).await?; 93 let access_token = client.access_token.clone(); 94 95 let client = Client::new(); 96 let url = format!("{}/files/{}", BASE_URL, file_id); 97 let res = client 98 .get(&url) 99 .bearer_auth(&access_token) 100 .query(&[("fields", "id, name, mimeType, parents")]) 101 .send() 102 .await?; 103 104 let file = res.json::<File>().await?; 105 106 if file.mime_type == "application/vnd.google-apps.folder" { 107 tracing::info!(folder = %file.name.bright_green(), "Scanning folder"); 108 109 create_google_drive_directory( 110 &pool, 111 &file, 112 &google_drive_id, 113 parent_drive_file_id.as_deref(), 114 ) 115 .await?; 116 117 // TODO: publish folder metadata to nats 118 119 let mut page_token: Option<String> = None; 120 let mut files: Vec<File> = Vec::new(); 121 122 loop { 123 let mut req = client 124 .get(&format!("{}/files", BASE_URL)) 125 .bearer_auth(&access_token) 126 .query(&[ 127 ("q", format!("'{}' in parents", file.id).as_str()), 128 ( 129 "fields", 130 "nextPageToken, files(id, name, mimeType, parents)", 131 ), 132 ("orderBy", "name"), 133 ("pageSize", "1000"), // max is 1000 134 ]); 135 136 if let Some(token) = &page_token { 137 req = req.query(&[("pageToken", token)]); 138 } 139 140 let res = req.send().await?; 141 let filelist = res.json::<FileList>().await?; 142 143 files.extend(filelist.files); 144 145 if let Some(token) = filelist.next_page_token { 146 page_token = Some(token); 147 } else { 148 break; 149 } 150 } 151 152 for child_file in files { 153 scan_audio_files( 154 pool.clone(), 155 child_file.id.clone(), 156 refresh_token.clone(), 157 did.clone(), 158 google_drive_id.clone(), 159 Some(file_id.clone()), 160 ) 161 .await?; 162 tokio::time::sleep(std::time::Duration::from_secs(3)).await; 163 } 164 165 return Ok(()); 166 } 167 168 if !AUDIO_EXTENSIONS 169 .into_iter() 170 .any(|ext| file.name.ends_with(&format!(".{}", ext))) 171 { 172 return Ok(()); 173 } 174 175 tracing::info!(file = %file.name.bright_green(), "Downloading file"); 176 177 let client = Client::new(); 178 179 let url = format!("{}/files/{}", BASE_URL, file_id); 180 let res = client 181 .get(&url) 182 .bearer_auth(&access_token) 183 .query(&[("alt", "media")]) 184 .send() 185 .await?; 186 187 let bytes = res.bytes().await?; 188 189 let temp_dir = TempDir::new()?; 190 let tmppath = temp_dir.path().join(&format!("{}", file.name)); 191 let mut tmpfile = std::fs::File::create(&tmppath)?; 192 tmpfile.write_all(&bytes)?; 193 194 tracing::info!(path = %tmppath.display(), "Reading file"); 195 196 let tagged_file = match Probe::open(&tmppath)?.read() { 197 Ok(tagged_file) => tagged_file, 198 Err(e) => { 199 tracing::warn!(file = %file.name.bright_green(), error = %e, "Failed to open file with lofty"); 200 return Ok(()); 201 } 202 }; 203 204 let primary_tag = tagged_file.primary_tag(); 205 let tag = match primary_tag { 206 Some(tag) => tag, 207 None => { 208 tracing::warn!(file = %file.name.bright_green(), "No tag found in file"); 209 return Ok(()); 210 } 211 }; 212 213 let pictures = tag.pictures(); 214 215 tracing::info!(title = %tag.get_string(&lofty::tag::ItemKey::TrackTitle).unwrap_or_default(), "Title"); 216 tracing::info!(artist = %tag.get_string(&lofty::tag::ItemKey::TrackArtist).unwrap_or_default(), "Artist"); 217 tracing::info!(album_artist = %tag.get_string(&lofty::tag::ItemKey::AlbumArtist).unwrap_or_default(), "Album artist"); 218 tracing::info!(album = %tag.get_string(&lofty::tag::ItemKey::AlbumTitle).unwrap_or_default(), "Album"); 219 tracing::info!(lyrics = %tag.get_string(&lofty::tag::ItemKey::Lyrics).unwrap_or_default(), "Lyrics"); 220 tracing::info!(year = %tag.year().unwrap_or_default(), "Year"); 221 tracing::info!(track_number = %tag.track().unwrap_or_default(), "Track number"); 222 tracing::info!(track_total = %tag.track_total().unwrap_or_default(), "Track total"); 223 tracing::info!(release_date = %tag.get_string(&lofty::tag::ItemKey::OriginalReleaseDate).unwrap_or_default(), "Release date"); 224 tracing::info!(recording_date = %tag.get_string(&lofty::tag::ItemKey::RecordingDate).unwrap_or_default(), "Recording date"); 225 tracing::info!(copyright = %tag.get_string(&lofty::tag::ItemKey::CopyrightMessage).unwrap_or_default(), "Copyright message"); 226 tracing::info!(pictures = %pictures.len(), "Pictures found"); 227 228 let title = tag 229 .get_string(&lofty::tag::ItemKey::TrackTitle) 230 .unwrap_or_default(); 231 let artist = tag 232 .get_string(&lofty::tag::ItemKey::TrackArtist) 233 .unwrap_or_default(); 234 let album_artist = tag 235 .get_string(&lofty::tag::ItemKey::AlbumArtist) 236 .unwrap_or_default(); 237 let album = tag 238 .get_string(&lofty::tag::ItemKey::AlbumTitle) 239 .unwrap_or_default(); 240 let access_token = generate_token(&did)?; 241 242 // check if track exists 243 // 244 // if not, create track 245 // upload album artist 246 // 247 // link path to track 248 249 let hash = sha256::digest(format!("{} - {} - {}", title, artist, album).to_lowercase()); 250 251 let track = get_track_by_hash(&pool, &hash).await?; 252 let duration = get_track_duration(&tmppath).await?; 253 let albumart_id = md5::compute(&format!("{} - {}", album_artist, album).to_lowercase()); 254 let albumart_id = format!("{:x}", albumart_id); 255 256 match track { 257 Some(track) => { 258 tracing::info!(title = %title.bright_green(), "Track exists"); 259 let parent_drive_id = parent_drive_file_id.as_deref(); 260 create_google_drive_path( 261 &pool, 262 &file, 263 &track, 264 &google_drive_id, 265 parent_drive_id.unwrap_or(""), 266 ) 267 .await?; 268 269 // TODO: publish file metadata to nats 270 } 271 None => { 272 tracing::info!(title = %title.bright_green(), "Creating track"); 273 274 let albumart = 275 upload_album_cover(albumart_id.into(), pictures, &access_token).await?; 276 277 let client = Client::new(); 278 const URL: &str = "https://api.rocksky.app/tracks"; 279 let response = client 280 .post(URL) 281 .header("Authorization", format!("Bearer {}", access_token)) 282 .json(&serde_json::json!({ 283 "title": tag.get_string(&lofty::tag::ItemKey::TrackTitle), 284 "album": tag.get_string(&lofty::tag::ItemKey::AlbumTitle), 285 "artist": tag.get_string(&lofty::tag::ItemKey::TrackArtist), 286 "albumArtist": match tag.get_string(&lofty::tag::ItemKey::AlbumArtist) { 287 Some(album_artist) => Some(album_artist), 288 None => Some(tag.get_string(&lofty::tag::ItemKey::TrackArtist).unwrap_or_default()), 289 }, 290 "duration": duration, 291 "trackNumber": tag.track(), 292 "releaseDate": tag.get_string(&lofty::tag::ItemKey::OriginalReleaseDate).map(|date| match date.contains("-") { 293 true => Some(date), 294 false => None, 295 }), 296 "year": tag.year(), 297 "discNumber": tag.disk().map(|disc| match disc { 298 0 => Some(1), 299 _ => Some(disc), 300 }).unwrap_or(Some(1)), 301 "composer": tag.get_string(&lofty::tag::ItemKey::Composer), 302 "albumArt": match albumart { 303 Some(albumart) => Some(format!("https://cdn.rocksky.app/covers/{}", albumart)), 304 None => None 305 }, 306 "lyrics": tag.get_string(&lofty::tag::ItemKey::Lyrics), 307 "copyrightMessage": tag.get_string(&lofty::tag::ItemKey::CopyrightMessage), 308 })) 309 .send() 310 .await?; 311 tracing::info!(status = %response.status(), "Track saved"); 312 tokio::time::sleep(std::time::Duration::from_secs(3)).await; 313 314 let track = get_track_by_hash(&pool, &hash).await?; 315 if let Some(track) = track { 316 let parent_drive_id = parent_drive_file_id.as_deref(); 317 create_google_drive_path( 318 &pool, 319 &file, 320 &track, 321 &google_drive_id, 322 parent_drive_id.unwrap_or(""), 323 ) 324 .await?; 325 326 // TODO: publish file metadata to nats 327 328 tokio::time::sleep(std::time::Duration::from_secs(1)).await; 329 330 return Ok(()); 331 } 332 333 tracing::warn!(title = %title.bright_green(), "Failed to create track"); 334 } 335 } 336 337 Ok(()) 338 }) 339} 340 341pub async fn upload_album_cover( 342 name: String, 343 pictures: &[Picture], 344 token: &str, 345) -> Result<Option<String>, Error> { 346 if pictures.is_empty() { 347 return Ok(None); 348 } 349 350 let picture = &pictures[0]; 351 352 let buffer = match picture.mime_type() { 353 Some(MimeType::Jpeg) => Some(picture.data().to_vec()), 354 Some(MimeType::Png) => Some(picture.data().to_vec()), 355 Some(MimeType::Gif) => Some(picture.data().to_vec()), 356 Some(MimeType::Bmp) => Some(picture.data().to_vec()), 357 Some(MimeType::Tiff) => Some(picture.data().to_vec()), 358 _ => None, 359 }; 360 361 if buffer.is_none() { 362 return Ok(None); 363 } 364 365 let buffer = buffer.unwrap(); 366 367 let ext = match picture.mime_type() { 368 Some(MimeType::Jpeg) => "jpg", 369 Some(MimeType::Png) => "png", 370 Some(MimeType::Gif) => "gif", 371 Some(MimeType::Bmp) => "bmp", 372 Some(MimeType::Tiff) => "tiff", 373 _ => { 374 return Ok(None); 375 } 376 }; 377 378 let name = format!("{}.{}", name, ext); 379 380 let part = multipart::Part::bytes(buffer).file_name(name.clone()); 381 let form = multipart::Form::new().part("file", part); 382 let client = Client::new(); 383 384 const URL: &str = "https://uploads.rocksky.app"; 385 386 let response = client 387 .post(URL) 388 .header("Authorization", format!("Bearer {}", token)) 389 .multipart(form) 390 .send() 391 .await?; 392 393 tracing::info!(status = %response.status(), "Cover uploaded"); 394 395 Ok(Some(name)) 396} 397 398pub async fn get_track_duration(path: &Path) -> Result<u64, Error> { 399 let duration = 0; 400 let media_source = 401 MediaSourceStream::new(Box::new(std::fs::File::open(path)?), Default::default()); 402 let mut hint = Hint::new(); 403 404 if let Some(extension) = path.extension() { 405 if let Some(extension) = extension.to_str() { 406 hint.with_extension(extension); 407 } 408 } 409 410 let meta_opts = MetadataOptions::default(); 411 let format_opts = FormatOptions::default(); 412 413 let probed = 414 match symphonia::default::get_probe().format(&hint, media_source, &format_opts, &meta_opts) 415 { 416 Ok(probed) => probed, 417 Err(e) => { 418 tracing::warn!(path = %path.display(), error = %e, "Failed to probe media"); 419 return Ok(duration); 420 } 421 }; 422 423 if let Some(track) = probed.format.tracks().first() { 424 if let Some(duration) = track.codec_params.n_frames { 425 if let Some(sample_rate) = track.codec_params.sample_rate { 426 return Ok((duration as f64 / sample_rate as f64) as u64 * 1000); 427 } 428 } 429 } 430 Ok(duration) 431}