forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use std::{env, io::Write, path::Path, sync::Arc};
2
3use anyhow::Error;
4use futures::future::BoxFuture;
5use lofty::{
6 file::TaggedFileExt,
7 picture::{MimeType, Picture},
8 probe::Probe,
9 tag::Accessor,
10};
11use owo_colors::OwoColorize;
12use reqwest::{multipart, Client};
13use sqlx::{Pool, Postgres};
14use symphonia::core::{
15 formats::FormatOptions, io::MediaSourceStream, meta::MetadataOptions, probe::Hint,
16};
17use tempfile::TempDir;
18
19use crate::{
20 client::{GoogleDriveClient, BASE_URL},
21 consts::AUDIO_EXTENSIONS,
22 crypto::decrypt_aes_256_ctr,
23 repo::{
24 google_drive_directory::create_google_drive_directory,
25 google_drive_path::create_google_drive_path,
26 google_drive_token::{find_google_drive_refresh_token, find_google_drive_refresh_tokens},
27 track::get_track_by_hash,
28 },
29 token::generate_token,
30 types::file::{File, FileList},
31};
32
33pub async fn scan_googledrive(pool: Arc<Pool<Postgres>>) -> Result<(), Error> {
34 let refresh_tokens = find_google_drive_refresh_tokens(&pool).await?;
35 for token in refresh_tokens {
36 let refresh_token = decrypt_aes_256_ctr(
37 &token.refresh_token,
38 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
39 )?;
40
41 let client = GoogleDriveClient::new(&refresh_token).await?;
42 let filelist = client.get_music_directory().await?;
43 let music_dir = filelist.files.first().unwrap();
44 scan_audio_files(
45 pool.clone(),
46 music_dir.id.clone(),
47 refresh_token.clone(),
48 token.did.clone(),
49 token.xata_id.clone(),
50 None,
51 )
52 .await?;
53 }
54 Ok(())
55}
56
57pub async fn scan_folder(
58 pool: Arc<Pool<Postgres>>,
59 did: &str,
60 folder_id: &str,
61) -> Result<(), Error> {
62 let refresh_token = find_google_drive_refresh_token(&pool, did).await?;
63 if let Some((refresh_token, google_drive_id)) = refresh_token {
64 let refresh_token = decrypt_aes_256_ctr(
65 &refresh_token,
66 &hex::decode(env::var("SPOTIFY_ENCRYPTION_KEY")?)?,
67 )?;
68
69 scan_audio_files(
70 pool.clone(),
71 folder_id.to_string(),
72 refresh_token,
73 did.to_string(),
74 google_drive_id.clone(),
75 None,
76 )
77 .await?;
78 }
79
80 Ok(())
81}
82
83pub fn scan_audio_files(
84 pool: Arc<Pool<Postgres>>,
85 file_id: String,
86 refresh_token: String,
87 did: String,
88 google_drive_id: String,
89 parent_drive_file_id: Option<String>,
90) -> BoxFuture<'static, Result<(), Error>> {
91 Box::pin(async move {
92 let client = GoogleDriveClient::new(&refresh_token).await?;
93 let access_token = client.access_token.clone();
94
95 let client = Client::new();
96 let url = format!("{}/files/{}", BASE_URL, file_id);
97 let res = client
98 .get(&url)
99 .bearer_auth(&access_token)
100 .query(&[("fields", "id, name, mimeType, parents")])
101 .send()
102 .await?;
103
104 let file = res.json::<File>().await?;
105
106 if file.mime_type == "application/vnd.google-apps.folder" {
107 tracing::info!(folder = %file.name.bright_green(), "Scanning folder");
108
109 create_google_drive_directory(
110 &pool,
111 &file,
112 &google_drive_id,
113 parent_drive_file_id.as_deref(),
114 )
115 .await?;
116
117 // TODO: publish folder metadata to nats
118
119 let mut page_token: Option<String> = None;
120 let mut files: Vec<File> = Vec::new();
121
122 loop {
123 let mut req = client
124 .get(&format!("{}/files", BASE_URL))
125 .bearer_auth(&access_token)
126 .query(&[
127 ("q", format!("'{}' in parents", file.id).as_str()),
128 (
129 "fields",
130 "nextPageToken, files(id, name, mimeType, parents)",
131 ),
132 ("orderBy", "name"),
133 ("pageSize", "1000"), // max is 1000
134 ]);
135
136 if let Some(token) = &page_token {
137 req = req.query(&[("pageToken", token)]);
138 }
139
140 let res = req.send().await?;
141 let filelist = res.json::<FileList>().await?;
142
143 files.extend(filelist.files);
144
145 if let Some(token) = filelist.next_page_token {
146 page_token = Some(token);
147 } else {
148 break;
149 }
150 }
151
152 for child_file in files {
153 scan_audio_files(
154 pool.clone(),
155 child_file.id.clone(),
156 refresh_token.clone(),
157 did.clone(),
158 google_drive_id.clone(),
159 Some(file_id.clone()),
160 )
161 .await?;
162 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
163 }
164
165 return Ok(());
166 }
167
168 if !AUDIO_EXTENSIONS
169 .into_iter()
170 .any(|ext| file.name.ends_with(&format!(".{}", ext)))
171 {
172 return Ok(());
173 }
174
175 tracing::info!(file = %file.name.bright_green(), "Downloading file");
176
177 let client = Client::new();
178
179 let url = format!("{}/files/{}", BASE_URL, file_id);
180 let res = client
181 .get(&url)
182 .bearer_auth(&access_token)
183 .query(&[("alt", "media")])
184 .send()
185 .await?;
186
187 let bytes = res.bytes().await?;
188
189 let temp_dir = TempDir::new()?;
190 let tmppath = temp_dir.path().join(&format!("{}", file.name));
191 let mut tmpfile = std::fs::File::create(&tmppath)?;
192 tmpfile.write_all(&bytes)?;
193
194 tracing::info!(path = %tmppath.display(), "Reading file");
195
196 let tagged_file = match Probe::open(&tmppath)?.read() {
197 Ok(tagged_file) => tagged_file,
198 Err(e) => {
199 tracing::warn!(file = %file.name.bright_green(), error = %e, "Failed to open file with lofty");
200 return Ok(());
201 }
202 };
203
204 let primary_tag = tagged_file.primary_tag();
205 let tag = match primary_tag {
206 Some(tag) => tag,
207 None => {
208 tracing::warn!(file = %file.name.bright_green(), "No tag found in file");
209 return Ok(());
210 }
211 };
212
213 let pictures = tag.pictures();
214
215 tracing::info!(title = %tag.get_string(&lofty::tag::ItemKey::TrackTitle).unwrap_or_default(), "Title");
216 tracing::info!(artist = %tag.get_string(&lofty::tag::ItemKey::TrackArtist).unwrap_or_default(), "Artist");
217 tracing::info!(album_artist = %tag.get_string(&lofty::tag::ItemKey::AlbumArtist).unwrap_or_default(), "Album artist");
218 tracing::info!(album = %tag.get_string(&lofty::tag::ItemKey::AlbumTitle).unwrap_or_default(), "Album");
219 tracing::info!(lyrics = %tag.get_string(&lofty::tag::ItemKey::Lyrics).unwrap_or_default(), "Lyrics");
220 tracing::info!(year = %tag.year().unwrap_or_default(), "Year");
221 tracing::info!(track_number = %tag.track().unwrap_or_default(), "Track number");
222 tracing::info!(track_total = %tag.track_total().unwrap_or_default(), "Track total");
223 tracing::info!(release_date = %tag.get_string(&lofty::tag::ItemKey::OriginalReleaseDate).unwrap_or_default(), "Release date");
224 tracing::info!(recording_date = %tag.get_string(&lofty::tag::ItemKey::RecordingDate).unwrap_or_default(), "Recording date");
225 tracing::info!(copyright = %tag.get_string(&lofty::tag::ItemKey::CopyrightMessage).unwrap_or_default(), "Copyright message");
226 tracing::info!(pictures = %pictures.len(), "Pictures found");
227
228 let title = tag
229 .get_string(&lofty::tag::ItemKey::TrackTitle)
230 .unwrap_or_default();
231 let artist = tag
232 .get_string(&lofty::tag::ItemKey::TrackArtist)
233 .unwrap_or_default();
234 let album_artist = tag
235 .get_string(&lofty::tag::ItemKey::AlbumArtist)
236 .unwrap_or_default();
237 let album = tag
238 .get_string(&lofty::tag::ItemKey::AlbumTitle)
239 .unwrap_or_default();
240 let access_token = generate_token(&did)?;
241
242 // check if track exists
243 //
244 // if not, create track
245 // upload album artist
246 //
247 // link path to track
248
249 let hash = sha256::digest(format!("{} - {} - {}", title, artist, album).to_lowercase());
250
251 let track = get_track_by_hash(&pool, &hash).await?;
252 let duration = get_track_duration(&tmppath).await?;
253 let albumart_id = md5::compute(&format!("{} - {}", album_artist, album).to_lowercase());
254 let albumart_id = format!("{:x}", albumart_id);
255
256 match track {
257 Some(track) => {
258 tracing::info!(title = %title.bright_green(), "Track exists");
259 let parent_drive_id = parent_drive_file_id.as_deref();
260 create_google_drive_path(
261 &pool,
262 &file,
263 &track,
264 &google_drive_id,
265 parent_drive_id.unwrap_or(""),
266 )
267 .await?;
268
269 // TODO: publish file metadata to nats
270 }
271 None => {
272 tracing::info!(title = %title.bright_green(), "Creating track");
273
274 let albumart =
275 upload_album_cover(albumart_id.into(), pictures, &access_token).await?;
276
277 let client = Client::new();
278 const URL: &str = "https://api.rocksky.app/tracks";
279 let response = client
280 .post(URL)
281 .header("Authorization", format!("Bearer {}", access_token))
282 .json(&serde_json::json!({
283 "title": tag.get_string(&lofty::tag::ItemKey::TrackTitle),
284 "album": tag.get_string(&lofty::tag::ItemKey::AlbumTitle),
285 "artist": tag.get_string(&lofty::tag::ItemKey::TrackArtist),
286 "albumArtist": match tag.get_string(&lofty::tag::ItemKey::AlbumArtist) {
287 Some(album_artist) => Some(album_artist),
288 None => Some(tag.get_string(&lofty::tag::ItemKey::TrackArtist).unwrap_or_default()),
289 },
290 "duration": duration,
291 "trackNumber": tag.track(),
292 "releaseDate": tag.get_string(&lofty::tag::ItemKey::OriginalReleaseDate).map(|date| match date.contains("-") {
293 true => Some(date),
294 false => None,
295 }),
296 "year": tag.year(),
297 "discNumber": tag.disk().map(|disc| match disc {
298 0 => Some(1),
299 _ => Some(disc),
300 }).unwrap_or(Some(1)),
301 "composer": tag.get_string(&lofty::tag::ItemKey::Composer),
302 "albumArt": match albumart {
303 Some(albumart) => Some(format!("https://cdn.rocksky.app/covers/{}", albumart)),
304 None => None
305 },
306 "lyrics": tag.get_string(&lofty::tag::ItemKey::Lyrics),
307 "copyrightMessage": tag.get_string(&lofty::tag::ItemKey::CopyrightMessage),
308 }))
309 .send()
310 .await?;
311 tracing::info!(status = %response.status(), "Track saved");
312 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
313
314 let track = get_track_by_hash(&pool, &hash).await?;
315 if let Some(track) = track {
316 let parent_drive_id = parent_drive_file_id.as_deref();
317 create_google_drive_path(
318 &pool,
319 &file,
320 &track,
321 &google_drive_id,
322 parent_drive_id.unwrap_or(""),
323 )
324 .await?;
325
326 // TODO: publish file metadata to nats
327
328 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
329
330 return Ok(());
331 }
332
333 tracing::warn!(title = %title.bright_green(), "Failed to create track");
334 }
335 }
336
337 Ok(())
338 })
339}
340
341pub async fn upload_album_cover(
342 name: String,
343 pictures: &[Picture],
344 token: &str,
345) -> Result<Option<String>, Error> {
346 if pictures.is_empty() {
347 return Ok(None);
348 }
349
350 let picture = &pictures[0];
351
352 let buffer = match picture.mime_type() {
353 Some(MimeType::Jpeg) => Some(picture.data().to_vec()),
354 Some(MimeType::Png) => Some(picture.data().to_vec()),
355 Some(MimeType::Gif) => Some(picture.data().to_vec()),
356 Some(MimeType::Bmp) => Some(picture.data().to_vec()),
357 Some(MimeType::Tiff) => Some(picture.data().to_vec()),
358 _ => None,
359 };
360
361 if buffer.is_none() {
362 return Ok(None);
363 }
364
365 let buffer = buffer.unwrap();
366
367 let ext = match picture.mime_type() {
368 Some(MimeType::Jpeg) => "jpg",
369 Some(MimeType::Png) => "png",
370 Some(MimeType::Gif) => "gif",
371 Some(MimeType::Bmp) => "bmp",
372 Some(MimeType::Tiff) => "tiff",
373 _ => {
374 return Ok(None);
375 }
376 };
377
378 let name = format!("{}.{}", name, ext);
379
380 let part = multipart::Part::bytes(buffer).file_name(name.clone());
381 let form = multipart::Form::new().part("file", part);
382 let client = Client::new();
383
384 const URL: &str = "https://uploads.rocksky.app";
385
386 let response = client
387 .post(URL)
388 .header("Authorization", format!("Bearer {}", token))
389 .multipart(form)
390 .send()
391 .await?;
392
393 tracing::info!(status = %response.status(), "Cover uploaded");
394
395 Ok(Some(name))
396}
397
398pub async fn get_track_duration(path: &Path) -> Result<u64, Error> {
399 let duration = 0;
400 let media_source =
401 MediaSourceStream::new(Box::new(std::fs::File::open(path)?), Default::default());
402 let mut hint = Hint::new();
403
404 if let Some(extension) = path.extension() {
405 if let Some(extension) = extension.to_str() {
406 hint.with_extension(extension);
407 }
408 }
409
410 let meta_opts = MetadataOptions::default();
411 let format_opts = FormatOptions::default();
412
413 let probed =
414 match symphonia::default::get_probe().format(&hint, media_source, &format_opts, &meta_opts)
415 {
416 Ok(probed) => probed,
417 Err(e) => {
418 tracing::warn!(path = %path.display(), error = %e, "Failed to probe media");
419 return Ok(duration);
420 }
421 };
422
423 if let Some(track) = probed.format.tracks().first() {
424 if let Some(duration) = track.codec_params.n_frames {
425 if let Some(sample_rate) = track.codec_params.sample_rate {
426 return Ok((duration as f64 / sample_rate as f64) as u64 * 1000);
427 }
428 }
429 }
430 Ok(duration)
431}