tangled
alpha
login
or
join now
teal.fm
/
teal
110
fork
atom
Your music, beautifully tracked. All yours. (coming soon)
teal.fm
teal-fm
atproto
110
fork
atom
overview
issues
pulls
pipelines
Add new jetstream package
natalie
1 year ago
69d5dbbf
c34fcb92
+178
5 changed files
expand all
collapse all
unified
split
packages
db
connect.ts
jetstring
package.json
src
index.ts
tsconfig.json
matcher
package.json
+21
packages/db/connect.ts
···
1
1
+
import { drizzle } from "drizzle-orm/libsql";
2
2
+
import * as schema from "@teal/db/schema";
3
3
+
import process from "node:process";
4
4
+
import path from "node:path";
5
5
+
6
6
+
console.log(
7
7
+
"Loading SQLite file at",
8
8
+
path.join(process.cwd(), "../../db.sqlite"),
9
9
+
);
10
10
+
11
11
+
export const db = drizzle({
12
12
+
connection:
13
13
+
// default is in project root / db.sqlite
14
14
+
process.env.DATABASE_URL ??
15
15
+
"file:" + path.join(process.cwd(), "../../db.sqlite"),
16
16
+
// doesn't seem to work?
17
17
+
//casing: "snake_case",
18
18
+
schema: schema,
19
19
+
});
20
20
+
21
21
+
export type Database = typeof db;
+16
packages/jetstring/package.json
···
1
1
+
{
2
2
+
"name": "@teal/jetstring",
3
3
+
"type": "module",
4
4
+
"scripts": {
5
5
+
"dev": "tsx watch src/index.ts | pino-pretty"
6
6
+
},
7
7
+
"dependencies": {
8
8
+
"@skyware/jetstream": "^0.2.0",
9
9
+
"@teal/db": "*"
10
10
+
},
11
11
+
"devDependencies": {
12
12
+
"tsup": "^8.3.5",
13
13
+
"tsx": "^4.19.2",
14
14
+
"typescript": "^5.6.3"
15
15
+
}
16
16
+
}
+129
packages/jetstring/src/index.ts
···
1
1
+
import type { Database } from "@teal/db/connect";
2
2
+
import { db } from "@teal/db/connect";
3
3
+
import { status } from "@teal/db/schema";
4
4
+
import { CommitCreateEvent, Jetstream } from "@skyware/jetstream";
5
5
+
import { server } from "@teal/lexicons/generated/server/types";
6
6
+
import ws from "ws";
7
7
+
8
8
+
class Handler {
9
9
+
private static instance: Handler;
10
10
+
private constructor() {}
11
11
+
public static getInstance(): Handler {
12
12
+
if (!Handler.instance) {
13
13
+
Handler.instance = new Handler();
14
14
+
}
15
15
+
return Handler.instance;
16
16
+
}
17
17
+
18
18
+
handle(msg_type: string, msg: any) {
19
19
+
// Handle message logic here
20
20
+
console.log("Handling" + msg_type + "message:", msg);
21
21
+
if (msg_type === "xyz.statusphere.status") {
22
22
+
// serialize message as xyz.statusphere.status
23
23
+
const st = db.insert(status).values({
24
24
+
status: msg.status,
25
25
+
uri: msg.uri,
26
26
+
authorDid: msg.authorDid,
27
27
+
});
28
28
+
}
29
29
+
}
30
30
+
}
31
31
+
32
32
+
class Streamer {
33
33
+
private static instance: Streamer;
34
34
+
private jetstream: Jetstream;
35
35
+
private handler: Handler;
36
36
+
37
37
+
private wantedCollections: string[];
38
38
+
39
39
+
private constructor(wantedCollections: string[]) {
40
40
+
this.handler = Handler.getInstance();
41
41
+
console.log("Creating new jetstream with collections", wantedCollections);
42
42
+
this.jetstream = new Jetstream({
43
43
+
wantedCollections,
44
44
+
});
45
45
+
this.wantedCollections = wantedCollections;
46
46
+
}
47
47
+
48
48
+
public static getInstance(wantedCollections?: string[]): Streamer {
49
49
+
if (!Streamer.instance && wantedCollections) {
50
50
+
Streamer.instance = new Streamer(wantedCollections);
51
51
+
} else if (!Streamer.instance) {
52
52
+
throw Error(
53
53
+
"Wanted collections are required if instance does not exist!",
54
54
+
);
55
55
+
}
56
56
+
return Streamer.instance;
57
57
+
}
58
58
+
59
59
+
async setOnCreates() {
60
60
+
for (const collection of this.wantedCollections) {
61
61
+
await this.setOnCreate(collection);
62
62
+
}
63
63
+
}
64
64
+
65
65
+
async setOnCreate(collection: string) {
66
66
+
try {
67
67
+
this.jetstream.onCreate(collection, (event) => {
68
68
+
console.log("Received message:", event.commit.record);
69
69
+
this.handleCreate(collection, event);
70
70
+
});
71
71
+
} catch (error) {
72
72
+
console.error("Error setting onCreate:", error);
73
73
+
}
74
74
+
console.log("Started onCreate stream for", collection);
75
75
+
}
76
76
+
77
77
+
async handleCreate(
78
78
+
collection: string,
79
79
+
event: CommitCreateEvent<string & {}>,
80
80
+
) {
81
81
+
this.handler.handle(collection, event);
82
82
+
}
83
83
+
84
84
+
// Add method to start the streamer
85
85
+
async start() {
86
86
+
try {
87
87
+
await this.setOnCreates();
88
88
+
this.jetstream.start();
89
89
+
console.log("Streamer started successfully");
90
90
+
} catch (error) {
91
91
+
console.error("Error starting streamer:", error);
92
92
+
}
93
93
+
}
94
94
+
}
95
95
+
96
96
+
// Main function to run the application
97
97
+
async function main() {
98
98
+
try {
99
99
+
const streamer = Streamer.getInstance(["xyz.statusphere.status"]);
100
100
+
await streamer.start();
101
101
+
102
102
+
// Keep the process running
103
103
+
process.on("SIGINT", () => {
104
104
+
console.log("Received SIGINT. Graceful shutdown...");
105
105
+
process.exit(0);
106
106
+
});
107
107
+
108
108
+
process.on("SIGTERM", () => {
109
109
+
console.log("Received SIGTERM. Graceful shutdown...");
110
110
+
process.exit(0);
111
111
+
});
112
112
+
113
113
+
// Prevent the Node.js process from exiting
114
114
+
setInterval(() => {
115
115
+
// This empty interval keeps the process running
116
116
+
}, 1000);
117
117
+
118
118
+
console.log("Application is running. Press Ctrl+C to exit.");
119
119
+
} catch (error) {
120
120
+
console.error("Error in main:", error);
121
121
+
process.exit(1);
122
122
+
}
123
123
+
}
124
124
+
125
125
+
// Run the application
126
126
+
main().catch((error) => {
127
127
+
console.error("Unhandled error:", error);
128
128
+
process.exit(1);
129
129
+
});
+3
packages/jetstring/tsconfig.json
···
1
1
+
{
2
2
+
"extends": "@teal/tsconfig/base.json"
3
3
+
}
+9
packages/matcher/package.json
···
1
1
+
{
2
2
+
"name": "matcher",
3
3
+
"version": "1.0.0",
4
4
+
"description": "",
5
5
+
"main": "index.ts",
6
6
+
"scripts": {
7
7
+
"test": "echo \"Error: no test specified\" && exit 1"
8
8
+
}
9
9
+
}