forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1use anyhow::Error;
2use rand::seq::SliceRandom;
3use redis::AsyncCommands;
4
5pub async fn add_track(
6 client: &redis::Client,
7 did: &str,
8 track_id: &str,
9) -> Result<Vec<String>, Error> {
10 let mut conn = client.get_multiplexed_async_connection().await?;
11
12 conn.rpush::<_, _, i32>(format!("user:{}:queue", did), track_id)
13 .await?;
14
15 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?;
16
17 Ok(queue)
18}
19
20pub async fn add_tracks(
21 client: &redis::Client,
22 did: &str,
23 track_ids: Vec<String>,
24) -> Result<Vec<String>, Error> {
25 let mut conn = client.get_multiplexed_async_connection().await?;
26
27 conn.rpush::<_, _, i32>(format!("user:{}:queue", did), track_ids)
28 .await?;
29
30 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?;
31
32 Ok(queue)
33}
34
35pub async fn insert_track_at(
36 client: &redis::Client,
37 did: &str,
38 position: usize,
39 track_id: &str,
40) -> Result<Vec<String>, Error> {
41 let mut conn = client.get_multiplexed_async_connection().await?;
42
43 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?;
44
45 let mut new_queue = queue.clone();
46 if position >= new_queue.len() {
47 new_queue.push(track_id.to_string());
48 } else {
49 new_queue.insert(position, track_id.to_string());
50 }
51
52 let mut pipeline = redis::pipe();
53 pipeline
54 .atomic()
55 .del(format!("user:{}:queue", did))
56 .rpush(format!("user:{}:queue", did), new_queue.clone())
57 .query_async::<()>(&mut conn)
58 .await?;
59
60 Ok(new_queue)
61}
62
63pub async fn remove_track_at(
64 client: &redis::Client,
65 did: &str,
66 position: usize,
67) -> Result<Vec<String>, Error> {
68 let mut conn = client.get_multiplexed_async_connection().await?;
69
70 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?;
71 if position < queue.len() {
72 let _: i32 = conn
73 .lrem::<_, _, i32>(format!("user:{}:queue", did), 1, &queue[position])
74 .await?;
75 }
76
77 let new_queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?;
78 Ok(new_queue)
79}
80
81pub async fn shuffle_queue(client: &redis::Client, did: &str) -> Result<Vec<String>, Error> {
82 let mut conn = client.get_multiplexed_async_connection().await?;
83
84 let mut queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?;
85 let old_queue = queue.clone();
86
87 loop {
88 let mut rng = rand::rng();
89 queue.shuffle(&mut rng);
90 if queue != old_queue {
91 break;
92 }
93 }
94
95 redis::pipe()
96 .atomic()
97 .del(format!("user:{}:queue", did))
98 .rpush(format!("user:{}:queue", did), queue.clone())
99 .query_async::<()>(&mut conn)
100 .await?;
101
102 Ok(queue)
103}
104
105pub async fn get_queue(client: &redis::Client, did: &str) -> Result<Vec<String>, Error> {
106 let mut conn = client.get_multiplexed_async_connection().await?;
107
108 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?;
109 Ok(queue)
110}
111
112pub async fn clear_queue(client: &redis::Client, did: &str) -> Result<(), Error> {
113 let mut conn = client.get_multiplexed_async_connection().await?;
114
115 redis::pipe()
116 .atomic()
117 .del(format!("user:{}:queue", did))
118 .query_async::<()>(&mut conn)
119 .await?;
120
121 Ok(())
122}
123
124pub async fn get_queue_length(client: &redis::Client, did: &str) -> Result<usize, Error> {
125 let mut conn = client.get_multiplexed_async_connection().await?;
126
127 let length: usize = conn.llen(format!("user:{}:queue", did)).await?;
128 Ok(length)
129}
130
131pub async fn is_queue_empty(client: &redis::Client, did: &str) -> Result<bool, Error> {
132 let length = get_queue_length(client, did).await?;
133 Ok(length == 0)
134}
135
136pub async fn set_current_track(
137 client: &redis::Client,
138 did: &str,
139 position: usize,
140) -> Result<(), Error> {
141 let mut conn = client.get_multiplexed_async_connection().await?;
142
143 conn.set::<_, _, ()>(format!("user:{}:current_track", did), position)
144 .await?;
145
146 Ok(())
147}
148
149pub async fn get_current_track(client: &redis::Client, did: &str) -> Result<Option<usize>, Error> {
150 let mut conn = client.get_multiplexed_async_connection().await?;
151
152 let position: Option<usize> = conn
153 .get::<_, Option<usize>>(format!("user:{}:current_track", did))
154 .await?;
155
156 Ok(position)
157}
158
159pub async fn clear_current_track(client: &redis::Client, did: &str) -> Result<(), Error> {
160 let mut conn = client.get_multiplexed_async_connection().await?;
161
162 conn.del::<_, ()>(format!("user:{}:current_track", did))
163 .await?;
164
165 Ok(())
166}
167
168pub async fn move_track(
169 client: &redis::Client,
170 did: &str,
171 from: usize,
172 to: usize,
173) -> Result<Vec<String>, Error> {
174 let mut conn = client.get_multiplexed_async_connection().await?;
175
176 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?;
177 if from >= queue.len() || to >= queue.len() {
178 return Ok(queue);
179 }
180
181 let mut new_queue = queue.clone();
182 let track = new_queue.remove(from);
183 new_queue.insert(to, track);
184
185 redis::pipe()
186 .atomic()
187 .del(format!("user:{}:queue", did))
188 .rpush(format!("user:{}:queue", did), new_queue.clone())
189 .query_async::<()>(&mut conn)
190 .await?;
191
192 Ok(new_queue)
193}
194
195pub async fn replace_queue(
196 client: &redis::Client,
197 did: &str,
198 new_queue: Vec<String>,
199) -> Result<Vec<String>, Error> {
200 let mut conn = client.get_multiplexed_async_connection().await?;
201
202 redis::pipe()
203 .atomic()
204 .del(format!("user:{}:queue", did))
205 .rpush(format!("user:{}:queue", did), new_queue.clone())
206 .query_async::<()>(&mut conn)
207 .await?;
208
209 Ok(new_queue)
210}
211
212pub async fn get_track_at(
213 client: &redis::Client,
214 did: &str,
215 position: usize,
216) -> Result<Option<String>, Error> {
217 let mut conn = client.get_multiplexed_async_connection().await?;
218
219 let track: Option<String> = conn
220 .lindex::<_, Option<String>>(format!("user:{}:queue", did), position as isize)
221 .await?;
222
223 Ok(track)
224}
225
226pub async fn insert_tracks_at(
227 client: &redis::Client,
228 did: &str,
229 position: usize,
230 track_ids: Vec<String>,
231) -> Result<Vec<String>, Error> {
232 let mut conn = client.get_multiplexed_async_connection().await?;
233
234 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?;
235
236 let mut new_queue = queue.clone();
237 if position >= new_queue.len() {
238 new_queue.extend(track_ids);
239 } else {
240 for (i, track_id) in track_ids.into_iter().enumerate() {
241 new_queue.insert(position + i, track_id);
242 }
243 }
244
245 let mut pipeline = redis::pipe();
246 pipeline
247 .atomic()
248 .del(format!("user:{}:queue", did))
249 .rpush(format!("user:{}:queue", did), new_queue.clone())
250 .query_async::<()>(&mut conn)
251 .await?;
252
253 Ok(new_queue)
254}
255
256#[cfg(test)]
257mod tests {
258 use std::env;
259
260 use super::*;
261 use anyhow::Error;
262 use redis::AsyncCommands;
263 use uuid::Uuid;
264
265 async fn setup_redis() -> redis::Client {
266 redis::Client::open(env::var("REDIS_URL").unwrap_or("redis://127.0.0.1".into()))
267 .expect("Failed to create Redis client")
268 }
269
270 async fn cleanup(client: &redis::Client, did: &str) -> Result<(), Error> {
271 let mut conn = client.get_multiplexed_async_connection().await?;
272 conn.del::<_, ()>(format!("user:{}:queue", did)).await?;
273 Ok(())
274 }
275
276 #[tokio::test]
277 async fn test_add_track() -> Result<(), Error> {
278 let client = setup_redis().await;
279 let did = Uuid::new_v4().to_string();
280 let track_id = "track:67890";
281
282 // Add a track
283 add_track(&client, &did, track_id).await?;
284 let queue = get_queue(&client, &did).await?;
285 assert_eq!(queue, vec![track_id]);
286
287 // Add another track
288 let track_id2 = "track:67891";
289 add_track(&client, &did, track_id2).await?;
290 let queue = get_queue(&client, &did).await?;
291 assert_eq!(queue, vec![track_id, track_id2]);
292
293 // Cleanup
294 cleanup(&client, &did).await?;
295 Ok(())
296 }
297
298 #[tokio::test]
299 async fn test_add_tracks() -> Result<(), Error> {
300 let client = setup_redis().await;
301 let did = Uuid::new_v4().to_string();
302
303 let track_ids = vec!["track:67890", "track:67891", "track:67892"];
304 add_tracks(
305 &client,
306 &did,
307 track_ids.iter().map(|s| s.to_string()).collect(),
308 )
309 .await?;
310
311 let queue = get_queue(&client, &did).await?;
312 assert_eq!(queue, track_ids);
313 cleanup(&client, &did).await?;
314 Ok(())
315 }
316
317 #[tokio::test]
318 async fn test_insert_track_at() -> Result<(), Error> {
319 let client = setup_redis().await;
320 let did = Uuid::new_v4().to_string();
321 let track_ids = vec!["track:67890", "track:67891", "track:67892"];
322
323 for &track_id in &track_ids {
324 add_track(&client, &did, track_id).await?;
325 }
326
327 let new_track = "track:67893";
328 insert_track_at(&client, &did, 1, new_track).await?;
329 let queue: Vec<String> = get_queue(&client, &did).await?;
330 assert_eq!(
331 queue,
332 vec!["track:67890", "track:67893", "track:67891", "track:67892"]
333 );
334
335 let end_track = "track:67894";
336 insert_track_at(&client, &did, 10, end_track).await?;
337 let queue = get_queue(&client, &did).await?;
338 assert_eq!(
339 queue,
340 vec![
341 "track:67890",
342 "track:67893",
343 "track:67891",
344 "track:67892",
345 "track:67894"
346 ]
347 );
348
349 let new_did = Uuid::new_v4().to_string();
350 insert_track_at(&client, &new_did, 0, "track:67895").await?;
351 let queue = get_queue(&client, &new_did).await?;
352 assert_eq!(queue, vec!["track:67895"]);
353
354 cleanup(&client, &did).await?;
355 cleanup(&client, &new_did).await?;
356 Ok(())
357 }
358
359 #[tokio::test]
360 async fn test_remove_track_at() -> Result<(), Error> {
361 let client = setup_redis().await;
362 let did = Uuid::new_v4().to_string();
363 let track_ids = vec!["track:67890", "track:67891", "track:67892"];
364
365 for &track_id in &track_ids {
366 add_track(&client, &did, track_id).await?;
367 }
368
369 remove_track_at(&client, &did, 1).await?;
370 let queue = get_queue(&client, &did).await?;
371 assert_eq!(queue, vec!["track:67890", "track:67892"]);
372
373 remove_track_at(&client, &did, 0).await?;
374 let queue = get_queue(&client, &did).await?;
375 assert_eq!(queue, vec!["track:67892"]);
376
377 remove_track_at(&client, &did, 5).await?;
378 let queue = get_queue(&client, &did).await?;
379 assert_eq!(queue, vec!["track:67892"]);
380
381 let new_did = Uuid::new_v4().to_string();
382 remove_track_at(&client, &new_did, 0).await?;
383 let queue = get_queue(&client, &new_did).await?;
384 assert_eq!(queue, Vec::<String>::new());
385
386 cleanup(&client, &did).await?;
387 cleanup(&client, &new_did).await?;
388 Ok(())
389 }
390
391 #[tokio::test]
392 async fn test_shuffle_queue() -> Result<(), Error> {
393 let client = setup_redis().await;
394 let did = Uuid::new_v4().to_string();
395 let track_ids = vec!["track:67890", "track:67891", "track:67892"];
396
397 for &track_id in &track_ids {
398 add_track(&client, &did, track_id).await?;
399 }
400
401 shuffle_queue(&client, &did).await?;
402 let queue = get_queue(&client, &did).await?;
403 assert_eq!(queue.len(), track_ids.len());
404 assert!(track_ids.iter().all(|id| queue.contains(&id.to_string())));
405
406 cleanup(&client, &did).await?;
407 Ok(())
408 }
409
410 #[tokio::test]
411 async fn test_get_queue() -> Result<(), Error> {
412 let client = setup_redis().await;
413 let did = Uuid::new_v4().to_string();
414 let track_ids = vec!["track:67890", "track:67891"];
415
416 let queue = get_queue(&client, &did).await?;
417 assert_eq!(queue, Vec::<String>::new());
418
419 for &track_id in &track_ids {
420 add_track(&client, &did, track_id).await?;
421 }
422
423 let queue = get_queue(&client, &did).await?;
424 assert_eq!(queue, track_ids);
425
426 cleanup(&client, &did).await?;
427 Ok(())
428 }
429
430 #[tokio::test]
431 async fn test_clear_queue() -> Result<(), Error> {
432 let client = setup_redis().await;
433 let did = Uuid::new_v4().to_string();
434 let track_ids = vec!["track:67890", "track:67891"];
435
436 for &track_id in &track_ids {
437 add_track(&client, &did, track_id).await?;
438 }
439
440 clear_queue(&client, &did).await?;
441 let queue = get_queue(&client, &did).await?;
442 assert_eq!(queue, Vec::<String>::new());
443
444 clear_queue(&client, &did).await?;
445 let queue = get_queue(&client, &did).await?;
446 assert_eq!(queue, Vec::<String>::new());
447
448 cleanup(&client, &did).await?;
449 Ok(())
450 }
451
452 #[tokio::test]
453 async fn test_queue_length_and_empty() -> Result<(), Error> {
454 let client = setup_redis().await;
455 let did = Uuid::new_v4().to_string();
456 let track_ids = vec!["track:67890", "track:67891"];
457
458 let length = get_queue_length(&client, &did).await?;
459 assert_eq!(length, 0);
460 let is_empty = is_queue_empty(&client, &did).await?;
461 assert!(is_empty);
462
463 for &track_id in &track_ids {
464 add_track(&client, &did, track_id).await?;
465 }
466
467 let length = get_queue_length(&client, &did).await?;
468 assert_eq!(length, track_ids.len());
469 let is_empty = is_queue_empty(&client, &did).await?;
470 assert!(!is_empty);
471
472 cleanup(&client, &did).await?;
473 Ok(())
474 }
475
476 #[tokio::test]
477 async fn test_current_track() -> Result<(), Error> {
478 let client = setup_redis().await;
479 let did = Uuid::new_v4().to_string();
480 let track_ids = vec!["track:67890", "track:67891"];
481 for &track_id in &track_ids {
482 add_track(&client, &did, track_id).await?;
483 }
484 let current = get_current_track(&client, &did).await?;
485 assert_eq!(current, None);
486 set_current_track(&client, &did, 1).await?;
487 let current = get_current_track(&client, &did).await?;
488 assert_eq!(current, Some(1));
489 clear_current_track(&client, &did).await?;
490 let current = get_current_track(&client, &did).await?;
491 assert_eq!(current, None);
492 cleanup(&client, &did).await?;
493 Ok(())
494 }
495
496 #[tokio::test]
497 async fn test_move_track() -> Result<(), Error> {
498 let client = setup_redis().await;
499 let did = Uuid::new_v4().to_string();
500 let track_ids = vec!["track:67890", "track:67891", "track:67892"];
501
502 for &track_id in &track_ids {
503 add_track(&client, &did, track_id).await?;
504 }
505
506 move_track(&client, &did, 0, 2).await?;
507 let queue = get_queue(&client, &did).await?;
508 assert_eq!(queue, vec!["track:67891", "track:67892", "track:67890"]);
509 move_track(&client, &did, 2, 0).await?;
510
511 let queue = get_queue(&client, &did).await?;
512 assert_eq!(queue, vec!["track:67890", "track:67891", "track:67892"]);
513 move_track(&client, &did, 1, 1).await?;
514
515 let queue = get_queue(&client, &did).await?;
516 assert_eq!(queue, vec!["track:67890", "track:67891", "track:67892"]);
517 move_track(&client, &did, 5, 0).await?;
518
519 let queue = get_queue(&client, &did).await?;
520 assert_eq!(queue, vec!["track:67890", "track:67891", "track:67892"]);
521
522 let new_did = Uuid::new_v4().to_string();
523 move_track(&client, &new_did, 0, 1).await?;
524
525 let queue = get_queue(&client, &new_did).await?;
526 assert_eq!(queue, Vec::<String>::new());
527
528 cleanup(&client, &did).await?;
529 cleanup(&client, &new_did).await?;
530
531 Ok(())
532 }
533
534 #[tokio::test]
535 async fn test_replace_queue() -> Result<(), Error> {
536 let client = setup_redis().await;
537 let did = Uuid::new_v4().to_string();
538 let initial_tracks = vec!["track:67890", "track:67891"];
539
540 for &track_id in &initial_tracks {
541 add_track(&client, &did, track_id).await?;
542 }
543
544 let new_queue = vec![
545 "track:67892".to_string(),
546 "track:67893".to_string(),
547 "track:67894".to_string(),
548 ];
549
550 replace_queue(&client, &did, new_queue.clone()).await?;
551 let queue = get_queue(&client, &did).await?;
552
553 assert_eq!(queue, new_queue);
554 cleanup(&client, &did).await?;
555 Ok(())
556 }
557
558 #[tokio::test]
559 async fn test_get_track_at() -> Result<(), Error> {
560 let client = setup_redis().await;
561 let did = Uuid::new_v4().to_string();
562 let track_ids = vec!["track:67890", "track:67891", "track:67892"];
563
564 for &track_id in &track_ids {
565 add_track(&client, &did, track_id).await?;
566 }
567
568 let track = get_track_at(&client, &did, 1).await?;
569 assert_eq!(track, Some("track:67891".to_string()));
570
571 let track = get_track_at(&client, &did, 5).await?;
572 assert_eq!(track, None);
573
574 let new_did = Uuid::new_v4().to_string();
575 let track = get_track_at(&client, &new_did, 0).await?;
576 assert_eq!(track, None);
577
578 cleanup(&client, &did).await?;
579 cleanup(&client, &new_did).await?;
580
581 Ok(())
582 }
583
584 #[tokio::test]
585 async fn test_insert_tracks_at() -> Result<(), Error> {
586 let client = setup_redis().await;
587 let did = Uuid::new_v4().to_string();
588 let initial_tracks = vec!["track:67890", "track:67891"];
589
590 for &track_id in &initial_tracks {
591 add_track(&client, &did, track_id).await?;
592 }
593
594 let new_tracks = vec!["track:67892".to_string(), "track:67893".to_string()];
595 insert_tracks_at(&client, &did, 1, new_tracks.clone()).await?;
596 let queue = get_queue(&client, &did).await?;
597
598 assert_eq!(
599 queue,
600 vec!["track:67890", "track:67892", "track:67893", "track:67891"]
601 );
602
603 let end_tracks = vec!["track:67894".to_string()];
604 insert_tracks_at(&client, &did, 10, end_tracks.clone()).await?;
605 let queue = get_queue(&client, &did).await?;
606 assert_eq!(
607 queue,
608 vec![
609 "track:67890",
610 "track:67892",
611 "track:67893",
612 "track:67891",
613 "track:67894"
614 ]
615 );
616 let new_did = Uuid::new_v4().to_string();
617 let new_tracks = vec!["track:67895".to_string(), "track:67896".to_string()];
618 insert_tracks_at(&client, &new_did, 0, new_tracks.clone()).await?;
619
620 let queue = get_queue(&client, &new_did).await?;
621 assert_eq!(queue, new_tracks);
622
623 cleanup(&client, &did).await?;
624 cleanup(&client, &new_did).await?;
625 Ok(())
626 }
627
628 #[tokio::test]
629 async fn test_concurrent_operations() -> Result<(), Error> {
630 let client = setup_redis().await;
631 let did = Uuid::new_v4().to_string();
632 let track_ids = vec!["track:67890", "track:67891", "track:67892"];
633
634 let add_task = add_track(&client, &did, track_ids[0]);
635 let insert_task = insert_track_at(&client, &did, 0, track_ids[1]);
636 let remove_task = remove_track_at(&client, &did, 0);
637 tokio::try_join!(add_task, insert_task, remove_task)?;
638
639 let queue = get_queue(&client, &did).await?;
640 assert!(queue.len() <= 2);
641 assert!(track_ids.iter().any(|id| queue.contains(&id.to_string())));
642
643 cleanup(&client, &did).await?;
644 Ok(())
645 }
646}