this repo has no description coral.waow.tech
at main 186 lines 6.1 kB view raw
1const std = @import("std"); 2const net = std.net; 3const posix = std.posix; 4const Thread = std.Thread; 5 6const lattice = @import("lattice.zig"); 7const http = @import("http.zig"); 8const ws_server = @import("ws_server.zig"); 9const entity_graph = @import("entity_graph.zig"); 10const db = @import("db.zig"); 11const ner = @import("ner.zig"); 12 13const log = std.log.scoped(.main); 14 15const MAX_HTTP_WORKERS = 8; 16const SOCKET_TIMEOUT_SECS = 30; 17const SAVE_INTERVAL_MS = 30_000; // save every 30 seconds 18const DB_PATH = "/data/coral.db"; 19 20// shutdown flag for graceful termination 21var shutdown_requested = std.atomic.Value(bool).init(false); 22 23pub fn main() !void { 24 var gpa = std.heap.GeneralPurposeAllocator(.{ 25 .thread_safe = true, 26 }){}; 27 defer _ = gpa.deinit(); 28 const allocator = gpa.allocator(); 29 30 // init world with three lattices (32, 128, 512) 31 lattice.init(); 32 33 // init entity graph config from env vars 34 entity_graph.initConfig(); 35 36 // init SQLite database for persistence 37 db.init(DB_PATH) catch |err| { 38 log.warn("failed to init database: {}, state won't persist", .{err}); 39 }; 40 41 // load persisted graph state from SQLite 42 db.loadState(&entity_graph.graph) catch |err| { 43 log.warn("failed to load state from db: {}, starting fresh", .{err}); 44 }; 45 46 // setup SIGTERM handler for graceful shutdown 47 const sigterm_action = posix.Sigaction{ 48 .handler = .{ .handler = handleSigterm }, 49 .mask = posix.sigemptyset(), 50 .flags = 0, 51 }; 52 posix.sigaction(posix.SIG.TERM, &sigterm_action, null); 53 54 // spawn periodic saver thread 55 const saver_thread = try Thread.spawn(.{}, periodicSaver, .{}); 56 saver_thread.detach(); 57 58 // init websocket client tracking 59 ws_server.init(allocator); 60 61 // spawn websocket broadcaster thread 62 const bc_thread = try Thread.spawn(.{}, ws_server.broadcaster, .{allocator}); 63 bc_thread.detach(); 64 65 // spawn websocket server thread 66 const ws_port: u16 = blk: { 67 const port_str = posix.getenv("WS_PORT") orelse "3001"; 68 break :blk std.fmt.parseInt(u16, port_str, 10) catch 3001; 69 }; 70 const ws_thread = try Thread.spawn(.{}, ws_server.start, .{ allocator, ws_port }); 71 ws_thread.detach(); 72 73 // init spacez NER model and spawn jetstream consumer 74 ner.init() catch |err| { 75 log.err("failed to init spacez NER model: {}, running without NER", .{err}); 76 }; 77 const ner_thread = try Thread.spawn(.{}, ner.start, .{allocator}); 78 ner_thread.detach(); 79 80 // init thread pool for http connections 81 var pool: Thread.Pool = undefined; 82 try pool.init(.{ 83 .allocator = allocator, 84 .n_jobs = MAX_HTTP_WORKERS, 85 }); 86 defer pool.deinit(); 87 88 // start http server 89 const port: u16 = blk: { 90 const port_str = posix.getenv("PORT") orelse "3000"; 91 break :blk std.fmt.parseInt(u16, port_str, 10) catch 3000; 92 }; 93 94 const address = try net.Address.parseIp("::", port); 95 var server = try address.listen(.{ .reuse_address = true }); 96 defer server.deinit(); 97 98 log.info("coral listening on http://[::]:{d}", .{port}); 99 100 while (true) { 101 const conn = server.accept() catch |err| { 102 log.err("accept error: {}", .{err}); 103 continue; 104 }; 105 106 setSocketTimeout(conn.stream.handle, SOCKET_TIMEOUT_SECS) catch |err| { 107 log.warn("failed to set socket timeout: {}", .{err}); 108 }; 109 110 pool.spawn(http.handleConnection, .{conn}) catch |err| { 111 log.err("pool spawn error: {}", .{err}); 112 conn.stream.close(); 113 }; 114 } 115} 116 117fn setSocketTimeout(fd: posix.fd_t, secs: u32) !void { 118 const timeout = std.mem.toBytes(posix.timeval{ 119 .sec = @intCast(secs), 120 .usec = 0, 121 }); 122 try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout); 123 try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout); 124} 125 126fn handleSigterm(_: c_int) callconv(.c) void { 127 log.info("received SIGTERM, initiating graceful shutdown", .{}); 128 shutdown_requested.store(true, .release); 129} 130 131fn periodicSaver() void { 132 var baseline_prune_counter: u32 = 0; 133 const BASELINE_PRUNE_INTERVAL = 20; // every 20 saves (~10 minutes) 134 135 while (!shutdown_requested.load(.acquire)) { 136 // check shutdown every second, save every SAVE_INTERVAL_MS 137 var elapsed_ms: u64 = 0; 138 while (elapsed_ms < SAVE_INTERVAL_MS and !shutdown_requested.load(.acquire)) { 139 Thread.sleep(1000 * std.time.ns_per_ms); // 1 second 140 elapsed_ms += 1000; 141 } 142 143 // save state to SQLite 144 entity_graph.graph.mutex.lock(); 145 db.saveState(&entity_graph.graph) catch |err| { 146 log.err("failed to save state: {}", .{err}); 147 }; 148 entity_graph.graph.mutex.unlock(); 149 150 // prune stale baselines every ~10 minutes (not every save cycle) 151 baseline_prune_counter += 1; 152 if (baseline_prune_counter >= BASELINE_PRUNE_INTERVAL) { 153 baseline_prune_counter = 0; 154 const remaining = db.pruneStaleBaselines() catch |err| blk: { 155 log.warn("failed to prune stale baselines: {}", .{err}); 156 break :blk 0; 157 }; 158 if (remaining > 0) { 159 log.info("pruned stale baselines, {d} remaining", .{remaining}); 160 } 161 } 162 163 if (shutdown_requested.load(.acquire)) { 164 log.info("graceful shutdown: state saved, exiting", .{}); 165 posix.exit(0); 166 } 167 } 168 169 // final save on shutdown 170 log.info("shutdown requested, saving final state", .{}); 171 entity_graph.graph.mutex.lock(); 172 db.saveState(&entity_graph.graph) catch |err| { 173 log.err("failed to save final state: {}", .{err}); 174 }; 175 entity_graph.graph.mutex.unlock(); 176 log.info("graceful shutdown complete", .{}); 177 posix.exit(0); 178} 179 180test { 181 _ = @import("lattice.zig"); 182 _ = @import("http.zig"); 183 _ = @import("ws_server.zig"); 184 _ = @import("entities.zig"); 185 _ = @import("entity_graph.zig"); 186}