Your music, beautifully tracked. All yours. (coming soon)
teal.fm
teal-fm
atproto
1import type { Database } from "@teal/db/connect";
2import { db } from "@teal/db/connect";
3import { status, play } from "@teal/db/schema";
4import { CommitCreateEvent, Jetstream } from "@skyware/jetstream";
5
6import {
7 Record as XyzStatusphereStatus,
8 isRecord as isStatusphereStatus,
9} from "@teal/lexicons/src/types/xyz/statusphere/status";
10
11import {
12 Record as FmTealAlphaPlay,
13 isRecord as isTealAlphaPlay,
14} from "@teal/lexicons/src/types/fm/teal/alpha/feed/play";
15
16class Handler {
17 private static instance: Handler;
18 private constructor() {}
19 public static getInstance(): Handler {
20 if (!Handler.instance) {
21 Handler.instance = new Handler();
22 }
23 return Handler.instance;
24 }
25
26 handle(msg_type: string, record: CommitCreateEvent<string & {}>) {
27 // Handle message logic here
28 const msg = record.commit.record;
29 console.log("Handling" + msg_type + "message:", msg);
30 if (isStatusphereStatus(msg) && msg.$type === "xyz.statusphere.status") {
31 if (record.commit.operation === "create") {
32 // serialize message as xyz.statusphere.status
33 db.insert(status)
34 .values({
35 createdAt: new Date().getTime().toString(),
36 indexedAt: new Date(record.time_us).getTime().toString(),
37 status: msg.status,
38 // the AT path
39 uri: record.commit.rkey,
40 authorDid: record.did,
41 })
42 .execute();
43 } else {
44 // TODO: sentry
45 console.log(
46 "unsupported operation for xyz.statusphere.status",
47 record.commit.operation,
48 );
49 }
50 } else if (isTealAlphaPlay(msg) && msg.$type === "fm.teal.alpha.play") {
51 if (record.commit.operation === "create") {
52 // serialize message as fm.teal.alpha.play
53 console.log(record.did);
54 db.insert(play)
55 .values({
56 createdAt: new Date().getTime().toString(),
57 indexedAt: new Date(record.time_us).getTime().toString(),
58 // the AT path
59 rkey: record.commit.rkey,
60 authorDid: record.did,
61 artistNames: msg.artistNames,
62 trackName: msg.trackName,
63 artistMbIds: msg.artistMbIds || [],
64 trackMbId: msg.trackMbId || "",
65 duration: msg.duration || null,
66 isrc: msg.isrc || null,
67 musicServiceBaseDomain: msg.musicServiceBaseDomain || "local",
68 originUrl: msg.originUrl || null,
69 playedTime: msg.playedTime ? msg.playedTime.toString() : undefined,
70 recordingMbId: msg.recordingMbId || null,
71 releaseMbId: msg.releaseMbId || null,
72 releaseName: msg.releaseName || null,
73 submissionClientAgent:
74 msg.submissionClientAgent || "manual/unknown",
75 })
76 .execute();
77 } else {
78 // TODO: sentry
79 console.log(
80 "unsupported operation for fm.teal.alpha.play",
81 record.commit.operation,
82 );
83 }
84 } else {
85 console.log("Unknown message type:", msg_type);
86 console.log("Message:", record);
87 }
88 }
89}
90
91class Streamer {
92 private static instance: Streamer;
93 private jetstream: Jetstream;
94 private handler: Handler;
95
96 private wantedCollections: string[];
97
98 private constructor(wantedCollections: string[]) {
99 this.handler = Handler.getInstance();
100 console.log("Creating new jetstream with collections", wantedCollections);
101 this.jetstream = new Jetstream({
102 wantedCollections,
103 });
104 this.wantedCollections = wantedCollections;
105 }
106
107 public static getInstance(wantedCollections?: string[]): Streamer {
108 if (!Streamer.instance && wantedCollections) {
109 Streamer.instance = new Streamer(wantedCollections);
110 } else if (!Streamer.instance) {
111 throw Error(
112 "Wanted collections are required if instance does not exist!",
113 );
114 }
115 return Streamer.instance;
116 }
117
118 async setOnCreates() {
119 for (const collection of this.wantedCollections) {
120 await this.setOnCreate(collection);
121 }
122 }
123
124 async setOnCreate(collection: string) {
125 try {
126 this.jetstream.onCreate(collection, (event) => {
127 console.log("Received message:", event.commit.record);
128 this.handleCreate(collection, event);
129 });
130 } catch (error) {
131 console.error("Error setting onCreate:", error);
132 }
133 console.log("Started onCreate stream for", collection);
134 }
135
136 async handleCreate(
137 collection: string,
138 event: CommitCreateEvent<string & {}>,
139 ) {
140 this.handler.handle(collection, event);
141 }
142
143 // Add method to start the streamer
144 async start() {
145 try {
146 await this.setOnCreates();
147 this.jetstream.start();
148 console.log("Streamer started successfully");
149 } catch (error) {
150 console.error("Error starting streamer:", error);
151 }
152 }
153}
154
155// Main function to run the application
156async function main() {
157 try {
158 const streamer = Streamer.getInstance([
159 "xyz.statusphere.status",
160 "fm.teal.alpha.play",
161 ]);
162 await streamer.start();
163
164 // Keep the process running
165 process.on("SIGINT", () => {
166 console.log("Received SIGINT. Graceful shutdown...");
167 process.exit(0);
168 });
169
170 process.on("SIGTERM", () => {
171 console.log("Received SIGTERM. Graceful shutdown...");
172 process.exit(0);
173 });
174
175 // Prevent the Node.js process from exiting
176 setInterval(() => {
177 // This empty interval keeps the process running
178 }, 1000);
179
180 console.log("Application is running. Press Ctrl+C to exit.");
181 } catch (error) {
182 console.error("Error in main:", error);
183 process.exit(1);
184 }
185}
186
187// Run the application
188main().catch((error) => {
189 console.error("Unhandled error:", error);
190 process.exit(1);
191});