this repo has no description
coral.waow.tech
1const std = @import("std");
2const net = std.net;
3const posix = std.posix;
4const Thread = std.Thread;
5
6const lattice = @import("lattice.zig");
7const http = @import("http.zig");
8const ws_server = @import("ws_server.zig");
9const entity_graph = @import("entity_graph.zig");
10const db = @import("db.zig");
11const ner = @import("ner.zig");
12
13const log = std.log.scoped(.main);
14
15const MAX_HTTP_WORKERS = 8;
16const SOCKET_TIMEOUT_SECS = 30;
17const SAVE_INTERVAL_MS = 30_000; // save every 30 seconds
18const DB_PATH = "/data/coral.db";
19
20// shutdown flag for graceful termination
21var shutdown_requested = std.atomic.Value(bool).init(false);
22
23pub fn main() !void {
24 var gpa = std.heap.GeneralPurposeAllocator(.{
25 .thread_safe = true,
26 }){};
27 defer _ = gpa.deinit();
28 const allocator = gpa.allocator();
29
30 // init world with three lattices (32, 128, 512)
31 lattice.init();
32
33 // init entity graph config from env vars
34 entity_graph.initConfig();
35
36 // init SQLite database for persistence
37 db.init(DB_PATH) catch |err| {
38 log.warn("failed to init database: {}, state won't persist", .{err});
39 };
40
41 // load persisted graph state from SQLite
42 db.loadState(&entity_graph.graph) catch |err| {
43 log.warn("failed to load state from db: {}, starting fresh", .{err});
44 };
45
46 // setup SIGTERM handler for graceful shutdown
47 const sigterm_action = posix.Sigaction{
48 .handler = .{ .handler = handleSigterm },
49 .mask = posix.sigemptyset(),
50 .flags = 0,
51 };
52 posix.sigaction(posix.SIG.TERM, &sigterm_action, null);
53
54 // spawn periodic saver thread
55 const saver_thread = try Thread.spawn(.{}, periodicSaver, .{});
56 saver_thread.detach();
57
58 // init websocket client tracking
59 ws_server.init(allocator);
60
61 // spawn websocket broadcaster thread
62 const bc_thread = try Thread.spawn(.{}, ws_server.broadcaster, .{allocator});
63 bc_thread.detach();
64
65 // spawn websocket server thread
66 const ws_port: u16 = blk: {
67 const port_str = posix.getenv("WS_PORT") orelse "3001";
68 break :blk std.fmt.parseInt(u16, port_str, 10) catch 3001;
69 };
70 const ws_thread = try Thread.spawn(.{}, ws_server.start, .{ allocator, ws_port });
71 ws_thread.detach();
72
73 // init spacez NER model and spawn jetstream consumer
74 ner.init() catch |err| {
75 log.err("failed to init spacez NER model: {}, running without NER", .{err});
76 };
77 const ner_thread = try Thread.spawn(.{}, ner.start, .{allocator});
78 ner_thread.detach();
79
80 // init thread pool for http connections
81 var pool: Thread.Pool = undefined;
82 try pool.init(.{
83 .allocator = allocator,
84 .n_jobs = MAX_HTTP_WORKERS,
85 });
86 defer pool.deinit();
87
88 // start http server
89 const port: u16 = blk: {
90 const port_str = posix.getenv("PORT") orelse "3000";
91 break :blk std.fmt.parseInt(u16, port_str, 10) catch 3000;
92 };
93
94 const address = try net.Address.parseIp("::", port);
95 var server = try address.listen(.{ .reuse_address = true });
96 defer server.deinit();
97
98 log.info("coral listening on http://[::]:{d}", .{port});
99
100 while (true) {
101 const conn = server.accept() catch |err| {
102 log.err("accept error: {}", .{err});
103 continue;
104 };
105
106 setSocketTimeout(conn.stream.handle, SOCKET_TIMEOUT_SECS) catch |err| {
107 log.warn("failed to set socket timeout: {}", .{err});
108 };
109
110 pool.spawn(http.handleConnection, .{conn}) catch |err| {
111 log.err("pool spawn error: {}", .{err});
112 conn.stream.close();
113 };
114 }
115}
116
117fn setSocketTimeout(fd: posix.fd_t, secs: u32) !void {
118 const timeout = std.mem.toBytes(posix.timeval{
119 .sec = @intCast(secs),
120 .usec = 0,
121 });
122 try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout);
123 try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout);
124}
125
126fn handleSigterm(_: c_int) callconv(.c) void {
127 log.info("received SIGTERM, initiating graceful shutdown", .{});
128 shutdown_requested.store(true, .release);
129}
130
131fn periodicSaver() void {
132 var baseline_prune_counter: u32 = 0;
133 const BASELINE_PRUNE_INTERVAL = 20; // every 20 saves (~10 minutes)
134
135 while (!shutdown_requested.load(.acquire)) {
136 // check shutdown every second, save every SAVE_INTERVAL_MS
137 var elapsed_ms: u64 = 0;
138 while (elapsed_ms < SAVE_INTERVAL_MS and !shutdown_requested.load(.acquire)) {
139 Thread.sleep(1000 * std.time.ns_per_ms); // 1 second
140 elapsed_ms += 1000;
141 }
142
143 // save state to SQLite
144 entity_graph.graph.mutex.lock();
145 db.saveState(&entity_graph.graph) catch |err| {
146 log.err("failed to save state: {}", .{err});
147 };
148 entity_graph.graph.mutex.unlock();
149
150 // prune stale baselines every ~10 minutes (not every save cycle)
151 baseline_prune_counter += 1;
152 if (baseline_prune_counter >= BASELINE_PRUNE_INTERVAL) {
153 baseline_prune_counter = 0;
154 const remaining = db.pruneStaleBaselines() catch |err| blk: {
155 log.warn("failed to prune stale baselines: {}", .{err});
156 break :blk 0;
157 };
158 if (remaining > 0) {
159 log.info("pruned stale baselines, {d} remaining", .{remaining});
160 }
161 }
162
163 if (shutdown_requested.load(.acquire)) {
164 log.info("graceful shutdown: state saved, exiting", .{});
165 posix.exit(0);
166 }
167 }
168
169 // final save on shutdown
170 log.info("shutdown requested, saving final state", .{});
171 entity_graph.graph.mutex.lock();
172 db.saveState(&entity_graph.graph) catch |err| {
173 log.err("failed to save final state: {}", .{err});
174 };
175 entity_graph.graph.mutex.unlock();
176 log.info("graceful shutdown complete", .{});
177 posix.exit(0);
178}
179
180test {
181 _ = @import("lattice.zig");
182 _ = @import("http.zig");
183 _ = @import("ws_server.zig");
184 _ = @import("entities.zig");
185 _ = @import("entity_graph.zig");
186}