tangled
alpha
login
or
join now
rocksky.app
/
rocksky
96
fork
atom
A decentralized music tracking and discovery platform built on AT Protocol 🎵
rocksky.app
spotify
atproto
lastfm
musicbrainz
scrobbling
listenbrainz
96
fork
atom
overview
issues
7
pulls
pipelines
rocksky: use the new analytics API
tsiry-sandratraina.com
1 year ago
2d300a2b
73aaf312
+1315
-243
26 changed files
expand all
collapse all
unified
split
Cargo.lock
crates
analytics
Cargo.toml
src
cmd
serve.rs
sync.rs
core.rs
handlers
albums.rs
artists.rs
mod.rs
scrobbles.rs
stats.rs
tracks.rs
main.rs
subscriber
mod.rs
types.rs
types
album.rs
artist.rs
stats.rs
xata
artist_album.rs
rockskyapi
rocksky-auth
.env.example
src
context.ts
lib
env.ts
lovedtracks
lovedtracks.service.ts
nowplaying
nowplaying.service.ts
users
app.ts
rockskyweb
src
pages
profile
overview
recenttracks
RecentTracks.tsx
types
scrobble.ts
+2
-1
Cargo.lock
···
277
277
"clap",
278
278
"dotenv",
279
279
"duckdb",
280
280
-
"futures-util",
281
280
"owo-colors",
282
281
"polars",
283
282
"serde",
284
283
"serde_json",
285
284
"sqlx",
286
285
"tokio",
286
286
+
"tokio-stream",
287
287
]
288
288
289
289
[[package]]
···
4519
4519
"futures-core",
4520
4520
"pin-project-lite",
4521
4521
"tokio",
4522
4522
+
"tokio-util",
4522
4523
]
4523
4524
4524
4525
[[package]]
+1
-1
crates/analytics/Cargo.toml
···
27
27
polars = "0.46.0"
28
28
clap = "4.5.31"
29
29
actix-web = "4.9.0"
30
30
-
futures-util = "0.3.31"
30
30
+
tokio-stream = { version = "0.1.17", features = ["full"] }
+11
-4
crates/analytics/src/cmd/serve.rs
···
1
1
use std::env;
2
2
3
3
-
use actix_web::{get, post, web::{self, Data}, App, HttpRequest, HttpServer, Responder};
3
3
+
use actix_web::{get, post, web::{self, Data}, App, HttpRequest, HttpResponse, HttpServer, Responder};
4
4
use duckdb::Connection;
5
5
use anyhow::Error;
6
6
use owo_colors::OwoColorize;
7
7
+
use serde_json::json;
7
8
use std::sync::{Arc, Mutex};
8
9
9
9
-
use crate::handlers::handle;
10
10
+
use crate::{handlers::handle, subscriber::subscribe};
10
11
12
12
+
// return json response
11
13
#[get("/")]
12
12
-
async fn index(_req: HttpRequest) -> String {
13
13
-
"Hello world!".to_owned()
14
14
+
async fn index(_req: HttpRequest) -> HttpResponse {
15
15
+
HttpResponse::Ok().json(json!({
16
16
+
"server": "Rocksky Analytics Server",
17
17
+
"version": "0.1.0",
18
18
+
}))
14
19
}
15
20
16
21
#[post("/{method}")]
···
28
33
29
34
30
35
pub async fn serve(conn: Arc<Mutex<Connection>>) -> Result<(), Error> {
36
36
+
subscribe(conn.clone()).await?;
37
37
+
31
38
let host = env::var("ANALYTICS_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
32
39
let port = env::var("ANALYTICS_PORT").unwrap_or_else(|_| "7879".to_string());
33
40
let addr = format!("{}:{}", host, port);
+1
crates/analytics/src/cmd/sync.rs
···
14
14
load_album_tracks(conn.clone(), pool).await?;
15
15
load_loved_tracks(conn.clone(), pool).await?;
16
16
load_artist_tracks(conn.clone(), pool).await?;
17
17
+
load_artist_albums(conn.clone(), pool).await?;
17
18
load_user_albums(conn.clone(), pool).await?;
18
19
load_user_artists(conn.clone(), pool).await?;
19
20
load_user_tracks(conn.clone(), pool).await?;
+41
-2
crates/analytics/src/core.rs
···
149
149
FOREIGN KEY (artist_id) REFERENCES artists(id),
150
150
FOREIGN KEY (track_id) REFERENCES tracks(id),
151
151
);
152
152
+
CREATE TABLE IF NOT EXISTS artist_albums (
153
153
+
id VARCHAR PRIMARY KEY,
154
154
+
artist_id VARCHAR,
155
155
+
album_id VARCHAR,
156
156
+
created_at TIMESTAMP,
157
157
+
FOREIGN KEY (artist_id) REFERENCES artists(id),
158
158
+
FOREIGN KEY (album_id) REFERENCES albums(id),
159
159
+
);
152
160
CREATE TABLE IF NOT EXISTS album_tracks (
153
161
id VARCHAR PRIMARY KEY,
154
162
album_id VARCHAR,
···
438
446
artist_id,
439
447
uri,
440
448
created_at
441
441
-
) VALUES (?,
449
449
+
) VALUES (
450
450
+
?,
442
451
?,
443
452
?,
444
453
?,
445
454
?,
446
455
?,
447
447
-
?)",
456
456
+
?
457
457
+
)",
448
458
params![
449
459
scrobble.xata_id,
450
460
scrobble.user_id,
···
558
568
559
569
println!("artist_tracks: {:?}", artist_tracks.len());
560
570
Ok(())
571
571
+
}
572
572
+
573
573
+
574
574
+
pub async fn load_artist_albums(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> {
575
575
+
let conn = conn.lock().unwrap();
576
576
+
let artist_albums: Vec<xata::artist_album::ArtistAlbum> = sqlx::query_as(r#"
577
577
+
SELECT * FROM artist_albums
578
578
+
"#)
579
579
+
.fetch_all(pool)
580
580
+
.await?;
581
581
+
582
582
+
for (i, artist_album) in artist_albums.clone().into_iter().enumerate() {
583
583
+
println!("artist_albums {} - {} - {}", i, artist_album.artist_id.bright_green(), artist_album.album_id);
584
584
+
match conn.execute(
585
585
+
"INSERT INTO artist_albums (id, artist_id, album_id, created_at) VALUES (?, ?, ?, ?)",
586
586
+
params![
587
587
+
artist_album.xata_id,
588
588
+
artist_album.artist_id,
589
589
+
artist_album.album_id,
590
590
+
artist_album.xata_createdat,
591
591
+
],
592
592
+
) {
593
593
+
Ok(_) => (),
594
594
+
Err(e) => println!("error: {}", e),
595
595
+
}
596
596
+
}
597
597
+
598
598
+
println!("artist_albums: {:?}", artist_albums.len());
599
599
+
Ok(())
561
600
}
562
601
563
602
pub async fn load_user_albums(conn: Arc<Mutex<Connection>>, pool: &Pool<Postgres>) -> Result<(), Error> {
+81
-15
crates/analytics/src/handlers/albums.rs
···
1
1
use std::sync::{Arc, Mutex};
2
2
3
3
use actix_web::{web, HttpRequest, HttpResponse};
4
4
-
use analytics::types::album::{Album, GetAlbumsParams, GetTopAlbumsParams};
4
4
+
use analytics::types::{album::{Album, GetAlbumTracksParams, GetAlbumsParams, GetTopAlbumsParams}, track::Track};
5
5
use duckdb::Connection;
6
6
use anyhow::Error;
7
7
-
use futures_util::StreamExt;
7
7
+
use tokio_stream::StreamExt;
8
8
9
9
use crate::read_payload;
10
10
···
23
23
SELECT a.* FROM user_albums ua
24
24
LEFT JOIN albums a ON ua.album_id = a.id
25
25
LEFT JOIN users u ON ua.user_id = u.id
26
26
-
WHERE u.did = ?
26
26
+
WHERE u.did = ? OR u.handle = ?
27
27
ORDER BY a.title ASC OFFSET ? LIMIT ?;
28
28
"#)?
29
29
},
···
34
34
35
35
match did {
36
36
Some(did) => {
37
37
-
let albums_iter = stmt.query_map([did, limit.to_string(), offset.to_string()], |row| {
37
37
+
let albums_iter = stmt.query_map([&did, &did, &limit.to_string(), &offset.to_string()], |row| {
38
38
Ok(Album {
39
39
id: row.get(0)?,
40
40
title: row.get(1)?,
···
101
101
a.album_art AS album_art,
102
102
a.release_date,
103
103
a.year,
104
104
-
a.uri AS uri,
104
104
+
a.uri,
105
105
+
a.sha256,
105
106
COUNT(*) AS play_count,
106
107
COUNT(DISTINCT s.user_id) AS unique_listeners
107
108
FROM
···
112
113
artists ar ON a.artist_uri = ar.uri
113
114
LEFT JOIN
114
115
users u ON s.user_id = u.id
115
115
-
WHERE s.album_id IS NOT NULL AND u.did = ?
116
116
+
WHERE s.album_id IS NOT NULL AND (u.did = ? OR u.handle = ?)
116
117
GROUP BY
117
117
-
s.album_id, a.title, ar.name, a.release_date, a.year, a.uri, a.album_art
118
118
+
s.album_id, a.title, ar.name, a.release_date, a.year, a.uri, a.album_art, a.sha256
118
119
ORDER BY
119
120
play_count DESC
120
121
OFFSET ?
···
128
129
a.album_art AS album_art,
129
130
a.release_date,
130
131
a.year,
131
131
-
a.uri AS uri,
132
132
+
a.uri,
133
133
+
a.sha256,
132
134
COUNT(*) AS play_count,
133
135
COUNT(DISTINCT s.user_id) AS unique_listeners
134
136
FROM
···
138
140
LEFT JOIN
139
141
artists ar ON a.artist_uri = ar.uri WHERE s.album_id IS NOT NULL
140
142
GROUP BY
141
141
-
s.album_id, a.title, ar.name, a.release_date, a.year, a.uri, a.album_art
143
143
+
s.album_id, a.title, ar.name, a.release_date, a.year, a.uri, a.album_art, a.sha256
142
144
ORDER BY
143
145
play_count DESC
144
146
OFFSET ?
···
148
150
149
151
match did {
150
152
Some(did) => {
151
151
-
let albums = stmt.query_map([did, limit.to_string(), offset.to_string()], |row| {
153
153
+
let albums = stmt.query_map([&did, &did, &limit.to_string(), &offset.to_string()], |row| {
152
154
Ok(Album {
153
155
id: row.get(0)?,
154
156
title: row.get(1)?,
···
157
159
release_date: row.get(4)?,
158
160
year: row.get(5)?,
159
161
uri: row.get(6)?,
160
160
-
play_count: Some(row.get(7)?),
161
161
-
unique_listeners: Some(row.get(8)?),
162
162
+
sha256: row.get(7)?,
163
163
+
play_count: Some(row.get(8)?),
164
164
+
unique_listeners: Some(row.get(9)?),
162
165
..Default::default()
163
166
})
164
167
})?;
···
175
178
release_date: row.get(4)?,
176
179
year: row.get(5)?,
177
180
uri: row.get(6)?,
178
178
-
play_count: Some(row.get(7)?),
179
179
-
unique_listeners: Some(row.get(8)?),
181
181
+
sha256: row.get(7)?,
182
182
+
play_count: Some(row.get(8)?),
183
183
+
unique_listeners: Some(row.get(9)?),
180
184
..Default::default()
181
185
})
182
186
})?;
···
184
188
Ok(HttpResponse::Ok().json(web::Json(albums?)))
185
189
}
186
190
}
187
187
-
}
191
191
+
}
192
192
+
193
193
+
pub async fn get_album_tracks(payload: &mut web::Payload, _req: &HttpRequest, conn: Arc<Mutex<Connection>>) -> Result<HttpResponse, Error> {
194
194
+
let body = read_payload!(payload);
195
195
+
let params = serde_json::from_slice::<GetAlbumTracksParams>(&body)?;
196
196
+
let conn = conn.lock().unwrap();
197
197
+
let mut stmt = conn.prepare(r#"
198
198
+
SELECT
199
199
+
t.id,
200
200
+
t.title,
201
201
+
t.artist,
202
202
+
t.album_artist,
203
203
+
t.album,
204
204
+
t.uri,
205
205
+
t.album_art,
206
206
+
t.duration,
207
207
+
t.disc_number,
208
208
+
t.track_number,
209
209
+
t.artist_uri,
210
210
+
t.album_uri,
211
211
+
t.sha256,
212
212
+
t.copyright_message,
213
213
+
t.label,
214
214
+
t.created_at,
215
215
+
COUNT(*) AS play_count,
216
216
+
COUNT(DISTINCT s.user_id) AS unique_listeners
217
217
+
FROM album_tracks at
218
218
+
LEFT JOIN tracks t ON at.track_id = t.id
219
219
+
LEFT JOIN albums a ON at.album_id = a.id
220
220
+
LEFT JOIN scrobbles s ON s.track_id = t.id
221
221
+
WHERE at.album_id = ? OR a.uri = ?
222
222
+
GROUP BY
223
223
+
t.id, t.title, t.artist, t.album_artist, t.album, t.uri, t.album_art, t.duration, t.disc_number, t.track_number, t.artist_uri, t.album_uri, t.sha256, t.copyright_message, t.label, t.created_at
224
224
+
ORDER BY t.track_number ASC;
225
225
+
"#)?;
226
226
+
227
227
+
let tracks = stmt.query_map([¶ms.album_id, ¶ms.album_id], |row| {
228
228
+
Ok(Track {
229
229
+
id: row.get(0)?,
230
230
+
title: row.get(1)?,
231
231
+
artist: row.get(2)?,
232
232
+
album_artist: row.get(3)?,
233
233
+
album: row.get(4)?,
234
234
+
uri: row.get(5)?,
235
235
+
album_art: row.get(6)?,
236
236
+
duration: row.get(7)?,
237
237
+
disc_number: row.get(8)?,
238
238
+
track_number: row.get(9)?,
239
239
+
artist_uri: row.get(10)?,
240
240
+
album_uri: row.get(11)?,
241
241
+
sha256: row.get(12)?,
242
242
+
copyright_message: row.get(13)?,
243
243
+
label: row.get(14)?,
244
244
+
created_at: row.get(15)?,
245
245
+
play_count: Some(row.get(16)?),
246
246
+
unique_listeners: Some(row.get(17)?),
247
247
+
..Default::default()
248
248
+
})
249
249
+
})?;
250
250
+
251
251
+
let tracks: Result<Vec<_>, _> = tracks.collect();
252
252
+
Ok(HttpResponse::Ok().json(web::Json(tracks?)))
253
253
+
}
+128
-8
crates/analytics/src/handlers/artists.rs
···
1
1
use std::sync::{Arc, Mutex};
2
2
3
3
use actix_web::{web, HttpRequest, HttpResponse};
4
4
-
use analytics::types::artist::{Artist, GetTopArtistsParams};
4
4
+
use analytics::types::{album::Album, artist::{Artist, GetArtistAlbumsParams, GetArtistTracksParams, GetArtistsParams, GetTopArtistsParams}, track::Track};
5
5
use duckdb::Connection;
6
6
use anyhow::Error;
7
7
-
use futures_util::StreamExt;
7
7
+
use tokio_stream::StreamExt;
8
8
9
9
-
use crate::{read_payload, types::artist::GetArtistsParams};
9
9
+
use crate::read_payload;
10
10
11
11
pub async fn get_artists(payload: &mut web::Payload, _req: &HttpRequest, conn: Arc<Mutex<Connection>>) -> Result<HttpResponse, Error> {
12
12
let body = read_payload!(payload);
···
23
23
SELECT a.* FROM user_artists ua
24
24
LEFT JOIN artists a ON ua.artist_id = a.id
25
25
LEFT JOIN users u ON ua.user_id = u.id
26
26
-
WHERE u.did = ?
26
26
+
WHERE u.did = ? OR u.handle = ?
27
27
ORDER BY a.name ASC OFFSET ? LIMIT ?;
28
28
"#)?
29
29
},
···
34
34
35
35
match did {
36
36
Some(did) => {
37
37
-
let artists = stmt.query_map([did, limit.to_string(), offset.to_string()], |row| {
37
37
+
let artists = stmt.query_map([&did, &did, &limit.to_string(), &offset.to_string()], |row| {
38
38
Ok(Artist {
39
39
id: row.get(0)?,
40
40
name: row.get(1)?,
···
111
111
LEFT JOIN
112
112
users u ON s.user_id = u.id
113
113
WHERE
114
114
-
s.artist_id IS NOT NULL AND u.did = ?
114
114
+
s.artist_id IS NOT NULL AND (u.did = ? OR u.handle = ?)
115
115
GROUP BY
116
116
s.artist_id, ar.name, ar.uri, ar.picture, ar.sha256
117
117
ORDER BY
···
148
148
149
149
match did {
150
150
Some(did) => {
151
151
-
let artists = stmt.query_map([did, limit.to_string(), offset.to_string()], |row| {
151
151
+
let artists = stmt.query_map([&did, &did, &limit.to_string(), &offset.to_string()], |row| {
152
152
Ok(Artist {
153
153
id: row.get(0)?,
154
154
name: row.get(1)?,
···
196
196
Ok(HttpResponse::Ok().json(artists?))
197
197
}
198
198
}
199
199
-
}
199
199
+
}
200
200
+
201
201
+
pub async fn get_artist_tracks(payload: &mut web::Payload, _req: &HttpRequest, conn: Arc<Mutex<Connection>>) -> Result<HttpResponse, Error> {
202
202
+
let body = read_payload!(payload);
203
203
+
let params = serde_json::from_slice::<GetArtistTracksParams>(&body)?;
204
204
+
let pagination = params.pagination.unwrap_or_default();
205
205
+
let offset = pagination.skip.unwrap_or(0);
206
206
+
let limit = pagination.take.unwrap_or(20);
207
207
+
let conn = conn.lock().unwrap();
208
208
+
209
209
+
let mut stmt = conn.prepare(r#"
210
210
+
SELECT
211
211
+
t.id,
212
212
+
t.title,
213
213
+
t.artist,
214
214
+
t.album_artist,
215
215
+
t.album,
216
216
+
t.uri,
217
217
+
t.album_art,
218
218
+
t.duration,
219
219
+
t.disc_number,
220
220
+
t.track_number,
221
221
+
t.artist_uri,
222
222
+
t.album_uri,
223
223
+
t.sha256,
224
224
+
t.copyright_message,
225
225
+
t.label,
226
226
+
t.created_at,
227
227
+
COUNT(*) AS play_count,
228
228
+
COUNT(DISTINCT s.user_id) AS unique_listeners
229
229
+
FROM artist_tracks at
230
230
+
LEFT JOIN tracks t ON at.track_id = t.id
231
231
+
LEFT JOIN artists a ON at.artist_id = a.id
232
232
+
LEFT JOIN scrobbles s ON s.track_id = t.id
233
233
+
WHERE at.artist_id = ? OR a.uri = ?
234
234
+
GROUP BY
235
235
+
t.id, t.title, t.artist, t.album_artist, t.album, t.uri, t.album_art, t.duration, t.disc_number, t.track_number, t.artist_uri, t.album_uri, t.sha256, t.copyright_message, t.label, t.created_at
236
236
+
ORDER BY play_count DESC
237
237
+
OFFSET ?
238
238
+
LIMIT ?;
239
239
+
"#)?;
240
240
+
241
241
+
let tracks = stmt.query_map([¶ms.artist_id, ¶ms.artist_id, &limit.to_string(), &offset.to_string()], |row| {
242
242
+
Ok(Track {
243
243
+
id: row.get(0)?,
244
244
+
title: row.get(1)?,
245
245
+
artist: row.get(2)?,
246
246
+
album_artist: row.get(3)?,
247
247
+
album: row.get(4)?,
248
248
+
uri: row.get(5)?,
249
249
+
album_art: row.get(6)?,
250
250
+
duration: row.get(7)?,
251
251
+
disc_number: row.get(8)?,
252
252
+
track_number: row.get(9)?,
253
253
+
artist_uri: row.get(10)?,
254
254
+
album_uri: row.get(11)?,
255
255
+
sha256: row.get(12)?,
256
256
+
copyright_message: row.get(13)?,
257
257
+
label: row.get(14)?,
258
258
+
created_at: row.get(15)?,
259
259
+
play_count: Some(row.get(16)?),
260
260
+
unique_listeners: Some(row.get(17)?),
261
261
+
..Default::default()
262
262
+
})
263
263
+
})?;
264
264
+
265
265
+
let tracks: Result<Vec<_>, _> = tracks.collect();
266
266
+
Ok(HttpResponse::Ok().json(tracks?))
267
267
+
}
268
268
+
269
269
+
pub async fn get_artist_albums(payload: &mut web::Payload, _req: &HttpRequest, conn: Arc<Mutex<Connection>>) -> Result<HttpResponse, Error> {
270
270
+
let body = read_payload!(payload);
271
271
+
let params = serde_json::from_slice::<GetArtistAlbumsParams>(&body)?;
272
272
+
let conn = conn.lock().unwrap();
273
273
+
274
274
+
let mut stmt = conn.prepare(r#"
275
275
+
SELECT
276
276
+
al.id,
277
277
+
al.title,
278
278
+
al.artist,
279
279
+
al.album_art,
280
280
+
al.release_date,
281
281
+
al.year,
282
282
+
al.uri,
283
283
+
al.sha256,
284
284
+
al.artist_uri,
285
285
+
COUNT(*) AS play_count,
286
286
+
COUNT(DISTINCT s.user_id) AS unique_listeners
287
287
+
FROM
288
288
+
artist_albums aa
289
289
+
LEFT JOIN artists ar ON aa.artist_id = ar.id
290
290
+
LEFT JOIN albums al ON aa.album_id = al.id
291
291
+
LEFT JOIN scrobbles s ON aa.album_id = s.album_id
292
292
+
WHERE ar.id = ? OR ar.uri = ?
293
293
+
GROUP BY al.id, al.title, al.artist, al.album_art, al.release_date, al.year, al.uri, al.sha256, al.artist_uri
294
294
+
ORDER BY play_count DESC;
295
295
+
"#)?;
296
296
+
297
297
+
let albums = stmt.query_map([¶ms.artist_id, ¶ms.artist_id], |row| {
298
298
+
Ok(Album {
299
299
+
id: row.get(0)?,
300
300
+
title: row.get(1)?,
301
301
+
artist: row.get(2)?,
302
302
+
release_date: row.get(3)?,
303
303
+
album_art: row.get(4)?,
304
304
+
year: row.get(5)?,
305
305
+
spotify_link: None,
306
306
+
tidal_link: None,
307
307
+
youtube_link: None,
308
308
+
apple_music_link: None,
309
309
+
sha256: row.get(7)?,
310
310
+
uri: row.get(6)?,
311
311
+
artist_uri: row.get(8)?,
312
312
+
play_count: Some(row.get(9)?),
313
313
+
unique_listeners: Some(row.get(10)?),
314
314
+
})
315
315
+
})?;
316
316
+
317
317
+
let albums: Result<Vec<_>, _> = albums.collect();
318
318
+
Ok(HttpResponse::Ok().json(albums?))
319
319
+
}
+9
-4
crates/analytics/src/handlers/mod.rs
···
1
1
use std::sync::{Arc, Mutex};
2
2
3
3
use actix_web::{web, HttpRequest, HttpResponse};
4
4
-
use albums::{get_albums, get_top_albums};
5
5
-
use artists::{get_artists, get_top_artists};
4
4
+
use albums::{get_album_tracks, get_albums, get_top_albums};
5
5
+
use artists::{get_artist_albums, get_artist_tracks, get_artists, get_top_artists};
6
6
use duckdb::Connection;
7
7
use scrobbles::get_scrobbles;
8
8
-
use stats::{get_scrobbles_per_day, get_scrobbles_per_month, get_scrobbles_per_year, get_stats};
8
8
+
use stats::{get_album_scrobbles, get_artist_scrobbles, get_scrobbles_per_day, get_scrobbles_per_month, get_scrobbles_per_year, get_stats, get_track_scrobbles};
9
9
use tracks::{get_loved_tracks, get_top_tracks, get_tracks};
10
10
use anyhow::Error;
11
11
···
29
29
body
30
30
}};
31
31
}
32
32
-
33
32
34
33
pub async fn handle(method: &str, payload: &mut web::Payload, req: &HttpRequest, conn: Arc<Mutex<Connection>>) -> Result<HttpResponse, Error> {
35
34
match method {
···
45
44
"library.getScrobblesPerDay" => get_scrobbles_per_day(payload, req, conn.clone()).await,
46
45
"library.getScrobblesPerMonth" => get_scrobbles_per_month(payload, req, conn.clone()).await,
47
46
"library.getScrobblesPerYear" => get_scrobbles_per_year(payload, req, conn.clone()).await,
47
47
+
"library.getAlbumScrobbles" => get_album_scrobbles(payload, req, conn.clone()).await,
48
48
+
"library.getArtistScrobbles" => get_artist_scrobbles(payload, req, conn.clone()).await,
49
49
+
"library.getTrackScrobbles" => get_track_scrobbles(payload, req, conn.clone()).await,
50
50
+
"library.getAlbumTracks" => get_album_tracks(payload, req, conn.clone()).await,
51
51
+
"library.getArtistAlbums" => get_artist_albums(payload, req, conn.clone()).await,
52
52
+
"library.getArtistTracks" => get_artist_tracks(payload, req, conn.clone()).await,
48
53
_ => return Err(anyhow::anyhow!("Method not found")),
49
54
}
50
55
}
+3
-3
crates/analytics/src/handlers/scrobbles.rs
···
4
4
use analytics::types::scrobble::{GetScrobblesParams, ScrobbleTrack};
5
5
use duckdb::Connection;
6
6
use anyhow::Error;
7
7
-
use futures_util::StreamExt;
7
7
+
use tokio_stream::StreamExt;
8
8
9
9
use crate::read_payload;
10
10
···
38
38
LEFT JOIN albums al ON s.album_id = al.id
39
39
LEFT JOIN tracks t ON s.track_id = t.id
40
40
LEFT JOIN users u ON s.user_id = u.id
41
41
-
WHERE u.did = ?
41
41
+
WHERE u.did = ? OR u.handle = ?
42
42
GROUP BY s.id, s.created_at, t.id, t.title, t.artist, t.album_artist, t.album, t.album_art, s.uri, t.uri, u.handle, a.uri, al.uri, s.created_at
43
43
ORDER BY s.created_at DESC
44
44
OFFSET ?
···
72
72
};
73
73
match did {
74
74
Some(did) => {
75
75
-
let scrobbles = stmt.query_map([did, limit.to_string(), offset.to_string()], |row| {
75
75
+
let scrobbles = stmt.query_map([&did, &did, &limit.to_string(), &offset.to_string()], |row| {
76
76
Ok(ScrobbleTrack {
77
77
id: row.get(0)?,
78
78
track_id: row.get(1)?,
+132
-18
crates/analytics/src/handlers/stats.rs
···
1
1
use std::sync::{Arc, Mutex};
2
2
3
3
use actix_web::{web, HttpRequest, HttpResponse};
4
4
-
use analytics::types::{scrobble::{ScrobblesPerDay, ScrobblesPerMonth, ScrobblesPerYear}, stats::{GetScrobblesPerDayParams, GetScrobblesPerMonthParams, GetScrobblesPerYearParams, GetStatsParams}};
4
4
+
use analytics::types::{scrobble::{ScrobblesPerDay, ScrobblesPerMonth, ScrobblesPerYear}, stats::{GetAlbumScrobblesParams, GetArtistScrobblesParams, GetScrobblesPerDayParams, GetScrobblesPerMonthParams, GetScrobblesPerYearParams, GetStatsParams, GetTrackScrobblesParams}};
5
5
use duckdb::Connection;
6
6
use anyhow::Error;
7
7
use serde_json::json;
8
8
-
use futures_util::StreamExt;
8
8
+
use tokio_stream::StreamExt;
9
9
use crate::read_payload;
10
10
11
11
pub async fn get_stats(payload: &mut web::Payload, _req: &HttpRequest, conn: Arc<Mutex<Connection>>) -> Result<HttpResponse, Error> {
···
13
13
let params = serde_json::from_slice::<GetStatsParams>(&body)?;
14
14
15
15
let conn = conn.lock().unwrap();
16
16
-
let mut stmt = conn.prepare("SELECT COUNT(*) FROM scrobbles s LEFT JOIN users u ON s.user_id = u.id WHERE u.did = ?")?;
17
17
-
let scrobbles: i64 = stmt.query_row([¶ms.user_did], |row| row.get(0))?;
16
16
+
let mut stmt = conn.prepare("SELECT COUNT(*) FROM scrobbles s LEFT JOIN users u ON s.user_id = u.id WHERE u.did = ? OR u.handle = ?")?;
17
17
+
let scrobbles: i64 = stmt.query_row([¶ms.user_did, ¶ms.user_did], |row| row.get(0))?;
18
18
19
19
-
let mut stmt = conn.prepare("SELECT COUNT(*) FROM user_artists LEFT JOIN users u ON user_artists.user_id = u.id WHERE u.did = ?")?;
20
20
-
let artists: i64 = stmt.query_row([¶ms.user_did], |row| row.get(0))?;
19
19
+
let mut stmt = conn.prepare("SELECT COUNT(*) FROM user_artists LEFT JOIN users u ON user_artists.user_id = u.id WHERE u.did = ? OR u.handle = ?")?;
20
20
+
let artists: i64 = stmt.query_row([¶ms.user_did, ¶ms.user_did], |row| row.get(0))?;
21
21
22
22
-
let mut stmt = conn.prepare("SELECT COUNT(*) FROM loved_tracks LEFT JOIN users u ON loved_tracks.user_id = u.id WHERE u.did = ?")?;
23
23
-
let loved_tracks: i64 = stmt.query_row([¶ms.user_did], |row| row.get(0))?;
22
22
+
let mut stmt = conn.prepare("SELECT COUNT(*) FROM loved_tracks LEFT JOIN users u ON loved_tracks.user_id = u.id WHERE u.did = ? OR u.handle = ?")?;
23
23
+
let loved_tracks: i64 = stmt.query_row([¶ms.user_did, ¶ms.user_did], |row| row.get(0))?;
24
24
25
25
-
let mut stmt = conn.prepare("SELECT COUNT(*) FROM user_albums LEFT JOIN users u ON user_albums.user_id = u.id WHERE u.did = ?")?;
26
26
-
let albums: i64 = stmt.query_row([¶ms.user_did], |row| row.get(0))?;
25
25
+
let mut stmt = conn.prepare("SELECT COUNT(*) FROM user_albums LEFT JOIN users u ON user_albums.user_id = u.id WHERE u.did = ? OR u.handle = ?")?;
26
26
+
let albums: i64 = stmt.query_row([¶ms.user_did, ¶ms.user_did], |row| row.get(0))?;
27
27
28
28
-
let mut stmt = conn.prepare("SELECT COUNT(*) FROM user_tracks LEFT JOIN users u ON user_tracks.user_id = u.id WHERE u.did = ?")?;
29
29
-
let tracks: i64 = stmt.query_row([¶ms.user_did], |row| row.get(0))?;
28
28
+
let mut stmt = conn.prepare("SELECT COUNT(*) FROM user_tracks LEFT JOIN users u ON user_tracks.user_id = u.id WHERE u.did = ? OR u.handle = ?")?;
29
29
+
let tracks: i64 = stmt.query_row([¶ms.user_did, ¶ms.user_did], |row| row.get(0))?;
30
30
31
31
Ok(HttpResponse::Ok().json(json!({
32
32
"scrobbles": scrobbles,
···
55
55
scrobbles
56
56
LEFT JOIN users u ON scrobbles.user_id = u.id
57
57
WHERE
58
58
-
u.did = ?
58
58
+
u.did = ? OR u.handle = ?
59
59
AND created_at BETWEEN ? AND ?
60
60
GROUP BY
61
61
date_trunc('day', created_at)
62
62
ORDER BY
63
63
date;
64
64
"#)?;
65
65
-
let scrobbles = stmt.query_map([did, start, end], |row| {
65
65
+
let scrobbles = stmt.query_map([&did, &did, &start, &end], |row| {
66
66
Ok(ScrobblesPerDay {
67
67
date: row.get(0)?,
68
68
count: row.get(1)?,
···
116
116
scrobbles
117
117
LEFT JOIN users u ON scrobbles.user_id = u.id
118
118
WHERE
119
119
-
u.did = ?
119
119
+
u.did = ? OR u.handle = ?
120
120
AND created_at BETWEEN ? AND ?
121
121
GROUP BY
122
122
EXTRACT(YEAR FROM created_at),
···
124
124
ORDER BY
125
125
year_month;
126
126
"#)?;
127
127
-
let scrobbles = stmt.query_map([did, start, end], |row| {
127
127
+
let scrobbles = stmt.query_map([&did, &did, &start, &end], |row| {
128
128
Ok(ScrobblesPerMonth {
129
129
year_month: row.get(0)?,
130
130
count: row.get(1)?,
···
179
179
scrobbles
180
180
LEFT JOIN users u ON scrobbles.user_id = u.id
181
181
WHERE
182
182
-
u.did = ?
182
182
+
u.did = ? OR u.handle = ?
183
183
AND created_at BETWEEN ? AND ?
184
184
GROUP BY
185
185
EXTRACT(YEAR FROM created_at)
186
186
ORDER BY
187
187
year;
188
188
"#)?;
189
189
-
let scrobbles = stmt.query_map([did, start, end], |row| {
189
189
+
let scrobbles = stmt.query_map([&did, &did, &start, &end], |row| {
190
190
Ok(ScrobblesPerYear {
191
191
year: row.get(0)?,
192
192
count: row.get(1)?,
···
220
220
}
221
221
}
222
222
}
223
223
+
224
224
+
pub async fn get_album_scrobbles(payload: &mut web::Payload, _req: &HttpRequest, conn: Arc<Mutex<Connection>>) -> Result<HttpResponse, Error> {
225
225
+
let body = read_payload!(payload);
226
226
+
let params = serde_json::from_slice::<GetAlbumScrobblesParams>(&body)?;
227
227
+
let start = params.start.unwrap_or(GetAlbumScrobblesParams::default().start.unwrap());
228
228
+
let end = params.end.unwrap_or(GetAlbumScrobblesParams::default().end.unwrap());
229
229
+
let conn = conn.lock().unwrap();
230
230
+
let mut stmt = conn.prepare(r#"
231
231
+
SELECT
232
232
+
date_trunc('day', s.created_at) AS date,
233
233
+
COUNT(s.album_id) AS count
234
234
+
FROM
235
235
+
scrobbles s
236
236
+
LEFT JOIN albums a ON s.album_id = a.id
237
237
+
WHERE
238
238
+
a.id = ? OR a.uri = ?
239
239
+
AND s.created_at BETWEEN ? AND ?
240
240
+
GROUP BY
241
241
+
date_trunc('day', s.created_at)
242
242
+
ORDER BY
243
243
+
date;
244
244
+
"#)?;
245
245
+
let scrobbles = stmt.query_map([
246
246
+
¶ms.album_id,
247
247
+
¶ms.album_id,
248
248
+
&start,
249
249
+
&end
250
250
+
], |row| {
251
251
+
Ok(ScrobblesPerDay {
252
252
+
date: row.get(0)?,
253
253
+
count: row.get(1)?,
254
254
+
})
255
255
+
})?;
256
256
+
let scrobbles: Result<Vec<_>, _> = scrobbles.collect();
257
257
+
Ok(HttpResponse::Ok().json(scrobbles?))
258
258
+
}
259
259
+
260
260
+
pub async fn get_artist_scrobbles(payload: &mut web::Payload, _req: &HttpRequest, conn: Arc<Mutex<Connection>>) -> Result<HttpResponse, Error> {
261
261
+
let body = read_payload!(payload);
262
262
+
let params = serde_json::from_slice::<GetArtistScrobblesParams>(&body)?;
263
263
+
let start = params.start.unwrap_or(GetArtistScrobblesParams::default().start.unwrap());
264
264
+
let end = params.end.unwrap_or(GetArtistScrobblesParams::default().end.unwrap());
265
265
+
let conn = conn.lock().unwrap();
266
266
+
267
267
+
let mut stmt = conn.prepare(r#"
268
268
+
SELECT
269
269
+
date_trunc('day', s.created_at) AS date,
270
270
+
COUNT(s.artist_id) AS count
271
271
+
FROM
272
272
+
scrobbles s
273
273
+
LEFT JOIN artists a ON s.artist_id = a.id
274
274
+
WHERE
275
275
+
a.id = ? OR a.uri = ?
276
276
+
AND s.created_at BETWEEN ? AND ?
277
277
+
GROUP BY
278
278
+
date_trunc('day', s.created_at)
279
279
+
ORDER BY
280
280
+
date;
281
281
+
"#)?;
282
282
+
283
283
+
let scrobbles = stmt.query_map([
284
284
+
¶ms.artist_id,
285
285
+
¶ms.artist_id,
286
286
+
&start,
287
287
+
&end
288
288
+
], |row| {
289
289
+
Ok(ScrobblesPerDay {
290
290
+
date: row.get(0)?,
291
291
+
count: row.get(1)?,
292
292
+
})
293
293
+
})?;
294
294
+
295
295
+
let scrobbles: Result<Vec<_>, _> = scrobbles.collect();
296
296
+
Ok(HttpResponse::Ok().json(scrobbles?))
297
297
+
}
298
298
+
299
299
+
pub async fn get_track_scrobbles(payload: &mut web::Payload, _req: &HttpRequest, conn: Arc<Mutex<Connection>>) -> Result<HttpResponse, Error> {
300
300
+
let body = read_payload!(payload);
301
301
+
let params = serde_json::from_slice::<GetTrackScrobblesParams>(&body)?;
302
302
+
let start = params.start.unwrap_or(GetTrackScrobblesParams::default().start.unwrap());
303
303
+
let end = params.end.unwrap_or(GetTrackScrobblesParams::default().end.unwrap());
304
304
+
let conn = conn.lock().unwrap();
305
305
+
306
306
+
let mut stmt = conn.prepare(r#"
307
307
+
SELECT
308
308
+
date_trunc('day', s.created_at) AS date,
309
309
+
COUNT(s.track_id) AS count
310
310
+
FROM
311
311
+
scrobbles s
312
312
+
LEFT JOIN tracks t ON s.track_id = t.id
313
313
+
WHERE
314
314
+
t.id = ? OR t.uri = ?
315
315
+
AND s.created_at BETWEEN ? AND ?
316
316
+
GROUP BY
317
317
+
date_trunc('day', s.created_at)
318
318
+
ORDER BY
319
319
+
date;
320
320
+
"#)?;
321
321
+
322
322
+
let scrobbles = stmt.query_map([
323
323
+
¶ms.track_id,
324
324
+
¶ms.track_id,
325
325
+
&start,
326
326
+
&end
327
327
+
], |row| {
328
328
+
Ok(ScrobblesPerDay {
329
329
+
date: row.get(0)?,
330
330
+
count: row.get(1)?,
331
331
+
})
332
332
+
})?;
333
333
+
334
334
+
let scrobbles: Result<Vec<_>, _> = scrobbles.collect();
335
335
+
Ok(HttpResponse::Ok().json(scrobbles?))
336
336
+
}
+7
-7
crates/analytics/src/handlers/tracks.rs
···
4
4
use analytics::types::track::{GetLovedTracksParams, GetTopTracksParams, GetTracksParams, Track};
5
5
use duckdb::Connection;
6
6
use anyhow::Error;
7
7
-
use futures_util::StreamExt;
7
7
+
use tokio_stream::StreamExt;
8
8
9
9
use crate::read_payload;
10
10
···
48
48
FROM tracks t
49
49
LEFT JOIN user_tracks ut ON t.id = ut.track_id
50
50
LEFT JOIN users u ON ut.user_id = u.id
51
51
-
WHERE u.did = ?
51
51
+
WHERE u.did = ? OR u.handle = ?
52
52
ORDER BY t.title ASC
53
53
OFFSET ?
54
54
LIMIT ?;
55
55
"#)?;
56
56
-
let tracks = stmt.query_map([did, limit.to_string(), offset.to_string()], |row| {
56
56
+
let tracks = stmt.query_map([&did, &did, &limit.to_string(), &offset.to_string()], |row| {
57
57
Ok(Track {
58
58
id: row.get(0)?,
59
59
title: row.get(1)?,
···
186
186
FROM loved_tracks l
187
187
LEFT JOIN users u ON l.user_id = u.id
188
188
LEFT JOIN tracks t ON l.track_id = t.id
189
189
-
WHERE u.did = ?
189
189
+
WHERE u.did = ? OR u.handle = ?
190
190
ORDER BY l.created_at DESC
191
191
OFFSET ?
192
192
LIMIT ?;
193
193
"#)?;
194
194
-
let loved_tracks = stmt.query_map([did, limit.to_string(), offset.to_string()], |row| {
194
194
+
let loved_tracks = stmt.query_map([&did, &did, &limit.to_string(), &offset.to_string()], |row| {
195
195
Ok(Track {
196
196
id: row.get(0)?,
197
197
title: row.get(1)?,
···
257
257
LEFT JOIN artists ar ON s.artist_id = ar.id
258
258
LEFT JOIN albums a ON s.album_id = a.id
259
259
LEFT JOIN users u ON s.user_id = u.id
260
260
-
WHERE u.did = ?
260
260
+
WHERE u.did = ? OR u.handle = ?
261
261
GROUP BY t.id, s.track_id, t.title, ar.name, a.title, t.artist, t.uri, t.album_art, t.duration, t.disc_number, t.track_number, t.artist_uri, t.album_uri, t.created_at, t.sha256, t.album_artist, t.album
262
262
ORDER BY play_count DESC
263
263
OFFSET ?
264
264
LIMIT ?;
265
265
"#)?;
266
266
-
let top_tracks = stmt.query_map([did, limit.to_string(), offset.to_string()], |row| {
266
266
+
let top_tracks = stmt.query_map([&did, &did, &limit.to_string(), &offset.to_string()], |row| {
267
267
Ok(Track {
268
268
id: row.get(0)?,
269
269
title: row.get(1)?,
+1
crates/analytics/src/main.rs
···
12
12
pub mod cmd;
13
13
pub mod core;
14
14
pub mod handlers;
15
15
+
pub mod subscriber;
15
16
16
17
fn cli() -> Command {
17
18
Command::new("analytics")
+481
crates/analytics/src/subscriber/mod.rs
···
1
1
+
use std::{env, sync::{Arc, Mutex}, thread};
2
2
+
use anyhow::Error;
3
3
+
use async_nats::{connect, Client};
4
4
+
use duckdb::{params, Connection};
5
5
+
use owo_colors::OwoColorize;
6
6
+
use tokio_stream::StreamExt;
7
7
+
use types::{LikePayload, ScrobblePayload, UnlikePayload};
8
8
+
9
9
+
pub mod types;
10
10
+
11
11
+
pub async fn subscribe(conn: Arc<Mutex<Connection>>) -> Result<(), Error> {
12
12
+
let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
13
13
+
let conn = conn.clone();
14
14
+
let nc = connect(&addr).await?;
15
15
+
println!("Connected to NATS server at {}", addr.bright_green());
16
16
+
17
17
+
let nc = Arc::new(Mutex::new(nc));
18
18
+
on_scrobble(nc.clone(), conn.clone());
19
19
+
on_like(nc.clone(), conn.clone());
20
20
+
on_unlike(nc.clone(), conn.clone());
21
21
+
22
22
+
Ok(())
23
23
+
}
24
24
+
25
25
+
pub fn on_scrobble(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) {
26
26
+
thread::spawn(move || {
27
27
+
let rt = tokio::runtime::Runtime::new().unwrap();
28
28
+
let conn = conn.clone();
29
29
+
let nc = nc.clone();
30
30
+
rt.block_on(async {
31
31
+
let nc = nc.lock().unwrap();
32
32
+
let mut sub = nc.subscribe("rocksky.scrobble".to_string()).await?;
33
33
+
drop(nc);
34
34
+
35
35
+
while let Some(msg) = sub.next().await {
36
36
+
let data = String::from_utf8(msg.payload.to_vec()).unwrap();
37
37
+
match serde_json::from_str::<ScrobblePayload>(&data) {
38
38
+
Ok(payload) => {
39
39
+
match save_scrobble(conn.clone(), payload.clone()).await {
40
40
+
Ok(_) => println!("Scrobble saved successfully for {}", payload.scrobble.uri.cyan()),
41
41
+
Err(e) => eprintln!("Error saving scrobble: {}", e),
42
42
+
}
43
43
+
},
44
44
+
Err(e) => {
45
45
+
eprintln!("Error parsing payload: {}", e);
46
46
+
}
47
47
+
}
48
48
+
}
49
49
+
50
50
+
Ok::<(), Error>(())
51
51
+
})?;
52
52
+
53
53
+
Ok::<(), Error>(())
54
54
+
});
55
55
+
}
56
56
+
57
57
+
58
58
+
pub fn on_like(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) {
59
59
+
thread::spawn(move || {
60
60
+
let rt = tokio::runtime::Runtime::new().unwrap();
61
61
+
let conn = conn.clone();
62
62
+
let nc = nc.clone();
63
63
+
rt.block_on(async {
64
64
+
let nc = nc.lock().unwrap();
65
65
+
let mut sub = nc.subscribe("rocksky.like".to_string()).await?;
66
66
+
drop(nc);
67
67
+
68
68
+
while let Some(msg) = sub.next().await {
69
69
+
let data = String::from_utf8(msg.payload.to_vec()).unwrap();
70
70
+
match serde_json::from_str::<LikePayload>(&data) {
71
71
+
Ok(payload) => {
72
72
+
match like(conn.clone(), payload.clone()).await {
73
73
+
Ok(_) => println!("Like saved successfully for {}", payload.track_id.xata_id.cyan()),
74
74
+
Err(e) => eprintln!("Error saving like: {}", e),
75
75
+
}
76
76
+
},
77
77
+
Err(e) => {
78
78
+
eprintln!("Error parsing payload: {}", e);
79
79
+
}
80
80
+
}
81
81
+
}
82
82
+
83
83
+
Ok::<(), Error>(())
84
84
+
})?;
85
85
+
86
86
+
Ok::<(), Error>(())
87
87
+
});
88
88
+
}
89
89
+
90
90
+
pub fn on_unlike(nc: Arc<Mutex<Client>>, conn: Arc<Mutex<Connection>>) {
91
91
+
thread::spawn(move || {
92
92
+
let rt = tokio::runtime::Runtime::new().unwrap();
93
93
+
let conn = conn.clone();
94
94
+
let nc = nc.clone();
95
95
+
rt.block_on(async {
96
96
+
let nc = nc.lock().unwrap();
97
97
+
let mut sub = nc.subscribe("rocksky.unlike".to_string()).await?;
98
98
+
drop(nc);
99
99
+
100
100
+
while let Some(msg) = sub.next().await {
101
101
+
let data = String::from_utf8(msg.payload.to_vec()).unwrap();
102
102
+
match serde_json::from_str::<UnlikePayload>(&data) {
103
103
+
Ok(payload) => {
104
104
+
match unlike(conn.clone(), payload.clone()).await {
105
105
+
Ok(_) => println!("Unlike saved successfully for {}", payload.track_id.xata_id.cyan()),
106
106
+
Err(e) => eprintln!("Error saving unlike: {}", e),
107
107
+
}
108
108
+
},
109
109
+
Err(e) => {
110
110
+
eprintln!("Error parsing payload: {}", e);
111
111
+
}
112
112
+
}
113
113
+
}
114
114
+
115
115
+
Ok::<(), Error>(())
116
116
+
})?;
117
117
+
118
118
+
Ok::<(), Error>(())
119
119
+
});
120
120
+
}
121
121
+
122
122
+
pub async fn save_scrobble(conn: Arc<Mutex<Connection>>, payload: ScrobblePayload) -> Result<(), Error> {
123
123
+
let conn = conn.lock().unwrap();
124
124
+
125
125
+
match conn.execute(
126
126
+
"INSERT INTO artists (
127
127
+
id,
128
128
+
name,
129
129
+
biography,
130
130
+
born,
131
131
+
born_in,
132
132
+
died,
133
133
+
picture,
134
134
+
sha256,
135
135
+
spotify_link,
136
136
+
tidal_link,
137
137
+
youtube_link,
138
138
+
apple_music_link,
139
139
+
uri
140
140
+
) VALUES (
141
141
+
?,
142
142
+
?,
143
143
+
?,
144
144
+
?,
145
145
+
?,
146
146
+
?,
147
147
+
?,
148
148
+
?,
149
149
+
?,
150
150
+
?,
151
151
+
?,
152
152
+
?,
153
153
+
?
154
154
+
)",
155
155
+
params![
156
156
+
payload.scrobble.artist_id.xata_id,
157
157
+
payload.scrobble.artist_id.name,
158
158
+
payload.scrobble.artist_id.biography,
159
159
+
payload.scrobble.artist_id.born,
160
160
+
payload.scrobble.artist_id.born_in,
161
161
+
payload.scrobble.artist_id.died,
162
162
+
payload.scrobble.artist_id.picture,
163
163
+
payload.scrobble.artist_id.sha256,
164
164
+
payload.scrobble.artist_id.spotify_link,
165
165
+
payload.scrobble.artist_id.tidal_link,
166
166
+
payload.scrobble.artist_id.youtube_link,
167
167
+
payload.scrobble.artist_id.apple_music_link,
168
168
+
payload.scrobble.artist_id.uri,
169
169
+
],
170
170
+
) {
171
171
+
Ok(_) => (),
172
172
+
Err(e) => {
173
173
+
if !e.to_string().contains("violates primary key constraint") {
174
174
+
println!("[artists] error: {}", e);
175
175
+
return Err(e.into());
176
176
+
}
177
177
+
}
178
178
+
}
179
179
+
180
180
+
match conn.execute(
181
181
+
"INSERT INTO albums (
182
182
+
id,
183
183
+
title,
184
184
+
artist,
185
185
+
release_date,
186
186
+
album_art,
187
187
+
year,
188
188
+
spotify_link,
189
189
+
tidal_link,
190
190
+
youtube_link,
191
191
+
apple_music_link,
192
192
+
sha256,
193
193
+
uri,
194
194
+
artist_uri
195
195
+
) VALUES (
196
196
+
?,
197
197
+
?,
198
198
+
?,
199
199
+
?,
200
200
+
?,
201
201
+
?,
202
202
+
?,
203
203
+
?,
204
204
+
?,
205
205
+
?,
206
206
+
?,
207
207
+
?,
208
208
+
?
209
209
+
)",
210
210
+
params![
211
211
+
payload.scrobble.album_id.xata_id,
212
212
+
payload.scrobble.album_id.title,
213
213
+
payload.scrobble.album_id.artist,
214
214
+
payload.scrobble.album_id.release_date,
215
215
+
payload.scrobble.album_id.album_art,
216
216
+
payload.scrobble.album_id.year,
217
217
+
payload.scrobble.album_id.spotify_link,
218
218
+
payload.scrobble.album_id.tidal_link,
219
219
+
payload.scrobble.album_id.youtube_link,
220
220
+
payload.scrobble.album_id.apple_music_link,
221
221
+
payload.scrobble.album_id.sha256,
222
222
+
payload.scrobble.album_id.uri,
223
223
+
payload.scrobble.album_id.artist_uri,
224
224
+
],
225
225
+
) {
226
226
+
Ok(_) => (),
227
227
+
Err(e) => {
228
228
+
if !e.to_string().contains("violates primary key constraint") {
229
229
+
println!("[albums] error: {}", e);
230
230
+
return Err(e.into());
231
231
+
}
232
232
+
},
233
233
+
}
234
234
+
235
235
+
match conn.execute(
236
236
+
"INSERT INTO tracks (
237
237
+
id,
238
238
+
title,
239
239
+
artist,
240
240
+
album_artist,
241
241
+
album_art,
242
242
+
album,
243
243
+
track_number,
244
244
+
duration,
245
245
+
mb_id,
246
246
+
youtube_link,
247
247
+
spotify_link,
248
248
+
tidal_link,
249
249
+
apple_music_link,
250
250
+
sha256,
251
251
+
lyrics,
252
252
+
composer,
253
253
+
genre,
254
254
+
disc_number,
255
255
+
copyright_message,
256
256
+
label,
257
257
+
uri,
258
258
+
artist_uri,
259
259
+
album_uri,
260
260
+
created_at
261
261
+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
262
262
+
params![
263
263
+
payload.scrobble.track_id.xata_id,
264
264
+
payload.scrobble.track_id.title,
265
265
+
payload.scrobble.track_id.artist,
266
266
+
payload.scrobble.track_id.album_artist,
267
267
+
payload.scrobble.track_id.album_art,
268
268
+
payload.scrobble.track_id.album,
269
269
+
payload.scrobble.track_id.track_number,
270
270
+
payload.scrobble.track_id.duration,
271
271
+
payload.scrobble.track_id.mb_id,
272
272
+
payload.scrobble.track_id.youtube_link,
273
273
+
payload.scrobble.track_id.spotify_link,
274
274
+
payload.scrobble.track_id.tidal_link,
275
275
+
payload.scrobble.track_id.apple_music_link,
276
276
+
payload.scrobble.track_id.sha256,
277
277
+
payload.scrobble.track_id.lyrics,
278
278
+
payload.scrobble.track_id.composer,
279
279
+
payload.scrobble.track_id.genre,
280
280
+
payload.scrobble.track_id.disc_number,
281
281
+
payload.scrobble.track_id.copyright_message,
282
282
+
payload.scrobble.track_id.label,
283
283
+
payload.scrobble.track_id.uri,
284
284
+
payload.scrobble.track_id.artist_uri,
285
285
+
payload.scrobble.track_id.album_uri,
286
286
+
payload.scrobble.track_id.xata_createdat,
287
287
+
],
288
288
+
) {
289
289
+
Ok(_) => (),
290
290
+
Err(e) => {
291
291
+
if !e.to_string().contains("violates primary key constraint") {
292
292
+
println!("[tracks] error: {}", e);
293
293
+
return Err(e.into());
294
294
+
}
295
295
+
}
296
296
+
}
297
297
+
298
298
+
match conn.execute(
299
299
+
"INSERT INTO album_tracks (
300
300
+
id,
301
301
+
album_id,
302
302
+
track_id
303
303
+
) VALUES (?,
304
304
+
?,
305
305
+
?)",
306
306
+
params![
307
307
+
payload.album_track.xata_id,
308
308
+
payload.album_track.album_id.xata_id,
309
309
+
payload.album_track.track_id.xata_id,
310
310
+
],
311
311
+
) {
312
312
+
Ok(_) => (),
313
313
+
Err(e) => {
314
314
+
if !e.to_string().contains("violates primary key constraint") {
315
315
+
println!("[album_tracks] error: {}", e);
316
316
+
return Err(e.into());
317
317
+
}
318
318
+
}
319
319
+
}
320
320
+
321
321
+
match conn.execute(
322
322
+
"INSERT INTO artist_tracks (id, artist_id, track_id, created_at) VALUES (?, ?, ?, ?)",
323
323
+
params![
324
324
+
payload.artist_track.xata_id,
325
325
+
payload.artist_track.artist_id.xata_id,
326
326
+
payload.artist_track.track_id.xata_id,
327
327
+
payload.artist_track.xata_createdat,
328
328
+
],
329
329
+
) {
330
330
+
Ok(_) => (),
331
331
+
Err(e) => {
332
332
+
if !e.to_string().contains("violates primary key constraint") {
333
333
+
println!("[artist_tracks] error: {}", e);
334
334
+
return Err(e.into());
335
335
+
}
336
336
+
}
337
337
+
}
338
338
+
339
339
+
match conn.execute(
340
340
+
"INSERT INTO user_albums (id, user_id, album_id, created_at) VALUES (?, ?, ?, ?)",
341
341
+
params![
342
342
+
payload.user_album.xata_id,
343
343
+
payload.user_album.user_id.xata_id,
344
344
+
payload.user_album.album_id.xata_id,
345
345
+
payload.user_album.xata_createdat,
346
346
+
],
347
347
+
) {
348
348
+
Ok(_) => (),
349
349
+
Err(e) => {
350
350
+
if !e.to_string().contains("violates primary key constraint") {
351
351
+
println!("[user_albums] error: {}", e);
352
352
+
return Err(e.into());
353
353
+
}
354
354
+
}
355
355
+
}
356
356
+
357
357
+
match conn.execute(
358
358
+
"INSERT INTO user_artists (id, user_id, artist_id, created_at) VALUES (?, ?, ?, ?)",
359
359
+
params![
360
360
+
payload.user_artist.xata_id,
361
361
+
payload.user_artist.user_id.xata_id,
362
362
+
payload.user_artist.artist_id.xata_id,
363
363
+
payload.user_artist.xata_createdat,
364
364
+
],
365
365
+
) {
366
366
+
Ok(_) => (),
367
367
+
Err(e) => {
368
368
+
if !e.to_string().contains("violates primary key constraint") {
369
369
+
println!("[user_artists] error: {}", e);
370
370
+
return Err(e.into());
371
371
+
}
372
372
+
}
373
373
+
}
374
374
+
375
375
+
match conn.execute(
376
376
+
"INSERT INTO user_tracks (id, user_id, track_id, created_at) VALUES (?, ?, ?, ?)",
377
377
+
params![
378
378
+
payload.user_track.xata_id,
379
379
+
payload.user_track.user_id.xata_id,
380
380
+
payload.user_track.track_id.xata_id,
381
381
+
payload.user_track.xata_createdat,
382
382
+
],
383
383
+
) {
384
384
+
Ok(_) => (),
385
385
+
Err(e) => {
386
386
+
if !e.to_string().contains("violates primary key constraint") {
387
387
+
println!("[user_tracks] error: {}", e);
388
388
+
return Err(e.into());
389
389
+
}
390
390
+
}
391
391
+
}
392
392
+
393
393
+
match conn.execute(
394
394
+
"INSERT INTO scrobbles (
395
395
+
id,
396
396
+
user_id,
397
397
+
track_id,
398
398
+
album_id,
399
399
+
artist_id,
400
400
+
uri,
401
401
+
created_at
402
402
+
) VALUES (
403
403
+
?,
404
404
+
?,
405
405
+
?,
406
406
+
?,
407
407
+
?,
408
408
+
?,
409
409
+
?
410
410
+
)",
411
411
+
params![
412
412
+
payload.scrobble.xata_id,
413
413
+
payload.scrobble.user_id.xata_id,
414
414
+
payload.scrobble.track_id.xata_id,
415
415
+
payload.scrobble.album_id.xata_id,
416
416
+
payload.scrobble.artist_id.xata_id,
417
417
+
payload.scrobble.uri,
418
418
+
payload.scrobble.xata_createdat,
419
419
+
],
420
420
+
) {
421
421
+
Ok(_) => (),
422
422
+
Err(e) => {
423
423
+
if !e.to_string().contains("violates primary key constraint") {
424
424
+
println!("[scrobbles] error: {}", e);
425
425
+
return Err(e.into());
426
426
+
}
427
427
+
}
428
428
+
}
429
429
+
430
430
+
Ok(())
431
431
+
}
432
432
+
433
433
+
pub async fn like(conn: Arc<Mutex<Connection>>, payload: LikePayload) -> Result<(), Error> {
434
434
+
let conn = conn.lock().unwrap();
435
435
+
match conn.execute(
436
436
+
"INSERT INTO likes (
437
437
+
id,
438
438
+
user_id,
439
439
+
track_id,
440
440
+
created_at
441
441
+
) VALUES (
442
442
+
?,
443
443
+
?,
444
444
+
?,
445
445
+
?
446
446
+
)",
447
447
+
params![
448
448
+
payload.xata_id,
449
449
+
payload.user_id.xata_id,
450
450
+
payload.track_id.xata_id,
451
451
+
payload.xata_createdat,
452
452
+
],
453
453
+
) {
454
454
+
Ok(_) => (),
455
455
+
Err(e) => {
456
456
+
if !e.to_string().contains("violates primary key constraint") {
457
457
+
println!("[likes] error: {}", e);
458
458
+
return Err(e.into());
459
459
+
}
460
460
+
}
461
461
+
}
462
462
+
Ok(())
463
463
+
}
464
464
+
465
465
+
pub async fn unlike(conn: Arc<Mutex<Connection>>, payload: UnlikePayload) -> Result<(), Error> {
466
466
+
let conn = conn.lock().unwrap();
467
467
+
match conn.execute(
468
468
+
"DELETE FROM likes WHERE user_id = ? AND track_id = ?",
469
469
+
params![
470
470
+
payload.user_id.xata_id,
471
471
+
payload.track_id.xata_id,
472
472
+
],
473
473
+
) {
474
474
+
Ok(_) => (),
475
475
+
Err(e) => {
476
476
+
println!("[unlikes] error: {}", e);
477
477
+
return Err(e.into());
478
478
+
}
479
479
+
}
480
480
+
Ok(())
481
481
+
}
+216
crates/analytics/src/subscriber/types.rs
···
1
1
+
use chrono::{DateTime, Utc};
2
2
+
use serde::{Deserialize, Serialize};
3
3
+
4
4
+
#[derive(Debug, Serialize, Deserialize, Clone)]
5
5
+
pub struct LikePayload {
6
6
+
#[serde(skip_serializing_if = "Option::is_none")]
7
7
+
pub uri: Option<String>,
8
8
+
pub track_id: Ref,
9
9
+
pub user_id: Ref,
10
10
+
pub xata_createdat: DateTime<Utc>,
11
11
+
pub xata_id: String,
12
12
+
pub xata_updatedat: DateTime<Utc>,
13
13
+
pub xata_version: i32,
14
14
+
}
15
15
+
16
16
+
#[derive(Debug, Serialize, Deserialize, Clone)]
17
17
+
pub struct UnlikePayload {
18
18
+
#[serde(skip_serializing_if = "Option::is_none")]
19
19
+
pub uri: Option<String>,
20
20
+
pub track_id: Ref,
21
21
+
pub user_id: Ref,
22
22
+
pub xata_createdat: DateTime<Utc>,
23
23
+
pub xata_id: String,
24
24
+
pub xata_updatedat: DateTime<Utc>,
25
25
+
pub xata_version: i32,
26
26
+
}
27
27
+
28
28
+
#[derive(Debug, Serialize, Deserialize, Clone)]
29
29
+
pub struct ScrobblePayload {
30
30
+
pub scrobble: Scrobble,
31
31
+
pub user_album: UserAlbum,
32
32
+
pub user_artist: UserArtist,
33
33
+
pub user_track: UserTrack,
34
34
+
pub album_track: AlbumTrack,
35
35
+
pub artist_track: ArtistTrack,
36
36
+
}
37
37
+
38
38
+
#[derive(Debug, Serialize, Deserialize, Clone)]
39
39
+
pub struct Scrobble {
40
40
+
pub album_id: AlbumId,
41
41
+
pub artist_id: ArtistId,
42
42
+
pub track_id: TrackId,
43
43
+
pub uri: String,
44
44
+
pub user_id: UserId,
45
45
+
pub xata_createdat: DateTime<Utc>,
46
46
+
pub xata_id: String,
47
47
+
pub xata_updatedat: DateTime<Utc>,
48
48
+
pub xata_version: i32,
49
49
+
}
50
50
+
51
51
+
#[derive(Debug, Serialize, Deserialize, Clone)]
52
52
+
pub struct AlbumId {
53
53
+
#[serde(skip_serializing_if = "Option::is_none")]
54
54
+
pub album_art: Option<String>,
55
55
+
pub artist: String,
56
56
+
pub artist_uri: String,
57
57
+
pub release_date: DateTime<Utc>,
58
58
+
pub sha256: String,
59
59
+
pub title: String,
60
60
+
pub uri: String,
61
61
+
pub xata_createdat: DateTime<Utc>,
62
62
+
pub xata_id: String,
63
63
+
pub xata_updatedat: DateTime<Utc>,
64
64
+
pub xata_version: i32,
65
65
+
pub year: i32,
66
66
+
#[serde(skip_serializing_if = "Option::is_none")]
67
67
+
pub apple_music_link: Option<String>,
68
68
+
#[serde(skip_serializing_if = "Option::is_none")]
69
69
+
pub spotify_link: Option<String>,
70
70
+
#[serde(skip_serializing_if = "Option::is_none")]
71
71
+
pub tidal_link: Option<String>,
72
72
+
#[serde(skip_serializing_if = "Option::is_none")]
73
73
+
pub youtube_link: Option<String>,
74
74
+
}
75
75
+
76
76
+
#[derive(Debug, Serialize, Deserialize, Clone)]
77
77
+
pub struct ArtistId {
78
78
+
pub name: String,
79
79
+
#[serde(skip_serializing_if = "Option::is_none")]
80
80
+
pub picture: Option<String>,
81
81
+
pub sha256: String,
82
82
+
pub uri: String,
83
83
+
pub xata_createdat: DateTime<Utc>,
84
84
+
pub xata_id: String,
85
85
+
pub xata_updatedat: DateTime<Utc>,
86
86
+
pub xata_version: i32,
87
87
+
#[serde(skip_serializing_if = "Option::is_none")]
88
88
+
pub apple_music_link: Option<String>,
89
89
+
#[serde(skip_serializing_if = "Option::is_none")]
90
90
+
pub biography: Option<String>,
91
91
+
#[serde(skip_serializing_if = "Option::is_none")]
92
92
+
pub born: Option<String>,
93
93
+
#[serde(skip_serializing_if = "Option::is_none")]
94
94
+
pub born_in: Option<String>,
95
95
+
#[serde(skip_serializing_if = "Option::is_none")]
96
96
+
pub died: Option<String>,
97
97
+
#[serde(skip_serializing_if = "Option::is_none")]
98
98
+
pub spotify_link: Option<String>,
99
99
+
#[serde(skip_serializing_if = "Option::is_none")]
100
100
+
pub tidal_link: Option<String>,
101
101
+
#[serde(skip_serializing_if = "Option::is_none")]
102
102
+
pub youtube_link: Option<String>,
103
103
+
}
104
104
+
105
105
+
#[derive(Debug, Serialize, Deserialize, Clone)]
106
106
+
pub struct TrackId {
107
107
+
pub album: String,
108
108
+
#[serde(skip_serializing_if = "Option::is_none")]
109
109
+
pub album_art: Option<String>,
110
110
+
pub album_artist: String,
111
111
+
pub album_uri: String,
112
112
+
pub artist: String,
113
113
+
pub artist_uri: String,
114
114
+
pub disc_number: i32,
115
115
+
pub duration: i32,
116
116
+
pub sha256: String,
117
117
+
pub spotify_link: String,
118
118
+
pub title: String,
119
119
+
pub track_number: i32,
120
120
+
pub uri: String,
121
121
+
pub xata_createdat: DateTime<Utc>,
122
122
+
pub xata_id: String,
123
123
+
pub xata_updatedat: DateTime<Utc>,
124
124
+
pub xata_version: i32,
125
125
+
#[serde(skip_serializing_if = "Option::is_none")]
126
126
+
pub apple_music_link: Option<String>,
127
127
+
#[serde(skip_serializing_if = "Option::is_none")]
128
128
+
pub composer: Option<String>,
129
129
+
#[serde(skip_serializing_if = "Option::is_none")]
130
130
+
pub copyright_message: Option<String>,
131
131
+
#[serde(skip_serializing_if = "Option::is_none")]
132
132
+
pub genre: Option<String>,
133
133
+
#[serde(skip_serializing_if = "Option::is_none")]
134
134
+
pub label: Option<String>,
135
135
+
#[serde(skip_serializing_if = "Option::is_none")]
136
136
+
pub lyrics: Option<String>,
137
137
+
#[serde(skip_serializing_if = "Option::is_none")]
138
138
+
pub mb_id: Option<String>,
139
139
+
#[serde(skip_serializing_if = "Option::is_none")]
140
140
+
pub tidal_link: Option<String>,
141
141
+
#[serde(skip_serializing_if = "Option::is_none")]
142
142
+
pub youtube_link: Option<String>,
143
143
+
}
144
144
+
145
145
+
#[derive(Debug, Serialize, Deserialize, Clone)]
146
146
+
pub struct UserId {
147
147
+
pub avatar: String,
148
148
+
pub did: String,
149
149
+
pub display_name: String,
150
150
+
pub handle: String,
151
151
+
pub xata_createdat: DateTime<Utc>,
152
152
+
pub xata_id: String,
153
153
+
pub xata_updatedat: DateTime<Utc>,
154
154
+
pub xata_version: i32,
155
155
+
}
156
156
+
157
157
+
#[derive(Debug, Serialize, Deserialize, Clone)]
158
158
+
pub struct UserAlbum {
159
159
+
pub album_id: Ref,
160
160
+
pub scrobbles: i32,
161
161
+
pub uri: String,
162
162
+
pub user_id: Ref,
163
163
+
pub xata_createdat: DateTime<Utc>,
164
164
+
pub xata_id: String,
165
165
+
pub xata_updatedat: DateTime<Utc>,
166
166
+
pub xata_version: i32,
167
167
+
}
168
168
+
169
169
+
#[derive(Debug, Serialize, Deserialize, Clone)]
170
170
+
pub struct UserArtist {
171
171
+
pub artist_id: Ref,
172
172
+
pub scrobbles: i32,
173
173
+
pub uri: String,
174
174
+
pub user_id: Ref,
175
175
+
pub xata_createdat: DateTime<Utc>,
176
176
+
pub xata_id: String,
177
177
+
pub xata_updatedat: DateTime<Utc>,
178
178
+
pub xata_version: i32,
179
179
+
}
180
180
+
181
181
+
#[derive(Debug, Serialize, Deserialize, Clone)]
182
182
+
pub struct UserTrack {
183
183
+
pub track_id: Ref,
184
184
+
pub scrobbles: i32,
185
185
+
pub uri: String,
186
186
+
pub user_id: Ref,
187
187
+
pub xata_createdat: DateTime<Utc>,
188
188
+
pub xata_id: String,
189
189
+
pub xata_updatedat: DateTime<Utc>,
190
190
+
pub xata_version: i32,
191
191
+
}
192
192
+
193
193
+
#[derive(Debug, Serialize, Deserialize, Clone)]
194
194
+
pub struct AlbumTrack {
195
195
+
pub track_id: Ref,
196
196
+
pub album_id: Ref,
197
197
+
pub xata_createdat: DateTime<Utc>,
198
198
+
pub xata_id: String,
199
199
+
pub xata_updatedat: DateTime<Utc>,
200
200
+
pub xata_version: i32,
201
201
+
}
202
202
+
203
203
+
#[derive(Debug, Serialize, Deserialize, Clone)]
204
204
+
pub struct ArtistTrack {
205
205
+
pub track_id: Ref,
206
206
+
pub artist_id: Ref,
207
207
+
pub xata_createdat: DateTime<Utc>,
208
208
+
pub xata_id: String,
209
209
+
pub xata_updatedat: DateTime<Utc>,
210
210
+
pub xata_version: i32,
211
211
+
}
212
212
+
213
213
+
#[derive(Debug, Serialize, Deserialize, Clone)]
214
214
+
pub struct Ref {
215
215
+
pub xata_id: String,
216
216
+
}
+5
crates/analytics/src/types/album.rs
···
43
43
pub user_did: Option<String>,
44
44
pub pagination: Option<Pagination>,
45
45
}
46
46
+
47
47
+
#[derive(Debug, Serialize, Deserialize)]
48
48
+
pub struct GetAlbumTracksParams {
49
49
+
pub album_id: String,
50
50
+
}
+11
crates/analytics/src/types/artist.rs
···
44
44
pub user_did: Option<String>,
45
45
pub pagination: Option<Pagination>,
46
46
}
47
47
+
48
48
+
#[derive(Debug, Serialize, Deserialize, Default)]
49
49
+
pub struct GetArtistTracksParams {
50
50
+
pub artist_id: String,
51
51
+
pub pagination: Option<Pagination>,
52
52
+
}
53
53
+
54
54
+
#[derive(Debug, Serialize, Deserialize)]
55
55
+
pub struct GetArtistAlbumsParams {
56
56
+
pub artist_id: String,
57
57
+
}
+60
crates/analytics/src/types/stats.rs
···
68
68
}
69
69
}
70
70
}
71
71
+
72
72
+
#[derive(Debug, Serialize, Deserialize)]
73
73
+
pub struct GetAlbumScrobblesParams {
74
74
+
pub album_id: String,
75
75
+
pub start: Option<String>,
76
76
+
pub end: Option<String>,
77
77
+
}
78
78
+
79
79
+
impl Default for GetAlbumScrobblesParams {
80
80
+
fn default() -> Self {
81
81
+
let current_date = Utc::now().naive_utc();
82
82
+
let date_30_days_ago = current_date - Duration::days(30);
83
83
+
84
84
+
GetAlbumScrobblesParams {
85
85
+
album_id: "".to_string(),
86
86
+
start: Some(date_30_days_ago.to_string()),
87
87
+
end: Some(current_date.to_string()),
88
88
+
}
89
89
+
}
90
90
+
}
91
91
+
92
92
+
#[derive(Debug, Serialize, Deserialize)]
93
93
+
pub struct GetArtistScrobblesParams {
94
94
+
pub artist_id: String,
95
95
+
pub start: Option<String>,
96
96
+
pub end: Option<String>,
97
97
+
}
98
98
+
99
99
+
impl Default for GetArtistScrobblesParams {
100
100
+
fn default() -> Self {
101
101
+
let current_date = Utc::now().naive_utc();
102
102
+
let date_30_days_ago = current_date - Duration::days(30);
103
103
+
104
104
+
GetArtistScrobblesParams {
105
105
+
artist_id: "".to_string(),
106
106
+
start: Some(date_30_days_ago.to_string()),
107
107
+
end: Some(current_date.to_string()),
108
108
+
}
109
109
+
}
110
110
+
}
111
111
+
112
112
+
#[derive(Debug, Serialize, Deserialize)]
113
113
+
pub struct GetTrackScrobblesParams {
114
114
+
pub track_id: String,
115
115
+
pub start: Option<String>,
116
116
+
pub end: Option<String>,
117
117
+
}
118
118
+
119
119
+
impl Default for GetTrackScrobblesParams {
120
120
+
fn default() -> Self {
121
121
+
let current_date = Utc::now().naive_utc();
122
122
+
let date_30_days_ago = current_date - Duration::days(30);
123
123
+
124
124
+
GetTrackScrobblesParams {
125
125
+
track_id: "".to_string(),
126
126
+
start: Some(date_30_days_ago.to_string()),
127
127
+
end: Some(current_date.to_string()),
128
128
+
}
129
129
+
}
130
130
+
}
+2
crates/analytics/src/xata/artist_album.rs
···
5
5
pub xata_id: String,
6
6
pub artist_id: String,
7
7
pub album_id: String,
8
8
+
#[serde(with = "chrono::serde::ts_seconds")]
9
9
+
pub xata_createdat: chrono::DateTime<chrono::Utc>,
8
10
}
+3
rockskyapi/rocksky-auth/.env.example
···
20
20
SPOTIFY_ENCRYPTION_KEY="your spotify encryption secret"
21
21
SPOTIFY_ENCRYPRION_IV="your spotify encryption iv"
22
22
ROCKSKY_BETA_TOKEN="your rocksky beta token"
23
23
+
24
24
+
NATS_URL="nats://localhost:4222"
25
25
+
ANALYTICS_URL="http://localhost:7879"
+4
rockskyapi/rocksky-auth/src/context.ts
···
1
1
import { createClient } from "auth/client";
2
2
+
import axios from "axios";
2
3
import { createDb, migrateToLatest } from "db";
3
4
import drizzle from "drizzle";
4
5
import { env } from "lib/env";
5
6
import { createBidirectionalResolver, createIdResolver } from "lib/idResolver";
7
7
+
import { connect } from "nats";
6
8
import sqliteKv from "sqliteKv";
7
9
import { createStorage } from "unstorage";
8
10
import { getXataClient } from "xata";
···
25
27
kv: new Map<string, string>(),
26
28
client,
27
29
db: drizzle.db,
30
30
+
nc: await connect({ servers: env.NATS_URL }),
31
31
+
analytics: axios.create({ baseURL: env.ANALYTICS }),
28
32
};
29
33
30
34
export type Context = typeof ctx;
+2
rockskyapi/rocksky-auth/src/lib/env.ts
···
25
25
SPOTIFY_ENCRYPTION_IV: str(),
26
26
ROCKSKY_BETA_TOKEN: str({}),
27
27
XATA_POSTGRES_URL: str({}),
28
28
+
NATS_URL: str({ devDefault: "nats://localhost:4222" }),
29
29
+
ANALYTICS: str({ devDefault: "http://localhost:7879" }),
28
30
});
+7
rockskyapi/rocksky-auth/src/lovedtracks/lovedtracks.service.ts
···
137
137
track_id,
138
138
}
139
139
);
140
140
+
141
141
+
const message = JSON.stringify(created);
142
142
+
ctx.nc.publish("rocksky.like", Buffer.from(message));
143
143
+
140
144
return created;
141
145
}
142
146
···
157
161
if (!lovedTrack) {
158
162
return;
159
163
}
164
164
+
165
165
+
const message = JSON.stringify(lovedTrack);
166
166
+
ctx.nc.publish("rocksky.unlike", Buffer.from(message));
160
167
161
168
await ctx.client.db.loved_tracks.delete(lovedTrack.xata_id);
162
169
}
+44
rockskyapi/rocksky-auth/src/nowplaying/nowplaying.service.ts
···
317
317
}
318
318
}
319
319
320
320
+
export async function publishScrobble(ctx: Context, id: string) {
321
321
+
const scrobble = await ctx.client.db.scrobbles
322
322
+
.select(["*", "track_id.*", "album_id.*", "artist_id.*", "user_id.*"])
323
323
+
.filter("xata_id", equals(id))
324
324
+
.getFirst();
325
325
+
326
326
+
const [user_album, user_artist, user_track, album_track, artist_track] =
327
327
+
await Promise.all([
328
328
+
ctx.client.db.user_albums
329
329
+
.select(["*"])
330
330
+
.filter("album_id.xata_id", equals(scrobble.album_id.xata_id))
331
331
+
.getFirst(),
332
332
+
ctx.client.db.user_artists
333
333
+
.select(["*"])
334
334
+
.filter("artist_id.xata_id", equals(scrobble.artist_id.xata_id))
335
335
+
.getFirst(),
336
336
+
ctx.client.db.user_tracks
337
337
+
.select(["*"])
338
338
+
.filter("track_id.xata_id", equals(scrobble.track_id.xata_id))
339
339
+
.getFirst(),
340
340
+
ctx.client.db.album_tracks
341
341
+
.select(["*"])
342
342
+
.filter("track_id.xata_id", equals(scrobble.track_id.xata_id))
343
343
+
.getFirst(),
344
344
+
ctx.client.db.artist_tracks
345
345
+
.select(["*"])
346
346
+
.filter("track_id.xata_id", equals(scrobble.track_id.xata_id))
347
347
+
.getFirst(),
348
348
+
]);
349
349
+
350
350
+
const message = JSON.stringify({
351
351
+
scrobble,
352
352
+
user_album,
353
353
+
user_artist,
354
354
+
user_track,
355
355
+
album_track,
356
356
+
artist_track,
357
357
+
});
358
358
+
359
359
+
ctx.nc.publish("rocksky.scrobble", Buffer.from(message));
360
360
+
}
361
361
+
320
362
export async function scrobbleTrack(
321
363
ctx: Context,
322
364
track: Track,
···
490
532
artist_id,
491
533
uri: scrobbleUri,
492
534
});
535
535
+
536
536
+
await publishScrobble(ctx, scrobble.xata_id);
493
537
494
538
return scrobble;
495
539
}
+39
-139
rockskyapi/rocksky-auth/src/users/app.ts
···
51
51
const size = +c.req.query("size") || 10;
52
52
const offset = +c.req.query("offset") || 0;
53
53
54
54
-
const scrobbles = await ctx.client.db.scrobbles
55
55
-
.select(["track_id.*", "uri", "album_id.*", "artist_id.*"])
56
56
-
.filter({
57
57
-
$any: [
58
58
-
{
59
59
-
"user_id.did": handle,
60
60
-
},
61
61
-
{
62
62
-
"user_id.handle": handle,
63
63
-
},
64
64
-
],
65
65
-
})
66
66
-
.sort("xata_createdat", "desc")
67
67
-
.getPaginated({
68
68
-
pagination: {
69
69
-
size,
70
70
-
offset,
71
71
-
},
72
72
-
});
54
54
+
const { data } = await ctx.analytics.post("library.getScrobbles", {
55
55
+
user_did: handle,
56
56
+
pagination: {
57
57
+
skip: offset,
58
58
+
take: size,
59
59
+
},
60
60
+
});
73
61
74
74
-
return c.json(scrobbles.records);
62
62
+
return c.json(data);
75
63
});
76
64
77
65
app.get("/:did/albums", async (c) => {
···
79
67
const size = +c.req.query("size") || 10;
80
68
const offset = +c.req.query("offset") || 0;
81
69
82
82
-
const albums = await ctx.client.db.user_albums
83
83
-
.select(["album_id.*", "scrobbles"])
84
84
-
.filter({
85
85
-
$any: [
86
86
-
{
87
87
-
"user_id.did": did,
88
88
-
},
89
89
-
{
90
90
-
"user_id.handle": did,
91
91
-
},
92
92
-
],
93
93
-
})
94
94
-
.getPaginated({
95
95
-
sort: {
96
96
-
scrobbles: "desc",
97
97
-
},
98
98
-
pagination: {
99
99
-
size,
100
100
-
offset,
101
101
-
},
102
102
-
});
70
70
+
const { data } = await ctx.analytics.post("library.getTopAlbums", {
71
71
+
user_did: did,
72
72
+
pagination: {
73
73
+
skip: offset,
74
74
+
take: size,
75
75
+
},
76
76
+
});
103
77
104
104
-
return c.json(albums.records.map((item) => ({ ...item.album_id, tags: [] })));
78
78
+
return c.json(data.map((item) => ({ ...item, tags: [] })));
105
79
});
106
80
107
81
app.get("/:did/artists", async (c) => {
···
109
83
const size = +c.req.query("size") || 10;
110
84
const offset = +c.req.query("offset") || 0;
111
85
112
112
-
const artists = await ctx.client.db.user_artists
113
113
-
.select(["artist_id.*", "scrobbles"])
114
114
-
.filter({
115
115
-
$any: [
116
116
-
{
117
117
-
"user_id.did": did,
118
118
-
},
119
119
-
{
120
120
-
"user_id.handle": did,
121
121
-
},
122
122
-
],
123
123
-
})
124
124
-
.getPaginated({
125
125
-
sort: {
126
126
-
scrobbles: "desc",
127
127
-
},
128
128
-
pagination: {
129
129
-
size,
130
130
-
offset,
131
131
-
},
132
132
-
});
86
86
+
const { data } = await ctx.analytics.post("library.getTopArtists", {
87
87
+
user_did: did,
88
88
+
pagination: {
89
89
+
skip: offset,
90
90
+
take: size,
91
91
+
},
92
92
+
});
133
93
134
134
-
return c.json(
135
135
-
artists.records.map((item) => ({ ...item.artist_id, tags: [] }))
136
136
-
);
94
94
+
return c.json(data.map((item) => ({ ...item, tags: [] })));
137
95
});
138
96
139
97
app.get("/:did/tracks", async (c) => {
···
141
99
const size = +c.req.query("size") || 10;
142
100
const offset = +c.req.query("offset") || 0;
143
101
144
144
-
const artists = await ctx.client.db.user_tracks
145
145
-
.select(["track_id.*", "scrobbles"])
146
146
-
.filter({
147
147
-
$any: [
148
148
-
{
149
149
-
"user_id.did": did,
150
150
-
},
151
151
-
{
152
152
-
"user_id.handle": did,
153
153
-
},
154
154
-
],
155
155
-
})
156
156
-
.getPaginated({
157
157
-
sort: {
158
158
-
scrobbles: "desc",
159
159
-
},
160
160
-
pagination: {
161
161
-
size,
162
162
-
offset,
163
163
-
},
164
164
-
});
102
102
+
const { data } = await ctx.analytics.post("library.getTopTracks", {
103
103
+
user_did: did,
104
104
+
pagination: {
105
105
+
skip: offset,
106
106
+
take: size,
107
107
+
},
108
108
+
});
165
109
166
110
return c.json(
167
167
-
artists.records.map((item) => ({
168
168
-
...item.track_id,
169
169
-
scrobles: item.scrobbles,
111
111
+
data.map((item) => ({
112
112
+
...item,
170
113
tags: [],
171
114
}))
172
115
);
···
1202
1145
1203
1146
app.get("/:did/stats", async (c) => {
1204
1147
const did = c.req.param("did");
1205
1205
-
const scrobbles = await ctx.client.db.scrobbles
1206
1206
-
.select(["user_id.*"])
1207
1207
-
.filter({
1208
1208
-
$any: [
1209
1209
-
{
1210
1210
-
"user_id.did": did,
1211
1211
-
},
1212
1212
-
{
1213
1213
-
"user_id.handle": did,
1214
1214
-
},
1215
1215
-
],
1216
1216
-
})
1217
1217
-
.summarize({
1218
1218
-
summaries: {
1219
1219
-
total: {
1220
1220
-
count: "*",
1221
1221
-
},
1222
1222
-
},
1223
1223
-
});
1224
1148
1225
1225
-
const artists = await ctx.client.db.user_artists
1226
1226
-
.select(["artist_id.*", "user_id.*"])
1227
1227
-
.filter({
1228
1228
-
$any: [
1229
1229
-
{
1230
1230
-
"user_id.did": did,
1231
1231
-
},
1232
1232
-
{
1233
1233
-
"user_id.handle": did,
1234
1234
-
},
1235
1235
-
],
1236
1236
-
})
1237
1237
-
.getAll();
1238
1238
-
1239
1239
-
const lovedTracks = await ctx.client.db.loved_tracks
1240
1240
-
.select(["track_id.*", "user_id.*"])
1241
1241
-
.filter({
1242
1242
-
$any: [
1243
1243
-
{
1244
1244
-
"user_id.did": did,
1245
1245
-
},
1246
1246
-
{
1247
1247
-
"user_id.handle": did,
1248
1248
-
},
1249
1249
-
],
1250
1250
-
})
1251
1251
-
.getAll();
1149
1149
+
const { data } = await ctx.analytics.post("library.getStats", {
1150
1150
+
user_did: did,
1151
1151
+
});
1252
1152
1253
1153
return c.json({
1254
1254
-
scrobbles: _.get(scrobbles, "summaries.0.total", 0),
1255
1255
-
artists: artists.length,
1256
1256
-
lovedTracks: lovedTracks.length,
1154
1154
+
scrobbles: data.scrobbles,
1155
1155
+
artists: data.artists,
1156
1156
+
lovedTracks: data.loved_tracks,
1257
1157
});
1258
1158
});
1259
1159
+12
-13
rockskyweb/src/pages/profile/overview/recenttracks/RecentTracks.tsx
···
56
56
const getRecentTracks = async () => {
57
57
const data = await getRecentTracksByDid(did, 0, props.size);
58
58
setRecentTracks(
59
59
-
data.map(({ track_id, album_id, artist_id, uri, xata_createdat }) => ({
60
60
-
id: track_id.xata_id,
61
61
-
title: track_id.title,
62
62
-
artist: track_id.artist,
63
63
-
album: track_id.album,
64
64
-
albumArt: track_id.album_art,
65
65
-
albumArtist: track_id.album_artist,
66
66
-
duration: track_id.duration,
67
67
-
uri: track_id.uri,
68
68
-
date: xata_createdat,
69
69
-
scrobbleUri: uri,
70
70
-
albumUri: album_id.uri,
71
71
-
artistUri: artist_id.uri,
59
59
+
data.map((item) => ({
60
60
+
id: item.id,
61
61
+
title: item.title,
62
62
+
artist: item.artist,
63
63
+
album: item.album,
64
64
+
albumArt: item.album_art,
65
65
+
albumArtist: item.album_artist,
66
66
+
uri: item.uri,
67
67
+
date: item.created_at,
68
68
+
scrobbleUri: item.uri,
69
69
+
albumUri: item.album_uri,
70
70
+
artistUri: item.artist_uri,
72
71
}))
73
72
);
74
73
};
+12
-28
rockskyweb/src/types/scrobble.ts
···
1
1
export type Scrobble = {
2
2
-
track_id: {
3
3
-
xata_id: string;
4
4
-
title: string;
5
5
-
artist: string;
6
6
-
album: string;
7
7
-
album_art?: string;
8
8
-
album_artist: string;
9
9
-
uri: string;
10
10
-
duration: number;
11
11
-
};
12
12
-
album_id: {
13
13
-
uri: string;
14
14
-
album_art?: string;
15
15
-
artist: string;
16
16
-
release_date: string;
17
17
-
title: string;
18
18
-
xata_id: string;
19
19
-
year: number;
20
20
-
};
21
21
-
artist_id: {
22
22
-
uri: string;
23
23
-
name: string;
24
24
-
xata_id: string;
25
25
-
bio?: string;
26
26
-
picture?: string;
27
27
-
};
2
2
+
id: string;
3
3
+
track_id: string;
4
4
+
title: string;
5
5
+
artist: string;
6
6
+
album: string;
7
7
+
album_art?: string;
8
8
+
album_artist: string;
9
9
+
handle: string;
10
10
+
track_uri: string;
11
11
+
album_uri: string;
12
12
+
artist_uri: string;
28
13
uri: string;
29
29
-
xata_createdat: string;
30
30
-
xata_id: string;
14
14
+
created_at: string;
31
15
};