A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz

initialize jetstream

+966 -13
+79 -11
Cargo.lock
··· 353 353 354 354 [[package]] 355 355 name = "anyhow" 356 - version = "1.0.96" 356 + version = "1.0.98" 357 357 source = "registry+https://github.com/rust-lang/crates.io-index" 358 - checksum = "6b964d184e89d9b6b67dd2715bc8e74cf3107fb2b529990c90cf517326150bf4" 358 + checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" 359 359 360 360 [[package]] 361 361 name = "arc-swap" ··· 857 857 858 858 [[package]] 859 859 name = "chrono" 860 - version = "0.4.39" 860 + version = "0.4.40" 861 861 source = "registry+https://github.com/rust-lang/crates.io-index" 862 - checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" 862 + checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" 863 863 dependencies = [ 864 864 "android-tzdata", 865 865 "iana-time-zone", ··· 867 867 "num-traits", 868 868 "serde", 869 869 "wasm-bindgen", 870 - "windows-targets 0.52.6", 870 + "windows-link", 871 871 ] 872 872 873 873 [[package]] ··· 2135 2135 checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" 2136 2136 2137 2137 [[package]] 2138 + name = "jetstream" 2139 + version = "0.1.0" 2140 + dependencies = [ 2141 + "anyhow", 2142 + "async-nats", 2143 + "chrono", 2144 + "dotenv", 2145 + "futures-util", 2146 + "owo-colors", 2147 + "reqwest", 2148 + "serde", 2149 + "serde_json", 2150 + "sqlx", 2151 + "tokio", 2152 + "tokio-stream", 2153 + "tokio-tungstenite", 2154 + "tungstenite", 2155 + "url", 2156 + ] 2157 + 2158 + [[package]] 2138 2159 name = "jobserver" 2139 2160 version = "0.1.32" 2140 2161 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3970 3991 3971 3992 [[package]] 3972 3993 name = "serde" 3973 - version = "1.0.217" 3994 + version = "1.0.219" 3974 3995 source = "registry+https://github.com/rust-lang/crates.io-index" 3975 - checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" 3996 + checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" 3976 3997 dependencies = [ 3977 3998 "serde_derive", 3978 3999 ] 3979 4000 3980 4001 [[package]] 3981 4002 name = "serde_derive" 3982 - version = "1.0.217" 4003 + version = "1.0.219" 3983 4004 source = "registry+https://github.com/rust-lang/crates.io-index" 3984 - checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" 4005 + checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" 3985 4006 dependencies = [ 3986 4007 "proc-macro2", 3987 4008 "quote", ··· 3990 4011 3991 4012 [[package]] 3992 4013 name = "serde_json" 3993 - version = "1.0.139" 4014 + version = "1.0.140" 3994 4015 source = "registry+https://github.com/rust-lang/crates.io-index" 3995 - checksum = "44f86c3acccc9c65b153fe1b85a3be07fe5515274ec9f0653b4a0875731c72a6" 4016 + checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" 3996 4017 dependencies = [ 3997 4018 "itoa", 3998 4019 "memchr", ··· 4956 4977 ] 4957 4978 4958 4979 [[package]] 4980 + name = "tokio-tungstenite" 4981 + version = "0.26.2" 4982 + source = "registry+https://github.com/rust-lang/crates.io-index" 4983 + checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084" 4984 + dependencies = [ 4985 + "futures-util", 4986 + "log", 4987 + "rustls", 4988 + "rustls-pki-types", 4989 + "tokio", 4990 + "tokio-rustls", 4991 + "tungstenite", 4992 + "webpki-roots", 4993 + ] 4994 + 4995 + [[package]] 4959 4996 name = "tokio-util" 4960 4997 version = "0.7.13" 4961 4998 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 5083 5120 ] 5084 5121 5085 5122 [[package]] 5123 + name = "tungstenite" 5124 + version = "0.26.2" 5125 + source = "registry+https://github.com/rust-lang/crates.io-index" 5126 + checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" 5127 + dependencies = [ 5128 + "bytes", 5129 + "data-encoding", 5130 + "http 1.2.0", 5131 + "httparse", 5132 + "log", 5133 + "rand 0.9.0", 5134 + "rustls", 5135 + "rustls-pki-types", 5136 + "sha1", 5137 + "thiserror 2.0.11", 5138 + "utf-8", 5139 + ] 5140 + 5141 + [[package]] 5086 5142 name = "typenum" 5087 5143 version = "1.18.0" 5088 5144 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 5158 5214 "idna", 5159 5215 "percent-encoding", 5160 5216 ] 5217 + 5218 + [[package]] 5219 + name = "utf-8" 5220 + version = "0.7.6" 5221 + source = "registry+https://github.com/rust-lang/crates.io-index" 5222 + checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" 5161 5223 5162 5224 [[package]] 5163 5225 name = "utf16_iter" ··· 5425 5487 "quote", 5426 5488 "syn 2.0.98", 5427 5489 ] 5490 + 5491 + [[package]] 5492 + name = "windows-link" 5493 + version = "0.1.1" 5494 + source = "registry+https://github.com/rust-lang/crates.io-index" 5495 + checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" 5428 5496 5429 5497 [[package]] 5430 5498 name = "windows-registry"
+1
crates/jetstream/.gitignore
··· 1 + .env
+37
crates/jetstream/Cargo.toml
··· 1 + [package] 2 + name = "jetstream" 3 + version = "0.1.0" 4 + authors.workspace = true 5 + edition.workspace = true 6 + license.workspace = true 7 + repository.workspace = true 8 + 9 + [dependencies] 10 + anyhow = "1.0.98" 11 + chrono = { version = "0.4.40", features = ["serde"] } 12 + serde = { version = "1.0.219", features = ["derive"] } 13 + serde_json = "1.0.140" 14 + tokio = { version = "1.43.0", features = ["full"] } 15 + tungstenite = { version = "0.26.2", features = ["rustls"] } 16 + tokio-tungstenite = { version = "0.26.2", features = [ 17 + "tokio-rustls", 18 + "rustls-tls-webpki-roots", 19 + ] } 20 + url = "2.5.4" 21 + owo-colors = "4.1.0" 22 + dotenv = "0.15.0" 23 + tokio-stream = { version = "0.1.17", features = ["full"] } 24 + sqlx = { version = "0.8.3", features = [ 25 + "runtime-tokio", 26 + "tls-rustls", 27 + "postgres", 28 + "chrono", 29 + "derive", 30 + "macros", 31 + ] } 32 + async-nats = "0.39.0" 33 + futures-util = "0.3.31" 34 + reqwest = { version = "0.12.12", features = [ 35 + "rustls-tls", 36 + "json", 37 + ], default-features = false }
+201
crates/jetstream/LICENSE
··· 1 + Apache License 2 + Version 2.0, January 2004 3 + http://www.apache.org/licenses/ 4 + 5 + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 6 + 7 + 1. Definitions. 8 + 9 + "License" shall mean the terms and conditions for use, reproduction, 10 + and distribution as defined by Sections 1 through 9 of this document. 11 + 12 + "Licensor" shall mean the copyright owner or entity authorized by 13 + the copyright owner that is granting the License. 14 + 15 + "Legal Entity" shall mean the union of the acting entity and all 16 + other entities that control, are controlled by, or are under common 17 + control with that entity. For the purposes of this definition, 18 + "control" means (i) the power, direct or indirect, to cause the 19 + direction or management of such entity, whether by contract or 20 + otherwise, or (ii) ownership of fifty percent (50%) or more of the 21 + outstanding shares, or (iii) beneficial ownership of such entity. 22 + 23 + "You" (or "Your") shall mean an individual or Legal Entity 24 + exercising permissions granted by this License. 25 + 26 + "Source" form shall mean the preferred form for making modifications, 27 + including but not limited to software source code, documentation 28 + source, and configuration files. 29 + 30 + "Object" form shall mean any form resulting from mechanical 31 + transformation or translation of a Source form, including but 32 + not limited to compiled object code, generated documentation, 33 + and conversions to other media types. 34 + 35 + "Work" shall mean the work of authorship, whether in Source or 36 + Object form, made available under the License, as indicated by a 37 + copyright notice that is included in or attached to the work 38 + (an example is provided in the Appendix below). 39 + 40 + "Derivative Works" shall mean any work, whether in Source or Object 41 + form, that is based on (or derived from) the Work and for which the 42 + editorial revisions, annotations, elaborations, or other modifications 43 + represent, as a whole, an original work of authorship. For the purposes 44 + of this License, Derivative Works shall not include works that remain 45 + separable from, or merely link (or bind by name) to the interfaces of, 46 + the Work and Derivative Works thereof. 47 + 48 + "Contribution" shall mean any work of authorship, including 49 + the original version of the Work and any modifications or additions 50 + to that Work or Derivative Works thereof, that is intentionally 51 + submitted to Licensor for inclusion in the Work by the copyright owner 52 + or by an individual or Legal Entity authorized to submit on behalf of 53 + the copyright owner. For the purposes of this definition, "submitted" 54 + means any form of electronic, verbal, or written communication sent 55 + to the Licensor or its representatives, including but not limited to 56 + communication on electronic mailing lists, source code control systems, 57 + and issue tracking systems that are managed by, or on behalf of, the 58 + Licensor for the purpose of discussing and improving the Work, but 59 + excluding communication that is conspicuously marked or otherwise 60 + designated in writing by the copyright owner as "Not a Contribution." 61 + 62 + "Contributor" shall mean Licensor and any individual or Legal Entity 63 + on behalf of whom a Contribution has been received by Licensor and 64 + subsequently incorporated within the Work. 65 + 66 + 2. Grant of Copyright License. Subject to the terms and conditions of 67 + this License, each Contributor hereby grants to You a perpetual, 68 + worldwide, non-exclusive, no-charge, royalty-free, irrevocable 69 + copyright license to reproduce, prepare Derivative Works of, 70 + publicly display, publicly perform, sublicense, and distribute the 71 + Work and such Derivative Works in Source or Object form. 72 + 73 + 3. Grant of Patent License. Subject to the terms and conditions of 74 + this License, each Contributor hereby grants to You a perpetual, 75 + worldwide, non-exclusive, no-charge, royalty-free, irrevocable 76 + (except as stated in this section) patent license to make, have made, 77 + use, offer to sell, sell, import, and otherwise transfer the Work, 78 + where such license applies only to those patent claims licensable 79 + by such Contributor that are necessarily infringed by their 80 + Contribution(s) alone or by combination of their Contribution(s) 81 + with the Work to which such Contribution(s) was submitted. If You 82 + institute patent litigation against any entity (including a 83 + cross-claim or counterclaim in a lawsuit) alleging that the Work 84 + or a Contribution incorporated within the Work constitutes direct 85 + or contributory patent infringement, then any patent licenses 86 + granted to You under this License for that Work shall terminate 87 + as of the date such litigation is filed. 88 + 89 + 4. Redistribution. You may reproduce and distribute copies of the 90 + Work or Derivative Works thereof in any medium, with or without 91 + modifications, and in Source or Object form, provided that You 92 + meet the following conditions: 93 + 94 + (a) You must give any other recipients of the Work or 95 + Derivative Works a copy of this License; and 96 + 97 + (b) You must cause any modified files to carry prominent notices 98 + stating that You changed the files; and 99 + 100 + (c) You must retain, in the Source form of any Derivative Works 101 + that You distribute, all copyright, patent, trademark, and 102 + attribution notices from the Source form of the Work, 103 + excluding those notices that do not pertain to any part of 104 + the Derivative Works; and 105 + 106 + (d) If the Work includes a "NOTICE" text file as part of its 107 + distribution, then any Derivative Works that You distribute must 108 + include a readable copy of the attribution notices contained 109 + within such NOTICE file, excluding those notices that do not 110 + pertain to any part of the Derivative Works, in at least one 111 + of the following places: within a NOTICE text file distributed 112 + as part of the Derivative Works; within the Source form or 113 + documentation, if provided along with the Derivative Works; or, 114 + within a display generated by the Derivative Works, if and 115 + wherever such third-party notices normally appear. The contents 116 + of the NOTICE file are for informational purposes only and 117 + do not modify the License. You may add Your own attribution 118 + notices within Derivative Works that You distribute, alongside 119 + or as an addendum to the NOTICE text from the Work, provided 120 + that such additional attribution notices cannot be construed 121 + as modifying the License. 122 + 123 + You may add Your own copyright statement to Your modifications and 124 + may provide additional or different license terms and conditions 125 + for use, reproduction, or distribution of Your modifications, or 126 + for any such Derivative Works as a whole, provided Your use, 127 + reproduction, and distribution of the Work otherwise complies with 128 + the conditions stated in this License. 129 + 130 + 5. Submission of Contributions. Unless You explicitly state otherwise, 131 + any Contribution intentionally submitted for inclusion in the Work 132 + by You to the Licensor shall be under the terms and conditions of 133 + this License, without any additional terms or conditions. 134 + Notwithstanding the above, nothing herein shall supersede or modify 135 + the terms of any separate license agreement you may have executed 136 + with Licensor regarding such Contributions. 137 + 138 + 6. Trademarks. This License does not grant permission to use the trade 139 + names, trademarks, service marks, or product names of the Licensor, 140 + except as required for reasonable and customary use in describing the 141 + origin of the Work and reproducing the content of the NOTICE file. 142 + 143 + 7. Disclaimer of Warranty. Unless required by applicable law or 144 + agreed to in writing, Licensor provides the Work (and each 145 + Contributor provides its Contributions) on an "AS IS" BASIS, 146 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 147 + implied, including, without limitation, any warranties or conditions 148 + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A 149 + PARTICULAR PURPOSE. You are solely responsible for determining the 150 + appropriateness of using or redistributing the Work and assume any 151 + risks associated with Your exercise of permissions under this License. 152 + 153 + 8. Limitation of Liability. In no event and under no legal theory, 154 + whether in tort (including negligence), contract, or otherwise, 155 + unless required by applicable law (such as deliberate and grossly 156 + negligent acts) or agreed to in writing, shall any Contributor be 157 + liable to You for damages, including any direct, indirect, special, 158 + incidental, or consequential damages of any character arising as a 159 + result of this License or out of the use or inability to use the 160 + Work (including but not limited to damages for loss of goodwill, 161 + work stoppage, computer failure or malfunction, or any and all 162 + other commercial damages or losses), even if such Contributor 163 + has been advised of the possibility of such damages. 164 + 165 + 9. Accepting Warranty or Additional Liability. While redistributing 166 + the Work or Derivative Works thereof, You may choose to offer, 167 + and charge a fee for, acceptance of support, warranty, indemnity, 168 + or other liability obligations and/or rights consistent with this 169 + License. However, in accepting such obligations, You may act only 170 + on Your own behalf and on Your sole responsibility, not on behalf 171 + of any other Contributor, and only if You agree to indemnify, 172 + defend, and hold each Contributor harmless for any liability 173 + incurred by, or claims asserted against, such Contributor by reason 174 + of your accepting any such warranty or additional liability. 175 + 176 + END OF TERMS AND CONDITIONS 177 + 178 + APPENDIX: How to apply the Apache License to your work. 179 + 180 + To apply the Apache License to your work, attach the following 181 + boilerplate notice, with the fields enclosed by brackets "[]" 182 + replaced with your own identifying information. (Don't include 183 + the brackets!) The text should be enclosed in the appropriate 184 + comment syntax for the file format. We also recommend that a 185 + file or class name and description of purpose be included on the 186 + same "printed page" as the copyright notice for easier 187 + identification within third-party archives. 188 + 189 + Copyright 2025 Tsiry Sandratraina 190 + 191 + Licensed under the Apache License, Version 2.0 (the "License"); 192 + you may not use this file except in compliance with the License. 193 + You may obtain a copy of the License at 194 + 195 + http://www.apache.org/licenses/LICENSE-2.0 196 + 197 + Unless required by applicable law or agreed to in writing, software 198 + distributed under the License is distributed on an "AS IS" BASIS, 199 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 200 + See the License for the specific language governing permissions and 201 + limitations under the License.
+20
crates/jetstream/src/main.rs
··· 1 + use std::env; 2 + 3 + use subscriber::{ScrobbleSubscriber, SCROBBLE_NSID}; 4 + use dotenv::dotenv; 5 + 6 + pub mod subscriber; 7 + pub mod types; 8 + pub mod xata; 9 + pub mod repo; 10 + pub mod profile; 11 + 12 + #[tokio::main] 13 + async fn main() -> Result<(), anyhow::Error> { 14 + dotenv()?; 15 + let jetstream_server = env::var("JETSTREAM_SERVER").unwrap_or_else(|_| "wss://jetstream2.us-east.bsky.network".to_string()); 16 + let url = format!("{}/subscribe?wantedCollections={}", jetstream_server, SCROBBLE_NSID); 17 + let subscriber = ScrobbleSubscriber::new(&url); 18 + subscriber.run().await?; 19 + Ok(()) 20 + }
+54
crates/jetstream/src/profile.rs
··· 1 + use anyhow::Error; 2 + use tokio::io::split; 3 + 4 + use crate::types::{Profile, ProfileResponse}; 5 + 6 + pub async fn did_to_profile(did: &str) -> Result<Profile, Error> { 7 + let client = reqwest::Client::new(); 8 + let response = client.get(format!("https://plc.directory/{}", did)) 9 + .header("Accept", "application/json") 10 + .send() 11 + .await? 12 + .json::<serde_json::Value>() 13 + .await?; 14 + 15 + let handle = response["alsoKnownAs"][0].as_str() 16 + .unwrap_or("") 17 + .split("at://") 18 + .last() 19 + .unwrap_or(""); 20 + 21 + let service_endpoint = response["service"][0]["serviceEndpoint"].as_str().unwrap_or(""); 22 + 23 + if service_endpoint.is_empty() { 24 + return Err(Error::msg("Invalid did")); 25 + } 26 + 27 + let client = reqwest::Client::new(); 28 + let mut response = client.get(format!("{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=app.bsky.actor.profile&rkey=self", service_endpoint, did)) 29 + .header("Accept", "application/json") 30 + .send() 31 + .await? 32 + .json::<ProfileResponse>() 33 + .await?; 34 + 35 + response.value.handle = Some(handle.to_string()); 36 + Ok(response.value) 37 + } 38 + 39 + #[cfg(test)] 40 + mod tests { 41 + use super::*; 42 + use anyhow::Result; 43 + 44 + #[tokio::test] 45 + async fn test_did_to_profile() -> Result<()> { 46 + let did = "did:plc:7vdlgi2bflelz7mmuxoqjfcr"; 47 + let profile = did_to_profile(did).await?; 48 + 49 + assert_eq!(profile.r#type, "app.bsky.actor.profile"); 50 + assert!(profile.display_name.map(|s| s.starts_with("Tsiry Sandratraina")).unwrap_or(false)); 51 + assert!(profile.handle.map(|s| s == "tsiry-sandratraina.com").unwrap_or(false)); 52 + Ok(()) 53 + } 54 + }
+146
crates/jetstream/src/repo.rs
··· 1 + use anyhow::Error; 2 + use sqlx::{Pool, Postgres}; 3 + 4 + use crate::{profile::did_to_profile, subscriber::{ALBUM_NSID, ARTIST_NSID, LIKE_NSID, PLAYLIST_NSID, SCROBBLE_NSID, SHOUT_NSID, SONG_NSID}, types::{Commit, ScrobbleRecord}, xata::user::User}; 5 + 6 + pub async fn save_scrobble(pool: &Pool<Postgres>, did: &str, commit: Commit) -> Result<(), Error> { 7 + // skip unknown collection 8 + if !vec![ 9 + SCROBBLE_NSID, 10 + ARTIST_NSID, 11 + ALBUM_NSID, 12 + SONG_NSID, 13 + PLAYLIST_NSID, 14 + LIKE_NSID, 15 + SHOUT_NSID, 16 + ].contains(&commit.collection.as_str()) { 17 + return Ok(()); 18 + } 19 + 20 + match commit.operation.as_str() { 21 + "create" => { 22 + if commit.collection == SCROBBLE_NSID { 23 + let mut tx = pool.begin().await?; 24 + let scrobble_record: ScrobbleRecord = serde_json::from_value(commit.record)?; 25 + 26 + let user_id = save_user(&mut tx, did).await?; 27 + 28 + let album_id = save_album(&mut tx, &user_id, scrobble_record.clone()).await?; 29 + let artist_id = save_artist(&mut tx, &user_id, scrobble_record.clone()).await?; 30 + let track_id = save_track(&mut tx, &user_id, scrobble_record.clone()).await?; 31 + let uri = format!("at://{}/app.rocksky.scrobble/{}", did, commit.rkey); 32 + 33 + sqlx::query(r#" 34 + INSERT INTO scrobbles ( 35 + album_id, 36 + artist_id, 37 + track_id, 38 + uri, 39 + user_id, 40 + ) VALUES ($1, $2, $3, $4, $5) 41 + "#) 42 + .bind(album_id) 43 + .bind(artist_id) 44 + .bind(track_id) 45 + .bind(uri) 46 + .bind(user_id) 47 + .execute(&mut *tx).await?; 48 + 49 + tx.commit().await?; 50 + 51 + /* 52 + sqlx::query(r#" 53 + INSERT INTO tracks ( 54 + track_number, 55 + disc_number, 56 + title, 57 + artist, 58 + album_artist, 59 + album, 60 + duration, 61 + release_date, 62 + year, 63 + genre, 64 + tags, 65 + composer, 66 + lyrics, 67 + copyright_message, 68 + wiki, 69 + album_art, 70 + youtube_link, 71 + spotify_link, 72 + tidal_link, 73 + apple_music_link, 74 + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20) 75 + "#) 76 + .bind(scrobble_record.track_number) 77 + .bind(scrobble_record.disc_number) 78 + .bind(scrobble_record.title) 79 + .bind(scrobble_record.artist) 80 + .bind(scrobble_record.album_artist) 81 + .bind(scrobble_record.album) 82 + .bind(scrobble_record.duration) 83 + .bind(scrobble_record.release_date) 84 + .bind(scrobble_record.year) 85 + .bind(scrobble_record.genre) 86 + .bind(scrobble_record.tags) 87 + .bind(scrobble_record.composer) 88 + .bind(scrobble_record.lyrics) 89 + .bind(scrobble_record.copyright_message) 90 + .bind(scrobble_record.wiki) 91 + .bind(scrobble_record.album_art.map(|x| format!("https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}", did, x.r#ref.link, x.mime_type.split('/').last().unwrap_or("jpeg")))) 92 + .bind(scrobble_record.youtube_link) 93 + .bind(scrobble_record.spotify_link) 94 + .bind(scrobble_record.tidal_link) 95 + .bind(scrobble_record.apple_music_link) 96 + .execute(pool).await?; 97 + */ 98 + } 99 + }, 100 + _ => { 101 + println!("Unsupported operation: {}", commit.operation); 102 + } 103 + } 104 + Ok(()) 105 + } 106 + 107 + 108 + pub async fn save_user(tx: &mut sqlx::Transaction<'_, Postgres>, did: &str) -> Result<String, Error> { 109 + let profile = did_to_profile(did).await?; 110 + 111 + // Check if the user exists in the database 112 + let mut users: Vec<User> = sqlx::query_as("SELECT id FROM users WHERE did = $1") 113 + .bind(did) 114 + .fetch_all(&mut **tx) 115 + .await?; 116 + 117 + // If the user does not exist, create a new user 118 + if users.is_empty() { 119 + let avatar = profile.avatar.map(|blob| format!("https://cdn.bsky.app/img/avatar/plain/{}/{}@{}", did, blob.r#ref.link, blob.mime_type.split('/').last().unwrap_or("jpeg"))); 120 + sqlx::query("INSERT INTO users (display_name, did, handle, avatar) VALUES ($1, $2, $3, $4)") 121 + .bind(profile.display_name) 122 + .bind(did) 123 + .bind(profile.handle) 124 + .bind(avatar) 125 + .execute(&mut **tx).await?; 126 + 127 + users = sqlx::query_as("SELECT id FROM users WHERE did = $1") 128 + .bind(did) 129 + .fetch_all(&mut **tx) 130 + .await?; 131 + } 132 + 133 + Ok(users[0].xata_id.clone()) 134 + } 135 + 136 + pub async fn save_track(tx: &mut sqlx::Transaction<'_, Postgres>, user_id: &str, scrobble_record: ScrobbleRecord) -> Result<String, Error> { 137 + todo!() 138 + } 139 + 140 + pub async fn save_album(tx: &mut sqlx::Transaction<'_, Postgres>, user_id: &str, scrobble_record: ScrobbleRecord) -> Result<String, Error> { 141 + todo!() 142 + } 143 + 144 + pub async fn save_artist(tx: &mut sqlx::Transaction<'_, Postgres>, user_id: &str, scrobble_record: ScrobbleRecord) -> Result<String, Error> { 145 + todo!() 146 + }
+110
crates/jetstream/src/subscriber.rs
··· 1 + use std::env; 2 + 3 + use anyhow::{Error, Context}; 4 + use futures_util::StreamExt; 5 + use owo_colors::OwoColorize; 6 + use sqlx::postgres::PgPoolOptions; 7 + use tokio_tungstenite::{connect_async, tungstenite::Message}; 8 + use tokio::sync::mpsc; 9 + 10 + use crate::{repo::save_scrobble, types::{Commit, Root}}; 11 + 12 + pub const SCROBBLE_NSID: &str = "app.rocksky.scrobble"; 13 + pub const ARTIST_NSID: &str = "app.rocksky.artist"; 14 + pub const ALBUM_NSID: &str = "app.rocksky.album"; 15 + pub const SONG_NSID: &str = "app.rocksky.song"; 16 + pub const PLAYLIST_NSID: &str = "app.rocksky.playlist"; 17 + pub const LIKE_NSID: &str = "app.rocksky.like"; 18 + pub const SHOUT_NSID: &str = "app.rocksky.shout"; 19 + 20 + 21 + pub struct ScrobbleSubscriber { 22 + pub service_url: String, 23 + } 24 + 25 + impl ScrobbleSubscriber { 26 + pub fn new(service: &str) -> Self { 27 + Self { 28 + service_url: service.to_string(), 29 + } 30 + } 31 + 32 + pub async fn run(&self) -> Result<(), Error> { 33 + // Get the connection string outside of the task 34 + let db_url = env::var("XATA_POSTGRES_URL") 35 + .context("Failed to get XATA_POSTGRES_URL environment variable")?; 36 + 37 + let (tx, rx) = mpsc::channel::<(String, Commit)>(100); 38 + 39 + let tx_clone = tx.clone(); 40 + 41 + // Start the processor task 42 + let processor = tokio::spawn(async move { 43 + let pool = PgPoolOptions::new().max_connections(5) 44 + .connect(&db_url).await?; 45 + 46 + process_scrobble_events(rx, &pool).await 47 + }); 48 + 49 + let (mut ws_stream, _) = connect_async(&self.service_url).await?; 50 + println!("Connected to jetstream at {}", self.service_url.bright_green()); 51 + 52 + while let Some(msg) = ws_stream.next().await { 53 + match msg { 54 + Ok(msg) => { 55 + if let Err(e) = self.handle_message(msg, &tx_clone).await { 56 + eprintln!("Error handling message: {}", e); 57 + } 58 + } 59 + Err(e) => { 60 + eprintln!("WebSocket error: {}", e); 61 + break; 62 + } 63 + } 64 + } 65 + 66 + drop(tx); 67 + 68 + // Wait for the processor task to complete 69 + match processor.await { 70 + Ok(result) => { 71 + if let Err(e) = result { 72 + eprintln!("Processor task had an error: {}", e); 73 + } 74 + } 75 + Err(e) => { 76 + eprintln!("Processor task panicked: {}", e); 77 + } 78 + } 79 + 80 + Ok(()) 81 + } 82 + 83 + async fn handle_message( 84 + &self, 85 + msg: Message, 86 + tx: &mpsc::Sender<(String, Commit)>, 87 + ) -> Result<(), Error> { 88 + if let Message::Text(text) = msg { 89 + let message: Root = serde_json::from_str(&text)?; 90 + println!("Received message: {:#?}", message); 91 + if let Some(commit) = message.commit { 92 + tx.send((message.did, commit)).await.map_err(|e| { 93 + Error::msg(format!("Failed to send message to channel: {}", e)) 94 + })?; 95 + } 96 + } 97 + 98 + Ok(()) 99 + } 100 + } 101 + 102 + async fn process_scrobble_events( 103 + mut rx: mpsc::Receiver<(String, Commit)>, 104 + pool: &sqlx::Pool<sqlx::Postgres>, 105 + ) -> Result<(), Error> { 106 + while let Some((did, record)) = rx.recv().await { 107 + save_scrobble(pool, &did, record).await?; 108 + } 109 + Ok(()) 110 + }
+124
crates/jetstream/src/types.rs
··· 1 + use serde::Deserialize; 2 + use serde_json::Value; 3 + 4 + #[derive(Debug, Deserialize)] 5 + pub struct Root { 6 + pub did: String, 7 + pub time_us: i64, 8 + pub kind: String, 9 + pub commit: Option<Commit>, 10 + } 11 + 12 + #[derive(Debug, Deserialize)] 13 + pub struct Commit { 14 + pub rev: String, 15 + pub operation: String, 16 + pub collection: String, 17 + pub rkey: String, 18 + pub record: Value, 19 + pub cid: String, 20 + } 21 + 22 + #[derive(Debug, Deserialize, Clone)] 23 + #[serde(rename_all = "camelCase")] 24 + pub struct AlbumArt { 25 + #[serde(rename = "$type")] 26 + pub r#type: String, 27 + pub r#ref: Ref, 28 + pub mime_type: String, 29 + pub size: i32, 30 + } 31 + 32 + #[derive(Debug, Deserialize, Clone)] 33 + pub struct Ref { 34 + #[serde(rename = "$link")] 35 + pub link: String, 36 + } 37 + 38 + #[derive(Debug, Deserialize, Clone)] 39 + #[serde(rename_all = "camelCase")] 40 + pub struct ScrobbleRecord { 41 + #[serde(skip_serializing_if = "Option::is_none")] 42 + pub track_number: Option<i32>, 43 + #[serde(skip_serializing_if = "Option::is_none")] 44 + pub disc_number: Option<i32>, 45 + pub title: String, 46 + pub artist: String, 47 + pub album_artist: String, 48 + pub album: String, 49 + pub duration: i32, 50 + #[serde(skip_serializing_if = "Option::is_none")] 51 + pub release_date: Option<String>, 52 + #[serde(skip_serializing_if = "Option::is_none")] 53 + pub year: Option<i32>, 54 + #[serde(skip_serializing_if = "Option::is_none")] 55 + pub genre: Option<String>, 56 + #[serde(skip_serializing_if = "Option::is_none")] 57 + pub tags: Option<Vec<String>>, 58 + #[serde(skip_serializing_if = "Option::is_none")] 59 + pub composer: Option<String>, 60 + #[serde(skip_serializing_if = "Option::is_none")] 61 + pub lyrics: Option<String>, 62 + #[serde(skip_serializing_if = "Option::is_none")] 63 + pub copyright_message: Option<String>, 64 + #[serde(skip_serializing_if = "Option::is_none")] 65 + pub wiki: Option<String>, 66 + #[serde(skip_serializing_if = "Option::is_none")] 67 + pub album_art: Option<AlbumArt>, 68 + #[serde(skip_serializing_if = "Option::is_none")] 69 + pub youtube_link: Option<String>, 70 + #[serde(skip_serializing_if = "Option::is_none")] 71 + pub spotify_link: Option<String>, 72 + #[serde(skip_serializing_if = "Option::is_none")] 73 + pub tidal_link: Option<String>, 74 + #[serde(skip_serializing_if = "Option::is_none")] 75 + pub apple_music_link: Option<String>, 76 + pub created_at: String, 77 + } 78 + 79 + #[derive(Debug, Deserialize)] 80 + pub struct ProfileResponse { 81 + pub uri: String, 82 + pub cid: String, 83 + pub value: Profile, 84 + } 85 + 86 + #[derive(Debug, Deserialize)] 87 + #[serde(rename_all = "camelCase")] 88 + pub struct Profile { 89 + #[serde(rename = "$type")] 90 + pub r#type: String, 91 + pub avatar: Option<Blob>, 92 + pub banner: Option<Blob>, 93 + pub created_at: String, 94 + #[serde(rename = "pinnedPost")] 95 + pub pinned_post: Option<PinnedPost>, 96 + pub description: Option<String>, 97 + #[serde(rename = "displayName")] 98 + pub display_name: Option<String>, 99 + pub handle: Option<String>, 100 + } 101 + 102 + #[derive(Debug, Deserialize)] 103 + #[serde(rename_all = "camelCase")] 104 + pub struct Blob { 105 + #[serde(rename = "$type")] 106 + pub r#type: String, 107 + #[serde(rename = "ref")] 108 + pub r#ref: BlobRef, 109 + #[serde(rename = "mimeType")] 110 + pub mime_type: String, 111 + pub size: u64, 112 + } 113 + 114 + #[derive(Debug, Deserialize)] 115 + pub struct BlobRef { 116 + #[serde(rename = "$link")] 117 + pub link: String, 118 + } 119 + 120 + #[derive(Debug, Deserialize)] 121 + pub struct PinnedPost { 122 + pub cid: String, 123 + pub uri: String, 124 + }
+21
crates/jetstream/src/xata/album.rs
··· 1 + use chrono::{DateTime, Utc}; 2 + use serde::Deserialize; 3 + 4 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 5 + pub struct Album { 6 + pub xata_id: String, 7 + pub title: String, 8 + pub artist: String, 9 + pub release_date: Option<String>, 10 + pub album_art: Option<String>, 11 + pub year: Option<i32>, 12 + pub spotify_link: Option<String>, 13 + pub tidal_link: Option<String>, 14 + pub youtube_link: Option<String>, 15 + pub apple_music_link: Option<String>, 16 + pub sha256: String, 17 + pub uri: Option<String>, 18 + pub artist_uri: Option<String>, 19 + #[serde(with = "chrono::serde::ts_seconds")] 20 + pub xata_createdat: DateTime<Utc>, 21 + }
+8
crates/jetstream/src/xata/album_track.rs
··· 1 + use serde::Deserialize; 2 + 3 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 4 + pub struct AlbumTrack { 5 + pub xata_id: String, 6 + pub album_id: String, 7 + pub track_id: String, 8 + }
+23
crates/jetstream/src/xata/artist.rs
··· 1 + use chrono::{DateTime, Utc}; 2 + use serde::Deserialize; 3 + 4 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 5 + pub struct Artist { 6 + pub xata_id: String, 7 + pub name: String, 8 + pub biography: Option<String>, 9 + #[serde(with = "chrono::serde::ts_seconds_option")] 10 + pub born: Option<DateTime<Utc>>, 11 + pub born_in: Option<String>, 12 + #[serde(with = "chrono::serde::ts_seconds_option")] 13 + pub died: Option<DateTime<Utc>>, 14 + pub picture: Option<String>, 15 + pub sha256: String, 16 + pub spotify_link: Option<String>, 17 + pub tidal_link: Option<String>, 18 + pub youtube_link: Option<String>, 19 + pub apple_music_link: Option<String>, 20 + pub uri: Option<String>, 21 + #[serde(with = "chrono::serde::ts_seconds")] 22 + pub xata_createdat: DateTime<Utc>, 23 + }
+10
crates/jetstream/src/xata/artist_album.rs
··· 1 + use serde::Deserialize; 2 + 3 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 4 + pub struct ArtistAlbum { 5 + pub xata_id: String, 6 + pub artist_id: String, 7 + pub album_id: String, 8 + #[serde(with = "chrono::serde::ts_seconds")] 9 + pub xata_createdat: chrono::DateTime<chrono::Utc>, 10 + }
+10
crates/jetstream/src/xata/artist_track.rs
··· 1 + use serde::Deserialize; 2 + 3 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 4 + pub struct ArtistTrack { 5 + pub xata_id: String, 6 + pub artist_id: String, 7 + pub track_id: String, 8 + #[serde(with = "chrono::serde::ts_seconds")] 9 + pub xata_createdat: chrono::DateTime<chrono::Utc>, 10 + }
+10
crates/jetstream/src/xata/loved_track.rs
··· 1 + use serde::Deserialize; 2 + 3 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 4 + pub struct LovedTrack { 5 + pub xata_id: String, 6 + pub user_id: String, 7 + pub track_id: String, 8 + #[serde(with = "chrono::serde::ts_seconds")] 9 + pub xata_createdat: chrono::DateTime<chrono::Utc>, 10 + }
+12
crates/jetstream/src/xata/mod.rs
··· 1 + pub mod album; 2 + pub mod album_track; 3 + pub mod artist; 4 + pub mod artist_album; 5 + pub mod artist_track; 6 + pub mod loved_track; 7 + pub mod scrobble; 8 + pub mod track; 9 + pub mod user; 10 + pub mod user_album; 11 + pub mod user_artist; 12 + pub mod user_track;
+16
crates/jetstream/src/xata/scrobble.rs
··· 1 + use chrono::{DateTime, Utc}; 2 + use serde::Deserialize; 3 + 4 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 5 + pub struct Scrobble { 6 + pub xata_id: String, 7 + pub user_id: String, 8 + pub track_id: String, 9 + pub album_id: Option<String>, 10 + pub artist_id: Option<String>, 11 + pub uri: Option<String>, 12 + #[serde(with = "chrono::serde::ts_seconds")] 13 + pub xata_createdat: DateTime<Utc>, 14 + #[serde(with = "chrono::serde::ts_seconds")] 15 + pub timestamp: DateTime<Utc>, 16 + }
+31
crates/jetstream/src/xata/track.rs
··· 1 + use chrono::{DateTime, Utc}; 2 + use serde::{Deserialize, Serialize}; 3 + 4 + #[derive(Debug, sqlx::FromRow, Serialize, Deserialize, Clone)] 5 + pub struct Track { 6 + pub xata_id: String, 7 + pub title: String, 8 + pub artist: String, 9 + pub album_artist: String, 10 + pub album_art: Option<String>, 11 + pub album: String, 12 + pub track_number: i32, 13 + pub duration: i32, 14 + pub mb_id: Option<String>, 15 + pub youtube_link: Option<String>, 16 + pub spotify_link: Option<String>, 17 + pub tidal_link: Option<String>, 18 + pub apple_music_link: Option<String>, 19 + pub sha256: String, 20 + pub lyrics: Option<String>, 21 + pub composer: Option<String>, 22 + pub genre: Option<String>, 23 + pub disc_number: i32, 24 + pub copyright_message: Option<String>, 25 + pub label: Option<String>, 26 + pub uri: Option<String>, 27 + pub artist_uri: Option<String>, 28 + pub album_uri: Option<String>, 29 + #[serde(with = "chrono::serde::ts_seconds")] 30 + pub xata_createdat: DateTime<Utc>, 31 + }
+13
crates/jetstream/src/xata/user.rs
··· 1 + use chrono::{DateTime, Utc}; 2 + use serde::Deserialize; 3 + 4 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 5 + pub struct User { 6 + pub xata_id: String, 7 + pub display_name: String, 8 + pub did: String, 9 + pub handle: String, 10 + pub avatar: String, 11 + #[serde(with = "chrono::serde::ts_seconds")] 12 + pub xata_createdat: DateTime<Utc>, 13 + }
+10
crates/jetstream/src/xata/user_album.rs
··· 1 + use serde::Deserialize; 2 + 3 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 4 + pub struct UserAlbum { 5 + pub xata_id: String, 6 + pub user_id: String, 7 + pub album_id: String, 8 + #[serde(with = "chrono::serde::ts_seconds")] 9 + pub xata_createdat: chrono::DateTime<chrono::Utc>, 10 + }
+10
crates/jetstream/src/xata/user_artist.rs
··· 1 + use serde::Deserialize; 2 + 3 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 4 + pub struct UserArtist { 5 + pub xata_id: String, 6 + pub user_id: String, 7 + pub artist_id: String, 8 + #[serde(with = "chrono::serde::ts_seconds")] 9 + pub xata_createdat: chrono::DateTime<chrono::Utc>, 10 + }
+10
crates/jetstream/src/xata/user_track.rs
··· 1 + use serde::Deserialize; 2 + 3 + #[derive(Debug, sqlx::FromRow, Deserialize, Clone)] 4 + pub struct UserPlaylist { 5 + pub xata_id: String, 6 + pub user_id: String, 7 + pub playlist_id: String, 8 + #[serde(with = "chrono::serde::ts_seconds")] 9 + pub xata_createdat: chrono::DateTime<chrono::Utc>, 10 + }
+8
rockskyapi/rocksky-auth/src/bsky/app.ts
··· 135 135 136 136 const { did } = jwt.verify(bearer, env.JWT_SECRET); 137 137 138 + await fetch(`http://localhost:8000/refresh/${did}`, { 139 + method: "POST", 140 + headers: { 141 + "Content-Type": "application/json", 142 + }, 143 + body: JSON.stringify({}), 144 + }); 145 + 138 146 const agent = await createAgent(ctx.oauthClient, did); 139 147 140 148 if (!agent) {
+2 -2
rockskyapi/rocksky-auth/src/spotify/app.ts
··· 12 12 const app = new Hono(); 13 13 14 14 app.use( 15 - "/*", 15 + "/currently-playing", 16 16 rateLimiter({ 17 - limit: 35, // max Spotify API calls 17 + limit: 20, // max Spotify API calls 18 18 window: 30, // per 30 seconds 19 19 keyPrefix: "spotify-ratelimit", 20 20 })