prefect server in zig
at f511403a2b901063559cd17995b45527418e76c6 187 lines 5.6 kB view raw
1const std = @import("std"); 2const zap = @import("zap"); 3const posix = std.posix; 4 5const db = @import("db/sqlite.zig"); 6const backend = @import("db/backend.zig"); 7const broker = @import("broker.zig"); 8const routes = @import("api/routes.zig"); 9const events = @import("api/events.zig"); 10const log = @import("logging.zig"); 11const services = @import("services.zig"); 12 13fn onRequest(r: zap.Request) !void { 14 const method = r.method orelse "?"; 15 const path = r.path orelse "/"; 16 17 routes.handle(r) catch |err| { 18 log.err("server", "{s} {s} - error: {}", .{ method, path, err }); 19 r.setStatus(.internal_server_error); 20 r.sendBody("{\"detail\":\"internal error\"}") catch {}; 21 return; 22 }; 23 24 log.debug("server", "{s} {s}", .{ method, path }); 25} 26 27const Command = enum { server, services_only, help }; 28 29const Args = struct { 30 command: Command = .server, 31 no_services: bool = false, 32}; 33 34fn parseArgs() Args { 35 var args = Args{}; 36 var iter = std.process.args(); 37 _ = iter.next(); // skip program name 38 39 while (iter.next()) |arg| { 40 if (std.mem.eql(u8, arg, "services")) { 41 args.command = .services_only; 42 } else if (std.mem.eql(u8, arg, "--no-services")) { 43 args.no_services = true; 44 } else if (std.mem.eql(u8, arg, "--help") or std.mem.eql(u8, arg, "-h")) { 45 args.command = .help; 46 } 47 } 48 return args; 49} 50 51fn printHelp() void { 52 const help = 53 \\prefect-server - zig implementation of prefect server 54 \\ 55 \\usage: 56 \\ prefect-server [options] run API server (default: with background services) 57 \\ prefect-server services run background services only (no API server) 58 \\ 59 \\options: 60 \\ --no-services run API server without background services 61 \\ --help, -h show this help message 62 \\ 63 \\environment variables: 64 \\ PREFECT_SERVER_PORT port to listen on (default: 4200) 65 \\ PREFECT_SERVER_API_HOST host to bind to (default: 127.0.0.1) 66 \\ PREFECT_DATABASE_BACKEND sqlite or postgres (default: sqlite) 67 \\ PREFECT_DATABASE_PATH sqlite database path (default: prefect.db) 68 \\ PREFECT_DATABASE_URL postgres connection URL 69 \\ PREFECT_BROKER_BACKEND memory or redis (default: memory) 70 \\ PREFECT_REDIS_MESSAGING_HOST redis host (default: localhost) 71 \\ PREFECT_REDIS_MESSAGING_PORT redis port (default: 6379) 72 \\ PREFECT_SERVER_LOGGING_LEVEL DEBUG, INFO, WARNING, ERROR (default: INFO) 73 \\ 74 \\horizontal scaling: 75 \\ prefect-server --no-services # run N API server instances 76 \\ prefect-server services # run 1 background services instance 77 \\ 78 ; 79 const stdout = std.fs.File.stdout(); 80 stdout.writeAll(help) catch {}; 81} 82 83pub fn main() !void { 84 log.init(); 85 86 const args = parseArgs(); 87 88 switch (args.command) { 89 .help => { 90 printHelp(); 91 return; 92 }, 93 .services_only => { 94 try runServicesOnly(); 95 }, 96 .server => { 97 try runServer(args.no_services); 98 }, 99 } 100} 101 102fn runServicesOnly() !void { 103 log.info("services", "starting background services only...", .{}); 104 105 try initInfra(); 106 defer deinitInfra(); 107 108 try services.startAll(); 109 defer services.stopAll(); 110 111 log.info("services", "all services running - press Ctrl+C to stop", .{}); 112 113 // block forever (services run in background threads) 114 while (true) { 115 posix.nanosleep(60, 0); 116 } 117} 118 119fn runServer(no_services: bool) !void { 120 const port: u16 = blk: { 121 const port_str = posix.getenv("PREFECT_SERVER_PORT") orelse "4200"; 122 break :blk std.fmt.parseInt(u16, port_str, 10) catch 4200; 123 }; 124 const host = posix.getenv("PREFECT_SERVER_API_HOST") orelse "127.0.0.1"; 125 126 try initInfra(); 127 defer deinitInfra(); 128 129 if (!no_services) { 130 try services.startAll(); 131 } else { 132 log.info("server", "running in API-only mode (--no-services)", .{}); 133 } 134 defer if (!no_services) services.stopAll(); 135 136 var listener = zap.HttpListener.init(.{ 137 .port = port, 138 .interface = host.ptr, 139 .on_request = onRequest, 140 .on_upgrade = events.onUpgrade, 141 .log = true, 142 .max_clients = 1000, 143 .max_body_size = 16 * 1024 * 1024, 144 }); 145 146 try listener.listen(); 147 log.info("server", "listening on http://{s}:{d}", .{ host, port }); 148 149 zap.start(.{ 150 .threads = 4, 151 .workers = 1, 152 }); 153} 154 155fn initInfra() !void { 156 log.info("database", "initializing...", .{}); 157 try db.init(); 158 log.info("database", "ready", .{}); 159 160 const broker_dialect: broker.Dialect = blk: { 161 const broker_str = posix.getenv("PREFECT_BROKER_BACKEND") orelse "memory"; 162 if (std.mem.eql(u8, broker_str, "redis")) { 163 break :blk .redis; 164 } 165 break :blk .memory; 166 }; 167 broker.initBroker(std.heap.page_allocator, broker_dialect) catch |err| { 168 log.err("broker", "failed to initialize: {}", .{err}); 169 return err; 170 }; 171 log.info("broker", "ready ({s})", .{@tagName(broker_dialect)}); 172} 173 174fn deinitInfra() void { 175 broker.deinitBroker(); 176 db.close(); 177} 178 179test { 180 _ = @import("db/backend.zig"); 181 _ = @import("db/dialect.zig"); 182 _ = @import("utilities/hashing.zig"); 183 _ = @import("utilities/time.zig"); 184 _ = @import("orchestration/types.zig"); 185 _ = @import("orchestration/flow_rules.zig"); 186 _ = @import("orchestration/transforms.zig"); 187}