prefect server in zig

refactor: move websocket handlers to api/websocket.zig

main.zig now just composes the server entry point (57 lines).
websocket handlers moved to src/api/websocket.zig where they belong.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+264 -317
+260
src/api/websocket.zig
··· 1 + // websocket handlers for /events/in and /events/out 2 + 3 + const std = @import("std"); 4 + const zap = @import("zap"); 5 + const log = @import("../logging.zig"); 6 + const messaging = @import("../messaging.zig"); 7 + 8 + const WebSockets = zap.WebSockets; 9 + 10 + // ============================================================================ 11 + // Events In - event ingestion endpoint 12 + // ============================================================================ 13 + 14 + const EventsContext = struct {}; 15 + const EventsHandler = WebSockets.Handler(EventsContext); 16 + 17 + var events_context: EventsContext = .{}; 18 + var events_settings: EventsHandler.WebSocketSettings = .{ 19 + .on_open = onEventsOpen, 20 + .on_close = onEventsClose, 21 + .on_message = onEventsMessage, 22 + .context = &events_context, 23 + }; 24 + 25 + fn onEventsOpen(_: ?*EventsContext, _: WebSockets.WsHandle) !void { 26 + log.debug("websocket", "events connection opened", .{}); 27 + } 28 + 29 + fn onEventsClose(_: ?*EventsContext, _: isize) !void { 30 + log.debug("websocket", "events connection closed", .{}); 31 + } 32 + 33 + fn onEventsMessage(_: ?*EventsContext, _: WebSockets.WsHandle, message: []const u8, is_text: bool) !void { 34 + log.debug("events", "received message: {d} bytes, text={}", .{ message.len, is_text }); 35 + 36 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 37 + defer arena.deinit(); 38 + const alloc = arena.allocator(); 39 + 40 + const parsed = std.json.parseFromSlice(std.json.Value, alloc, message, .{}) catch { 41 + log.err("events", "failed to parse event json", .{}); 42 + return; 43 + }; 44 + 45 + const obj = parsed.value.object; 46 + 47 + const getString = struct { 48 + fn f(val: ?std.json.Value) ?[]const u8 { 49 + if (val) |v| { 50 + return switch (v) { 51 + .string => |s| s, 52 + else => null, 53 + }; 54 + } 55 + return null; 56 + } 57 + }.f; 58 + 59 + const stringifyJson = struct { 60 + fn f(a: std.mem.Allocator, val: ?std.json.Value, default: []const u8) []const u8 { 61 + if (val) |v| { 62 + const formatted = std.fmt.allocPrint(a, "{f}", .{std.json.fmt(v, .{})}) catch return default; 63 + return formatted; 64 + } 65 + return default; 66 + } 67 + }.f; 68 + 69 + const id = getString(obj.get("id")); 70 + const occurred = getString(obj.get("occurred")); 71 + const event_name = getString(obj.get("event")); 72 + 73 + if (id == null or occurred == null or event_name == null) { 74 + log.err("events", "event missing required fields", .{}); 75 + return; 76 + } 77 + 78 + var resource_id: []const u8 = ""; 79 + if (obj.get("resource")) |res| { 80 + if (res == .object) { 81 + resource_id = getString(res.object.get("prefect.resource.id")) orelse ""; 82 + } 83 + } 84 + 85 + const resource_json = stringifyJson(alloc, obj.get("resource"), "{}"); 86 + const payload_json = stringifyJson(alloc, obj.get("payload"), "{}"); 87 + const related_json = stringifyJson(alloc, obj.get("related"), "[]"); 88 + const follows = getString(obj.get("follows")); 89 + 90 + if (messaging.publishEvent(id.?, occurred.?, event_name.?, resource_id, resource_json, payload_json, related_json, follows)) { 91 + log.debug("events", "queued: {s}", .{event_name.?}); 92 + } else { 93 + log.warn("events", "dropped (backpressure): {s}", .{event_name.?}); 94 + } 95 + } 96 + 97 + // ============================================================================ 98 + // Events Out - subscription endpoint 99 + // ============================================================================ 100 + 101 + const SubscriberState = enum { waiting_auth, waiting_filter, subscribed }; 102 + 103 + const SubscriberContext = struct { 104 + state: SubscriberState = .waiting_auth, 105 + handle: WebSockets.WsHandle = null, 106 + }; 107 + 108 + const SubscriberHandler = WebSockets.Handler(SubscriberContext); 109 + 110 + const MAX_SUBSCRIBERS = 256; 111 + var subscriber_contexts: [MAX_SUBSCRIBERS]SubscriberContext = undefined; 112 + var subscriber_settings: [MAX_SUBSCRIBERS]SubscriberHandler.WebSocketSettings = undefined; 113 + var initialized: bool = false; 114 + 115 + fn initIfNeeded() void { 116 + if (initialized) return; 117 + for (0..MAX_SUBSCRIBERS) |i| { 118 + subscriber_contexts[i] = .{ .state = .waiting_auth, .handle = null }; 119 + subscriber_settings[i] = .{ 120 + .on_open = onSubscriberOpen, 121 + .on_close = onSubscriberClose, 122 + .on_message = onSubscriberMessage, 123 + .context = &subscriber_contexts[i], 124 + }; 125 + } 126 + initialized = true; 127 + } 128 + 129 + fn onSubscriberOpen(ctx: ?*SubscriberContext, handle: WebSockets.WsHandle) !void { 130 + log.debug("events-out", "subscriber connected", .{}); 131 + if (ctx) |c| { 132 + c.handle = handle; 133 + c.state = .waiting_auth; 134 + } 135 + } 136 + 137 + fn onSubscriberClose(ctx: ?*SubscriberContext, _: isize) !void { 138 + log.debug("events-out", "subscriber disconnected", .{}); 139 + if (ctx) |c| { 140 + if (c.handle) |h| messaging.removeSubscriber(h); 141 + c.handle = null; 142 + c.state = .waiting_auth; 143 + } 144 + } 145 + 146 + fn onSubscriberMessage(ctx: ?*SubscriberContext, handle: WebSockets.WsHandle, message: []const u8, _: bool) !void { 147 + const c = ctx orelse return; 148 + 149 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 150 + defer arena.deinit(); 151 + const alloc = arena.allocator(); 152 + 153 + const parsed = std.json.parseFromSlice(std.json.Value, alloc, message, .{}) catch { 154 + log.err("events-out", "invalid json from subscriber", .{}); 155 + SubscriberHandler.close(handle); 156 + return; 157 + }; 158 + 159 + const obj = parsed.value.object; 160 + const msg_type = if (obj.get("type")) |v| switch (v) { 161 + .string => |s| s, 162 + else => null, 163 + } else null; 164 + 165 + if (msg_type == null) { 166 + log.err("events-out", "message missing type field", .{}); 167 + SubscriberHandler.close(handle); 168 + return; 169 + } 170 + 171 + switch (c.state) { 172 + .waiting_auth => { 173 + if (!std.mem.eql(u8, msg_type.?, "auth")) { 174 + log.err("events-out", "expected auth message, got {s}", .{msg_type.?}); 175 + SubscriberHandler.close(handle); 176 + return; 177 + } 178 + SubscriberHandler.write(handle, "{\"type\":\"auth_success\"}", true) catch { 179 + log.err("events-out", "failed to send auth_success", .{}); 180 + return; 181 + }; 182 + c.state = .waiting_filter; 183 + log.debug("events-out", "auth successful, waiting for filter", .{}); 184 + }, 185 + .waiting_filter => { 186 + if (!std.mem.eql(u8, msg_type.?, "filter")) { 187 + log.err("events-out", "expected filter message, got {s}", .{msg_type.?}); 188 + SubscriberHandler.close(handle); 189 + return; 190 + } 191 + const filter = messaging.EventFilter.match_all; 192 + if (!messaging.addSubscriber(handle, filter)) { 193 + log.err("events-out", "failed to add subscriber (at capacity)", .{}); 194 + SubscriberHandler.close(handle); 195 + return; 196 + } 197 + c.state = .subscribed; 198 + log.info("events-out", "subscriber registered, total: {d}", .{messaging.getSubscriberCount()}); 199 + }, 200 + .subscribed => { 201 + log.debug("events-out", "ignoring message from subscribed client", .{}); 202 + }, 203 + } 204 + } 205 + 206 + // ============================================================================ 207 + // Public interface 208 + // ============================================================================ 209 + 210 + pub fn onUpgrade(r: zap.Request, target_protocol: []const u8) !void { 211 + const target = r.path orelse "/"; 212 + 213 + if (!std.mem.eql(u8, target_protocol, "websocket")) { 214 + r.setStatus(.bad_request); 215 + r.sendBody("{\"detail\":\"bad protocol\"}") catch {}; 216 + return; 217 + } 218 + 219 + // /events/in - event ingestion 220 + if (std.mem.eql(u8, target, "/events/in") or std.mem.eql(u8, target, "/api/events/in")) { 221 + log.debug("websocket", "upgrading {s} (events-in)", .{target}); 222 + EventsHandler.upgrade(r.h, &events_settings) catch |err| { 223 + log.err("websocket", "upgrade failed: {}", .{err}); 224 + r.setStatus(.internal_server_error); 225 + r.sendBody("{\"detail\":\"upgrade failed\"}") catch {}; 226 + }; 227 + return; 228 + } 229 + 230 + // /events/out - event subscription 231 + if (std.mem.eql(u8, target, "/events/out") or std.mem.eql(u8, target, "/api/events/out")) { 232 + log.debug("websocket", "upgrading {s} (events-out)", .{target}); 233 + initIfNeeded(); 234 + 235 + var slot: ?usize = null; 236 + for (0..MAX_SUBSCRIBERS) |i| { 237 + if (subscriber_contexts[i].handle == null) { 238 + slot = i; 239 + break; 240 + } 241 + } 242 + 243 + if (slot) |i| { 244 + subscriber_contexts[i].state = .waiting_auth; 245 + SubscriberHandler.upgrade(r.h, &subscriber_settings[i]) catch |err| { 246 + log.err("websocket", "events-out upgrade failed: {}", .{err}); 247 + r.setStatus(.internal_server_error); 248 + r.sendBody("{\"detail\":\"upgrade failed\"}") catch {}; 249 + }; 250 + } else { 251 + log.warn("websocket", "events-out at capacity", .{}); 252 + r.setStatus(.service_unavailable); 253 + r.sendBody("{\"detail\":\"too many subscribers\"}") catch {}; 254 + } 255 + return; 256 + } 257 + 258 + r.setStatus(.not_found); 259 + r.sendBody("{\"detail\":\"not found\"}") catch {}; 260 + }
+4 -317
src/main.zig
··· 4 4 5 5 const db = @import("db/sqlite.zig"); 6 6 const routes = @import("api/routes.zig"); 7 + const websocket = @import("api/websocket.zig"); 7 8 const log = @import("logging.zig"); 8 - const messaging = @import("messaging.zig"); 9 9 const services = @import("services/mod.zig"); 10 10 11 - // websocket handler for events 12 - const WebSockets = zap.WebSockets; 13 - 14 - const EventsContext = struct { 15 - // placeholder - could store connection metadata 16 - }; 17 - 18 - const EventsHandler = WebSockets.Handler(EventsContext); 19 - 20 - fn onEventsOpen(_: ?*EventsContext, _: WebSockets.WsHandle) !void { 21 - log.debug("websocket", "events connection opened", .{}); 22 - } 23 - 24 - fn onEventsClose(_: ?*EventsContext, _: isize) !void { 25 - log.debug("websocket", "events connection closed", .{}); 26 - } 27 - 28 - fn onEventsMessage(_: ?*EventsContext, _: WebSockets.WsHandle, message: []const u8, is_text: bool) !void { 29 - log.debug("events", "received message: {d} bytes, text={}", .{ message.len, is_text }); 30 - 31 - // use arena for temporary allocations during JSON stringification 32 - var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 33 - defer arena.deinit(); 34 - const alloc = arena.allocator(); 35 - 36 - // parse event json to extract required fields 37 - const parsed = std.json.parseFromSlice(std.json.Value, alloc, message, .{}) catch { 38 - log.err("events", "failed to parse event json", .{}); 39 - return; 40 - }; 41 - 42 - const obj = parsed.value.object; 43 - 44 - // helper to get string from json value 45 - const getString = struct { 46 - fn f(val: ?std.json.Value) ?[]const u8 { 47 - if (val) |v| { 48 - return switch (v) { 49 - .string => |s| s, 50 - else => null, 51 - }; 52 - } 53 - return null; 54 - } 55 - }.f; 56 - 57 - // helper to stringify json value using fmt 58 - const stringifyJson = struct { 59 - fn f(a: std.mem.Allocator, val: ?std.json.Value, default: []const u8) []const u8 { 60 - if (val) |v| { 61 - // use json.fmt which returns a formatter, {f} calls the format method 62 - const formatted = std.fmt.allocPrint(a, "{f}", .{std.json.fmt(v, .{})}) catch return default; 63 - return formatted; 64 - } 65 - return default; 66 - } 67 - }.f; 68 - 69 - // extract required fields 70 - const id = getString(obj.get("id")); 71 - const occurred = getString(obj.get("occurred")); 72 - const event_name = getString(obj.get("event")); 73 - 74 - if (id == null or occurred == null or event_name == null) { 75 - log.err("events", "event missing required fields", .{}); 76 - return; 77 - } 78 - 79 - // extract resource_id from resource object 80 - var resource_id: []const u8 = ""; 81 - if (obj.get("resource")) |res| { 82 - if (res == .object) { 83 - resource_id = getString(res.object.get("prefect.resource.id")) orelse ""; 84 - } 85 - } 86 - 87 - // stringify resource, payload, related objects for storage 88 - const resource_json = stringifyJson(alloc, obj.get("resource"), "{}"); 89 - const payload_json = stringifyJson(alloc, obj.get("payload"), "{}"); 90 - const related_json = stringifyJson(alloc, obj.get("related"), "[]"); 91 - const follows = getString(obj.get("follows")); 92 - 93 - // publish to messaging broker (async persistence) 94 - if (messaging.publishEvent(id.?, occurred.?, event_name.?, resource_id, resource_json, payload_json, related_json, follows)) { 95 - log.debug("events", "queued: {s}", .{event_name.?}); 96 - } else { 97 - log.warn("events", "dropped (backpressure): {s}", .{event_name.?}); 98 - } 99 - } 100 - 101 - var events_context: EventsContext = .{}; 102 - var events_settings: EventsHandler.WebSocketSettings = .{ 103 - .on_open = onEventsOpen, 104 - .on_close = onEventsClose, 105 - .on_message = onEventsMessage, 106 - .context = &events_context, 107 - }; 108 - 109 - // ============================================================================ 110 - // Events Out WebSocket Handler (subscription endpoint) 111 - // ============================================================================ 112 - 113 - const SubscriberState = enum { 114 - waiting_auth, 115 - waiting_filter, 116 - subscribed, 117 - }; 118 - 119 - const SubscriberContext = struct { 120 - state: SubscriberState = .waiting_auth, 121 - handle: WebSockets.WsHandle = null, 122 - }; 123 - 124 - const SubscriberHandler = WebSockets.Handler(SubscriberContext); 125 - 126 - // pool of subscriber contexts (reused for connections) 127 - const MAX_SUBSCRIBER_CONTEXTS = 256; 128 - var subscriber_contexts: [MAX_SUBSCRIBER_CONTEXTS]SubscriberContext = undefined; 129 - var contexts_initialized: bool = false; 130 - 131 - fn getAvailableContext() ?*SubscriberContext { 132 - if (!contexts_initialized) { 133 - for (&subscriber_contexts) |*ctx| { 134 - ctx.state = .waiting_auth; 135 - ctx.handle = null; 136 - } 137 - contexts_initialized = true; 138 - } 139 - 140 - for (&subscriber_contexts) |*ctx| { 141 - if (ctx.handle == null) { 142 - ctx.state = .waiting_auth; 143 - return ctx; 144 - } 145 - } 146 - return null; 147 - } 148 - 149 - fn onSubscriberOpen(ctx: ?*SubscriberContext, handle: WebSockets.WsHandle) !void { 150 - log.debug("events-out", "subscriber connected", .{}); 151 - if (ctx) |c| { 152 - c.handle = handle; 153 - c.state = .waiting_auth; 154 - } 155 - } 156 - 157 - fn onSubscriberClose(ctx: ?*SubscriberContext, _: isize) !void { 158 - log.debug("events-out", "subscriber disconnected", .{}); 159 - if (ctx) |c| { 160 - // remove from messaging subscribers 161 - if (c.handle) |h| { 162 - messaging.removeSubscriber(h); 163 - } 164 - c.handle = null; 165 - c.state = .waiting_auth; 166 - } 167 - } 168 - 169 - fn onSubscriberMessage(ctx: ?*SubscriberContext, handle: WebSockets.WsHandle, message: []const u8, _: bool) !void { 170 - const c = ctx orelse return; 171 - 172 - var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 173 - defer arena.deinit(); 174 - const alloc = arena.allocator(); 175 - 176 - const parsed = std.json.parseFromSlice(std.json.Value, alloc, message, .{}) catch { 177 - log.err("events-out", "invalid json from subscriber", .{}); 178 - SubscriberHandler.close(handle); 179 - return; 180 - }; 181 - 182 - const obj = parsed.value.object; 183 - const msg_type = if (obj.get("type")) |v| switch (v) { 184 - .string => |s| s, 185 - else => null, 186 - } else null; 187 - 188 - if (msg_type == null) { 189 - log.err("events-out", "message missing type field", .{}); 190 - SubscriberHandler.close(handle); 191 - return; 192 - } 193 - 194 - switch (c.state) { 195 - .waiting_auth => { 196 - if (!std.mem.eql(u8, msg_type.?, "auth")) { 197 - log.err("events-out", "expected auth message, got {s}", .{msg_type.?}); 198 - SubscriberHandler.close(handle); 199 - return; 200 - } 201 - // auth success (we don't actually validate tokens for now) 202 - SubscriberHandler.write(handle, "{\"type\":\"auth_success\"}", true) catch { 203 - log.err("events-out", "failed to send auth_success", .{}); 204 - return; 205 - }; 206 - c.state = .waiting_filter; 207 - log.debug("events-out", "auth successful, waiting for filter", .{}); 208 - }, 209 - .waiting_filter => { 210 - if (!std.mem.eql(u8, msg_type.?, "filter")) { 211 - log.err("events-out", "expected filter message, got {s}", .{msg_type.?}); 212 - SubscriberHandler.close(handle); 213 - return; 214 - } 215 - 216 - // parse filter (for now, just accept any filter and match all events) 217 - // TODO: parse filter.event.prefix etc. for actual filtering 218 - const filter = messaging.EventFilter.match_all; 219 - 220 - if (!messaging.addSubscriber(handle, filter)) { 221 - log.err("events-out", "failed to add subscriber (at capacity)", .{}); 222 - SubscriberHandler.close(handle); 223 - return; 224 - } 225 - 226 - c.state = .subscribed; 227 - log.info("events-out", "subscriber registered, total: {d}", .{messaging.getSubscriberCount()}); 228 - }, 229 - .subscribed => { 230 - // already subscribed, ignore further messages (or could handle filter updates) 231 - log.debug("events-out", "ignoring message from subscribed client", .{}); 232 - }, 233 - } 234 - } 235 - 236 - // we need individual settings per connection, so we create them dynamically 237 - fn createSubscriberSettings(ctx: *SubscriberContext) SubscriberHandler.WebSocketSettings { 238 - return .{ 239 - .on_open = onSubscriberOpen, 240 - .on_close = onSubscriberClose, 241 - .on_message = onSubscriberMessage, 242 - .context = ctx, 243 - }; 244 - } 245 - 246 11 fn onRequest(r: zap.Request) !void { 247 12 const method = r.method orelse "?"; 248 13 const path = r.path orelse "/"; ··· 257 22 log.debug("server", "{s} {s}", .{ method, path }); 258 23 } 259 24 260 - // static storage for subscriber settings (one per connection slot) 261 - var subscriber_settings: [MAX_SUBSCRIBER_CONTEXTS]SubscriberHandler.WebSocketSettings = undefined; 262 - var settings_initialized: bool = false; 263 - 264 - fn initSubscriberSettings() void { 265 - if (!settings_initialized) { 266 - for (0..MAX_SUBSCRIBER_CONTEXTS) |i| { 267 - subscriber_settings[i] = .{ 268 - .on_open = onSubscriberOpen, 269 - .on_close = onSubscriberClose, 270 - .on_message = onSubscriberMessage, 271 - .context = &subscriber_contexts[i], 272 - }; 273 - } 274 - settings_initialized = true; 275 - } 276 - } 277 - 278 - fn onUpgrade(r: zap.Request, target_protocol: []const u8) !void { 279 - const target = r.path orelse "/"; 280 - 281 - if (!std.mem.eql(u8, target_protocol, "websocket")) { 282 - r.setStatus(.bad_request); 283 - r.sendBody("{\"detail\":\"bad protocol\"}") catch {}; 284 - return; 285 - } 286 - 287 - // /events/in - event ingestion 288 - if (std.mem.eql(u8, target, "/events/in") or std.mem.eql(u8, target, "/api/events/in")) { 289 - log.debug("websocket", "upgrading {s} (events-in)", .{target}); 290 - EventsHandler.upgrade(r.h, &events_settings) catch |err| { 291 - log.err("websocket", "upgrade failed: {}", .{err}); 292 - r.setStatus(.internal_server_error); 293 - r.sendBody("{\"detail\":\"upgrade failed\"}") catch {}; 294 - }; 295 - return; 296 - } 297 - 298 - // /events/out - event subscription 299 - if (std.mem.eql(u8, target, "/events/out") or std.mem.eql(u8, target, "/api/events/out")) { 300 - log.debug("websocket", "upgrading {s} (events-out)", .{target}); 301 - 302 - // initialize settings if needed 303 - initSubscriberSettings(); 304 - 305 - // find available context slot 306 - var slot: ?usize = null; 307 - for (0..MAX_SUBSCRIBER_CONTEXTS) |i| { 308 - if (subscriber_contexts[i].handle == null) { 309 - slot = i; 310 - break; 311 - } 312 - } 313 - 314 - if (slot) |i| { 315 - subscriber_contexts[i].state = .waiting_auth; 316 - SubscriberHandler.upgrade(r.h, &subscriber_settings[i]) catch |err| { 317 - log.err("websocket", "events-out upgrade failed: {}", .{err}); 318 - r.setStatus(.internal_server_error); 319 - r.sendBody("{\"detail\":\"upgrade failed\"}") catch {}; 320 - }; 321 - } else { 322 - log.warn("websocket", "events-out at capacity", .{}); 323 - r.setStatus(.service_unavailable); 324 - r.sendBody("{\"detail\":\"too many subscribers\"}") catch {}; 325 - } 326 - return; 327 - } 328 - 329 - r.setStatus(.not_found); 330 - r.sendBody("{\"detail\":\"not found\"}") catch {}; 331 - } 332 - 333 25 pub fn main() !void { 334 - // init logging first 335 26 log.init(); 336 27 337 28 const port: u16 = blk: { ··· 339 30 break :blk std.fmt.parseInt(u16, port_str, 10) catch 4200; 340 31 }; 341 32 342 - // init database 343 33 log.info("database", "initializing...", .{}); 344 34 try db.init(); 345 35 defer db.close(); 346 36 log.info("database", "ready", .{}); 347 37 348 - // start background services 349 38 try services.startAll(); 350 39 defer services.stopAll(); 351 40 352 41 var listener = zap.HttpListener.init(.{ 353 42 .port = port, 354 43 .on_request = onRequest, 355 - .on_upgrade = onUpgrade, 356 - .log = true, // facil.io request logging 44 + .on_upgrade = websocket.onUpgrade, 45 + .log = true, 357 46 .max_clients = 1000, 358 - .max_body_size = 16 * 1024 * 1024, // 16MB 47 + .max_body_size = 16 * 1024 * 1024, 359 48 }); 360 49 361 50 try listener.listen(); 362 - 363 51 log.info("server", "listening on http://0.0.0.0:{d}", .{port}); 364 52 365 - // start event loop 366 53 zap.start(.{ 367 54 .threads = 4, 368 55 .workers = 1,