prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const posix = std.posix;
4
5const db = @import("db/sqlite.zig");
6const backend = @import("db/backend.zig");
7const broker = @import("broker.zig");
8const routes = @import("api/routes.zig");
9const events = @import("api/events.zig");
10const log = @import("logging.zig");
11const services = @import("services.zig");
12
13fn onRequest(r: zap.Request) !void {
14 const method = r.method orelse "?";
15 const path = r.path orelse "/";
16
17 routes.handle(r) catch |err| {
18 log.err("server", "{s} {s} - error: {}", .{ method, path, err });
19 r.setStatus(.internal_server_error);
20 r.sendBody("{\"detail\":\"internal error\"}") catch {};
21 return;
22 };
23
24 log.debug("server", "{s} {s}", .{ method, path });
25}
26
27const Command = enum { server, services_only, help };
28
29const Args = struct {
30 command: Command = .server,
31 no_services: bool = false,
32};
33
34fn parseArgs() Args {
35 var args = Args{};
36 var iter = std.process.args();
37 _ = iter.next(); // skip program name
38
39 while (iter.next()) |arg| {
40 if (std.mem.eql(u8, arg, "services")) {
41 args.command = .services_only;
42 } else if (std.mem.eql(u8, arg, "--no-services")) {
43 args.no_services = true;
44 } else if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) {
45 args.command = .help;
46 }
47 }
48 return args;
49}
50
51fn printHelp() void {
52 const help =
53 \\prefect-server - zig implementation of prefect server
54 \\
55 \\usage:
56 \\ prefect-server [options] run API server (default: with background services)
57 \\ prefect-server services run background services only (no API server)
58 \\
59 \\options:
60 \\ --no-services run API server without background services
61 \\ --help, -h show this help message
62 \\
63 \\environment variables:
64 \\ PREFECT_SERVER_PORT port to listen on (default: 4200)
65 \\ PREFECT_SERVER_API_HOST host to bind to (default: 127.0.0.1)
66 \\ PREFECT_DATABASE_BACKEND sqlite or postgres (default: sqlite)
67 \\ PREFECT_DATABASE_PATH sqlite database path (default: prefect.db)
68 \\ PREFECT_DATABASE_URL postgres connection URL
69 \\ PREFECT_BROKER_BACKEND memory or redis (default: memory)
70 \\ PREFECT_REDIS_MESSAGING_HOST redis host (default: localhost)
71 \\ PREFECT_REDIS_MESSAGING_PORT redis port (default: 6379)
72 \\ PREFECT_SERVER_LOGGING_LEVEL DEBUG, INFO, WARNING, ERROR (default: INFO)
73 \\
74 \\horizontal scaling:
75 \\ prefect-server --no-services # run N API server instances
76 \\ prefect-server services # run 1 background services instance
77 \\
78 ;
79 const stdout = std.fs.File.stdout();
80 stdout.writeAll(help) catch {};
81}
82
83pub fn main() !void {
84 log.init();
85
86 const args = parseArgs();
87
88 switch (args.command) {
89 .help => {
90 printHelp();
91 return;
92 },
93 .services_only => {
94 try runServicesOnly();
95 },
96 .server => {
97 try runServer(args.no_services);
98 },
99 }
100}
101
102fn runServicesOnly() !void {
103 log.info("services", "starting background services only...", .{});
104
105 try initInfra();
106 defer deinitInfra();
107
108 try services.startAll();
109 defer services.stopAll();
110
111 log.info("services", "all services running - press Ctrl+C to stop", .{});
112
113 // block forever (services run in background threads)
114 while (true) {
115 posix.nanosleep(60, 0);
116 }
117}
118
119fn runServer(no_services: bool) !void {
120 const port: u16 = blk: {
121 const port_str = posix.getenv("PREFECT_SERVER_PORT") orelse "4200";
122 break :blk std.fmt.parseInt(u16, port_str, 10) catch 4200;
123 };
124 const host = posix.getenv("PREFECT_SERVER_API_HOST") orelse "127.0.0.1";
125
126 try initInfra();
127 defer deinitInfra();
128
129 if (!no_services) {
130 try services.startAll();
131 } else {
132 log.info("server", "running in API-only mode (--no-services)", .{});
133 }
134 defer if (!no_services) services.stopAll();
135
136 var listener = zap.HttpListener.init(.{
137 .port = port,
138 .interface = host.ptr,
139 .on_request = onRequest,
140 .on_upgrade = events.onUpgrade,
141 .log = true,
142 .max_clients = 1000,
143 .max_body_size = 16 * 1024 * 1024,
144 });
145
146 try listener.listen();
147 log.info("server", "listening on http://{s}:{d}", .{ host, port });
148
149 zap.start(.{
150 .threads = 4,
151 .workers = 1,
152 });
153}
154
155fn initInfra() !void {
156 log.info("database", "initializing...", .{});
157 try db.init();
158 log.info("database", "ready", .{});
159
160 const broker_dialect: broker.Dialect = blk: {
161 const broker_str = posix.getenv("PREFECT_BROKER_BACKEND") orelse "memory";
162 if (std.mem.eql(u8, broker_str, "redis")) {
163 break :blk .redis;
164 }
165 break :blk .memory;
166 };
167 broker.initBroker(std.heap.page_allocator, broker_dialect) catch |err| {
168 log.err("broker", "failed to initialize: {}", .{err});
169 return err;
170 };
171 log.info("broker", "ready ({s})", .{@tagName(broker_dialect)});
172}
173
174fn deinitInfra() void {
175 broker.deinitBroker();
176 db.close();
177}
178
179test {
180 _ = @import("db/backend.zig");
181 _ = @import("db/dialect.zig");
182 _ = @import("utilities/hashing.zig");
183 _ = @import("utilities/time.zig");
184 _ = @import("orchestration/types.zig");
185 _ = @import("orchestration/flow_rules.zig");
186 _ = @import("orchestration/transforms.zig");
187}