Your music, beautifully tracked. All yours. (coming soon) teal.fm
teal-fm atproto
at sponsor-button 191 lines 5.8 kB view raw
1import type { Database } from "@teal/db/connect"; 2import { db } from "@teal/db/connect"; 3import { status, play } from "@teal/db/schema"; 4import { CommitCreateEvent, Jetstream } from "@skyware/jetstream"; 5 6import { 7 Record as XyzStatusphereStatus, 8 isRecord as isStatusphereStatus, 9} from "@teal/lexicons/src/types/xyz/statusphere/status"; 10 11import { 12 Record as FmTealAlphaPlay, 13 isRecord as isTealAlphaPlay, 14} from "@teal/lexicons/src/types/fm/teal/alpha/feed/play"; 15 16class Handler { 17 private static instance: Handler; 18 private constructor() {} 19 public static getInstance(): Handler { 20 if (!Handler.instance) { 21 Handler.instance = new Handler(); 22 } 23 return Handler.instance; 24 } 25 26 handle(msg_type: string, record: CommitCreateEvent<string & {}>) { 27 // Handle message logic here 28 const msg = record.commit.record; 29 console.log("Handling" + msg_type + "message:", msg); 30 if (isStatusphereStatus(msg) && msg.$type === "xyz.statusphere.status") { 31 if (record.commit.operation === "create") { 32 // serialize message as xyz.statusphere.status 33 db.insert(status) 34 .values({ 35 createdAt: new Date().getTime().toString(), 36 indexedAt: new Date(record.time_us).getTime().toString(), 37 status: msg.status, 38 // the AT path 39 uri: record.commit.rkey, 40 authorDid: record.did, 41 }) 42 .execute(); 43 } else { 44 // TODO: sentry 45 console.log( 46 "unsupported operation for xyz.statusphere.status", 47 record.commit.operation, 48 ); 49 } 50 } else if (isTealAlphaPlay(msg) && msg.$type === "fm.teal.alpha.play") { 51 if (record.commit.operation === "create") { 52 // serialize message as fm.teal.alpha.play 53 console.log(record.did); 54 db.insert(play) 55 .values({ 56 createdAt: new Date().getTime().toString(), 57 indexedAt: new Date(record.time_us).getTime().toString(), 58 // the AT path 59 rkey: record.commit.rkey, 60 authorDid: record.did, 61 artistNames: msg.artistNames, 62 trackName: msg.trackName, 63 artistMbIds: msg.artistMbIds || [], 64 trackMbId: msg.trackMbId || "", 65 duration: msg.duration || null, 66 isrc: msg.isrc || null, 67 musicServiceBaseDomain: msg.musicServiceBaseDomain || "local", 68 originUrl: msg.originUrl || null, 69 playedTime: msg.playedTime ? msg.playedTime.toString() : undefined, 70 recordingMbId: msg.recordingMbId || null, 71 releaseMbId: msg.releaseMbId || null, 72 releaseName: msg.releaseName || null, 73 submissionClientAgent: 74 msg.submissionClientAgent || "manual/unknown", 75 }) 76 .execute(); 77 } else { 78 // TODO: sentry 79 console.log( 80 "unsupported operation for fm.teal.alpha.play", 81 record.commit.operation, 82 ); 83 } 84 } else { 85 console.log("Unknown message type:", msg_type); 86 console.log("Message:", record); 87 } 88 } 89} 90 91class Streamer { 92 private static instance: Streamer; 93 private jetstream: Jetstream; 94 private handler: Handler; 95 96 private wantedCollections: string[]; 97 98 private constructor(wantedCollections: string[]) { 99 this.handler = Handler.getInstance(); 100 console.log("Creating new jetstream with collections", wantedCollections); 101 this.jetstream = new Jetstream({ 102 wantedCollections, 103 }); 104 this.wantedCollections = wantedCollections; 105 } 106 107 public static getInstance(wantedCollections?: string[]): Streamer { 108 if (!Streamer.instance && wantedCollections) { 109 Streamer.instance = new Streamer(wantedCollections); 110 } else if (!Streamer.instance) { 111 throw Error( 112 "Wanted collections are required if instance does not exist!", 113 ); 114 } 115 return Streamer.instance; 116 } 117 118 async setOnCreates() { 119 for (const collection of this.wantedCollections) { 120 await this.setOnCreate(collection); 121 } 122 } 123 124 async setOnCreate(collection: string) { 125 try { 126 this.jetstream.onCreate(collection, (event) => { 127 console.log("Received message:", event.commit.record); 128 this.handleCreate(collection, event); 129 }); 130 } catch (error) { 131 console.error("Error setting onCreate:", error); 132 } 133 console.log("Started onCreate stream for", collection); 134 } 135 136 async handleCreate( 137 collection: string, 138 event: CommitCreateEvent<string & {}>, 139 ) { 140 this.handler.handle(collection, event); 141 } 142 143 // Add method to start the streamer 144 async start() { 145 try { 146 await this.setOnCreates(); 147 this.jetstream.start(); 148 console.log("Streamer started successfully"); 149 } catch (error) { 150 console.error("Error starting streamer:", error); 151 } 152 } 153} 154 155// Main function to run the application 156async function main() { 157 try { 158 const streamer = Streamer.getInstance([ 159 "xyz.statusphere.status", 160 "fm.teal.alpha.play", 161 ]); 162 await streamer.start(); 163 164 // Keep the process running 165 process.on("SIGINT", () => { 166 console.log("Received SIGINT. Graceful shutdown..."); 167 process.exit(0); 168 }); 169 170 process.on("SIGTERM", () => { 171 console.log("Received SIGTERM. Graceful shutdown..."); 172 process.exit(0); 173 }); 174 175 // Prevent the Node.js process from exiting 176 setInterval(() => { 177 // This empty interval keeps the process running 178 }, 1000); 179 180 console.log("Application is running. Press Ctrl+C to exit."); 181 } catch (error) { 182 console.error("Error in main:", error); 183 process.exit(1); 184 } 185} 186 187// Run the application 188main().catch((error) => { 189 console.error("Unhandled error:", error); 190 process.exit(1); 191});