prefect server in zig
at main 268 lines 8.2 kB view raw
1const std = @import("std"); 2const zap = @import("zap"); 3const posix = std.posix; 4 5const db = @import("db/sqlite.zig"); 6const backend = @import("db/backend.zig"); 7const broker = @import("broker.zig"); 8const lease_storage = @import("leases/storage.zig"); 9const routes = @import("api/routes.zig"); 10const events = @import("api/events.zig"); 11const log = @import("logging.zig"); 12const services = @import("services.zig"); 13const logfire = @import("logfire"); 14 15// Graceful shutdown state 16var shutdown_requested: bool = false; 17 18// Logfire instance for telemetry 19var lf: ?*logfire.Logfire = null; 20 21fn signalHandler(sig: c_int) callconv(.c) void { 22 _ = sig; 23 if (!shutdown_requested) { 24 shutdown_requested = true; 25 // zap.stop() tells facil.io to stop accepting connections and shut down 26 zap.stop(); 27 } 28} 29 30fn setupSignalHandlers() void { 31 const action = posix.Sigaction{ 32 .handler = .{ .handler = signalHandler }, 33 .mask = posix.sigemptyset(), 34 .flags = 0, 35 }; 36 posix.sigaction(posix.SIG.INT, &action, null); 37 posix.sigaction(posix.SIG.TERM, &action, null); 38} 39 40fn onRequest(r: zap.Request) !void { 41 const method = r.method orelse "?"; 42 const path = r.path orelse "/"; 43 44 const span = logfire.httpSpan(method, path, .{}); 45 defer span.end(); 46 47 routes.handle(r) catch |err| { 48 span.recordError(err); 49 log.err("server", "{s} {s} - error: {}", .{ method, path, err }); 50 r.setStatus(.internal_server_error); 51 r.sendBody("{\"detail\":\"internal error\"}") catch {}; 52 return; 53 }; 54 55 log.debug("server", "{s} {s}", .{ method, path }); 56} 57 58const Command = enum { server, services_only, help }; 59 60const Args = struct { 61 command: Command = .server, 62 no_services: bool = false, 63}; 64 65fn parseArgs() Args { 66 var args = Args{}; 67 var iter = std.process.args(); 68 _ = iter.next(); // skip program name 69 70 while (iter.next()) |arg| { 71 if (std.mem.eql(u8, arg, "services")) { 72 args.command = .services_only; 73 } else if (std.mem.eql(u8, arg, "--no-services")) { 74 args.no_services = true; 75 } else if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) { 76 args.command = .help; 77 } 78 } 79 return args; 80} 81 82fn printHelp() void { 83 const help = 84 \\prefect-server - zig implementation of prefect server 85 \\ 86 \\usage: 87 \\ prefect-server [options] run API server (default: with background services) 88 \\ prefect-server services run background services only (no API server) 89 \\ 90 \\options: 91 \\ --no-services run API server without background services 92 \\ --help, -h show this help message 93 \\ 94 \\environment variables: 95 \\ PREFECT_SERVER_PORT port to listen on (default: 4200) 96 \\ PREFECT_SERVER_API_HOST host to bind to (default: 127.0.0.1) 97 \\ PREFECT_DATABASE_BACKEND sqlite or postgres (default: sqlite) 98 \\ PREFECT_DATABASE_PATH sqlite database path (default: prefect.db) 99 \\ PREFECT_DATABASE_URL postgres connection URL 100 \\ PREFECT_BROKER_BACKEND memory or redis (default: memory) 101 \\ PREFECT_REDIS_MESSAGING_HOST redis host (default: localhost) 102 \\ PREFECT_REDIS_MESSAGING_PORT redis port (default: 6379) 103 \\ PREFECT_SERVER_LOGGING_LEVEL DEBUG, INFO, WARNING, ERROR (default: INFO) 104 \\ 105 \\horizontal scaling: 106 \\ prefect-server --no-services # run N API server instances 107 \\ prefect-server services # run 1 background services instance 108 \\ 109 ; 110 const stdout = std.fs.File.stdout(); 111 stdout.writeAll(help) catch {}; 112} 113 114pub fn main() void { 115 log.init(); 116 117 // initialize logfire telemetry 118 lf = logfire.configure(.{ 119 .service_name = "prefect-server", 120 .service_version = "0.0.1", 121 }) catch |err| blk: { 122 log.warn("telemetry", "logfire init failed: {} - continuing without tracing", .{err}); 123 break :blk null; 124 }; 125 defer if (lf) |l| { 126 l.flush() catch {}; 127 l.shutdown(); 128 }; 129 130 const args = parseArgs(); 131 132 switch (args.command) { 133 .help => { 134 printHelp(); 135 return; 136 }, 137 .services_only => { 138 runServicesOnly() catch |err| { 139 if (shutdown_requested) return; // clean shutdown 140 log.err("server", "fatal: {}", .{err}); 141 std.process.exit(1); 142 }; 143 }, 144 .server => { 145 runServer(args.no_services) catch |err| { 146 if (shutdown_requested) return; // clean shutdown 147 log.err("server", "fatal: {}", .{err}); 148 std.process.exit(1); 149 }; 150 }, 151 } 152} 153 154fn runServicesOnly() !void { 155 log.info("services", "starting background services only...", .{}); 156 157 setupSignalHandlers(); 158 159 try initInfra(); 160 defer { 161 log.info("shutdown", "cleaning up infrastructure...", .{}); 162 deinitInfra(); 163 log.info("shutdown", "complete", .{}); 164 } 165 166 try services.startAll(); 167 defer { 168 log.info("shutdown", "stopping services...", .{}); 169 services.stopAll(); 170 } 171 172 log.info("services", "all services running - send SIGTERM or SIGINT to stop", .{}); 173 174 // block until shutdown signal 175 while (!shutdown_requested) { 176 posix.nanosleep(1, 0); 177 } 178 179 log.info("shutdown", "signal received, initiating graceful shutdown...", .{}); 180} 181 182fn runServer(no_services: bool) !void { 183 const port: u16 = blk: { 184 const port_str = posix.getenv("PREFECT_SERVER_PORT") orelse "4200"; 185 break :blk std.fmt.parseInt(u16, port_str, 10) catch 4200; 186 }; 187 const host = posix.getenv("PREFECT_SERVER_API_HOST") orelse "127.0.0.1"; 188 189 setupSignalHandlers(); 190 191 try initInfra(); 192 defer { 193 log.info("shutdown", "cleaning up infrastructure...", .{}); 194 deinitInfra(); 195 log.info("shutdown", "complete", .{}); 196 } 197 198 if (!no_services) { 199 try services.startAll(); 200 } else { 201 log.info("server", "running in API-only mode (--no-services)", .{}); 202 } 203 defer if (!no_services) { 204 log.info("shutdown", "stopping services...", .{}); 205 services.stopAll(); 206 }; 207 208 var listener = zap.HttpListener.init(.{ 209 .port = port, 210 .interface = host.ptr, 211 .on_request = onRequest, 212 .on_upgrade = events.onUpgrade, 213 .log = true, 214 .max_clients = 1000, 215 .max_body_size = 16 * 1024 * 1024, 216 }); 217 218 try listener.listen(); 219 log.info("server", "listening on http://{s}:{d} - send SIGTERM or SIGINT to stop", .{ host, port }); 220 221 // zap.start blocks until zap.stop() is called (via signal handler) 222 zap.start(.{ 223 .threads = 4, 224 .workers = 1, 225 }); 226 227 log.info("shutdown", "signal received, initiating graceful shutdown...", .{}); 228} 229 230fn initInfra() !void { 231 log.info("database", "initializing...", .{}); 232 try db.init(); 233 log.info("database", "ready", .{}); 234 235 const broker_dialect: broker.Dialect = blk: { 236 const broker_str = posix.getenv("PREFECT_BROKER_BACKEND") orelse "memory"; 237 if (std.mem.eql(u8, broker_str, "redis")) { 238 break :blk .redis; 239 } 240 break :blk .memory; 241 }; 242 broker.initBroker(std.heap.page_allocator, broker_dialect) catch |err| { 243 log.err("broker", "failed to initialize: {}", .{err}); 244 return err; 245 }; 246 log.info("broker", "ready ({s})", .{@tagName(broker_dialect)}); 247 248 // Initialize lease storage (matches broker backend: redis for HA, database otherwise) 249 lease_storage.init() catch |err| { 250 log.err("leases", "failed to initialize lease storage: {}", .{err}); 251 return err; 252 }; 253} 254 255fn deinitInfra() void { 256 broker.deinitBroker(); 257 db.close(); 258} 259 260test { 261 _ = @import("db/backend.zig"); 262 _ = @import("db/dialect.zig"); 263 _ = @import("utilities/hashing.zig"); 264 _ = @import("utilities/time.zig"); 265 _ = @import("orchestration/types.zig"); 266 _ = @import("orchestration/flow_rules.zig"); 267 _ = @import("orchestration/transforms.zig"); 268}