A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz

fix: remove r2d2 dependency and refactor database connection handling to use direct duckdb connections

+184 -281
-21
Cargo.lock
··· 4940 checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" 4941 4942 [[package]] 4943 - name = "r2d2" 4944 - version = "0.8.10" 4945 - source = "registry+https://github.com/rust-lang/crates.io-index" 4946 - checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" 4947 - dependencies = [ 4948 - "log", 4949 - "parking_lot", 4950 - "scheduled-thread-pool", 4951 - ] 4952 - 4953 - [[package]] 4954 name = "radium" 4955 version = "0.7.0" 4956 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 5380 "dotenv", 5381 "duckdb", 5382 "owo-colors", 5383 - "r2d2", 5384 "reqwest", 5385 "serde", 5386 "serde_json", ··· 5954 checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" 5955 dependencies = [ 5956 "windows-sys 0.61.1", 5957 - ] 5958 - 5959 - [[package]] 5960 - name = "scheduled-thread-pool" 5961 - version = "0.2.7" 5962 - source = "registry+https://github.com/rust-lang/crates.io-index" 5963 - checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" 5964 - dependencies = [ 5965 - "parking_lot", 5966 ] 5967 5968 [[package]]
··· 4940 checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" 4941 4942 [[package]] 4943 name = "radium" 4944 version = "0.7.0" 4945 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 5369 "dotenv", 5370 "duckdb", 5371 "owo-colors", 5372 "reqwest", 5373 "serde", 5374 "serde_json", ··· 5942 checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" 5943 dependencies = [ 5944 "windows-sys 0.61.1", 5945 ] 5946 5947 [[package]]
-1
crates/feed/Cargo.toml
··· 41 async-trait = "0.1.89" 42 xid = "1.1.1" 43 sha256 = "1.6.0" 44 - r2d2 = "0.8.10"
··· 41 async-trait = "0.1.89" 42 xid = "1.1.1" 43 sha256 = "1.6.0"
-1
crates/feed/src/lib.rs
··· 16 pub mod feed; 17 pub mod feed_handler; 18 pub mod feeds; 19 - mod r2d2_duckdb; 20 pub mod repo; 21 pub mod subscriber; 22 pub mod sync;
··· 16 pub mod feed; 17 pub mod feed_handler; 18 pub mod feeds; 19 pub mod repo; 20 pub mod subscriber; 21 pub mod sync;
-46
crates/feed/src/r2d2_duckdb.rs
··· 1 - extern crate duckdb; 2 - extern crate r2d2; 3 - 4 - use duckdb::{params, Connection, Error}; 5 - use std::path::{Path, PathBuf}; 6 - 7 - enum ConnectionConfig { 8 - File(PathBuf), 9 - Memory, 10 - } 11 - 12 - /// An `r2d2::ManageConnection` for `ruDuckDB::Connection`s. 13 - pub struct DuckDBConnectionManager(ConnectionConfig); 14 - 15 - impl DuckDBConnectionManager { 16 - /// Creates a new `DuckDBConnectionManager` from file. 17 - /// 18 - pub fn file<P: AsRef<Path>>(path: P) -> Self { 19 - DuckDBConnectionManager(ConnectionConfig::File(path.as_ref().to_path_buf())) 20 - } 21 - 22 - pub fn memory() -> Self { 23 - DuckDBConnectionManager(ConnectionConfig::Memory) 24 - } 25 - } 26 - 27 - impl r2d2::ManageConnection for DuckDBConnectionManager { 28 - type Connection = Connection; 29 - type Error = duckdb::Error; 30 - 31 - fn connect(&self) -> Result<Connection, Error> { 32 - match self.0 { 33 - ConnectionConfig::File(ref path) => Connection::open(path), 34 - ConnectionConfig::Memory => Connection::open_in_memory(), 35 - } 36 - } 37 - 38 - fn is_valid(&self, conn: &mut Connection) -> Result<(), Error> { 39 - let _ = conn.execute("", params![]); 40 - Ok(()) 41 - } 42 - 43 - fn has_broken(&self, _: &mut Connection) -> bool { 44 - false 45 - } 46 - }
···
+3 -6
crates/feed/src/repo/duckdb/album.rs
··· 1 use std::sync::{Arc, Mutex}; 2 3 - use crate::r2d2_duckdb::DuckDBConnectionManager; 4 use crate::types::AlbumRecord; 5 use anyhow::Error; 6 use duckdb::params; 7 8 - pub async fn save_album( 9 - pool: r2d2::Pool<DuckDBConnectionManager>, 10 - mutex: Arc<Mutex<()>>, 11 uri: &str, 12 record: AlbumRecord, 13 ) -> Result<(), Error> { 14 - let _lock = mutex.lock().unwrap(); 15 let uri = uri.to_string(); 16 - let conn = pool.get()?; 17 18 let album_hash = sha256::digest(format!("{} - {}", record.title, record.artist).to_lowercase()); 19
··· 1 use std::sync::{Arc, Mutex}; 2 3 use crate::types::AlbumRecord; 4 use anyhow::Error; 5 use duckdb::params; 6 7 + pub fn save_album( 8 + conn: Arc<Mutex<duckdb::Connection>>, 9 uri: &str, 10 record: AlbumRecord, 11 ) -> Result<(), Error> { 12 let uri = uri.to_string(); 13 + let conn = conn.lock().unwrap(); 14 15 let album_hash = sha256::digest(format!("{} - {}", record.title, record.artist).to_lowercase()); 16
+4 -6
crates/feed/src/repo/duckdb/artist.rs
··· 1 use std::sync::{Arc, Mutex}; 2 3 - use crate::{r2d2_duckdb::DuckDBConnectionManager, types::ArtistRecord}; 4 use anyhow::Error; 5 use duckdb::params; 6 7 - pub async fn save_artist( 8 - pool: r2d2::Pool<DuckDBConnectionManager>, 9 - mutex: Arc<Mutex<()>>, 10 uri: &str, 11 record: ArtistRecord, 12 ) -> Result<(), Error> { 13 - let _lock = mutex.lock().unwrap(); 14 let uri = uri.to_string(); 15 - let conn = pool.get()?; 16 17 let artist_hash = sha256::digest(record.name.to_lowercase()); 18
··· 1 use std::sync::{Arc, Mutex}; 2 3 + use crate::types::ArtistRecord; 4 use anyhow::Error; 5 use duckdb::params; 6 7 + pub fn save_artist( 8 + conn: Arc<Mutex<duckdb::Connection>>, 9 uri: &str, 10 record: ArtistRecord, 11 ) -> Result<(), Error> { 12 + let conn = conn.lock().unwrap(); 13 let uri = uri.to_string(); 14 15 let artist_hash = sha256::digest(record.name.to_lowercase()); 16
+10 -17
crates/feed/src/repo/duckdb/mod.rs
··· 10 }; 11 12 use super::Repo; 13 - use crate::r2d2_duckdb::DuckDBConnectionManager; 14 use anyhow::Error; 15 use async_trait::async_trait; 16 use tokio::sync::mpsc::Sender; ··· 49 50 #[derive(Clone)] 51 pub struct DuckdbRepo { 52 - pool: r2d2::Pool<DuckDBConnectionManager>, 53 save_tx: Sender<SaveMessage>, 54 } 55 ··· 57 pub async fn new() -> Result<Self, Error> { 58 let (save_tx, mut save_rx) = tokio::sync::mpsc::channel::<SaveMessage>(100); 59 60 - let manager = DuckDBConnectionManager::file(DB_PATH); 61 - let pool = r2d2::Pool::builder().max_size(1).build(manager)?; 62 - 63 - let pool_clone = pool.clone(); 64 - 65 tokio::spawn(async move { 66 - let mutex = Arc::new(Mutex::new(())); 67 while let Some(msg) = save_rx.recv().await { 68 let result = match msg { 69 SaveMessage::Album { uri, record } => { 70 - save_album(pool_clone.clone(), mutex.clone(), &uri, record).await 71 } 72 SaveMessage::Artist { uri, record } => { 73 - save_artist(pool_clone.clone(), mutex.clone(), &uri, record).await 74 } 75 SaveMessage::Scrobble { did, uri, record } => { 76 - save_scrobble(pool_clone.clone(), mutex.clone(), &did, &uri, record).await 77 } 78 SaveMessage::Track { uri, record } => { 79 - save_track(pool_clone.clone(), mutex.clone(), &uri, record).await 80 - } 81 - SaveMessage::User { did } => { 82 - save_user(pool_clone.clone(), mutex.clone(), &did).await 83 } 84 }; 85 86 if let Err(e) = result { ··· 89 } 90 }); 91 92 - Ok(Self { pool, save_tx }) 93 } 94 } 95 ··· 192 } 193 194 async fn create_tables(self) -> Result<(), anyhow::Error> { 195 - let conn = self.pool.get()?; 196 conn.execute_batch( 197 "BEGIN; 198 CREATE TABLE IF NOT EXISTS artists (
··· 10 }; 11 12 use super::Repo; 13 use anyhow::Error; 14 use async_trait::async_trait; 15 use tokio::sync::mpsc::Sender; ··· 48 49 #[derive(Clone)] 50 pub struct DuckdbRepo { 51 + conn: Arc<Mutex<duckdb::Connection>>, 52 save_tx: Sender<SaveMessage>, 53 } 54 ··· 56 pub async fn new() -> Result<Self, Error> { 57 let (save_tx, mut save_rx) = tokio::sync::mpsc::channel::<SaveMessage>(100); 58 59 + let conn = Arc::new(Mutex::new(duckdb::Connection::open(DB_PATH)?)); 60 + let conn_clone = conn.clone(); 61 tokio::spawn(async move { 62 while let Some(msg) = save_rx.recv().await { 63 let result = match msg { 64 SaveMessage::Album { uri, record } => { 65 + save_album(conn_clone.clone(), &uri, record) 66 } 67 SaveMessage::Artist { uri, record } => { 68 + save_artist(conn_clone.clone(), &uri, record) 69 } 70 SaveMessage::Scrobble { did, uri, record } => { 71 + save_scrobble(conn_clone.clone(), &did, &uri, record) 72 } 73 SaveMessage::Track { uri, record } => { 74 + save_track(conn_clone.clone(), &uri, record) 75 } 76 + SaveMessage::User { did } => save_user(conn_clone.clone(), &did), 77 }; 78 79 if let Err(e) = result { ··· 82 } 83 }); 84 85 + Ok(Self { conn, save_tx }) 86 } 87 } 88 ··· 185 } 186 187 async fn create_tables(self) -> Result<(), anyhow::Error> { 188 + let conn = self.conn.lock().unwrap(); 189 conn.execute_batch( 190 "BEGIN; 191 CREATE TABLE IF NOT EXISTS artists (
+162 -170
crates/feed/src/repo/duckdb/scrobble.rs
··· 4 use duckdb::{params, OptionalExt}; 5 use std::sync::Mutex; 6 7 - use crate::{did::did_to_profile, r2d2_duckdb::DuckDBConnectionManager, types::ScrobbleRecord}; 8 9 - pub async fn save_scrobble( 10 - pool: r2d2::Pool<DuckDBConnectionManager>, 11 - mutex: Arc<Mutex<()>>, 12 did: &str, 13 uri: &str, 14 record: ScrobbleRecord, ··· 17 let cloned_did = did.clone(); 18 19 let uri = uri.to_string(); 20 21 - let handle = tokio::task::spawn_blocking(move || -> Result<(), Error> { 22 - tracing::info!("Inserting scrobble for user: {}, scrobble: {}", did, uri); 23 - let _lock = mutex.lock().unwrap(); 24 - let conn = pool.get()?; 25 - let mut user = conn.prepare("SELECT id FROM users WHERE did = ?")?; 26 - let user_id: Option<String> = user.query_row(params![did], |row| row.get(0)).optional()?; 27 28 - if user_id.is_none() { 29 - let rt = tokio::runtime::Runtime::new()?; 30 - let profile = rt.block_on(did_to_profile(&did))?; 31 32 - let avatar = profile.avatar.map(|blob| { 33 - format!( 34 - "https://cdn.bsky.app/img/avatar/plain/{}/{}@{}", 35 - did, 36 - blob.r#ref.link, 37 - blob.mime_type.split('/').last().unwrap_or("jpeg") 38 - ) 39 - }); 40 41 - conn.execute( 42 - "INSERT OR IGNORE INTO users ( 43 id, 44 display_name, 45 did, ··· 51 ?, 52 ?, 53 ?)", 54 - params![ 55 - xid::new().to_string(), 56 - profile.display_name.unwrap_or_default(), 57 - did, 58 - profile.handle.unwrap_or_default(), 59 - avatar, 60 - ], 61 - )?; 62 - } 63 64 - let album_hash = 65 - sha256::digest(format!("{} - {}", record.album, record.album_artist).to_lowercase()); 66 67 - match conn.execute( 68 - "INSERT OR IGNORE INTO albums ( 69 id, 70 title, 71 artist, ··· 84 ?, 85 ? 86 )", 87 - params![ 88 - xid::new().to_string(), 89 - record.album, 90 - record.album_artist, 91 - record.release_date, 92 - record.album_art_url, 93 - record.year, 94 - record.album_uri, 95 - album_hash, 96 - ], 97 - ) { 98 - Ok(x) => tracing::info!("Album inserted or already exists {}", x), 99 - Err(e) => tracing::error!(error = %e, "Error inserting album"), 100 - } 101 102 - let artist_hash = sha256::digest(record.album_artist.to_lowercase()); 103 - match conn.execute( 104 - &format!( 105 - "INSERT OR IGNORE INTO artists ( 106 id, 107 name, 108 sha256, ··· 117 ?, 118 [{}] 119 )", 120 - record 121 - .tags 122 - .as_ref() 123 - .map(|tags| tags 124 - .iter() 125 - .map(|tag| tag.replace('\'', "''")) 126 - .map(|tag| format!("'{}'", tag)) 127 - .collect::<Vec<_>>() 128 - .join(", ")) 129 - .unwrap_or_default() 130 - ), 131 - params![ 132 - xid::new().to_string(), 133 - record.album_artist, 134 - artist_hash, 135 - record.artist_picture, 136 - record.artist_uri 137 - ], 138 - ) { 139 - Ok(x) => tracing::info!("Artist inserted or already exists {}", x), 140 - Err(e) => tracing::error!(error = %e, "Error inserting artist"), 141 - } 142 143 - let track_hash = sha256::digest( 144 - format!("{} - {} - {}", record.title, record.artist, record.album).to_lowercase(), 145 - ); 146 - match conn.execute( 147 - "INSERT OR IGNORE INTO tracks ( 148 id, 149 title, 150 artist, ··· 187 ?, 188 ? 189 )", 190 - params![ 191 - xid::new().to_string(), 192 - record.title, 193 - record.artist, 194 - record.album_artist, 195 - record.album_art_url, 196 - record.album, 197 - record.track_number, 198 - record.disc_number, 199 - record.spotify_link, 200 - record.tidal_link, 201 - record.youtube_link, 202 - record.apple_music_link, 203 - record.copyright_message, 204 - record.label, 205 - record.lyrics, 206 - record.composer, 207 - record.duration, 208 - record.mbid, 209 - record.song_uri, 210 - track_hash, 211 - ], 212 - ) { 213 - Ok(x) => tracing::info!("Track inserted or already exists {}", x), 214 - Err(e) => tracing::error!(error = %e, "Error inserting track"), 215 - } 216 217 - match conn.execute( 218 - "INSERT OR IGNORE INTO album_tracks ( 219 id, 220 album_id, 221 track_id ··· 224 (SELECT id FROM albums WHERE sha256 = ?), 225 (SELECT id FROM tracks WHERE sha256 = ?), 226 )", 227 - params![xid::new().to_string(), album_hash, track_hash], 228 - ) { 229 - Ok(x) => tracing::info!("Album-Track relation inserted or already exists {}", x), 230 - Err(e) => tracing::error!(error = %e, "Error inserting album-track relation"), 231 - } 232 233 - match conn.execute( 234 - "INSERT OR IGNORE INTO user_artists ( 235 id, 236 user_id, 237 artist_id ··· 240 (SELECT id FROM users WHERE did = ?), 241 (SELECT id FROM artists WHERE sha256 = ?), 242 )", 243 - params![xid::new().to_string(), cloned_did, artist_hash], 244 - ) { 245 - Ok(x) => tracing::info!("User-Artist relation inserted or already exists {}", x), 246 - Err(e) => tracing::error!(error = %e, "Error inserting user-artist relation"), 247 - } 248 249 - match conn.execute( 250 - "INSERT OR IGNORE INTO user_albums ( 251 id, 252 user_id, 253 album_id ··· 256 (SELECT id FROM users WHERE did = ?), 257 (SELECT id FROM albums WHERE sha256 = ?), 258 )", 259 - params![xid::new().to_string(), cloned_did, album_hash], 260 - ) { 261 - Ok(x) => tracing::info!("User-Album relation inserted or already exists {}", x), 262 - Err(e) => tracing::error!(error = %e, "Error inserting user-album relation"), 263 - } 264 265 - match conn.execute( 266 - "INSERT OR IGNORE INTO user_tracks ( 267 id, 268 user_id, 269 track_id ··· 272 (SELECT id FROM users WHERE did = ?), 273 (SELECT id FROM tracks WHERE sha256 = ?), 274 )", 275 - params![xid::new().to_string(), cloned_did, track_hash], 276 - ) { 277 - Ok(x) => tracing::info!("User-Track relation inserted or already exists {}", x), 278 - Err(e) => tracing::error!(error = %e, "Error inserting user-track relation"), 279 - } 280 281 - match conn.execute( 282 - "INSERT OR IGNORE INTO artist_albums ( 283 id, 284 artist_id, 285 album_id ··· 288 (SELECT id FROM artists WHERE sha256 = ?), 289 (SELECT id FROM albums WHERE sha256 = ?), 290 )", 291 - params![xid::new().to_string(), artist_hash, album_hash], 292 - ) { 293 - Ok(x) => tracing::info!("Artist-Album relation inserted or already exists {}", x), 294 - Err(e) => tracing::error!(error = %e, "Error inserting artist-album relation"), 295 - } 296 297 - match conn.execute( 298 - "INSERT OR IGNORE INTO artist_tracks ( 299 id, 300 artist_id, 301 track_id ··· 304 (SELECT id FROM artists WHERE sha256 = ?), 305 (SELECT id FROM tracks WHERE sha256 = ?), 306 )", 307 - params![xid::new().to_string(), artist_hash, track_hash], 308 - ) { 309 - Ok(x) => tracing::info!("Artist-Track relation inserted or already exists {}", x), 310 - Err(e) => tracing::error!(error = %e, "Error inserting artist-track relation"), 311 - } 312 313 - match conn.execute( 314 - "INSERT OR IGNORE INTO scrobbles ( 315 id, 316 user_id, 317 track_id, ··· 328 ?, 329 ?, 330 )", 331 - params![ 332 - xid::new().to_string(), 333 - cloned_did, 334 - track_hash, 335 - album_hash, 336 - artist_hash, 337 - uri, 338 - record.created_at, 339 - ], 340 - ) { 341 - Ok(x) => tracing::info!("Scrobble inserted {}", x), 342 - Err(e) => tracing::error!(error = %e, "Error inserting scrobble"), 343 - } 344 - 345 - tracing::info!("Scrobble insertion process completed for user: {}", did); 346 347 - Ok::<(), Error>(()) 348 - }); 349 - 350 - handle.await??; 351 352 Ok(()) 353 }
··· 4 use duckdb::{params, OptionalExt}; 5 use std::sync::Mutex; 6 7 + use crate::{did::did_to_profile, types::ScrobbleRecord}; 8 9 + pub fn save_scrobble( 10 + conn: Arc<Mutex<duckdb::Connection>>, 11 did: &str, 12 uri: &str, 13 record: ScrobbleRecord, ··· 16 let cloned_did = did.clone(); 17 18 let uri = uri.to_string(); 19 + let conn = conn.lock().unwrap(); 20 21 + tracing::info!("Inserting scrobble for user: {}, scrobble: {}", did, uri); 22 + let mut user = conn.prepare("SELECT id FROM users WHERE did = ?")?; 23 + let user_id: Option<String> = user.query_row(params![did], |row| row.get(0)).optional()?; 24 25 + if user_id.is_none() { 26 + let rt = tokio::runtime::Runtime::new()?; 27 + let profile = rt.block_on(did_to_profile(&did))?; 28 29 + let avatar = profile.avatar.map(|blob| { 30 + format!( 31 + "https://cdn.bsky.app/img/avatar/plain/{}/{}@{}", 32 + did, 33 + blob.r#ref.link, 34 + blob.mime_type.split('/').last().unwrap_or("jpeg") 35 + ) 36 + }); 37 38 + conn.execute( 39 + "INSERT OR IGNORE INTO users ( 40 id, 41 display_name, 42 did, ··· 48 ?, 49 ?, 50 ?)", 51 + params![ 52 + xid::new().to_string(), 53 + profile.display_name.unwrap_or_default(), 54 + did, 55 + profile.handle.unwrap_or_default(), 56 + avatar, 57 + ], 58 + )?; 59 + } 60 61 + let album_hash = 62 + sha256::digest(format!("{} - {}", record.album, record.album_artist).to_lowercase()); 63 64 + match conn.execute( 65 + "INSERT OR IGNORE INTO albums ( 66 id, 67 title, 68 artist, ··· 81 ?, 82 ? 83 )", 84 + params![ 85 + xid::new().to_string(), 86 + record.album, 87 + record.album_artist, 88 + record.release_date, 89 + record.album_art_url, 90 + record.year, 91 + record.album_uri, 92 + album_hash, 93 + ], 94 + ) { 95 + Ok(x) => tracing::info!("Album inserted or already exists {}", x), 96 + Err(e) => tracing::error!(error = %e, "Error inserting album"), 97 + } 98 99 + let artist_hash = sha256::digest(record.album_artist.to_lowercase()); 100 + match conn.execute( 101 + &format!( 102 + "INSERT OR IGNORE INTO artists ( 103 id, 104 name, 105 sha256, ··· 114 ?, 115 [{}] 116 )", 117 + record 118 + .tags 119 + .as_ref() 120 + .map(|tags| tags 121 + .iter() 122 + .map(|tag| tag.replace('\'', "''")) 123 + .map(|tag| format!("'{}'", tag)) 124 + .collect::<Vec<_>>() 125 + .join(", ")) 126 + .unwrap_or_default() 127 + ), 128 + params![ 129 + xid::new().to_string(), 130 + record.album_artist, 131 + artist_hash, 132 + record.artist_picture, 133 + record.artist_uri 134 + ], 135 + ) { 136 + Ok(x) => tracing::info!("Artist inserted or already exists {}", x), 137 + Err(e) => tracing::error!(error = %e, "Error inserting artist"), 138 + } 139 140 + let track_hash = sha256::digest( 141 + format!("{} - {} - {}", record.title, record.artist, record.album).to_lowercase(), 142 + ); 143 + match conn.execute( 144 + "INSERT OR IGNORE INTO tracks ( 145 id, 146 title, 147 artist, ··· 184 ?, 185 ? 186 )", 187 + params![ 188 + xid::new().to_string(), 189 + record.title, 190 + record.artist, 191 + record.album_artist, 192 + record.album_art_url, 193 + record.album, 194 + record.track_number, 195 + record.disc_number, 196 + record.spotify_link, 197 + record.tidal_link, 198 + record.youtube_link, 199 + record.apple_music_link, 200 + record.copyright_message, 201 + record.label, 202 + record.lyrics, 203 + record.composer, 204 + record.duration, 205 + record.mbid, 206 + record.song_uri, 207 + track_hash, 208 + ], 209 + ) { 210 + Ok(x) => tracing::info!("Track inserted or already exists {}", x), 211 + Err(e) => tracing::error!(error = %e, "Error inserting track"), 212 + } 213 214 + match conn.execute( 215 + "INSERT OR IGNORE INTO album_tracks ( 216 id, 217 album_id, 218 track_id ··· 221 (SELECT id FROM albums WHERE sha256 = ?), 222 (SELECT id FROM tracks WHERE sha256 = ?), 223 )", 224 + params![xid::new().to_string(), album_hash, track_hash], 225 + ) { 226 + Ok(x) => tracing::info!("Album-Track relation inserted or already exists {}", x), 227 + Err(e) => tracing::error!(error = %e, "Error inserting album-track relation"), 228 + } 229 230 + match conn.execute( 231 + "INSERT OR IGNORE INTO user_artists ( 232 id, 233 user_id, 234 artist_id ··· 237 (SELECT id FROM users WHERE did = ?), 238 (SELECT id FROM artists WHERE sha256 = ?), 239 )", 240 + params![xid::new().to_string(), cloned_did, artist_hash], 241 + ) { 242 + Ok(x) => tracing::info!("User-Artist relation inserted or already exists {}", x), 243 + Err(e) => tracing::error!(error = %e, "Error inserting user-artist relation"), 244 + } 245 246 + match conn.execute( 247 + "INSERT OR IGNORE INTO user_albums ( 248 id, 249 user_id, 250 album_id ··· 253 (SELECT id FROM users WHERE did = ?), 254 (SELECT id FROM albums WHERE sha256 = ?), 255 )", 256 + params![xid::new().to_string(), cloned_did, album_hash], 257 + ) { 258 + Ok(x) => tracing::info!("User-Album relation inserted or already exists {}", x), 259 + Err(e) => tracing::error!(error = %e, "Error inserting user-album relation"), 260 + } 261 262 + match conn.execute( 263 + "INSERT OR IGNORE INTO user_tracks ( 264 id, 265 user_id, 266 track_id ··· 269 (SELECT id FROM users WHERE did = ?), 270 (SELECT id FROM tracks WHERE sha256 = ?), 271 )", 272 + params![xid::new().to_string(), cloned_did, track_hash], 273 + ) { 274 + Ok(x) => tracing::info!("User-Track relation inserted or already exists {}", x), 275 + Err(e) => tracing::error!(error = %e, "Error inserting user-track relation"), 276 + } 277 278 + match conn.execute( 279 + "INSERT OR IGNORE INTO artist_albums ( 280 id, 281 artist_id, 282 album_id ··· 285 (SELECT id FROM artists WHERE sha256 = ?), 286 (SELECT id FROM albums WHERE sha256 = ?), 287 )", 288 + params![xid::new().to_string(), artist_hash, album_hash], 289 + ) { 290 + Ok(x) => tracing::info!("Artist-Album relation inserted or already exists {}", x), 291 + Err(e) => tracing::error!(error = %e, "Error inserting artist-album relation"), 292 + } 293 294 + match conn.execute( 295 + "INSERT OR IGNORE INTO artist_tracks ( 296 id, 297 artist_id, 298 track_id ··· 301 (SELECT id FROM artists WHERE sha256 = ?), 302 (SELECT id FROM tracks WHERE sha256 = ?), 303 )", 304 + params![xid::new().to_string(), artist_hash, track_hash], 305 + ) { 306 + Ok(x) => tracing::info!("Artist-Track relation inserted or already exists {}", x), 307 + Err(e) => tracing::error!(error = %e, "Error inserting artist-track relation"), 308 + } 309 310 + match conn.execute( 311 + "INSERT OR IGNORE INTO scrobbles ( 312 id, 313 user_id, 314 track_id, ··· 325 ?, 326 ?, 327 )", 328 + params![ 329 + xid::new().to_string(), 330 + cloned_did, 331 + track_hash, 332 + album_hash, 333 + artist_hash, 334 + uri, 335 + record.created_at, 336 + ], 337 + ) { 338 + Ok(x) => tracing::info!("Scrobble inserted {}", x), 339 + Err(e) => tracing::error!(error = %e, "Error inserting scrobble"), 340 + } 341 342 + tracing::info!("Scrobble insertion process completed for user: {}", did); 343 344 Ok(()) 345 }
+4 -6
crates/feed/src/repo/duckdb/track.rs
··· 1 use std::sync::{Arc, Mutex}; 2 3 - use crate::{r2d2_duckdb::DuckDBConnectionManager, types::SongRecord}; 4 use anyhow::Error; 5 use duckdb::params; 6 7 - pub async fn save_track( 8 - pool: r2d2::Pool<DuckDBConnectionManager>, 9 - mutex: Arc<Mutex<()>>, 10 uri: &str, 11 record: SongRecord, 12 ) -> Result<(), Error> { 13 - let _lock = mutex.lock().unwrap(); 14 let uri = uri.to_string(); 15 - let conn = pool.get()?; 16 let track_hash = sha256::digest( 17 format!("{} - {} - {}", record.title, record.artist, record.album).to_lowercase(), 18 );
··· 1 use std::sync::{Arc, Mutex}; 2 3 + use crate::types::SongRecord; 4 use anyhow::Error; 5 use duckdb::params; 6 7 + pub fn save_track( 8 + conn: Arc<Mutex<duckdb::Connection>>, 9 uri: &str, 10 record: SongRecord, 11 ) -> Result<(), Error> { 12 let uri = uri.to_string(); 13 + let conn = conn.lock().unwrap(); 14 let track_hash = sha256::digest( 15 format!("{} - {} - {}", record.title, record.artist, record.album).to_lowercase(), 16 );
+1 -7
crates/feed/src/repo/duckdb/user.rs
··· 1 use std::sync::{Arc, Mutex}; 2 3 - use crate::r2d2_duckdb::DuckDBConnectionManager; 4 - 5 - pub async fn save_user( 6 - _pool: r2d2::Pool<DuckDBConnectionManager>, 7 - _mutex: Arc<Mutex<()>>, 8 - _did: &str, 9 - ) -> Result<(), anyhow::Error> { 10 todo!() 11 }
··· 1 use std::sync::{Arc, Mutex}; 2 3 + pub fn save_user(_conn: Arc<Mutex<duckdb::Connection>>, _did: &str) -> Result<(), anyhow::Error> { 4 todo!() 5 }