A decentralized music tracking and discovery platform built on AT Protocol 馃幍
rocksky.app
spotify
atproto
lastfm
musicbrainz
scrobbling
listenbrainz
1use std::{env, fs::File, io::Write, path::Path, sync::Arc};
2
3use anyhow::Error;
4use futures::future::BoxFuture;
5use lofty::{
6 file::TaggedFileExt,
7 picture::{MimeType, Picture},
8 probe::Probe,
9 tag::Accessor,
10};
11use owo_colors::OwoColorize;
12use reqwest::{multipart, Client};
13use serde_json::json;
14use sqlx::{Pool, Postgres};
15use symphonia::core::{
16 formats::FormatOptions, io::MediaSourceStream, meta::MetadataOptions, probe::Hint,
17};
18use tempfile::TempDir;
19
20use crate::{
21 client::{get_access_token, BASE_URL, CONTENT_URL},
22 consts::AUDIO_EXTENSIONS,
23 crypto::decrypt_aes_256_ctr,
24 repo::{
25 dropbox_directory::create_dropbox_directory,
26 dropbox_path::create_dropbox_path,
27 dropbox_token::{find_dropbox_refresh_token, find_dropbox_refresh_tokens},
28 track::get_track_by_hash,
29 },
30 token::generate_token,
31 types::file::{Entry, EntryList},
32};
33
34pub async fn scan_dropbox(pool: Arc<Pool<Postgres>>) -> Result<(), Error> {
35 let refresh_tokens = find_dropbox_refresh_tokens(&pool).await?;
36 for token in refresh_tokens {
37 let refresh_token = decrypt_aes_256_ctr(
38 &token.refresh_token,
39 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
40 )?;
41 scan_audio_files(
42 pool.clone(),
43 "/Music".to_string(),
44 refresh_token,
45 token.did,
46 token.xata_id,
47 )
48 .await?;
49 }
50 Ok(())
51}
52
53pub async fn scan_folder(pool: Arc<Pool<Postgres>>, did: &str, path: &str) -> Result<(), Error> {
54 let refresh_tokens = find_dropbox_refresh_token(&pool, did).await?;
55 if let Some((refresh_token, dropbox_id)) = refresh_tokens {
56 let refresh_token = decrypt_aes_256_ctr(
57 &refresh_token,
58 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
59 )?;
60
61 scan_audio_files(
62 pool.clone(),
63 path.to_string(),
64 refresh_token,
65 did.to_string(),
66 dropbox_id,
67 )
68 .await?;
69 }
70 Ok(())
71}
72
73pub fn scan_audio_files(
74 pool: Arc<Pool<Postgres>>,
75 path: String,
76 refresh_token: String,
77 did: String,
78 dropbox_id: String,
79) -> BoxFuture<'static, Result<(), Error>> {
80 Box::pin(async move {
81 let res = get_access_token(&refresh_token).await?;
82 let access_token = res.access_token;
83
84 let client = Client::new();
85
86 let res = client
87 .post(&format!("{}/files/get_metadata", BASE_URL))
88 .bearer_auth(&access_token)
89 .json(&json!({ "path": path }))
90 .send()
91 .await?;
92
93 if res.status().as_u16() == 400 || res.status().as_u16() == 409 {
94 tracing::error!(path = %path.bright_red(), "Path not found");
95 return Ok(());
96 }
97
98 let entry = res.json::<Entry>().await?;
99
100 if entry.tag.clone().unwrap().as_str() == "folder" {
101 tracing::info!(path = %path.bright_green(), "Scanning folder");
102
103 let parent_path = Path::new(&path)
104 .parent()
105 .map(|p| p.to_string_lossy().to_string())
106 .unwrap_or_else(|| "".to_string());
107
108 create_dropbox_directory(&pool, &entry, &dropbox_id, &parent_path).await?;
109
110 // TODO: publish folder metadata to nats
111
112 let mut entries: Vec<Entry> = Vec::new();
113
114 let res = client
115 .post(&format!("{}/files/list_folder", BASE_URL))
116 .bearer_auth(&access_token)
117 .json(&json!({ "path": path }))
118 .send()
119 .await?;
120
121 let mut entry_list = res.json::<EntryList>().await?;
122 entries.extend(entry_list.entries);
123
124 // Handle pagination using list_folder/continue
125 while entry_list.has_more {
126 let res = client
127 .post(&format!("{}/files/list_folder/continue", BASE_URL))
128 .bearer_auth(&access_token)
129 .json(&json!({ "cursor": entry_list.cursor }))
130 .send()
131 .await?;
132
133 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
134
135 entry_list = res.json::<EntryList>().await?;
136 entries.extend(entry_list.entries);
137 }
138
139 for entry in entries {
140 scan_audio_files(
141 pool.clone(),
142 entry.path_display,
143 refresh_token.clone(),
144 did.clone(),
145 dropbox_id.clone(),
146 )
147 .await?;
148 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
149 }
150
151 return Ok(());
152 }
153
154 if !AUDIO_EXTENSIONS
155 .into_iter()
156 .any(|ext| path.ends_with(&format!(".{}", ext)))
157 {
158 return Ok(());
159 }
160
161 let client = Client::new();
162
163 tracing::info!(path = %path.bright_green(), "Downloading file");
164
165 let res = client
166 .post(&format!("{}/files/download", CONTENT_URL))
167 .bearer_auth(&access_token)
168 .header("Dropbox-API-Arg", &json!({ "path": path }).to_string())
169 .send()
170 .await?;
171
172 let bytes = res.bytes().await?;
173
174 let temp_dir = TempDir::new()?;
175 let tmppath = temp_dir.path().join(&format!("{}", entry.name));
176 let mut tmpfile = File::create(&tmppath)?;
177 tmpfile.write_all(&bytes)?;
178
179 tracing::info!(path = %tmppath.clone().display().to_string().bright_green(), "Reading file");
180
181 let tagged_file = match Probe::open(&tmppath)?.read() {
182 Ok(tagged_file) => tagged_file,
183 Err(e) => {
184 tracing::error!(path = %tmppath.clone().display().to_string().bright_red(), "Error reading file: {}", e);
185 return Ok(());
186 }
187 };
188
189 let primary_tag = tagged_file.primary_tag();
190 let tag = match primary_tag {
191 Some(tag) => tag,
192 None => {
193 tracing::error!(path = %tmppath.clone().display().to_string().bright_red(), "No tag found in file");
194 return Ok(());
195 }
196 };
197
198 let pictures = tag.pictures();
199
200 tracing::info!(
201 title = %tag
202 .get_string(&lofty::tag::ItemKey::TrackTitle)
203 .unwrap_or_default(),
204 );
205 tracing::info!(
206 artist = %tag
207 .get_string(&lofty::tag::ItemKey::TrackArtist)
208 .unwrap_or_default(),
209 );
210 tracing::info!(
211 album = %tag
212 .get_string(&lofty::tag::ItemKey::AlbumTitle)
213 .unwrap_or_default(),
214 );
215 tracing::info!(
216 album_artist = %tag
217 .get_string(&lofty::tag::ItemKey::AlbumArtist)
218 .unwrap_or_default(),
219 );
220 tracing::info!(
221 lyrics = %tag
222 .get_string(&lofty::tag::ItemKey::Lyrics)
223 .unwrap_or_default(),
224 );
225 tracing::info!(year = %tag.year().unwrap_or_default());
226 tracing::info!(track_number = %tag.track().unwrap_or_default());
227 tracing::info!(track_total = %tag.track_total().unwrap_or_default());
228 tracing::info!(
229 release_date = %tag
230 .get_string(&lofty::tag::ItemKey::OriginalReleaseDate)
231 .unwrap_or_default(),
232 );
233 tracing::info!(
234 recording_date = %tag
235 .get_string(&lofty::tag::ItemKey::RecordingDate)
236 .unwrap_or_default(),
237 );
238 tracing::info!(
239 copyright_message = %tag
240 .get_string(&lofty::tag::ItemKey::CopyrightMessage)
241 .unwrap_or_default(),
242 );
243 tracing::info!(pictures = ?pictures);
244
245 let title = tag
246 .get_string(&lofty::tag::ItemKey::TrackTitle)
247 .unwrap_or_default();
248 let artist = tag
249 .get_string(&lofty::tag::ItemKey::TrackArtist)
250 .unwrap_or_default();
251 let album = tag
252 .get_string(&lofty::tag::ItemKey::AlbumTitle)
253 .unwrap_or_default();
254 let album_artist = tag
255 .get_string(&lofty::tag::ItemKey::AlbumArtist)
256 .unwrap_or_default();
257
258 let access_token = generate_token(&did)?;
259
260 // check if track exists
261 //
262 // if not, create track
263 // upload album art
264 //
265 // link path to track
266
267 let hash = sha256::digest(format!("{} - {} - {}", title, artist, album).to_lowercase());
268
269 let track = get_track_by_hash(&pool, &hash).await?;
270 let duration = get_track_duration(&tmppath).await?;
271 let albumart_id = md5::compute(&format!("{} - {}", album_artist, album).to_lowercase());
272 let albumart_id = format!("{:x}", albumart_id);
273
274 match track {
275 Some(track) => {
276 tracing::info!(title = %title.bright_green(), "Track exists");
277 let parent_path = Path::new(&path)
278 .parent()
279 .map(|p| p.to_string_lossy().to_string());
280 let status =
281 create_dropbox_path(&pool, &entry, &track, &dropbox_id, parent_path).await;
282 tracing::info!(status = ?status);
283
284 // TODO: publish file metadata to nats
285 }
286 None => {
287 tracing::info!(title = %title.bright_green(), "Creating track");
288 let album_art =
289 upload_album_cover(albumart_id.into(), pictures, &access_token).await?;
290 let client = Client::new();
291 const URL: &str = "https://api.rocksky.app/tracks";
292 let response = client
293 .post(URL)
294 .header("Authorization", format!("Bearer {}", access_token))
295 .json(&serde_json::json!({
296 "title": tag.get_string(&lofty::tag::ItemKey::TrackTitle),
297 "album": tag.get_string(&lofty::tag::ItemKey::AlbumTitle),
298 "artist": tag.get_string(&lofty::tag::ItemKey::TrackArtist),
299 "albumArtist": match tag.get_string(&lofty::tag::ItemKey::AlbumArtist) {
300 Some(album_artist) => Some(album_artist),
301 None => Some(tag.get_string(&lofty::tag::ItemKey::TrackArtist).unwrap_or_default()),
302 },
303 "duration": duration,
304 "trackNumber": tag.track(),
305 "releaseDate": tag.get_string(&lofty::tag::ItemKey::OriginalReleaseDate).map(|date| match date.contains("-") {
306 true => Some(date),
307 false => None,
308 }),
309 "year": tag.year(),
310 "discNumber": tag.disk().map(|disc| match disc {
311 0 => Some(1),
312 _ => Some(disc),
313 }).unwrap_or(Some(1)),
314 "composer": tag.get_string(&lofty::tag::ItemKey::Composer),
315 "albumArt": match album_art{
316 Some(album_art) => Some(format!("https://cdn.rocksky.app/covers/{}", album_art)),
317 None => None
318 },
319 "lyrics": tag.get_string(&lofty::tag::ItemKey::Lyrics),
320 "copyrightMessage": tag.get_string(&lofty::tag::ItemKey::CopyrightMessage),
321 }))
322 .send()
323 .await?;
324 tracing::info!(title = title, status = %response.status(), "Track saved");
325 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
326
327 let track = get_track_by_hash(&pool, &hash).await?;
328 if let Some(track) = track {
329 let parent_path = Path::new(&path)
330 .parent()
331 .map(|p| p.to_string_lossy().to_string());
332 create_dropbox_path(&pool, &entry, &track, &dropbox_id, parent_path).await?;
333
334 // TODO: publish file metadata to nats
335
336 return Ok(());
337 }
338
339 tracing::error!(title = %title.bright_red(), "Failed to create track");
340 }
341 }
342
343 Ok(())
344 })
345}
346
347pub async fn upload_album_cover(
348 name: String,
349 pictures: &[Picture],
350 token: &str,
351) -> Result<Option<String>, Error> {
352 if pictures.is_empty() {
353 return Ok(None);
354 }
355
356 let picture = &pictures[0];
357
358 let buffer = match picture.mime_type() {
359 Some(MimeType::Jpeg) => Some(picture.data().to_vec()),
360 Some(MimeType::Png) => Some(picture.data().to_vec()),
361 Some(MimeType::Gif) => Some(picture.data().to_vec()),
362 Some(MimeType::Bmp) => Some(picture.data().to_vec()),
363 Some(MimeType::Tiff) => Some(picture.data().to_vec()),
364 _ => None,
365 };
366
367 if buffer.is_none() {
368 return Ok(None);
369 }
370
371 let buffer = buffer.unwrap();
372
373 let ext = match picture.mime_type() {
374 Some(MimeType::Jpeg) => "jpg",
375 Some(MimeType::Png) => "png",
376 Some(MimeType::Gif) => "gif",
377 Some(MimeType::Bmp) => "bmp",
378 Some(MimeType::Tiff) => "tiff",
379 _ => {
380 return Ok(None);
381 }
382 };
383
384 let name = format!("{}.{}", name, ext);
385
386 let part = multipart::Part::bytes(buffer).file_name(name.clone());
387 let form = multipart::Form::new().part("file", part);
388 let client = Client::new();
389
390 const URL: &str = "https://uploads.rocksky.app";
391
392 let response = client
393 .post(URL)
394 .header("Authorization", format!("Bearer {}", token))
395 .multipart(form)
396 .send()
397 .await?;
398
399 tracing::info!(status = %response.status(), "Cover uploaded");
400
401 Ok(Some(name))
402}
403
404pub async fn get_track_duration(path: &Path) -> Result<u64, Error> {
405 let duration = 0;
406 let media_source =
407 MediaSourceStream::new(Box::new(std::fs::File::open(path)?), Default::default());
408 let mut hint = Hint::new();
409
410 if let Some(extension) = path.extension() {
411 if let Some(extension) = extension.to_str() {
412 hint.with_extension(extension);
413 }
414 }
415
416 let meta_opts = MetadataOptions::default();
417 let format_opts = FormatOptions::default();
418
419 let probed = match symphonia::default::get_probe().format(
420 &hint,
421 media_source,
422 &format_opts,
423 &meta_opts,
424 ) {
425 Ok(probed) => probed,
426 Err(e) => {
427 tracing::error!(path = %path.display().to_string().bright_red(), "Error probing file: {}", e);
428 return Ok(duration);
429 }
430 };
431
432 if let Some(track) = probed.format.tracks().first() {
433 if let Some(duration) = track.codec_params.n_frames {
434 if let Some(sample_rate) = track.codec_params.sample_rate {
435 return Ok((duration as f64 / sample_rate as f64) as u64 * 1000);
436 }
437 }
438 }
439 Ok(duration)
440}