const std = @import("std"); const zap = @import("zap"); const posix = std.posix; const db = @import("db/sqlite.zig"); const backend = @import("db/backend.zig"); const broker = @import("broker.zig"); const lease_storage = @import("leases/storage.zig"); const routes = @import("api/routes.zig"); const events = @import("api/events.zig"); const log = @import("logging.zig"); const services = @import("services.zig"); const logfire = @import("logfire"); // Graceful shutdown state var shutdown_requested: bool = false; // Logfire instance for telemetry var lf: ?*logfire.Logfire = null; fn signalHandler(sig: c_int) callconv(.c) void { _ = sig; if (!shutdown_requested) { shutdown_requested = true; // zap.stop() tells facil.io to stop accepting connections and shut down zap.stop(); } } fn setupSignalHandlers() void { const action = posix.Sigaction{ .handler = .{ .handler = signalHandler }, .mask = posix.sigemptyset(), .flags = 0, }; posix.sigaction(posix.SIG.INT, &action, null); posix.sigaction(posix.SIG.TERM, &action, null); } fn onRequest(r: zap.Request) !void { const method = r.method orelse "?"; const path = r.path orelse "/"; const span = logfire.httpSpan(method, path, .{}); defer span.end(); routes.handle(r) catch |err| { span.recordError(err); log.err("server", "{s} {s} - error: {}", .{ method, path, err }); r.setStatus(.internal_server_error); r.sendBody("{\"detail\":\"internal error\"}") catch {}; return; }; log.debug("server", "{s} {s}", .{ method, path }); } const Command = enum { server, services_only, help }; const Args = struct { command: Command = .server, no_services: bool = false, }; fn parseArgs() Args { var args = Args{}; var iter = std.process.args(); _ = iter.next(); // skip program name while (iter.next()) |arg| { if (std.mem.eql(u8, arg, "services")) { args.command = .services_only; } else if (std.mem.eql(u8, arg, "--no-services")) { args.no_services = true; } else if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) { args.command = .help; } } return args; } fn printHelp() void { const help = \\prefect-server - zig implementation of prefect server \\ \\usage: \\ prefect-server [options] run API server (default: with background services) \\ prefect-server services run background services only (no API server) \\ \\options: \\ --no-services run API server without background services \\ --help, -h show this help message \\ \\environment variables: \\ PREFECT_SERVER_PORT port to listen on (default: 4200) \\ PREFECT_SERVER_API_HOST host to bind to (default: 127.0.0.1) \\ PREFECT_DATABASE_BACKEND sqlite or postgres (default: sqlite) \\ PREFECT_DATABASE_PATH sqlite database path (default: prefect.db) \\ PREFECT_DATABASE_URL postgres connection URL \\ PREFECT_BROKER_BACKEND memory or redis (default: memory) \\ PREFECT_REDIS_MESSAGING_HOST redis host (default: localhost) \\ PREFECT_REDIS_MESSAGING_PORT redis port (default: 6379) \\ PREFECT_SERVER_LOGGING_LEVEL DEBUG, INFO, WARNING, ERROR (default: INFO) \\ \\horizontal scaling: \\ prefect-server --no-services # run N API server instances \\ prefect-server services # run 1 background services instance \\ ; const stdout = std.fs.File.stdout(); stdout.writeAll(help) catch {}; } pub fn main() void { log.init(); // initialize logfire telemetry lf = logfire.configure(.{ .service_name = "prefect-server", .service_version = "0.0.1", }) catch |err| blk: { log.warn("telemetry", "logfire init failed: {} - continuing without tracing", .{err}); break :blk null; }; defer if (lf) |l| { l.flush() catch {}; l.shutdown(); }; const args = parseArgs(); switch (args.command) { .help => { printHelp(); return; }, .services_only => { runServicesOnly() catch |err| { if (shutdown_requested) return; // clean shutdown log.err("server", "fatal: {}", .{err}); std.process.exit(1); }; }, .server => { runServer(args.no_services) catch |err| { if (shutdown_requested) return; // clean shutdown log.err("server", "fatal: {}", .{err}); std.process.exit(1); }; }, } } fn runServicesOnly() !void { log.info("services", "starting background services only...", .{}); setupSignalHandlers(); try initInfra(); defer { log.info("shutdown", "cleaning up infrastructure...", .{}); deinitInfra(); log.info("shutdown", "complete", .{}); } try services.startAll(); defer { log.info("shutdown", "stopping services...", .{}); services.stopAll(); } log.info("services", "all services running - send SIGTERM or SIGINT to stop", .{}); // block until shutdown signal while (!shutdown_requested) { posix.nanosleep(1, 0); } log.info("shutdown", "signal received, initiating graceful shutdown...", .{}); } fn runServer(no_services: bool) !void { const port: u16 = blk: { const port_str = posix.getenv("PREFECT_SERVER_PORT") orelse "4200"; break :blk std.fmt.parseInt(u16, port_str, 10) catch 4200; }; const host = posix.getenv("PREFECT_SERVER_API_HOST") orelse "127.0.0.1"; setupSignalHandlers(); try initInfra(); defer { log.info("shutdown", "cleaning up infrastructure...", .{}); deinitInfra(); log.info("shutdown", "complete", .{}); } if (!no_services) { try services.startAll(); } else { log.info("server", "running in API-only mode (--no-services)", .{}); } defer if (!no_services) { log.info("shutdown", "stopping services...", .{}); services.stopAll(); }; var listener = zap.HttpListener.init(.{ .port = port, .interface = host.ptr, .on_request = onRequest, .on_upgrade = events.onUpgrade, .log = true, .max_clients = 1000, .max_body_size = 16 * 1024 * 1024, }); try listener.listen(); log.info("server", "listening on http://{s}:{d} - send SIGTERM or SIGINT to stop", .{ host, port }); // zap.start blocks until zap.stop() is called (via signal handler) zap.start(.{ .threads = 4, .workers = 1, }); log.info("shutdown", "signal received, initiating graceful shutdown...", .{}); } fn initInfra() !void { log.info("database", "initializing...", .{}); try db.init(); log.info("database", "ready", .{}); const broker_dialect: broker.Dialect = blk: { const broker_str = posix.getenv("PREFECT_BROKER_BACKEND") orelse "memory"; if (std.mem.eql(u8, broker_str, "redis")) { break :blk .redis; } break :blk .memory; }; broker.initBroker(std.heap.page_allocator, broker_dialect) catch |err| { log.err("broker", "failed to initialize: {}", .{err}); return err; }; log.info("broker", "ready ({s})", .{@tagName(broker_dialect)}); // Initialize lease storage (matches broker backend: redis for HA, database otherwise) lease_storage.init() catch |err| { log.err("leases", "failed to initialize lease storage: {}", .{err}); return err; }; } fn deinitInfra() void { broker.deinitBroker(); db.close(); } test { _ = @import("db/backend.zig"); _ = @import("db/dialect.zig"); _ = @import("utilities/hashing.zig"); _ = @import("utilities/time.zig"); _ = @import("orchestration/types.zig"); _ = @import("orchestration/flow_rules.zig"); _ = @import("orchestration/transforms.zig"); }