A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at main 646 lines 20 kB view raw
1use anyhow::Error; 2use rand::seq::SliceRandom; 3use redis::AsyncCommands; 4 5pub async fn add_track( 6 client: &redis::Client, 7 did: &str, 8 track_id: &str, 9) -> Result<Vec<String>, Error> { 10 let mut conn = client.get_multiplexed_async_connection().await?; 11 12 conn.rpush::<_, _, i32>(format!("user:{}:queue", did), track_id) 13 .await?; 14 15 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?; 16 17 Ok(queue) 18} 19 20pub async fn add_tracks( 21 client: &redis::Client, 22 did: &str, 23 track_ids: Vec<String>, 24) -> Result<Vec<String>, Error> { 25 let mut conn = client.get_multiplexed_async_connection().await?; 26 27 conn.rpush::<_, _, i32>(format!("user:{}:queue", did), track_ids) 28 .await?; 29 30 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?; 31 32 Ok(queue) 33} 34 35pub async fn insert_track_at( 36 client: &redis::Client, 37 did: &str, 38 position: usize, 39 track_id: &str, 40) -> Result<Vec<String>, Error> { 41 let mut conn = client.get_multiplexed_async_connection().await?; 42 43 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?; 44 45 let mut new_queue = queue.clone(); 46 if position >= new_queue.len() { 47 new_queue.push(track_id.to_string()); 48 } else { 49 new_queue.insert(position, track_id.to_string()); 50 } 51 52 let mut pipeline = redis::pipe(); 53 pipeline 54 .atomic() 55 .del(format!("user:{}:queue", did)) 56 .rpush(format!("user:{}:queue", did), new_queue.clone()) 57 .query_async::<()>(&mut conn) 58 .await?; 59 60 Ok(new_queue) 61} 62 63pub async fn remove_track_at( 64 client: &redis::Client, 65 did: &str, 66 position: usize, 67) -> Result<Vec<String>, Error> { 68 let mut conn = client.get_multiplexed_async_connection().await?; 69 70 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?; 71 if position < queue.len() { 72 let _: i32 = conn 73 .lrem::<_, _, i32>(format!("user:{}:queue", did), 1, &queue[position]) 74 .await?; 75 } 76 77 let new_queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?; 78 Ok(new_queue) 79} 80 81pub async fn shuffle_queue(client: &redis::Client, did: &str) -> Result<Vec<String>, Error> { 82 let mut conn = client.get_multiplexed_async_connection().await?; 83 84 let mut queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?; 85 let old_queue = queue.clone(); 86 87 loop { 88 let mut rng = rand::rng(); 89 queue.shuffle(&mut rng); 90 if queue != old_queue { 91 break; 92 } 93 } 94 95 redis::pipe() 96 .atomic() 97 .del(format!("user:{}:queue", did)) 98 .rpush(format!("user:{}:queue", did), queue.clone()) 99 .query_async::<()>(&mut conn) 100 .await?; 101 102 Ok(queue) 103} 104 105pub async fn get_queue(client: &redis::Client, did: &str) -> Result<Vec<String>, Error> { 106 let mut conn = client.get_multiplexed_async_connection().await?; 107 108 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?; 109 Ok(queue) 110} 111 112pub async fn clear_queue(client: &redis::Client, did: &str) -> Result<(), Error> { 113 let mut conn = client.get_multiplexed_async_connection().await?; 114 115 redis::pipe() 116 .atomic() 117 .del(format!("user:{}:queue", did)) 118 .query_async::<()>(&mut conn) 119 .await?; 120 121 Ok(()) 122} 123 124pub async fn get_queue_length(client: &redis::Client, did: &str) -> Result<usize, Error> { 125 let mut conn = client.get_multiplexed_async_connection().await?; 126 127 let length: usize = conn.llen(format!("user:{}:queue", did)).await?; 128 Ok(length) 129} 130 131pub async fn is_queue_empty(client: &redis::Client, did: &str) -> Result<bool, Error> { 132 let length = get_queue_length(client, did).await?; 133 Ok(length == 0) 134} 135 136pub async fn set_current_track( 137 client: &redis::Client, 138 did: &str, 139 position: usize, 140) -> Result<(), Error> { 141 let mut conn = client.get_multiplexed_async_connection().await?; 142 143 conn.set::<_, _, ()>(format!("user:{}:current_track", did), position) 144 .await?; 145 146 Ok(()) 147} 148 149pub async fn get_current_track(client: &redis::Client, did: &str) -> Result<Option<usize>, Error> { 150 let mut conn = client.get_multiplexed_async_connection().await?; 151 152 let position: Option<usize> = conn 153 .get::<_, Option<usize>>(format!("user:{}:current_track", did)) 154 .await?; 155 156 Ok(position) 157} 158 159pub async fn clear_current_track(client: &redis::Client, did: &str) -> Result<(), Error> { 160 let mut conn = client.get_multiplexed_async_connection().await?; 161 162 conn.del::<_, ()>(format!("user:{}:current_track", did)) 163 .await?; 164 165 Ok(()) 166} 167 168pub async fn move_track( 169 client: &redis::Client, 170 did: &str, 171 from: usize, 172 to: usize, 173) -> Result<Vec<String>, Error> { 174 let mut conn = client.get_multiplexed_async_connection().await?; 175 176 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?; 177 if from >= queue.len() || to >= queue.len() { 178 return Ok(queue); 179 } 180 181 let mut new_queue = queue.clone(); 182 let track = new_queue.remove(from); 183 new_queue.insert(to, track); 184 185 redis::pipe() 186 .atomic() 187 .del(format!("user:{}:queue", did)) 188 .rpush(format!("user:{}:queue", did), new_queue.clone()) 189 .query_async::<()>(&mut conn) 190 .await?; 191 192 Ok(new_queue) 193} 194 195pub async fn replace_queue( 196 client: &redis::Client, 197 did: &str, 198 new_queue: Vec<String>, 199) -> Result<Vec<String>, Error> { 200 let mut conn = client.get_multiplexed_async_connection().await?; 201 202 redis::pipe() 203 .atomic() 204 .del(format!("user:{}:queue", did)) 205 .rpush(format!("user:{}:queue", did), new_queue.clone()) 206 .query_async::<()>(&mut conn) 207 .await?; 208 209 Ok(new_queue) 210} 211 212pub async fn get_track_at( 213 client: &redis::Client, 214 did: &str, 215 position: usize, 216) -> Result<Option<String>, Error> { 217 let mut conn = client.get_multiplexed_async_connection().await?; 218 219 let track: Option<String> = conn 220 .lindex::<_, Option<String>>(format!("user:{}:queue", did), position as isize) 221 .await?; 222 223 Ok(track) 224} 225 226pub async fn insert_tracks_at( 227 client: &redis::Client, 228 did: &str, 229 position: usize, 230 track_ids: Vec<String>, 231) -> Result<Vec<String>, Error> { 232 let mut conn = client.get_multiplexed_async_connection().await?; 233 234 let queue: Vec<String> = conn.lrange(format!("user:{}:queue", did), 0, -1).await?; 235 236 let mut new_queue = queue.clone(); 237 if position >= new_queue.len() { 238 new_queue.extend(track_ids); 239 } else { 240 for (i, track_id) in track_ids.into_iter().enumerate() { 241 new_queue.insert(position + i, track_id); 242 } 243 } 244 245 let mut pipeline = redis::pipe(); 246 pipeline 247 .atomic() 248 .del(format!("user:{}:queue", did)) 249 .rpush(format!("user:{}:queue", did), new_queue.clone()) 250 .query_async::<()>(&mut conn) 251 .await?; 252 253 Ok(new_queue) 254} 255 256#[cfg(test)] 257mod tests { 258 use std::env; 259 260 use super::*; 261 use anyhow::Error; 262 use redis::AsyncCommands; 263 use uuid::Uuid; 264 265 async fn setup_redis() -> redis::Client { 266 redis::Client::open(env::var("REDIS_URL").unwrap_or("redis://127.0.0.1".into())) 267 .expect("Failed to create Redis client") 268 } 269 270 async fn cleanup(client: &redis::Client, did: &str) -> Result<(), Error> { 271 let mut conn = client.get_multiplexed_async_connection().await?; 272 conn.del::<_, ()>(format!("user:{}:queue", did)).await?; 273 Ok(()) 274 } 275 276 #[tokio::test] 277 async fn test_add_track() -> Result<(), Error> { 278 let client = setup_redis().await; 279 let did = Uuid::new_v4().to_string(); 280 let track_id = "track:67890"; 281 282 // Add a track 283 add_track(&client, &did, track_id).await?; 284 let queue = get_queue(&client, &did).await?; 285 assert_eq!(queue, vec![track_id]); 286 287 // Add another track 288 let track_id2 = "track:67891"; 289 add_track(&client, &did, track_id2).await?; 290 let queue = get_queue(&client, &did).await?; 291 assert_eq!(queue, vec![track_id, track_id2]); 292 293 // Cleanup 294 cleanup(&client, &did).await?; 295 Ok(()) 296 } 297 298 #[tokio::test] 299 async fn test_add_tracks() -> Result<(), Error> { 300 let client = setup_redis().await; 301 let did = Uuid::new_v4().to_string(); 302 303 let track_ids = vec!["track:67890", "track:67891", "track:67892"]; 304 add_tracks( 305 &client, 306 &did, 307 track_ids.iter().map(|s| s.to_string()).collect(), 308 ) 309 .await?; 310 311 let queue = get_queue(&client, &did).await?; 312 assert_eq!(queue, track_ids); 313 cleanup(&client, &did).await?; 314 Ok(()) 315 } 316 317 #[tokio::test] 318 async fn test_insert_track_at() -> Result<(), Error> { 319 let client = setup_redis().await; 320 let did = Uuid::new_v4().to_string(); 321 let track_ids = vec!["track:67890", "track:67891", "track:67892"]; 322 323 for &track_id in &track_ids { 324 add_track(&client, &did, track_id).await?; 325 } 326 327 let new_track = "track:67893"; 328 insert_track_at(&client, &did, 1, new_track).await?; 329 let queue: Vec<String> = get_queue(&client, &did).await?; 330 assert_eq!( 331 queue, 332 vec!["track:67890", "track:67893", "track:67891", "track:67892"] 333 ); 334 335 let end_track = "track:67894"; 336 insert_track_at(&client, &did, 10, end_track).await?; 337 let queue = get_queue(&client, &did).await?; 338 assert_eq!( 339 queue, 340 vec![ 341 "track:67890", 342 "track:67893", 343 "track:67891", 344 "track:67892", 345 "track:67894" 346 ] 347 ); 348 349 let new_did = Uuid::new_v4().to_string(); 350 insert_track_at(&client, &new_did, 0, "track:67895").await?; 351 let queue = get_queue(&client, &new_did).await?; 352 assert_eq!(queue, vec!["track:67895"]); 353 354 cleanup(&client, &did).await?; 355 cleanup(&client, &new_did).await?; 356 Ok(()) 357 } 358 359 #[tokio::test] 360 async fn test_remove_track_at() -> Result<(), Error> { 361 let client = setup_redis().await; 362 let did = Uuid::new_v4().to_string(); 363 let track_ids = vec!["track:67890", "track:67891", "track:67892"]; 364 365 for &track_id in &track_ids { 366 add_track(&client, &did, track_id).await?; 367 } 368 369 remove_track_at(&client, &did, 1).await?; 370 let queue = get_queue(&client, &did).await?; 371 assert_eq!(queue, vec!["track:67890", "track:67892"]); 372 373 remove_track_at(&client, &did, 0).await?; 374 let queue = get_queue(&client, &did).await?; 375 assert_eq!(queue, vec!["track:67892"]); 376 377 remove_track_at(&client, &did, 5).await?; 378 let queue = get_queue(&client, &did).await?; 379 assert_eq!(queue, vec!["track:67892"]); 380 381 let new_did = Uuid::new_v4().to_string(); 382 remove_track_at(&client, &new_did, 0).await?; 383 let queue = get_queue(&client, &new_did).await?; 384 assert_eq!(queue, Vec::<String>::new()); 385 386 cleanup(&client, &did).await?; 387 cleanup(&client, &new_did).await?; 388 Ok(()) 389 } 390 391 #[tokio::test] 392 async fn test_shuffle_queue() -> Result<(), Error> { 393 let client = setup_redis().await; 394 let did = Uuid::new_v4().to_string(); 395 let track_ids = vec!["track:67890", "track:67891", "track:67892"]; 396 397 for &track_id in &track_ids { 398 add_track(&client, &did, track_id).await?; 399 } 400 401 shuffle_queue(&client, &did).await?; 402 let queue = get_queue(&client, &did).await?; 403 assert_eq!(queue.len(), track_ids.len()); 404 assert!(track_ids.iter().all(|id| queue.contains(&id.to_string()))); 405 406 cleanup(&client, &did).await?; 407 Ok(()) 408 } 409 410 #[tokio::test] 411 async fn test_get_queue() -> Result<(), Error> { 412 let client = setup_redis().await; 413 let did = Uuid::new_v4().to_string(); 414 let track_ids = vec!["track:67890", "track:67891"]; 415 416 let queue = get_queue(&client, &did).await?; 417 assert_eq!(queue, Vec::<String>::new()); 418 419 for &track_id in &track_ids { 420 add_track(&client, &did, track_id).await?; 421 } 422 423 let queue = get_queue(&client, &did).await?; 424 assert_eq!(queue, track_ids); 425 426 cleanup(&client, &did).await?; 427 Ok(()) 428 } 429 430 #[tokio::test] 431 async fn test_clear_queue() -> Result<(), Error> { 432 let client = setup_redis().await; 433 let did = Uuid::new_v4().to_string(); 434 let track_ids = vec!["track:67890", "track:67891"]; 435 436 for &track_id in &track_ids { 437 add_track(&client, &did, track_id).await?; 438 } 439 440 clear_queue(&client, &did).await?; 441 let queue = get_queue(&client, &did).await?; 442 assert_eq!(queue, Vec::<String>::new()); 443 444 clear_queue(&client, &did).await?; 445 let queue = get_queue(&client, &did).await?; 446 assert_eq!(queue, Vec::<String>::new()); 447 448 cleanup(&client, &did).await?; 449 Ok(()) 450 } 451 452 #[tokio::test] 453 async fn test_queue_length_and_empty() -> Result<(), Error> { 454 let client = setup_redis().await; 455 let did = Uuid::new_v4().to_string(); 456 let track_ids = vec!["track:67890", "track:67891"]; 457 458 let length = get_queue_length(&client, &did).await?; 459 assert_eq!(length, 0); 460 let is_empty = is_queue_empty(&client, &did).await?; 461 assert!(is_empty); 462 463 for &track_id in &track_ids { 464 add_track(&client, &did, track_id).await?; 465 } 466 467 let length = get_queue_length(&client, &did).await?; 468 assert_eq!(length, track_ids.len()); 469 let is_empty = is_queue_empty(&client, &did).await?; 470 assert!(!is_empty); 471 472 cleanup(&client, &did).await?; 473 Ok(()) 474 } 475 476 #[tokio::test] 477 async fn test_current_track() -> Result<(), Error> { 478 let client = setup_redis().await; 479 let did = Uuid::new_v4().to_string(); 480 let track_ids = vec!["track:67890", "track:67891"]; 481 for &track_id in &track_ids { 482 add_track(&client, &did, track_id).await?; 483 } 484 let current = get_current_track(&client, &did).await?; 485 assert_eq!(current, None); 486 set_current_track(&client, &did, 1).await?; 487 let current = get_current_track(&client, &did).await?; 488 assert_eq!(current, Some(1)); 489 clear_current_track(&client, &did).await?; 490 let current = get_current_track(&client, &did).await?; 491 assert_eq!(current, None); 492 cleanup(&client, &did).await?; 493 Ok(()) 494 } 495 496 #[tokio::test] 497 async fn test_move_track() -> Result<(), Error> { 498 let client = setup_redis().await; 499 let did = Uuid::new_v4().to_string(); 500 let track_ids = vec!["track:67890", "track:67891", "track:67892"]; 501 502 for &track_id in &track_ids { 503 add_track(&client, &did, track_id).await?; 504 } 505 506 move_track(&client, &did, 0, 2).await?; 507 let queue = get_queue(&client, &did).await?; 508 assert_eq!(queue, vec!["track:67891", "track:67892", "track:67890"]); 509 move_track(&client, &did, 2, 0).await?; 510 511 let queue = get_queue(&client, &did).await?; 512 assert_eq!(queue, vec!["track:67890", "track:67891", "track:67892"]); 513 move_track(&client, &did, 1, 1).await?; 514 515 let queue = get_queue(&client, &did).await?; 516 assert_eq!(queue, vec!["track:67890", "track:67891", "track:67892"]); 517 move_track(&client, &did, 5, 0).await?; 518 519 let queue = get_queue(&client, &did).await?; 520 assert_eq!(queue, vec!["track:67890", "track:67891", "track:67892"]); 521 522 let new_did = Uuid::new_v4().to_string(); 523 move_track(&client, &new_did, 0, 1).await?; 524 525 let queue = get_queue(&client, &new_did).await?; 526 assert_eq!(queue, Vec::<String>::new()); 527 528 cleanup(&client, &did).await?; 529 cleanup(&client, &new_did).await?; 530 531 Ok(()) 532 } 533 534 #[tokio::test] 535 async fn test_replace_queue() -> Result<(), Error> { 536 let client = setup_redis().await; 537 let did = Uuid::new_v4().to_string(); 538 let initial_tracks = vec!["track:67890", "track:67891"]; 539 540 for &track_id in &initial_tracks { 541 add_track(&client, &did, track_id).await?; 542 } 543 544 let new_queue = vec![ 545 "track:67892".to_string(), 546 "track:67893".to_string(), 547 "track:67894".to_string(), 548 ]; 549 550 replace_queue(&client, &did, new_queue.clone()).await?; 551 let queue = get_queue(&client, &did).await?; 552 553 assert_eq!(queue, new_queue); 554 cleanup(&client, &did).await?; 555 Ok(()) 556 } 557 558 #[tokio::test] 559 async fn test_get_track_at() -> Result<(), Error> { 560 let client = setup_redis().await; 561 let did = Uuid::new_v4().to_string(); 562 let track_ids = vec!["track:67890", "track:67891", "track:67892"]; 563 564 for &track_id in &track_ids { 565 add_track(&client, &did, track_id).await?; 566 } 567 568 let track = get_track_at(&client, &did, 1).await?; 569 assert_eq!(track, Some("track:67891".to_string())); 570 571 let track = get_track_at(&client, &did, 5).await?; 572 assert_eq!(track, None); 573 574 let new_did = Uuid::new_v4().to_string(); 575 let track = get_track_at(&client, &new_did, 0).await?; 576 assert_eq!(track, None); 577 578 cleanup(&client, &did).await?; 579 cleanup(&client, &new_did).await?; 580 581 Ok(()) 582 } 583 584 #[tokio::test] 585 async fn test_insert_tracks_at() -> Result<(), Error> { 586 let client = setup_redis().await; 587 let did = Uuid::new_v4().to_string(); 588 let initial_tracks = vec!["track:67890", "track:67891"]; 589 590 for &track_id in &initial_tracks { 591 add_track(&client, &did, track_id).await?; 592 } 593 594 let new_tracks = vec!["track:67892".to_string(), "track:67893".to_string()]; 595 insert_tracks_at(&client, &did, 1, new_tracks.clone()).await?; 596 let queue = get_queue(&client, &did).await?; 597 598 assert_eq!( 599 queue, 600 vec!["track:67890", "track:67892", "track:67893", "track:67891"] 601 ); 602 603 let end_tracks = vec!["track:67894".to_string()]; 604 insert_tracks_at(&client, &did, 10, end_tracks.clone()).await?; 605 let queue = get_queue(&client, &did).await?; 606 assert_eq!( 607 queue, 608 vec![ 609 "track:67890", 610 "track:67892", 611 "track:67893", 612 "track:67891", 613 "track:67894" 614 ] 615 ); 616 let new_did = Uuid::new_v4().to_string(); 617 let new_tracks = vec!["track:67895".to_string(), "track:67896".to_string()]; 618 insert_tracks_at(&client, &new_did, 0, new_tracks.clone()).await?; 619 620 let queue = get_queue(&client, &new_did).await?; 621 assert_eq!(queue, new_tracks); 622 623 cleanup(&client, &did).await?; 624 cleanup(&client, &new_did).await?; 625 Ok(()) 626 } 627 628 #[tokio::test] 629 async fn test_concurrent_operations() -> Result<(), Error> { 630 let client = setup_redis().await; 631 let did = Uuid::new_v4().to_string(); 632 let track_ids = vec!["track:67890", "track:67891", "track:67892"]; 633 634 let add_task = add_track(&client, &did, track_ids[0]); 635 let insert_task = insert_track_at(&client, &did, 0, track_ids[1]); 636 let remove_task = remove_track_at(&client, &did, 0); 637 tokio::try_join!(add_task, insert_task, remove_task)?; 638 639 let queue = get_queue(&client, &did).await?; 640 assert!(queue.len() <= 2); 641 assert!(track_ids.iter().any(|id| queue.contains(&id.to_string()))); 642 643 cleanup(&client, &did).await?; 644 Ok(()) 645 } 646}