prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const posix = std.posix;
4
5const db = @import("db/sqlite.zig");
6const backend = @import("db/backend.zig");
7const broker = @import("broker.zig");
8const lease_storage = @import("leases/storage.zig");
9const routes = @import("api/routes.zig");
10const events = @import("api/events.zig");
11const log = @import("logging.zig");
12const services = @import("services.zig");
13const logfire = @import("logfire");
14
15// Graceful shutdown state
16var shutdown_requested: bool = false;
17
18// Logfire instance for telemetry
19var lf: ?*logfire.Logfire = null;
20
21fn signalHandler(sig: c_int) callconv(.c) void {
22 _ = sig;
23 if (!shutdown_requested) {
24 shutdown_requested = true;
25 // zap.stop() tells facil.io to stop accepting connections and shut down
26 zap.stop();
27 }
28}
29
30fn setupSignalHandlers() void {
31 const action = posix.Sigaction{
32 .handler = .{ .handler = signalHandler },
33 .mask = posix.sigemptyset(),
34 .flags = 0,
35 };
36 posix.sigaction(posix.SIG.INT, &action, null);
37 posix.sigaction(posix.SIG.TERM, &action, null);
38}
39
40fn onRequest(r: zap.Request) !void {
41 const method = r.method orelse "?";
42 const path = r.path orelse "/";
43
44 const span = logfire.httpSpan(method, path, .{});
45 defer span.end();
46
47 routes.handle(r) catch |err| {
48 span.recordError(err);
49 log.err("server", "{s} {s} - error: {}", .{ method, path, err });
50 r.setStatus(.internal_server_error);
51 r.sendBody("{\"detail\":\"internal error\"}") catch {};
52 return;
53 };
54
55 log.debug("server", "{s} {s}", .{ method, path });
56}
57
58const Command = enum { server, services_only, help };
59
60const Args = struct {
61 command: Command = .server,
62 no_services: bool = false,
63};
64
65fn parseArgs() Args {
66 var args = Args{};
67 var iter = std.process.args();
68 _ = iter.next(); // skip program name
69
70 while (iter.next()) |arg| {
71 if (std.mem.eql(u8, arg, "services")) {
72 args.command = .services_only;
73 } else if (std.mem.eql(u8, arg, "--no-services")) {
74 args.no_services = true;
75 } else if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) {
76 args.command = .help;
77 }
78 }
79 return args;
80}
81
82fn printHelp() void {
83 const help =
84 \\prefect-server - zig implementation of prefect server
85 \\
86 \\usage:
87 \\ prefect-server [options] run API server (default: with background services)
88 \\ prefect-server services run background services only (no API server)
89 \\
90 \\options:
91 \\ --no-services run API server without background services
92 \\ --help, -h show this help message
93 \\
94 \\environment variables:
95 \\ PREFECT_SERVER_PORT port to listen on (default: 4200)
96 \\ PREFECT_SERVER_API_HOST host to bind to (default: 127.0.0.1)
97 \\ PREFECT_DATABASE_BACKEND sqlite or postgres (default: sqlite)
98 \\ PREFECT_DATABASE_PATH sqlite database path (default: prefect.db)
99 \\ PREFECT_DATABASE_URL postgres connection URL
100 \\ PREFECT_BROKER_BACKEND memory or redis (default: memory)
101 \\ PREFECT_REDIS_MESSAGING_HOST redis host (default: localhost)
102 \\ PREFECT_REDIS_MESSAGING_PORT redis port (default: 6379)
103 \\ PREFECT_SERVER_LOGGING_LEVEL DEBUG, INFO, WARNING, ERROR (default: INFO)
104 \\
105 \\horizontal scaling:
106 \\ prefect-server --no-services # run N API server instances
107 \\ prefect-server services # run 1 background services instance
108 \\
109 ;
110 const stdout = std.fs.File.stdout();
111 stdout.writeAll(help) catch {};
112}
113
114pub fn main() void {
115 log.init();
116
117 // initialize logfire telemetry
118 lf = logfire.configure(.{
119 .service_name = "prefect-server",
120 .service_version = "0.0.1",
121 }) catch |err| blk: {
122 log.warn("telemetry", "logfire init failed: {} - continuing without tracing", .{err});
123 break :blk null;
124 };
125 defer if (lf) |l| {
126 l.flush() catch {};
127 l.shutdown();
128 };
129
130 const args = parseArgs();
131
132 switch (args.command) {
133 .help => {
134 printHelp();
135 return;
136 },
137 .services_only => {
138 runServicesOnly() catch |err| {
139 if (shutdown_requested) return; // clean shutdown
140 log.err("server", "fatal: {}", .{err});
141 std.process.exit(1);
142 };
143 },
144 .server => {
145 runServer(args.no_services) catch |err| {
146 if (shutdown_requested) return; // clean shutdown
147 log.err("server", "fatal: {}", .{err});
148 std.process.exit(1);
149 };
150 },
151 }
152}
153
154fn runServicesOnly() !void {
155 log.info("services", "starting background services only...", .{});
156
157 setupSignalHandlers();
158
159 try initInfra();
160 defer {
161 log.info("shutdown", "cleaning up infrastructure...", .{});
162 deinitInfra();
163 log.info("shutdown", "complete", .{});
164 }
165
166 try services.startAll();
167 defer {
168 log.info("shutdown", "stopping services...", .{});
169 services.stopAll();
170 }
171
172 log.info("services", "all services running - send SIGTERM or SIGINT to stop", .{});
173
174 // block until shutdown signal
175 while (!shutdown_requested) {
176 posix.nanosleep(1, 0);
177 }
178
179 log.info("shutdown", "signal received, initiating graceful shutdown...", .{});
180}
181
182fn runServer(no_services: bool) !void {
183 const port: u16 = blk: {
184 const port_str = posix.getenv("PREFECT_SERVER_PORT") orelse "4200";
185 break :blk std.fmt.parseInt(u16, port_str, 10) catch 4200;
186 };
187 const host = posix.getenv("PREFECT_SERVER_API_HOST") orelse "127.0.0.1";
188
189 setupSignalHandlers();
190
191 try initInfra();
192 defer {
193 log.info("shutdown", "cleaning up infrastructure...", .{});
194 deinitInfra();
195 log.info("shutdown", "complete", .{});
196 }
197
198 if (!no_services) {
199 try services.startAll();
200 } else {
201 log.info("server", "running in API-only mode (--no-services)", .{});
202 }
203 defer if (!no_services) {
204 log.info("shutdown", "stopping services...", .{});
205 services.stopAll();
206 };
207
208 var listener = zap.HttpListener.init(.{
209 .port = port,
210 .interface = host.ptr,
211 .on_request = onRequest,
212 .on_upgrade = events.onUpgrade,
213 .log = true,
214 .max_clients = 1000,
215 .max_body_size = 16 * 1024 * 1024,
216 });
217
218 try listener.listen();
219 log.info("server", "listening on http://{s}:{d} - send SIGTERM or SIGINT to stop", .{ host, port });
220
221 // zap.start blocks until zap.stop() is called (via signal handler)
222 zap.start(.{
223 .threads = 4,
224 .workers = 1,
225 });
226
227 log.info("shutdown", "signal received, initiating graceful shutdown...", .{});
228}
229
230fn initInfra() !void {
231 log.info("database", "initializing...", .{});
232 try db.init();
233 log.info("database", "ready", .{});
234
235 const broker_dialect: broker.Dialect = blk: {
236 const broker_str = posix.getenv("PREFECT_BROKER_BACKEND") orelse "memory";
237 if (std.mem.eql(u8, broker_str, "redis")) {
238 break :blk .redis;
239 }
240 break :blk .memory;
241 };
242 broker.initBroker(std.heap.page_allocator, broker_dialect) catch |err| {
243 log.err("broker", "failed to initialize: {}", .{err});
244 return err;
245 };
246 log.info("broker", "ready ({s})", .{@tagName(broker_dialect)});
247
248 // Initialize lease storage (matches broker backend: redis for HA, database otherwise)
249 lease_storage.init() catch |err| {
250 log.err("leases", "failed to initialize lease storage: {}", .{err});
251 return err;
252 };
253}
254
255fn deinitInfra() void {
256 broker.deinitBroker();
257 db.close();
258}
259
260test {
261 _ = @import("db/backend.zig");
262 _ = @import("db/dialect.zig");
263 _ = @import("utilities/hashing.zig");
264 _ = @import("utilities/time.zig");
265 _ = @import("orchestration/types.zig");
266 _ = @import("orchestration/flow_rules.zig");
267 _ = @import("orchestration/transforms.zig");
268}