const std = @import("std"); const net = std.net; const posix = std.posix; const Thread = std.Thread; const lattice = @import("lattice.zig"); const http = @import("http.zig"); const ws_server = @import("ws_server.zig"); const entity_graph = @import("entity_graph.zig"); const db = @import("db.zig"); const ner = @import("ner.zig"); const log = std.log.scoped(.main); const MAX_HTTP_WORKERS = 8; const SOCKET_TIMEOUT_SECS = 30; const SAVE_INTERVAL_MS = 30_000; // save every 30 seconds const DB_PATH = "/data/coral.db"; // shutdown flag for graceful termination var shutdown_requested = std.atomic.Value(bool).init(false); pub fn main() !void { var gpa = std.heap.GeneralPurposeAllocator(.{ .thread_safe = true, }){}; defer _ = gpa.deinit(); const allocator = gpa.allocator(); // init world with three lattices (32, 128, 512) lattice.init(); // init entity graph config from env vars entity_graph.initConfig(); // init SQLite database for persistence db.init(DB_PATH) catch |err| { log.warn("failed to init database: {}, state won't persist", .{err}); }; // load persisted graph state from SQLite db.loadState(&entity_graph.graph) catch |err| { log.warn("failed to load state from db: {}, starting fresh", .{err}); }; // setup SIGTERM handler for graceful shutdown const sigterm_action = posix.Sigaction{ .handler = .{ .handler = handleSigterm }, .mask = posix.sigemptyset(), .flags = 0, }; posix.sigaction(posix.SIG.TERM, &sigterm_action, null); // spawn periodic saver thread const saver_thread = try Thread.spawn(.{}, periodicSaver, .{}); saver_thread.detach(); // init websocket client tracking ws_server.init(allocator); // spawn websocket broadcaster thread const bc_thread = try Thread.spawn(.{}, ws_server.broadcaster, .{allocator}); bc_thread.detach(); // spawn websocket server thread const ws_port: u16 = blk: { const port_str = posix.getenv("WS_PORT") orelse "3001"; break :blk std.fmt.parseInt(u16, port_str, 10) catch 3001; }; const ws_thread = try Thread.spawn(.{}, ws_server.start, .{ allocator, ws_port }); ws_thread.detach(); // init spacez NER model and spawn jetstream consumer ner.init() catch |err| { log.err("failed to init spacez NER model: {}, running without NER", .{err}); }; const ner_thread = try Thread.spawn(.{}, ner.start, .{allocator}); ner_thread.detach(); // init thread pool for http connections var pool: Thread.Pool = undefined; try pool.init(.{ .allocator = allocator, .n_jobs = MAX_HTTP_WORKERS, }); defer pool.deinit(); // start http server const port: u16 = blk: { const port_str = posix.getenv("PORT") orelse "3000"; break :blk std.fmt.parseInt(u16, port_str, 10) catch 3000; }; const address = try net.Address.parseIp("::", port); var server = try address.listen(.{ .reuse_address = true }); defer server.deinit(); log.info("coral listening on http://[::]:{d}", .{port}); while (true) { const conn = server.accept() catch |err| { log.err("accept error: {}", .{err}); continue; }; setSocketTimeout(conn.stream.handle, SOCKET_TIMEOUT_SECS) catch |err| { log.warn("failed to set socket timeout: {}", .{err}); }; pool.spawn(http.handleConnection, .{conn}) catch |err| { log.err("pool spawn error: {}", .{err}); conn.stream.close(); }; } } fn setSocketTimeout(fd: posix.fd_t, secs: u32) !void { const timeout = std.mem.toBytes(posix.timeval{ .sec = @intCast(secs), .usec = 0, }); try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout); try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout); } fn handleSigterm(_: c_int) callconv(.c) void { log.info("received SIGTERM, initiating graceful shutdown", .{}); shutdown_requested.store(true, .release); } fn periodicSaver() void { var baseline_prune_counter: u32 = 0; const BASELINE_PRUNE_INTERVAL = 20; // every 20 saves (~10 minutes) while (!shutdown_requested.load(.acquire)) { // check shutdown every second, save every SAVE_INTERVAL_MS var elapsed_ms: u64 = 0; while (elapsed_ms < SAVE_INTERVAL_MS and !shutdown_requested.load(.acquire)) { Thread.sleep(1000 * std.time.ns_per_ms); // 1 second elapsed_ms += 1000; } // save state to SQLite entity_graph.graph.mutex.lock(); db.saveState(&entity_graph.graph) catch |err| { log.err("failed to save state: {}", .{err}); }; entity_graph.graph.mutex.unlock(); // prune stale baselines every ~10 minutes (not every save cycle) baseline_prune_counter += 1; if (baseline_prune_counter >= BASELINE_PRUNE_INTERVAL) { baseline_prune_counter = 0; const remaining = db.pruneStaleBaselines() catch |err| blk: { log.warn("failed to prune stale baselines: {}", .{err}); break :blk 0; }; if (remaining > 0) { log.info("pruned stale baselines, {d} remaining", .{remaining}); } } if (shutdown_requested.load(.acquire)) { log.info("graceful shutdown: state saved, exiting", .{}); posix.exit(0); } } // final save on shutdown log.info("shutdown requested, saving final state", .{}); entity_graph.graph.mutex.lock(); db.saveState(&entity_graph.graph) catch |err| { log.err("failed to save final state: {}", .{err}); }; entity_graph.graph.mutex.unlock(); log.info("graceful shutdown complete", .{}); posix.exit(0); } test { _ = @import("lattice.zig"); _ = @import("http.zig"); _ = @import("ws_server.zig"); _ = @import("entities.zig"); _ = @import("entity_graph.zig"); }