prefect server in zig

add event persistence, websocket support, service architecture

- websocket endpoint /api/events/in for receiving client events
- event persistence with batching, deduplication, retention trimming
- messaging layer with bounded channel (50k capacity) for backpressure
- service registry pattern for background workers
- uuid format fix: store/return standard 36-char dashed format
- structured logging with level control via env vars
- GET /api/flows/{id} endpoint
- justfile for dev commands
- test scripts for python client
- CLAUDE.md and ROADMAP.md for project documentation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+1507 -203
+1
.gitignore
··· 4 4 *.db-wal 5 5 *.db-shm 6 6 .loq_cache 7 + uv.lock
+66
CLAUDE.md
··· 1 + # prefect-zig 2 + 3 + zig 0.15.2 implementation of prefect server using zap (facil.io). 4 + 5 + ## build 6 + 7 + ``` 8 + zig build 9 + ``` 10 + 11 + ## binary 12 + 13 + `./zig-out/bin/prefect-server` 14 + 15 + ## dev 16 + 17 + ``` 18 + just dev # builds, clears db, runs server with DEBUG logging 19 + just test # runs test_flow.py against localhost:4200 20 + ``` 21 + 22 + ## env vars 23 + 24 + - `PREFECT_SERVER_LOGGING_LEVEL` / `PREFECT_LOGGING_LEVEL`: DEBUG, INFO, WARNING, ERROR, CRITICAL 25 + - `PREFECT_SERVER_PORT`: default 4200 26 + - `PREFECT_DATABASE_PATH`: default prefect.db 27 + 28 + ## implemented endpoints 29 + 30 + - GET /api/health 31 + - GET /api/csrf-token 32 + - GET /api/admin/version 33 + - POST /api/flows/ 34 + - GET /api/flows/{id} 35 + - POST /api/flow_runs/ 36 + - GET /api/flow_runs/{id} 37 + - POST /api/flow_runs/{id}/set_state 38 + - POST /api/logs/ 39 + - WS /api/events/in 40 + 41 + ## database 42 + 43 + sqlite via zqlite. tables: flow, flow_run, flow_run_state, events 44 + 45 + ## architecture 46 + 47 + ### messaging (src/messaging.zig) 48 + - bounded channel (50k capacity) for backpressure 49 + - publishEvent() queues events, drops + logs when full 50 + 51 + ### services (src/services/) 52 + - event_persister: background worker that batches events and flushes to db 53 + - batch size: 100, flush interval: 1s 54 + - deduplication via INSERT OR IGNORE 55 + - retention trimming: 7 days, checked hourly 56 + - services/mod.zig: lifecycle manager (startAll/stopAll) 57 + 58 + ### event flow 59 + 1. WebSocket receives event on /api/events/in 60 + 2. onEventsMessage parses JSON, extracts id/occurred/event/resource_id 61 + 3. messaging.publishEvent() queues to bounded channel 62 + 4. event_persister worker drains batches and writes to db 63 + 64 + ## roadmap 65 + 66 + see ROADMAP.md for implementation status vs python prefect server
+139
ROADMAP.md
··· 1 + # prefect-zig roadmap 2 + 3 + comparison with prefect server (python). checkmarks indicate implemented. 4 + 5 + ## api endpoints 6 + 7 + ### core (required for basic flow execution) 8 + - [x] GET /api/health 9 + - [x] GET /api/csrf-token 10 + - [x] GET /api/admin/version 11 + - [x] POST /api/flows/ 12 + - [x] GET /api/flows/{id} 13 + - [ ] POST /api/flows/filter 14 + - [ ] PATCH /api/flows/{id} 15 + - [ ] DELETE /api/flows/{id} 16 + - [x] POST /api/flow_runs/ 17 + - [x] GET /api/flow_runs/{id} 18 + - [x] POST /api/flow_runs/{id}/set_state 19 + - [ ] POST /api/flow_runs/filter 20 + - [ ] PATCH /api/flow_runs/{id} 21 + - [ ] DELETE /api/flow_runs/{id} 22 + - [x] POST /api/logs/ 23 + - [x] WS /api/events/in 24 + 25 + ### task runs 26 + - [ ] POST /api/task_runs/ 27 + - [ ] GET /api/task_runs/{id} 28 + - [ ] POST /api/task_runs/{id}/set_state 29 + - [ ] POST /api/task_runs/filter 30 + 31 + ### state endpoints 32 + - [ ] GET /api/flow_run_states/{id} 33 + - [ ] POST /api/flow_run_states/filter 34 + - [ ] GET /api/task_run_states/{id} 35 + - [ ] POST /api/task_run_states/filter 36 + 37 + ### deployments 38 + - [ ] POST /api/deployments/ 39 + - [ ] GET /api/deployments/{id} 40 + - [ ] POST /api/deployments/filter 41 + - [ ] PATCH /api/deployments/{id} 42 + - [ ] DELETE /api/deployments/{id} 43 + - [ ] POST /api/deployments/{id}/schedule 44 + - [ ] POST /api/deployments/{id}/create_flow_run 45 + 46 + ### work pools & workers 47 + - [ ] POST /api/work_pools/ 48 + - [ ] GET /api/work_pools/{name} 49 + - [ ] POST /api/work_pools/filter 50 + - [ ] POST /api/work_pools/{name}/queues/ 51 + - [ ] POST /api/work_pools/{name}/workers/heartbeat 52 + - [ ] POST /api/work_pools/{name}/get_scheduled_flow_runs 53 + 54 + ### blocks 55 + - [ ] POST /api/block_types/ 56 + - [ ] POST /api/block_types/filter 57 + - [ ] POST /api/block_documents/ 58 + - [ ] POST /api/block_documents/filter 59 + - [ ] POST /api/block_schemas/ 60 + - [ ] POST /api/block_capabilities/ 61 + 62 + ### concurrency 63 + - [ ] POST /api/concurrency_limits/ 64 + - [ ] POST /api/concurrency_limits/filter 65 + - [ ] POST /api/v2/concurrency_limits/ 66 + 67 + ### artifacts 68 + - [ ] POST /api/artifacts/ 69 + - [ ] POST /api/artifacts/filter 70 + 71 + ### automations 72 + - [ ] POST /api/automations/ 73 + - [ ] POST /api/automations/filter 74 + 75 + ### variables 76 + - [ ] POST /api/variables/ 77 + - [ ] POST /api/variables/filter 78 + 79 + ### events 80 + - [ ] POST /api/events/filter 81 + - [ ] WS /api/events/out (subscribe) 82 + 83 + ### ui endpoints 84 + - [ ] POST /api/ui/flows/ 85 + - [ ] POST /api/ui/flow_runs/ 86 + - [ ] POST /api/ui/task_runs/ 87 + - [ ] GET /api/ui/schemas/ 88 + 89 + ## background services 90 + 91 + - [x] event_persister (batched event writes, deduplication, retention trimming) 92 + - [ ] scheduler (create flow runs from deployment schedules) 93 + - [ ] late_runs (mark runs as late) 94 + - [ ] foreman (infrastructure management) 95 + - [ ] cancellation_cleanup (clean up cancelled runs) 96 + - [ ] pause_expirations (expire paused runs) 97 + - [ ] task_run_recorder (record task run events) 98 + - [ ] telemetry 99 + 100 + ## database 101 + 102 + - [x] flow table 103 + - [x] flow_run table 104 + - [x] flow_run_state table 105 + - [x] events table 106 + - [ ] task_run table 107 + - [ ] task_run_state table 108 + - [ ] deployment table 109 + - [ ] work_pool table 110 + - [ ] work_queue table 111 + - [ ] worker table 112 + - [ ] block_type table 113 + - [ ] block_document table 114 + - [ ] block_schema table 115 + - [ ] concurrency_limit table 116 + - [ ] artifact table 117 + - [ ] automation table 118 + - [ ] variable table 119 + - [ ] log table (currently in-memory only) 120 + 121 + ## infrastructure 122 + 123 + - [x] http server (zap/facil.io) 124 + - [x] websocket support 125 + - [x] sqlite database (zqlite) 126 + - [x] structured logging 127 + - [x] service lifecycle management 128 + - [ ] migrations 129 + - [ ] connection pooling 130 + - [ ] graceful shutdown 131 + - [ ] configuration file support 132 + 133 + ## notes 134 + 135 + priority order for next work: 136 + 1. task_runs endpoints (needed for full task tracking) 137 + 2. deployments (needed for scheduled/triggered runs) 138 + 3. work_pools (needed for worker-based execution) 139 + 4. scheduler service (needed for deployment schedules)
+8
build.zig
··· 14 14 .optimize = optimize, 15 15 }); 16 16 17 + const zap = b.dependency("zap", .{ 18 + .target = target, 19 + .optimize = optimize, 20 + .openssl = false, 21 + }); 22 + 17 23 const exe = b.addExecutable(.{ 18 24 .name = "prefect-server", 19 25 .root_module = b.createModule(.{ ··· 23 29 .imports = &.{ 24 30 .{ .name = "uuid", .module = uuid_dep.module("uuid") }, 25 31 .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 32 + .{ .name = "zap", .module = zap.module("zap") }, 26 33 }, 27 34 }), 28 35 }); ··· 50 57 .imports = &.{ 51 58 .{ .name = "uuid", .module = uuid_dep.module("uuid") }, 52 59 .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 60 + .{ .name = "zap", .module = zap.module("zap") }, 53 61 }, 54 62 }), 55 63 });
+4
build.zig.zon
··· 12 12 .url = "https://github.com/karlseguin/zqlite.zig/archive/refs/heads/master.tar.gz", 13 13 .hash = "zqlite-0.0.0-RWLaY_y_mADh2LdbDrG_2HT2dBAcsAR8Jig_7-dOJd0B", 14 14 }, 15 + .zap = .{ 16 + .url = "git+https://github.com/zigzap/zap?ref=v0.11.0#66c5dc42c781bbb8a9100afda3c7e69ee96eddf3", 17 + .hash = "zap-0.10.6-GoeB8xCEJABLgoiZjWZMMT5TsoZ5OO2EZe6j24RTUYEH", 18 + }, 15 19 }, 16 20 .paths = .{ 17 21 "build.zig",
+12
justfile
··· 1 + default: 2 + @just --list 3 + 4 + build: 5 + zig build 6 + 7 + dev: build 8 + rm -f prefect.db 9 + PREFECT_SERVER_LOGGING_LEVEL=DEBUG ./zig-out/bin/prefect-server 10 + 11 + test: 12 + PREFECT_API_URL=http://localhost:4200/api uv run python test_flow.py
+38
scripts/test-with-client
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["prefect>=3"] 5 + # /// 6 + """ 7 + Test prefect-zig using the actual Prefect client. 8 + """ 9 + 10 + import os 11 + 12 + os.environ.setdefault("PREFECT_API_URL", "http://localhost:4200/api") 13 + 14 + from prefect import flow, task 15 + 16 + 17 + @task 18 + def add(a: int, b: int) -> int: 19 + print(f"adding {a} + {b}") 20 + return a + b 21 + 22 + 23 + @task 24 + def multiply(a: int, b: int) -> int: 25 + print(f"multiplying {a} * {b}") 26 + return a * b 27 + 28 + 29 + @flow(log_prints=True) 30 + def math_flow(x: int, y: int) -> int: 31 + sum_result = add(x, y) 32 + product = multiply(sum_result, 2) 33 + return product 34 + 35 + 36 + if __name__ == "__main__": 37 + result = math_flow(3, 4) 38 + print(f"result: {result}")
+16 -10
src/api/admin.zig
··· 1 1 const std = @import("std"); 2 - const http = std.http; 2 + const zap = @import("zap"); 3 3 const mem = std.mem; 4 4 5 - const http_server = @import("../server/http.zig"); 5 + fn sendJson(r: zap.Request, body: []const u8) void { 6 + r.setHeader("content-type", "application/json") catch {}; 7 + r.setHeader("access-control-allow-origin", "*") catch {}; 8 + r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {}; 9 + r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {}; 10 + r.sendBody(body) catch {}; 11 + } 6 12 7 - pub fn health(request: *http.Server.Request) !void { 8 - try http_server.sendJson(request, "{\"status\":\"healthy\"}"); 13 + pub fn health(r: zap.Request) !void { 14 + sendJson(r, "{\"status\":\"healthy\"}"); 9 15 } 10 16 11 - pub fn csrfToken(request: *http.Server.Request) !void { 17 + pub fn csrfToken(r: zap.Request) !void { 12 18 // prefect client expects a csrf token with client id and expiration 13 19 // extract client from query string: /api/csrf-token?client=... 14 - const target = request.head.target; 20 + const target = r.path orelse "/"; 15 21 var client_id: []const u8 = "unknown"; 16 22 17 23 if (mem.indexOf(u8, target, "client=")) |start| { ··· 27 33 const response = std.fmt.bufPrint(&buf, 28 34 \\{{"token":"zig-csrf-token","client":"{s}","expiration":"2099-01-01T00:00:00Z"}} 29 35 , .{client_id}) catch { 30 - try http_server.sendJson(request, "{\"token\":\"zig-csrf-token\",\"client\":\"unknown\",\"expiration\":\"2099-01-01T00:00:00Z\"}"); 36 + sendJson(r, "{\"token\":\"zig-csrf-token\",\"client\":\"unknown\",\"expiration\":\"2099-01-01T00:00:00Z\"}"); 31 37 return; 32 38 }; 33 - try http_server.sendJson(request, response); 39 + sendJson(r, response); 34 40 } 35 41 36 - pub fn version(request: *http.Server.Request) !void { 42 + pub fn version(r: zap.Request) !void { 37 43 // return version as JSON string (FastAPI behavior) 38 44 // must return 3.x to match Python client major version 39 - try http_server.sendJson(request, "\"3.0.0\""); 45 + sendJson(r, "\"3.0.0\""); 40 46 }
+33
src/api/common.zig
··· 66 66 return alloc.dupe(u8, &hex_buf) catch "00000000000000000000000000000000"; 67 67 } 68 68 69 + // buffer-based version - writes to provided buffer, returns slice 70 + // returns standard 36-char dashed UUID format (xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx) 71 + pub fn generateUuidBuf(buf: *[36]u8) []const u8 { 72 + const id = uuid.v4.new(); 73 + const urn = uuid.urn.serialize(id); 74 + @memcpy(buf, &urn); 75 + return buf[0..36]; 76 + } 77 + 78 + // buffer-based normalize - writes to provided buffer, returns slice 79 + pub fn normalizeUuidBuf(input_uuid: []const u8, buf: *[32]u8) []const u8 { 80 + if (input_uuid.len == 32) { 81 + @memcpy(buf, input_uuid[0..32]); 82 + return buf[0..32]; 83 + } 84 + if (input_uuid.len != 36) { 85 + // invalid, return as-is (caller must handle) 86 + const len = @min(input_uuid.len, 32); 87 + @memcpy(buf[0..len], input_uuid[0..len]); 88 + return buf[0..len]; 89 + } 90 + 91 + var j: usize = 0; 92 + for (input_uuid) |c| { 93 + if (c != '-') { 94 + if (j >= 32) break; 95 + buf[j] = c; 96 + j += 1; 97 + } 98 + } 99 + return buf[0..j]; 100 + } 101 + 69 102 pub fn generateRunName(alloc: std.mem.Allocator) []const u8 { 70 103 const adjectives = [_][]const u8{ "happy", "quick", "brave", "calm", "eager" }; 71 104 const nouns = [_][]const u8{ "panda", "tiger", "eagle", "dolphin", "falcon" };
+81 -70
src/api/flow_runs.zig
··· 1 1 const std = @import("std"); 2 - const http = std.http; 2 + const zap = @import("zap"); 3 3 const mem = std.mem; 4 4 const json = std.json; 5 5 6 6 const db = @import("../db/sqlite.zig"); 7 - const http_server = @import("../server/http.zig"); 8 7 const common = @import("common.zig"); 9 8 9 + fn sendJson(r: zap.Request, body: []const u8) void { 10 + r.setHeader("content-type", "application/json") catch {}; 11 + r.setHeader("access-control-allow-origin", "*") catch {}; 12 + r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {}; 13 + r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {}; 14 + r.sendBody(body) catch {}; 15 + } 16 + 17 + fn sendJsonStatus(r: zap.Request, body: []const u8, status: zap.http.StatusCode) void { 18 + r.setStatus(status); 19 + sendJson(r, body); 20 + } 21 + 10 22 // POST /flow_runs/ - create flow run 11 23 // GET /flow_runs/{id} - read flow run 12 24 // POST /flow_runs/{id}/set_state - set state 13 25 // POST /flow_runs/filter - list flow runs 14 - pub fn handle(request: *http.Server.Request, target: []const u8) !void { 15 - const method = request.head.method; 26 + pub fn handle(r: zap.Request) !void { 27 + const target = r.path orelse "/"; 28 + const method = r.method orelse "GET"; 16 29 17 30 // POST /flow_runs/ - create 18 - if (method == .POST and (mem.eql(u8, target, "/flow_runs/") or mem.eql(u8, target, "/api/flow_runs/"))) { 19 - try create(request); 31 + if (mem.eql(u8, method, "POST") and (mem.eql(u8, target, "/flow_runs/") or mem.eql(u8, target, "/api/flow_runs/"))) { 32 + try create(r); 20 33 return; 21 34 } 22 35 23 36 // POST /flow_runs/filter - list 24 - if (method == .POST and (mem.endsWith(u8, target, "/filter"))) { 25 - try filter(request); 37 + if (mem.eql(u8, method, "POST") and (mem.endsWith(u8, target, "/filter"))) { 38 + try filter(r); 26 39 return; 27 40 } 28 41 29 42 // check for /{id}/set_state 30 - if (method == .POST and mem.endsWith(u8, target, "/set_state")) { 43 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/set_state")) { 31 44 const id = common.extractId(target, "/flow_runs/", "/set_state") orelse 32 45 common.extractId(target, "/api/flow_runs/", "/set_state"); 33 46 if (id) |flow_run_id| { 34 - try setState(request, flow_run_id); 47 + try setState(r, flow_run_id); 35 48 return; 36 49 } 37 50 } 38 51 39 52 // GET /flow_runs/{id} - read single 40 - if (method == .GET) { 53 + if (mem.eql(u8, method, "GET")) { 41 54 const id = common.extractIdSimple(target, "/flow_runs/") orelse 42 55 common.extractIdSimple(target, "/api/flow_runs/"); 43 56 if (id) |flow_run_id| { 44 - try read(request, flow_run_id); 57 + try read(r, flow_run_id); 45 58 return; 46 59 } 47 60 } 48 61 49 - try http_server.sendJsonStatus(request, "{\"detail\":\"not found\"}", .not_found); 62 + sendJsonStatus(r, "{\"detail\":\"not found\"}", .not_found); 50 63 } 51 64 52 - fn create(request: *http.Server.Request) !void { 65 + fn create(r: zap.Request) !void { 53 66 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 54 67 defer arena.deinit(); 55 68 const alloc = arena.allocator(); 56 69 57 - var body_buf: [16384]u8 = undefined; 58 - request.head.expect = null; 59 - const body_reader = request.readerExpectNone(&body_buf); 60 - const body = body_reader.allocRemaining(alloc, .unlimited) catch { 61 - try http_server.sendJsonStatus(request, "{\"detail\":\"failed to read body\"}", .bad_request); 70 + const body = r.body orelse { 71 + sendJsonStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 62 72 return; 63 73 }; 64 74 65 75 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 66 - try http_server.sendJsonStatus(request, "{\"detail\":\"invalid json\"}", .bad_request); 76 + sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 67 77 return; 68 78 }; 69 79 ··· 71 81 const raw_flow_id = if (obj.get("flow_id")) |v| switch (v) { 72 82 .string => |s| s, 73 83 else => { 74 - try http_server.sendJsonStatus(request, "{\"detail\":\"flow_id must be string\"}", .bad_request); 84 + sendJsonStatus(r, "{\"detail\":\"flow_id must be string\"}", .bad_request); 75 85 return; 76 86 }, 77 87 } else { 78 - try http_server.sendJsonStatus(request, "{\"detail\":\"flow_id required\"}", .bad_request); 88 + sendJsonStatus(r, "{\"detail\":\"flow_id required\"}", .bad_request); 79 89 return; 80 90 }; 81 91 82 - // normalize UUID by removing hyphens (client may send hyphenated format) 83 - const flow_id = common.normalizeUuid(alloc, raw_flow_id); 92 + const flow_id = raw_flow_id; 84 93 85 94 const name = if (obj.get("name")) |v| switch (v) { 86 95 .string => |s| s, ··· 97 106 if (s.object.get("name")) |n| state_name = n.string; 98 107 } 99 108 100 - const new_id = common.generateUuid(alloc); 109 + var new_id_buf: [36]u8 = undefined; 110 + const new_id = common.generateUuidBuf(&new_id_buf); 101 111 var ts_buf: [32]u8 = undefined; 102 112 const now = common.getTimestamp(&ts_buf); 103 113 104 114 db.insertFlowRun(new_id, flow_id, name, state_type, state_name, now) catch { 105 - try http_server.sendJsonStatus(request, "{\"detail\":\"insert failed\"}", .internal_server_error); 115 + sendJsonStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 106 116 return; 107 117 }; 108 118 109 - // return the created flow run 110 - const response = std.fmt.allocPrint(alloc, 119 + // use stack buffer for response 120 + var state_id_buf: [36]u8 = undefined; 121 + const state_id = common.generateUuidBuf(&state_id_buf); 122 + var resp_buf: [1024]u8 = undefined; 123 + const response = std.fmt.bufPrint(&resp_buf, 111 124 \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}},"parameters":{{}},"tags":[],"run_count":0}} 112 - , .{ new_id, now, now, name, flow_id, state_type, state_name, state_type, state_name, now, common.generateUuid(alloc) }) catch { 113 - try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 125 + , .{ new_id, now, now, name, flow_id, state_type, state_name, state_type, state_name, now, state_id }) catch { 126 + sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 114 127 return; 115 128 }; 116 129 117 - try http_server.sendJsonStatus(request, response, .created); 130 + sendJsonStatus(r, response, .created); 118 131 } 119 132 120 - fn read(request: *http.Server.Request, raw_id: []const u8) !void { 121 - var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 122 - defer arena.deinit(); 123 - const alloc = arena.allocator(); 124 - 125 - const id = common.normalizeUuid(alloc, raw_id); 126 - 133 + fn read(r: zap.Request, id: []const u8) !void { 127 134 const run = db.getFlowRun(id) orelse { 128 - try http_server.sendJsonStatus(request, "{\"detail\":\"flow run not found\"}", .not_found); 135 + sendJsonStatus(r, "{\"detail\":\"flow run not found\"}", .not_found); 129 136 return; 130 137 }; 131 138 132 - const response = std.fmt.allocPrint(alloc, 139 + var resp_buf: [1024]u8 = undefined; 140 + const response = std.fmt.bufPrint(&resp_buf, 133 141 \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"parameters":{s},"tags":{s},"run_count":0}} 134 142 , .{ run.id, run.created, run.updated, run.name, run.flow_id, run.state_type, run.state_name, run.state_type, run.state_name, run.state_timestamp, run.parameters, run.tags }) catch { 135 - try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 143 + sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 136 144 return; 137 145 }; 138 146 139 - try http_server.sendJson(request, response); 147 + sendJson(r, response); 140 148 } 141 149 142 - fn setState(request: *http.Server.Request, raw_id: []const u8) !void { 150 + fn setState(r: zap.Request, id: []const u8) !void { 143 151 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 144 152 defer arena.deinit(); 145 153 const alloc = arena.allocator(); 146 154 147 - const id = common.normalizeUuid(alloc, raw_id); 148 - 149 - var body_buf: [8192]u8 = undefined; 150 - request.head.expect = null; 151 - const body_reader = request.readerExpectNone(&body_buf); 152 - const body = body_reader.allocRemaining(alloc, .unlimited) catch { 153 - try http_server.sendJsonStatus(request, "{\"detail\":\"failed to read body\"}", .bad_request); 155 + const body = r.body orelse { 156 + sendJsonStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 154 157 return; 155 158 }; 156 159 157 160 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 158 - try http_server.sendJsonStatus(request, "{\"detail\":\"invalid json\"}", .bad_request); 161 + sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 159 162 return; 160 163 }; 161 164 162 165 const state = parsed.value.object.get("state") orelse { 163 - try http_server.sendJsonStatus(request, "{\"detail\":\"state required\"}", .bad_request); 166 + sendJsonStatus(r, "{\"detail\":\"state required\"}", .bad_request); 164 167 return; 165 168 }; 166 169 ··· 168 171 const state_name = if (state.object.get("name")) |v| v.string else "Pending"; 169 172 var ts_buf: [32]u8 = undefined; 170 173 const now = common.getTimestamp(&ts_buf); 171 - const state_id = common.generateUuid(alloc); 174 + var state_id_buf: [36]u8 = undefined; 175 + const state_id = common.generateUuidBuf(&state_id_buf); 172 176 173 - // atomic state transition (update + history insert in transaction) 177 + // atomic state transition 174 178 db.setFlowRunState(id, state_id, state_type, state_name, now) catch { 175 - try http_server.sendJsonStatus(request, "{\"detail\":\"update failed\"}", .internal_server_error); 179 + sendJsonStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 176 180 return; 177 181 }; 178 182 179 - // return orchestration result 180 - const response = std.fmt.allocPrint(alloc, 183 + var resp_buf: [512]u8 = undefined; 184 + const response = std.fmt.bufPrint(&resp_buf, 181 185 \\{{"status":"ACCEPT","details":{{}},"state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}}}} 182 186 , .{ state_type, state_name, now, state_id }) catch { 183 - try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 187 + sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 184 188 return; 185 189 }; 186 190 187 - try http_server.sendJson(request, response); 191 + sendJson(r, response); 188 192 } 189 193 190 - fn filter(request: *http.Server.Request) !void { 194 + fn filter(r: zap.Request) !void { 191 195 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 192 196 defer arena.deinit(); 193 197 const alloc = arena.allocator(); 194 198 195 199 const runs = db.listFlowRuns(alloc, 50) catch { 196 - try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 200 + sendJsonStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 197 201 return; 198 202 }; 199 203 200 - var results: std.ArrayList(u8) = .{}; 201 - defer results.deinit(alloc); 204 + // use fixed buffer for filter response 205 + var resp_buf: [32768]u8 = undefined; 206 + var fbs = std.io.fixedBufferStream(&resp_buf); 207 + const writer = fbs.writer(); 202 208 203 - try results.appendSlice(alloc, "["); 209 + writer.writeAll("[") catch { 210 + sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 211 + return; 212 + }; 204 213 205 214 for (runs, 0..) |run, i| { 206 - if (i > 0) try results.appendSlice(alloc, ","); 215 + if (i > 0) writer.writeAll(",") catch continue; 207 216 208 - const item = std.fmt.allocPrint(alloc, 217 + std.fmt.format(writer, 209 218 \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"parameters":{s},"tags":{s},"run_count":0}} 210 219 , .{ run.id, run.created, run.updated, run.name, run.flow_id, run.state_type, run.state_name, run.state_type, run.state_name, run.state_timestamp, run.parameters, run.tags }) catch continue; 211 - 212 - try results.appendSlice(alloc, item); 213 220 } 214 221 215 - try results.appendSlice(alloc, "]"); 216 - try http_server.sendJson(request, results.items); 222 + writer.writeAll("]") catch { 223 + sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 224 + return; 225 + }; 226 + 227 + sendJson(r, fbs.getWritten()); 217 228 }
+63 -23
src/api/flows.zig
··· 1 1 const std = @import("std"); 2 - const http = std.http; 2 + const zap = @import("zap"); 3 3 const mem = std.mem; 4 4 const json = std.json; 5 5 6 6 const db = @import("../db/sqlite.zig"); 7 - const http_server = @import("../server/http.zig"); 8 7 const common = @import("common.zig"); 9 8 9 + fn sendJson(r: zap.Request, body: []const u8) void { 10 + r.setHeader("content-type", "application/json") catch {}; 11 + r.setHeader("access-control-allow-origin", "*") catch {}; 12 + r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {}; 13 + r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {}; 14 + r.sendBody(body) catch {}; 15 + } 16 + 17 + fn sendJsonStatus(r: zap.Request, body: []const u8, status: zap.http.StatusCode) void { 18 + r.setStatus(status); 19 + sendJson(r, body); 20 + } 21 + 10 22 // POST /flows/ - create or get flow by name 11 - pub fn handle(request: *http.Server.Request, target: []const u8) !void { 12 - if (request.head.method == .POST and (mem.eql(u8, target, "/flows/") or mem.eql(u8, target, "/api/flows/"))) { 13 - try createFlow(request); 23 + // GET /flows/{id} - get flow by id 24 + pub fn handle(r: zap.Request) !void { 25 + const target = r.path orelse "/"; 26 + const method = r.method orelse "GET"; 27 + 28 + if (mem.eql(u8, method, "POST") and (mem.eql(u8, target, "/flows/") or mem.eql(u8, target, "/api/flows/"))) { 29 + try createFlow(r); 30 + } else if (mem.eql(u8, method, "GET")) { 31 + // extract id from path: /api/flows/{id} or /flows/{id} 32 + const prefix = if (mem.startsWith(u8, target, "/api/flows/")) "/api/flows/" else "/flows/"; 33 + if (target.len > prefix.len) { 34 + const flow_id = target[prefix.len..]; 35 + try getFlow(r, flow_id); 36 + } else { 37 + sendJsonStatus(r, "{\"detail\":\"flow id required\"}", .bad_request); 38 + } 14 39 } else { 15 - try http_server.sendJsonStatus(request, "{\"detail\":\"not implemented\"}", .not_implemented); 40 + sendJsonStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented); 16 41 } 17 42 } 18 43 19 - fn createFlow(request: *http.Server.Request) !void { 44 + fn createFlow(r: zap.Request) !void { 20 45 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 21 46 defer arena.deinit(); 22 47 const alloc = arena.allocator(); 23 48 24 49 // read request body 25 - var body_buf: [8192]u8 = undefined; 26 - request.head.expect = null; 27 - const body_reader = request.readerExpectNone(&body_buf); 28 - const body = body_reader.allocRemaining(alloc, .unlimited) catch { 29 - try http_server.sendJsonStatus(request, "{\"detail\":\"failed to read body\"}", .bad_request); 50 + const body = r.body orelse { 51 + sendJsonStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 30 52 return; 31 53 }; 32 54 33 55 // parse json to get name 34 56 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 35 - try http_server.sendJsonStatus(request, "{\"detail\":\"invalid json\"}", .bad_request); 57 + sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 36 58 return; 37 59 }; 38 60 39 61 const name = parsed.value.object.get("name") orelse { 40 - try http_server.sendJsonStatus(request, "{\"detail\":\"name required\"}", .bad_request); 62 + sendJsonStatus(r, "{\"detail\":\"name required\"}", .bad_request); 41 63 return; 42 64 }; 43 65 const name_str = name.string; 44 66 45 67 // try to get existing flow first 46 68 if (db.getFlowByName(name_str)) |flow| { 47 - const response = std.fmt.allocPrint(alloc, 69 + var resp_buf: [512]u8 = undefined; 70 + const response = std.fmt.bufPrint(&resp_buf, 48 71 \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","tags":{s}}} 49 72 , .{ flow.id, flow.created, flow.updated, flow.name, flow.tags }) catch { 50 - try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 73 + sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 51 74 return; 52 75 }; 53 - try http_server.sendJson(request, response); 76 + sendJson(r, response); 54 77 return; 55 78 } 56 79 57 80 // create new flow 58 - const new_id = common.generateUuid(alloc); 81 + var new_id_buf: [36]u8 = undefined; 82 + const new_id = common.generateUuidBuf(&new_id_buf); 59 83 60 84 db.insertFlow(new_id, name_str) catch { 61 - try http_server.sendJsonStatus(request, "{\"detail\":\"insert failed\"}", .internal_server_error); 85 + sendJsonStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 62 86 return; 63 87 }; 64 88 65 - // return created flow 66 89 var ts_buf: [32]u8 = undefined; 67 90 const now = common.getTimestamp(&ts_buf); 68 - const response = std.fmt.allocPrint(alloc, 91 + 92 + var resp_buf: [512]u8 = undefined; 93 + const response = std.fmt.bufPrint(&resp_buf, 69 94 \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","tags":[]}} 70 95 , .{ new_id, now, now, name_str }) catch { 71 - try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 96 + sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 72 97 return; 73 98 }; 74 99 75 - try http_server.sendJsonStatus(request, response, .created); 100 + sendJsonStatus(r, response, .created); 101 + } 102 + 103 + fn getFlow(r: zap.Request, flow_id: []const u8) !void { 104 + if (db.getFlowById(flow_id)) |flow| { 105 + var resp_buf: [512]u8 = undefined; 106 + const response = std.fmt.bufPrint(&resp_buf, 107 + \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","tags":{s}}} 108 + , .{ flow.id, flow.created, flow.updated, flow.name, flow.tags }) catch { 109 + sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 110 + return; 111 + }; 112 + sendJson(r, response); 113 + } else { 114 + sendJsonStatus(r, "{\"detail\":\"flow not found\"}", .not_found); 115 + } 76 116 }
+14 -11
src/api/logs.zig
··· 1 1 const std = @import("std"); 2 - const http = std.http; 2 + const zap = @import("zap"); 3 + 4 + fn sendJson(r: zap.Request, body: []const u8) void { 5 + r.setHeader("content-type", "application/json") catch {}; 6 + r.setHeader("access-control-allow-origin", "*") catch {}; 7 + r.sendBody(body) catch {}; 8 + } 3 9 4 - const http_server = @import("../server/http.zig"); 10 + pub fn handle(r: zap.Request) !void { 11 + const method = r.method orelse "GET"; 5 12 6 - pub fn logs(request: *http.Server.Request) !void { 7 13 // accept logs but don't store them yet (TODO: implement log storage) 8 - if (request.head.method == .POST) { 9 - // consume the body 10 - var body_buf: [65536]u8 = undefined; 11 - request.head.expect = null; 12 - const body_reader = request.readerExpectNone(&body_buf); 13 - _ = body_reader.allocRemaining(std.heap.page_allocator, .unlimited) catch {}; 14 - try http_server.sendJsonStatus(request, "[]", .created); 14 + if (std.mem.eql(u8, method, "POST")) { 15 + // body is automatically handled by zap 16 + r.setStatus(.created); 17 + sendJson(r, "[]"); 15 18 } else { 16 - try http_server.sendJson(request, "[]"); 19 + sendJson(r, "[]"); 17 20 } 18 21 }
+43 -5
src/api/routes.zig
··· 1 1 const std = @import("std"); 2 - const http = std.http; 2 + const zap = @import("zap"); 3 3 4 4 // resource modules 5 5 pub const admin = @import("admin.zig"); ··· 8 8 pub const logs = @import("logs.zig"); 9 9 pub const common = @import("common.zig"); 10 10 11 - // re-export admin handlers for convenience 12 - pub const health = admin.health; 13 - pub const csrfToken = admin.csrfToken; 14 - pub const version = admin.version; 11 + pub fn handle(r: zap.Request) !void { 12 + const target = r.path orelse "/"; 13 + const method = r.method orelse "GET"; 14 + 15 + // cors preflight 16 + if (std.mem.eql(u8, method, "OPTIONS")) { 17 + try sendCors(r); 18 + return; 19 + } 20 + 21 + // route dispatch 22 + if (std.mem.eql(u8, target, "/api/health") or std.mem.eql(u8, target, "/health")) { 23 + try admin.health(r); 24 + } else if (std.mem.startsWith(u8, target, "/api/csrf-token") or std.mem.startsWith(u8, target, "/csrf-token")) { 25 + try admin.csrfToken(r); 26 + } else if (std.mem.eql(u8, target, "/api/admin/version") or std.mem.eql(u8, target, "/admin/version")) { 27 + try admin.version(r); 28 + } else if (std.mem.startsWith(u8, target, "/api/logs") or std.mem.startsWith(u8, target, "/logs")) { 29 + try logs.handle(r); 30 + } else if (std.mem.startsWith(u8, target, "/api/flows") or std.mem.startsWith(u8, target, "/flows")) { 31 + try flows.handle(r); 32 + } else if (std.mem.startsWith(u8, target, "/api/flow_runs") or std.mem.startsWith(u8, target, "/flow_runs")) { 33 + try flow_runs.handle(r); 34 + } else { 35 + try sendNotFound(r); 36 + } 37 + } 38 + 39 + fn sendCors(r: zap.Request) !void { 40 + r.setStatus(.no_content); 41 + r.setHeader("access-control-allow-origin", "*") catch {}; 42 + r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {}; 43 + r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {}; 44 + try r.sendBody(""); 45 + } 46 + 47 + fn sendNotFound(r: zap.Request) !void { 48 + r.setStatus(.not_found); 49 + r.setHeader("content-type", "application/json") catch {}; 50 + r.setHeader("access-control-allow-origin", "*") catch {}; 51 + try r.sendBody("{\"detail\":\"not found\"}"); 52 + }
+231 -39
src/db/sqlite.zig
··· 1 1 const std = @import("std"); 2 2 const zqlite = @import("zqlite"); 3 3 const Thread = std.Thread; 4 + const log = @import("../logging.zig"); 4 5 5 6 pub var conn: zqlite.Conn = undefined; 6 7 pub var mutex: Thread.Mutex = .{}; ··· 14 15 path_buf[path_env.len] = 0; 15 16 const path: [*:0]const u8 = path_buf[0..path_env.len :0]; 16 17 17 - std.debug.print("opening database: {s}\n", .{path_env}); 18 + log.debug("database", "opening {s}", .{path_env}); 18 19 19 20 const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite; 20 21 conn = zqlite.open(path, flags) catch |err| { 21 - std.debug.print("failed to open database: {}\n", .{err}); 22 + log.err("database", "failed to open: {}", .{err}); 22 23 return err; 23 24 }; 24 25 ··· 28 29 _ = conn.exec("PRAGMA foreign_keys=ON", .{}) catch {}; 29 30 30 31 try initSchema(); 31 - 32 - std.debug.print("database initialized\n", .{}); 33 32 } 34 33 35 34 pub fn close() void { ··· 49 48 \\ tags TEXT DEFAULT '[]' 50 49 \\) 51 50 ) catch |err| { 52 - std.debug.print("failed to create flow table: {}\n", .{err}); 51 + log.err("database", "failed to create flow table: {}", .{err}); 53 52 return err; 54 53 }; 55 54 ··· 69 68 \\ run_count INTEGER DEFAULT 0 70 69 \\) 71 70 ) catch |err| { 72 - std.debug.print("failed to create flow_run table: {}\n", .{err}); 71 + log.err("database", "failed to create flow_run table: {}", .{err}); 73 72 return err; 74 73 }; 75 74 ··· 83 82 \\ timestamp TEXT DEFAULT (datetime('now')) 84 83 \\) 85 84 ) catch |err| { 86 - std.debug.print("failed to create flow_run_state table: {}\n", .{err}); 85 + log.err("database", "failed to create flow_run_state table: {}", .{err}); 86 + return err; 87 + }; 88 + 89 + conn.execNoArgs( 90 + \\CREATE TABLE IF NOT EXISTS events ( 91 + \\ id TEXT PRIMARY KEY, 92 + \\ created TEXT DEFAULT (datetime('now')), 93 + \\ updated TEXT DEFAULT (datetime('now')), 94 + \\ occurred TEXT NOT NULL, 95 + \\ event TEXT NOT NULL, 96 + \\ resource_id TEXT NOT NULL, 97 + \\ resource TEXT NOT NULL DEFAULT '{}', 98 + \\ related_resource_ids TEXT DEFAULT '[]', 99 + \\ related TEXT DEFAULT '[]', 100 + \\ payload TEXT DEFAULT '{}', 101 + \\ received TEXT NOT NULL, 102 + \\ recorded TEXT NOT NULL, 103 + \\ follows TEXT 104 + \\) 105 + ) catch |err| { 106 + log.err("database", "failed to create events table: {}", .{err}); 87 107 return err; 88 108 }; 89 109 ··· 91 111 conn.execNoArgs("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)") catch {}; 92 112 conn.execNoArgs("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)") catch {}; 93 113 conn.execNoArgs("CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id)") catch {}; 114 + conn.execNoArgs("CREATE INDEX IF NOT EXISTS ix_events__occurred ON events(occurred)") catch {}; 115 + conn.execNoArgs("CREATE INDEX IF NOT EXISTS ix_events__event__id ON events(event, id)") catch {}; 116 + conn.execNoArgs("CREATE INDEX IF NOT EXISTS ix_events__event_resource_id_occurred ON events(event, resource_id, occurred)") catch {}; 117 + conn.execNoArgs("CREATE INDEX IF NOT EXISTS ix_events__occurred_id ON events(occurred, id)") catch {}; 94 118 } 95 119 96 120 // --- flow operations --- 97 121 122 + // static buffers for single-row returns (thread-safety via mutex) 123 + var flow_row_buf: FlowRowBuf = undefined; 124 + var flow_run_row_buf: FlowRunRowBuf = undefined; 125 + 126 + const FlowRowBuf = struct { 127 + id: [64]u8 = undefined, 128 + created: [64]u8 = undefined, 129 + updated: [64]u8 = undefined, 130 + name: [256]u8 = undefined, 131 + tags: [1024]u8 = undefined, 132 + id_len: usize = 0, 133 + created_len: usize = 0, 134 + updated_len: usize = 0, 135 + name_len: usize = 0, 136 + tags_len: usize = 0, 137 + }; 138 + 139 + const FlowRunRowBuf = struct { 140 + id: [64]u8 = undefined, 141 + created: [64]u8 = undefined, 142 + updated: [64]u8 = undefined, 143 + flow_id: [64]u8 = undefined, 144 + name: [256]u8 = undefined, 145 + state_type: [32]u8 = undefined, 146 + state_name: [64]u8 = undefined, 147 + state_timestamp: [64]u8 = undefined, 148 + parameters: [4096]u8 = undefined, 149 + tags: [1024]u8 = undefined, 150 + id_len: usize = 0, 151 + created_len: usize = 0, 152 + updated_len: usize = 0, 153 + flow_id_len: usize = 0, 154 + name_len: usize = 0, 155 + state_type_len: usize = 0, 156 + state_name_len: usize = 0, 157 + state_timestamp_len: usize = 0, 158 + parameters_len: usize = 0, 159 + tags_len: usize = 0, 160 + }; 161 + 162 + fn copyToBuf(dest: []u8, src: []const u8) usize { 163 + const len = @min(src.len, dest.len); 164 + @memcpy(dest[0..len], src[0..len]); 165 + return len; 166 + } 167 + 98 168 pub fn getFlowByName(name: []const u8) ?FlowRow { 99 169 mutex.lock(); 100 170 defer mutex.unlock(); 101 171 102 172 if (conn.row("SELECT id, created, updated, name, tags FROM flow WHERE name = ?", .{name}) catch null) |row| { 103 173 defer row.deinit(); 174 + // copy to static buffer while row is valid 175 + flow_row_buf.id_len = copyToBuf(&flow_row_buf.id, row.text(0)); 176 + flow_row_buf.created_len = copyToBuf(&flow_row_buf.created, row.text(1)); 177 + flow_row_buf.updated_len = copyToBuf(&flow_row_buf.updated, row.text(2)); 178 + flow_row_buf.name_len = copyToBuf(&flow_row_buf.name, row.text(3)); 179 + flow_row_buf.tags_len = copyToBuf(&flow_row_buf.tags, row.text(4)); 104 180 return FlowRow{ 105 - .id = row.text(0), 106 - .created = row.text(1), 107 - .updated = row.text(2), 108 - .name = row.text(3), 109 - .tags = row.text(4), 181 + .id = flow_row_buf.id[0..flow_row_buf.id_len], 182 + .created = flow_row_buf.created[0..flow_row_buf.created_len], 183 + .updated = flow_row_buf.updated[0..flow_row_buf.updated_len], 184 + .name = flow_row_buf.name[0..flow_row_buf.name_len], 185 + .tags = flow_row_buf.tags[0..flow_row_buf.tags_len], 186 + }; 187 + } 188 + return null; 189 + } 190 + 191 + pub fn getFlowById(id: []const u8) ?FlowRow { 192 + mutex.lock(); 193 + defer mutex.unlock(); 194 + 195 + if (conn.row("SELECT id, created, updated, name, tags FROM flow WHERE id = ?", .{id}) catch null) |row| { 196 + defer row.deinit(); 197 + flow_row_buf.id_len = copyToBuf(&flow_row_buf.id, row.text(0)); 198 + flow_row_buf.created_len = copyToBuf(&flow_row_buf.created, row.text(1)); 199 + flow_row_buf.updated_len = copyToBuf(&flow_row_buf.updated, row.text(2)); 200 + flow_row_buf.name_len = copyToBuf(&flow_row_buf.name, row.text(3)); 201 + flow_row_buf.tags_len = copyToBuf(&flow_row_buf.tags, row.text(4)); 202 + return FlowRow{ 203 + .id = flow_row_buf.id[0..flow_row_buf.id_len], 204 + .created = flow_row_buf.created[0..flow_row_buf.created_len], 205 + .updated = flow_row_buf.updated[0..flow_row_buf.updated_len], 206 + .name = flow_row_buf.name[0..flow_row_buf.name_len], 207 + .tags = flow_row_buf.tags[0..flow_row_buf.tags_len], 110 208 }; 111 209 } 112 210 return null; ··· 117 215 defer mutex.unlock(); 118 216 119 217 conn.exec("INSERT INTO flow (id, name) VALUES (?, ?)", .{ id, name }) catch |err| { 120 - std.debug.print("insert flow error: {}\n", .{err}); 218 + log.err("database", "insert flow error: {}", .{err}); 121 219 return err; 122 220 }; 123 221 } ··· 139 237 \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp) 140 238 \\VALUES (?, ?, ?, ?, ?, ?) 141 239 , .{ id, flow_id, name, state_type, state_name, timestamp }) catch |err| { 142 - std.debug.print("insert flow_run error: {}\n", .{err}); 240 + log.err("database", "insert flow_run error: {}", .{err}); 143 241 return err; 144 242 }; 145 243 } ··· 153 251 \\FROM flow_run WHERE id = ? 154 252 , .{id}) catch null) |row| { 155 253 defer row.deinit(); 254 + // copy to static buffer while row is valid 255 + flow_run_row_buf.id_len = copyToBuf(&flow_run_row_buf.id, row.text(0)); 256 + flow_run_row_buf.created_len = copyToBuf(&flow_run_row_buf.created, row.text(1)); 257 + flow_run_row_buf.updated_len = copyToBuf(&flow_run_row_buf.updated, row.text(2)); 258 + flow_run_row_buf.flow_id_len = copyToBuf(&flow_run_row_buf.flow_id, row.text(3)); 259 + flow_run_row_buf.name_len = copyToBuf(&flow_run_row_buf.name, row.text(4)); 260 + flow_run_row_buf.state_type_len = copyToBuf(&flow_run_row_buf.state_type, row.text(5)); 261 + flow_run_row_buf.state_name_len = copyToBuf(&flow_run_row_buf.state_name, row.text(6)); 262 + flow_run_row_buf.state_timestamp_len = copyToBuf(&flow_run_row_buf.state_timestamp, row.text(7)); 263 + flow_run_row_buf.parameters_len = copyToBuf(&flow_run_row_buf.parameters, row.text(8)); 264 + flow_run_row_buf.tags_len = copyToBuf(&flow_run_row_buf.tags, row.text(9)); 156 265 return FlowRunRow{ 157 - .id = row.text(0), 158 - .created = row.text(1), 159 - .updated = row.text(2), 160 - .flow_id = row.text(3), 161 - .name = row.text(4), 162 - .state_type = row.text(5), 163 - .state_name = row.text(6), 164 - .state_timestamp = row.text(7), 165 - .parameters = row.text(8), 166 - .tags = row.text(9), 266 + .id = flow_run_row_buf.id[0..flow_run_row_buf.id_len], 267 + .created = flow_run_row_buf.created[0..flow_run_row_buf.created_len], 268 + .updated = flow_run_row_buf.updated[0..flow_run_row_buf.updated_len], 269 + .flow_id = flow_run_row_buf.flow_id[0..flow_run_row_buf.flow_id_len], 270 + .name = flow_run_row_buf.name[0..flow_run_row_buf.name_len], 271 + .state_type = flow_run_row_buf.state_type[0..flow_run_row_buf.state_type_len], 272 + .state_name = flow_run_row_buf.state_name[0..flow_run_row_buf.state_name_len], 273 + .state_timestamp = flow_run_row_buf.state_timestamp[0..flow_run_row_buf.state_timestamp_len], 274 + .parameters = flow_run_row_buf.parameters[0..flow_run_row_buf.parameters_len], 275 + .tags = flow_run_row_buf.tags[0..flow_run_row_buf.tags_len], 167 276 }; 168 277 } 169 278 return null; ··· 182 291 183 292 // wrap in transaction for atomicity 184 293 conn.transaction() catch |err| { 185 - std.debug.print("begin transaction error: {}\n", .{err}); 294 + log.err("database", "begin transaction error: {}", .{err}); 186 295 return err; 187 296 }; 188 297 errdefer conn.rollback(); ··· 192 301 \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ? 193 302 \\WHERE id = ? 194 303 , .{ state_id, state_type, state_name, timestamp, timestamp, run_id }) catch |err| { 195 - std.debug.print("update flow_run error: {}\n", .{err}); 304 + log.err("database", "update flow_run error: {}", .{err}); 196 305 return err; 197 306 }; 198 307 ··· 201 310 \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp) 202 311 \\VALUES (?, ?, ?, ?, ?) 203 312 , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| { 204 - std.debug.print("insert flow_run_state error: {}\n", .{err}); 313 + log.err("database", "insert flow_run_state error: {}", .{err}); 205 314 return err; 206 315 }; 207 316 208 317 conn.commit() catch |err| { 209 - std.debug.print("commit error: {}\n", .{err}); 318 + log.err("database", "commit error: {}", .{err}); 210 319 return err; 211 320 }; 212 321 } ··· 222 331 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, parameters, tags 223 332 \\FROM flow_run ORDER BY created DESC LIMIT ? 224 333 , .{@as(i64, @intCast(limit))}) catch |err| { 225 - std.debug.print("list flow_runs error: {}\n", .{err}); 334 + log.err("database", "list flow_runs error: {}", .{err}); 226 335 return err; 227 336 }; 228 337 defer rows.deinit(); 229 338 230 339 while (rows.next()) |row| { 340 + // dupe strings into allocator since row memory is reused 231 341 try results.append(allocator, .{ 232 - .id = row.text(0), 233 - .created = row.text(1), 234 - .updated = row.text(2), 235 - .flow_id = row.text(3), 236 - .name = row.text(4), 237 - .state_type = row.text(5), 238 - .state_name = row.text(6), 239 - .state_timestamp = row.text(7), 240 - .parameters = row.text(8), 241 - .tags = row.text(9), 342 + .id = try allocator.dupe(u8, row.text(0)), 343 + .created = try allocator.dupe(u8, row.text(1)), 344 + .updated = try allocator.dupe(u8, row.text(2)), 345 + .flow_id = try allocator.dupe(u8, row.text(3)), 346 + .name = try allocator.dupe(u8, row.text(4)), 347 + .state_type = try allocator.dupe(u8, row.text(5)), 348 + .state_name = try allocator.dupe(u8, row.text(6)), 349 + .state_timestamp = try allocator.dupe(u8, row.text(7)), 350 + .parameters = try allocator.dupe(u8, row.text(8)), 351 + .tags = try allocator.dupe(u8, row.text(9)), 242 352 }); 243 353 } 244 354 ··· 267 377 parameters: []const u8, 268 378 tags: []const u8, 269 379 }; 380 + 381 + // --- event operations --- 382 + 383 + pub fn insertEvent( 384 + id: []const u8, 385 + occurred: []const u8, 386 + event_type: []const u8, 387 + resource_id: []const u8, 388 + resource: []const u8, 389 + related_resource_ids: []const u8, 390 + related: []const u8, 391 + payload: []const u8, 392 + received: []const u8, 393 + recorded: []const u8, 394 + follows: ?[]const u8, 395 + ) !void { 396 + mutex.lock(); 397 + defer mutex.unlock(); 398 + 399 + conn.exec( 400 + \\INSERT INTO events (id, occurred, event, resource_id, resource, related_resource_ids, related, payload, received, recorded, follows) 401 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 402 + , .{ id, occurred, event_type, resource_id, resource, related_resource_ids, related, payload, received, recorded, follows orelse "" }) catch |err| { 403 + log.err("database", "insert event error: {}", .{err}); 404 + return err; 405 + }; 406 + } 407 + 408 + /// Insert event with deduplication (INSERT OR IGNORE) 409 + pub fn insertEventDeduped( 410 + id: []const u8, 411 + occurred: []const u8, 412 + event_type: []const u8, 413 + resource_id: []const u8, 414 + resource: []const u8, 415 + related_resource_ids: []const u8, 416 + related: []const u8, 417 + payload: []const u8, 418 + received: []const u8, 419 + recorded: []const u8, 420 + follows: ?[]const u8, 421 + ) !void { 422 + mutex.lock(); 423 + defer mutex.unlock(); 424 + 425 + conn.exec( 426 + \\INSERT OR IGNORE INTO events (id, occurred, event, resource_id, resource, related_resource_ids, related, payload, received, recorded, follows) 427 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 428 + , .{ id, occurred, event_type, resource_id, resource, related_resource_ids, related, payload, received, recorded, follows orelse "" }) catch |err| { 429 + log.err("database", "insert event deduped error: {}", .{err}); 430 + return err; 431 + }; 432 + } 433 + 434 + /// Delete events older than cutoff timestamp. Returns count deleted. 435 + pub fn trimEvents(cutoff: []const u8) !usize { 436 + mutex.lock(); 437 + defer mutex.unlock(); 438 + 439 + conn.exec("DELETE FROM events WHERE occurred < ?", .{cutoff}) catch |err| { 440 + log.err("database", "trim events error: {}", .{err}); 441 + return err; 442 + }; 443 + 444 + // get rows affected (sqlite doesn't return this directly, so query) 445 + if (conn.row("SELECT changes()", .{}) catch null) |row| { 446 + defer row.deinit(); 447 + return @intCast(row.int(0)); 448 + } 449 + return 0; 450 + } 451 + 452 + pub fn countEvents() usize { 453 + mutex.lock(); 454 + defer mutex.unlock(); 455 + 456 + if (conn.row("SELECT COUNT(*) FROM events", .{}) catch null) |row| { 457 + defer row.deinit(); 458 + return @intCast(row.int(0)); 459 + } 460 + return 0; 461 + }
+102
src/logging.zig
··· 1 + const std = @import("std"); 2 + const posix = std.posix; 3 + 4 + pub const Level = enum(u8) { 5 + debug = 10, 6 + info = 20, 7 + warning = 30, 8 + err = 40, 9 + critical = 50, 10 + 11 + pub fn fromString(s: []const u8) Level { 12 + if (std.ascii.eqlIgnoreCase(s, "DEBUG")) return .debug; 13 + if (std.ascii.eqlIgnoreCase(s, "INFO")) return .info; 14 + if (std.ascii.eqlIgnoreCase(s, "WARNING") or std.ascii.eqlIgnoreCase(s, "WARN")) return .warning; 15 + if (std.ascii.eqlIgnoreCase(s, "ERROR")) return .err; 16 + if (std.ascii.eqlIgnoreCase(s, "CRITICAL")) return .critical; 17 + return .info; 18 + } 19 + 20 + pub fn name(self: Level) []const u8 { 21 + return switch (self) { 22 + .debug => "DEBUG", 23 + .info => "INFO", 24 + .warning => "WARNING", 25 + .err => "ERROR", 26 + .critical => "CRITICAL", 27 + }; 28 + } 29 + }; 30 + 31 + var server_level: Level = .info; 32 + 33 + pub fn init() void { 34 + if (posix.getenv("PREFECT_SERVER_LOGGING_LEVEL")) |level_str| { 35 + server_level = Level.fromString(level_str); 36 + } else if (posix.getenv("PREFECT_LOGGING_LEVEL")) |level_str| { 37 + server_level = Level.fromString(level_str); 38 + } 39 + } 40 + 41 + pub fn setLevel(level: Level) void { 42 + server_level = level; 43 + } 44 + 45 + pub fn getLevel() Level { 46 + return server_level; 47 + } 48 + 49 + fn getTimestamp(buf: *[12]u8) []const u8 { 50 + const ms_total = std.time.milliTimestamp(); 51 + const ms: u64 = @intCast(@mod(ms_total, 1000)); 52 + const epoch_secs: u64 = @intCast(@divFloor(ms_total, 1000)); 53 + const secs_today = epoch_secs % 86400; 54 + const hours = secs_today / 3600; 55 + const mins = (secs_today % 3600) / 60; 56 + const secs = secs_today % 60; 57 + 58 + return std.fmt.bufPrint(buf, "{d:0>2}:{d:0>2}:{d:0>2}.{d:0>3}", .{ 59 + hours, mins, secs, ms, 60 + }) catch "00:00:00.000"; 61 + } 62 + 63 + fn writeStderr(data: []const u8) void { 64 + const stderr = std.fs.File.stderr(); 65 + stderr.writeAll(data) catch {}; 66 + } 67 + 68 + pub fn log(level: Level, component: []const u8, comptime fmt: []const u8, args: anytype) void { 69 + if (@intFromEnum(level) < @intFromEnum(server_level)) return; 70 + 71 + var ts_buf: [12]u8 = undefined; 72 + const ts = getTimestamp(&ts_buf); 73 + 74 + var buf: [4096]u8 = undefined; 75 + const len = (std.fmt.bufPrint(&buf, "{s} | {s: <8} | {s} - " ++ fmt ++ "\n", .{ 76 + ts, 77 + level.name(), 78 + component, 79 + } ++ args) catch return).len; 80 + 81 + writeStderr(buf[0..len]); 82 + } 83 + 84 + pub fn debug(component: []const u8, comptime fmt: []const u8, args: anytype) void { 85 + log(.debug, component, fmt, args); 86 + } 87 + 88 + pub fn info(component: []const u8, comptime fmt: []const u8, args: anytype) void { 89 + log(.info, component, fmt, args); 90 + } 91 + 92 + pub fn warn(component: []const u8, comptime fmt: []const u8, args: anytype) void { 93 + log(.warning, component, fmt, args); 94 + } 95 + 96 + pub fn err(component: []const u8, comptime fmt: []const u8, args: anytype) void { 97 + log(.err, component, fmt, args); 98 + } 99 + 100 + pub fn critical(component: []const u8, comptime fmt: []const u8, args: anytype) void { 101 + log(.critical, component, fmt, args); 102 + }
+134 -45
src/main.zig
··· 1 1 const std = @import("std"); 2 - const net = std.net; 2 + const zap = @import("zap"); 3 3 const posix = std.posix; 4 - const http = std.http; 5 - const Thread = std.Thread; 6 4 7 - const server = @import("server/http.zig"); 8 5 const db = @import("db/sqlite.zig"); 6 + const routes = @import("api/routes.zig"); 7 + const log = @import("logging.zig"); 8 + const messaging = @import("messaging.zig"); 9 + const services = @import("services/mod.zig"); 10 + 11 + // websocket handler for events 12 + const WebSockets = zap.WebSockets; 9 13 10 - const MAX_HTTP_WORKERS = 16; 11 - const SOCKET_TIMEOUT_SECS = 30; 14 + const EventsContext = struct { 15 + // placeholder - could store connection metadata 16 + }; 17 + 18 + const EventsHandler = WebSockets.Handler(EventsContext); 19 + 20 + fn onEventsOpen(_: ?*EventsContext, _: WebSockets.WsHandle) !void { 21 + log.debug("websocket", "events connection opened", .{}); 22 + } 23 + 24 + fn onEventsClose(_: ?*EventsContext, _: isize) !void { 25 + log.debug("websocket", "events connection closed", .{}); 26 + } 27 + 28 + fn onEventsMessage(_: ?*EventsContext, _: WebSockets.WsHandle, message: []const u8, is_text: bool) !void { 29 + log.debug("events", "received message: {d} bytes, text={}", .{ message.len, is_text }); 30 + 31 + // parse event json to extract required fields 32 + const parsed = std.json.parseFromSlice(std.json.Value, std.heap.page_allocator, message, .{}) catch { 33 + log.err("events", "failed to parse event json", .{}); 34 + return; 35 + }; 36 + defer parsed.deinit(); 37 + 38 + const obj = parsed.value.object; 39 + 40 + // helper to get string from json value 41 + const getString = struct { 42 + fn f(val: ?std.json.Value) ?[]const u8 { 43 + if (val) |v| { 44 + return switch (v) { 45 + .string => |s| s, 46 + else => null, 47 + }; 48 + } 49 + return null; 50 + } 51 + }.f; 52 + 53 + // extract required fields 54 + const id = getString(obj.get("id")); 55 + const occurred = getString(obj.get("occurred")); 56 + const event_name = getString(obj.get("event")); 57 + 58 + if (id == null or occurred == null or event_name == null) { 59 + log.err("events", "event missing required fields", .{}); 60 + return; 61 + } 62 + 63 + // extract resource_id from resource object 64 + var resource_id: []const u8 = ""; 65 + if (obj.get("resource")) |res| { 66 + if (res == .object) { 67 + resource_id = getString(res.object.get("prefect.resource.id")) orelse ""; 68 + } 69 + } 70 + 71 + // publish to messaging broker (async persistence) 72 + if (messaging.publishEvent(id.?, occurred.?, event_name.?, resource_id, message)) { 73 + log.debug("events", "queued: {s}", .{event_name.?}); 74 + } else { 75 + log.warn("events", "dropped (backpressure): {s}", .{event_name.?}); 76 + } 77 + } 78 + 79 + var events_context: EventsContext = .{}; 80 + var events_settings: EventsHandler.WebSocketSettings = .{ 81 + .on_open = onEventsOpen, 82 + .on_close = onEventsClose, 83 + .on_message = onEventsMessage, 84 + .context = &events_context, 85 + }; 86 + 87 + fn onRequest(r: zap.Request) !void { 88 + routes.handle(r) catch |err| { 89 + log.err("server", "request error: {}", .{err}); 90 + r.setStatus(.internal_server_error); 91 + r.sendBody("{\"detail\":\"internal error\"}") catch {}; 92 + }; 93 + } 12 94 13 - pub fn main() !void { 14 - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 15 - defer _ = gpa.deinit(); 16 - const allocator = gpa.allocator(); 95 + fn onUpgrade(r: zap.Request, target_protocol: []const u8) !void { 96 + const target = r.path orelse "/"; 97 + 98 + if (!std.mem.eql(u8, target_protocol, "websocket")) { 99 + r.setStatus(.bad_request); 100 + r.sendBody("{\"detail\":\"bad protocol\"}") catch {}; 101 + return; 102 + } 17 103 18 - // init database (uses global state with mutex protection) 19 - try db.init(); 20 - defer db.close(); 104 + // only upgrade /events/in and /api/events/in 105 + if (std.mem.eql(u8, target, "/events/in") or std.mem.eql(u8, target, "/api/events/in")) { 106 + log.debug("websocket", "upgrading {s}", .{target}); 107 + EventsHandler.upgrade(r.h, &events_settings) catch |err| { 108 + log.err("websocket", "upgrade failed: {}", .{err}); 109 + r.setStatus(.internal_server_error); 110 + r.sendBody("{\"detail\":\"upgrade failed\"}") catch {}; 111 + }; 112 + } else { 113 + r.setStatus(.not_found); 114 + r.sendBody("{\"detail\":\"not found\"}") catch {}; 115 + } 116 + } 21 117 22 - // init thread pool for http connections 23 - var pool: Thread.Pool = undefined; 24 - try pool.init(.{ 25 - .allocator = allocator, 26 - .n_jobs = MAX_HTTP_WORKERS, 27 - }); 28 - defer pool.deinit(); 118 + pub fn main() !void { 119 + // init logging first 120 + log.init(); 29 121 30 - // start http server 31 122 const port: u16 = blk: { 32 123 const port_str = posix.getenv("PREFECT_SERVER_PORT") orelse "4200"; 33 124 break :blk std.fmt.parseInt(u16, port_str, 10) catch 4200; 34 125 }; 35 126 36 - const address = try net.Address.parseIp("0.0.0.0", port); 37 - var listener = try address.listen(.{ .reuse_address = true }); 38 - defer listener.deinit(); 127 + // init database 128 + log.info("database", "initializing...", .{}); 129 + try db.init(); 130 + defer db.close(); 131 + log.info("database", "ready", .{}); 39 132 40 - std.debug.print("prefect-zig listening on http://0.0.0.0:{d} (max {} workers)\n", .{ port, MAX_HTTP_WORKERS }); 133 + // start background services 134 + try services.startAll(); 135 + defer services.stopAll(); 41 136 42 - while (true) { 43 - const conn = listener.accept() catch |err| { 44 - std.debug.print("accept error: {}\n", .{err}); 45 - continue; 46 - }; 137 + var listener = zap.HttpListener.init(.{ 138 + .port = port, 139 + .on_request = onRequest, 140 + .on_upgrade = onUpgrade, 141 + .log = true, // facil.io request logging 142 + .max_clients = 1000, 143 + .max_body_size = 16 * 1024 * 1024, // 16MB 144 + }); 47 145 48 - setSocketTimeout(conn.stream.handle, SOCKET_TIMEOUT_SECS) catch |err| { 49 - std.debug.print("failed to set socket timeout: {}\n", .{err}); 50 - }; 146 + try listener.listen(); 51 147 52 - pool.spawn(server.handleConnection, .{conn}) catch |err| { 53 - std.debug.print("pool spawn error: {}\n", .{err}); 54 - conn.stream.close(); 55 - }; 56 - } 57 - } 148 + log.info("server", "listening on http://0.0.0.0:{d}", .{port}); 58 149 59 - fn setSocketTimeout(fd: posix.fd_t, secs: u32) !void { 60 - const timeout = std.mem.toBytes(posix.timeval{ 61 - .sec = @intCast(secs), 62 - .usec = 0, 150 + // start event loop 151 + zap.start(.{ 152 + .threads = 4, 153 + .workers = 1, 63 154 }); 64 - try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout); 65 - try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout); 66 155 }
+207
src/messaging.zig
··· 1 + const std = @import("std"); 2 + const Thread = std.Thread; 3 + const Mutex = Thread.Mutex; 4 + const log = @import("logging.zig"); 5 + 6 + /// Message wrapper for broker 7 + pub const Message = struct { 8 + id: []const u8, 9 + topic: []const u8, 10 + payload: []const u8, 11 + timestamp: i64, 12 + }; 13 + 14 + /// Bounded channel for message passing 15 + pub fn BoundedChannel(comptime T: type, comptime capacity: usize) type { 16 + return struct { 17 + const Self = @This(); 18 + 19 + buffer: [capacity]T = undefined, 20 + head: usize = 0, 21 + tail: usize = 0, 22 + count: usize = 0, 23 + mutex: Mutex = .{}, 24 + not_empty: Thread.Condition = .{}, 25 + not_full: Thread.Condition = .{}, 26 + closed: bool = false, 27 + 28 + pub fn init() Self { 29 + return .{}; 30 + } 31 + 32 + /// Try to send without blocking. Returns false if full or closed. 33 + pub fn trySend(self: *Self, item: T) bool { 34 + self.mutex.lock(); 35 + defer self.mutex.unlock(); 36 + 37 + if (self.closed or self.count >= capacity) { 38 + return false; 39 + } 40 + 41 + self.buffer[self.tail] = item; 42 + self.tail = (self.tail + 1) % capacity; 43 + self.count += 1; 44 + self.not_empty.signal(); 45 + return true; 46 + } 47 + 48 + /// Receive with timeout. Returns null if closed or timeout. 49 + pub fn receiveTimeout(self: *Self, timeout_ns: u64) ?T { 50 + self.mutex.lock(); 51 + defer self.mutex.unlock(); 52 + 53 + while (self.count == 0 and !self.closed) { 54 + self.not_empty.timedWait(&self.mutex, timeout_ns) catch { 55 + return null; 56 + }; 57 + } 58 + 59 + if (self.count == 0) return null; 60 + 61 + const item = self.buffer[self.head]; 62 + self.head = (self.head + 1) % capacity; 63 + self.count -= 1; 64 + self.not_full.signal(); 65 + return item; 66 + } 67 + 68 + /// Drain up to max items into provided slice. Returns count drained. 69 + pub fn drain(self: *Self, out: []T, max: usize) usize { 70 + self.mutex.lock(); 71 + defer self.mutex.unlock(); 72 + 73 + const to_drain = @min(self.count, @min(max, out.len)); 74 + for (0..to_drain) |i| { 75 + out[i] = self.buffer[self.head]; 76 + self.head = (self.head + 1) % capacity; 77 + } 78 + self.count -= to_drain; 79 + if (to_drain > 0) self.not_full.broadcast(); 80 + return to_drain; 81 + } 82 + 83 + pub fn len(self: *Self) usize { 84 + self.mutex.lock(); 85 + defer self.mutex.unlock(); 86 + return self.count; 87 + } 88 + 89 + pub fn close(self: *Self) void { 90 + self.mutex.lock(); 91 + defer self.mutex.unlock(); 92 + self.closed = true; 93 + self.not_empty.broadcast(); 94 + self.not_full.broadcast(); 95 + } 96 + 97 + pub fn isClosed(self: *Self) bool { 98 + self.mutex.lock(); 99 + defer self.mutex.unlock(); 100 + return self.closed; 101 + } 102 + }; 103 + } 104 + 105 + /// Stored event data (owns memory) 106 + pub const StoredEvent = struct { 107 + id: [64]u8, 108 + id_len: usize, 109 + occurred: [32]u8, 110 + occurred_len: usize, 111 + event_name: [128]u8, 112 + event_name_len: usize, 113 + resource_id: [256]u8, 114 + resource_id_len: usize, 115 + payload: [8192]u8, 116 + payload_len: usize, 117 + 118 + pub fn idSlice(self: *const StoredEvent) []const u8 { 119 + return self.id[0..self.id_len]; 120 + } 121 + 122 + pub fn occurredSlice(self: *const StoredEvent) []const u8 { 123 + return self.occurred[0..self.occurred_len]; 124 + } 125 + 126 + pub fn eventNameSlice(self: *const StoredEvent) []const u8 { 127 + return self.event_name[0..self.event_name_len]; 128 + } 129 + 130 + pub fn resourceIdSlice(self: *const StoredEvent) []const u8 { 131 + return self.resource_id[0..self.resource_id_len]; 132 + } 133 + 134 + pub fn payloadSlice(self: *const StoredEvent) []const u8 { 135 + return self.payload[0..self.payload_len]; 136 + } 137 + }; 138 + 139 + /// Event channel with 50k capacity (matches Prefect's backpressure limit) 140 + pub const EventChannel = BoundedChannel(StoredEvent, 50000); 141 + 142 + /// Global event channel 143 + var event_channel: EventChannel = EventChannel.init(); 144 + var dropped_count: usize = 0; 145 + var dropped_mutex: Mutex = .{}; 146 + 147 + /// Publish an event to the events topic. Returns false if dropped due to backpressure. 148 + pub fn publishEvent( 149 + id: []const u8, 150 + occurred: []const u8, 151 + event_name: []const u8, 152 + resource_id: []const u8, 153 + payload: []const u8, 154 + ) bool { 155 + var stored: StoredEvent = undefined; 156 + 157 + // copy into fixed buffers 158 + const id_len = @min(id.len, stored.id.len); 159 + @memcpy(stored.id[0..id_len], id[0..id_len]); 160 + stored.id_len = id_len; 161 + 162 + const occ_len = @min(occurred.len, stored.occurred.len); 163 + @memcpy(stored.occurred[0..occ_len], occurred[0..occ_len]); 164 + stored.occurred_len = occ_len; 165 + 166 + const name_len = @min(event_name.len, stored.event_name.len); 167 + @memcpy(stored.event_name[0..name_len], event_name[0..name_len]); 168 + stored.event_name_len = name_len; 169 + 170 + const res_len = @min(resource_id.len, stored.resource_id.len); 171 + @memcpy(stored.resource_id[0..res_len], resource_id[0..res_len]); 172 + stored.resource_id_len = res_len; 173 + 174 + const pay_len = @min(payload.len, stored.payload.len); 175 + @memcpy(stored.payload[0..pay_len], payload[0..pay_len]); 176 + stored.payload_len = pay_len; 177 + 178 + if (!event_channel.trySend(stored)) { 179 + dropped_mutex.lock(); 180 + dropped_count += 1; 181 + const count = dropped_count; 182 + dropped_mutex.unlock(); 183 + 184 + if (count % 100 == 1) { 185 + log.warn("events", "backpressure: dropped {d} events", .{count}); 186 + } 187 + return false; 188 + } 189 + return true; 190 + } 191 + 192 + /// Get the event channel for consumers 193 + pub fn getEventChannel() *EventChannel { 194 + return &event_channel; 195 + } 196 + 197 + /// Get dropped event count 198 + pub fn getDroppedCount() usize { 199 + dropped_mutex.lock(); 200 + defer dropped_mutex.unlock(); 201 + return dropped_count; 202 + } 203 + 204 + /// Close the event channel (for shutdown) 205 + pub fn close() void { 206 + event_channel.close(); 207 + }
+219
src/services/event_persister.zig
··· 1 + const std = @import("std"); 2 + const Thread = std.Thread; 3 + const log = @import("../logging.zig"); 4 + const messaging = @import("../messaging.zig"); 5 + const db = @import("../db/sqlite.zig"); 6 + const common = @import("../api/common.zig"); 7 + 8 + const BATCH_SIZE: usize = 100; 9 + const FLUSH_INTERVAL_MS: u64 = 1000; // 1 second 10 + const TRIM_INTERVAL_MS: u64 = 60 * 60 * 1000; // 1 hour 11 + const RETENTION_DAYS: i64 = 7; 12 + 13 + var worker_thread: ?Thread = null; 14 + var trimmer_thread: ?Thread = null; 15 + var running: bool = false; 16 + var mutex: Thread.Mutex = .{}; 17 + 18 + pub fn start() !void { 19 + mutex.lock(); 20 + defer mutex.unlock(); 21 + 22 + if (running) return; 23 + running = true; 24 + 25 + log.info("event_persister", "starting", .{}); 26 + 27 + worker_thread = try Thread.spawn(.{}, workerLoop, .{}); 28 + trimmer_thread = try Thread.spawn(.{}, trimmerLoop, .{}); 29 + } 30 + 31 + pub fn stop() void { 32 + mutex.lock(); 33 + const was_running = running; 34 + running = false; 35 + mutex.unlock(); 36 + 37 + if (!was_running) return; 38 + 39 + log.info("event_persister", "stopping", .{}); 40 + 41 + // close the channel to unblock the worker 42 + messaging.close(); 43 + 44 + if (worker_thread) |t| { 45 + t.join(); 46 + worker_thread = null; 47 + } 48 + if (trimmer_thread) |t| { 49 + t.join(); 50 + trimmer_thread = null; 51 + } 52 + 53 + log.info("event_persister", "stopped", .{}); 54 + } 55 + 56 + pub fn isRunning() bool { 57 + mutex.lock(); 58 + defer mutex.unlock(); 59 + return running; 60 + } 61 + 62 + fn workerLoop() void { 63 + var batch: [BATCH_SIZE]messaging.StoredEvent = undefined; 64 + var batch_count: usize = 0; 65 + var last_flush = std.time.milliTimestamp(); 66 + 67 + const channel = messaging.getEventChannel(); 68 + 69 + while (true) { 70 + // check if we should stop 71 + mutex.lock(); 72 + const should_run = running; 73 + mutex.unlock(); 74 + 75 + if (!should_run and channel.len() == 0) break; 76 + 77 + // try to receive with timeout 78 + const timeout_ns: u64 = 100 * std.time.ns_per_ms; // 100ms 79 + if (channel.receiveTimeout(timeout_ns)) |event| { 80 + batch[batch_count] = event; 81 + batch_count += 1; 82 + 83 + // flush if batch is full 84 + if (batch_count >= BATCH_SIZE) { 85 + flushBatch(batch[0..batch_count]); 86 + batch_count = 0; 87 + last_flush = std.time.milliTimestamp(); 88 + } 89 + } 90 + 91 + // flush on interval 92 + const now = std.time.milliTimestamp(); 93 + if (batch_count > 0 and (now - last_flush) >= FLUSH_INTERVAL_MS) { 94 + flushBatch(batch[0..batch_count]); 95 + batch_count = 0; 96 + last_flush = now; 97 + } 98 + } 99 + 100 + // final flush on shutdown 101 + if (batch_count > 0) { 102 + flushBatch(batch[0..batch_count]); 103 + } 104 + } 105 + 106 + fn flushBatch(batch: []const messaging.StoredEvent) void { 107 + if (batch.len == 0) return; 108 + 109 + log.debug("event_persister", "flushing {d} events", .{batch.len}); 110 + 111 + var success_count: usize = 0; 112 + var ts_buf: [32]u8 = undefined; 113 + const now_ts = common.getTimestamp(&ts_buf); 114 + 115 + for (batch) |event| { 116 + // use INSERT OR IGNORE for deduplication 117 + db.insertEventDeduped( 118 + event.idSlice(), 119 + event.occurredSlice(), 120 + event.eventNameSlice(), 121 + event.resourceIdSlice(), 122 + event.payloadSlice(), 123 + "[]", // related_resource_ids 124 + "[]", // related 125 + "{}", // payload (separate from raw) 126 + now_ts, 127 + now_ts, 128 + null, // follows 129 + ) catch |err| { 130 + log.err("event_persister", "insert failed: {}", .{err}); 131 + continue; 132 + }; 133 + success_count += 1; 134 + } 135 + 136 + if (success_count > 0) { 137 + log.debug("event_persister", "persisted {d}/{d} events", .{ success_count, batch.len }); 138 + } 139 + } 140 + 141 + fn trimmerLoop() void { 142 + while (true) { 143 + // check if we should stop 144 + mutex.lock(); 145 + const should_run = running; 146 + mutex.unlock(); 147 + 148 + if (!should_run) break; 149 + 150 + // sleep for trim interval (check running flag periodically) 151 + var elapsed: u64 = 0; 152 + while (elapsed < TRIM_INTERVAL_MS) { 153 + Thread.sleep(1000 * std.time.ns_per_ms); // 1 second 154 + elapsed += 1000; 155 + 156 + mutex.lock(); 157 + const still_running = running; 158 + mutex.unlock(); 159 + if (!still_running) return; 160 + } 161 + 162 + // trim old events 163 + trimOldEvents(); 164 + } 165 + } 166 + 167 + fn trimOldEvents() void { 168 + const cutoff_ms = std.time.milliTimestamp() - (RETENTION_DAYS * 24 * 60 * 60 * 1000); 169 + const cutoff_secs: u64 = @intCast(@divFloor(cutoff_ms, 1000)); 170 + 171 + // format as ISO timestamp using simple math (same approach as logging.zig) 172 + const secs_per_day: u64 = 86400; 173 + const secs_per_hour: u64 = 3600; 174 + const secs_per_min: u64 = 60; 175 + 176 + // days since epoch 177 + const days = cutoff_secs / secs_per_day; 178 + const secs_today = cutoff_secs % secs_per_day; 179 + const hours = secs_today / secs_per_hour; 180 + const mins = (secs_today % secs_per_hour) / secs_per_min; 181 + const secs = secs_today % secs_per_min; 182 + 183 + // approximate year/month/day (good enough for retention) 184 + // start from 1970-01-01 185 + var year: u32 = 1970; 186 + var remaining_days = days; 187 + 188 + while (true) { 189 + const days_in_year: u64 = if (year % 4 == 0 and (year % 100 != 0 or year % 400 == 0)) 366 else 365; 190 + if (remaining_days < days_in_year) break; 191 + remaining_days -= days_in_year; 192 + year += 1; 193 + } 194 + 195 + const is_leap = year % 4 == 0 and (year % 100 != 0 or year % 400 == 0); 196 + const days_in_months = [_]u8{ 31, if (is_leap) 29 else 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31 }; 197 + 198 + var month: u8 = 1; 199 + for (days_in_months) |dim| { 200 + if (remaining_days < dim) break; 201 + remaining_days -= dim; 202 + month += 1; 203 + } 204 + const day: u8 = @intCast(remaining_days + 1); 205 + 206 + var buf: [32]u8 = undefined; 207 + const cutoff_str = std.fmt.bufPrint(&buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ 208 + year, month, day, hours, mins, secs, 209 + }) catch return; 210 + 211 + const deleted = db.trimEvents(cutoff_str) catch |err| { 212 + log.err("event_persister", "trim failed: {}", .{err}); 213 + return; 214 + }; 215 + 216 + if (deleted > 0) { 217 + log.info("event_persister", "trimmed {d} events older than {s}", .{ deleted, cutoff_str }); 218 + } 219 + }
+31
src/services/mod.zig
··· 1 + const std = @import("std"); 2 + const log = @import("../logging.zig"); 3 + 4 + pub const event_persister = @import("event_persister.zig"); 5 + 6 + pub const Service = struct { 7 + name: []const u8, 8 + start: *const fn () anyerror!void, 9 + stop: *const fn () void, 10 + }; 11 + 12 + const all = [_]Service{ 13 + .{ .name = "event_persister", .start = event_persister.start, .stop = event_persister.stop }, 14 + }; 15 + 16 + pub fn startAll() !void { 17 + for (all) |svc| { 18 + log.info("services", "starting {s}", .{svc.name}); 19 + try svc.start(); 20 + } 21 + } 22 + 23 + pub fn stopAll() void { 24 + // stop in reverse order 25 + var i = all.len; 26 + while (i > 0) { 27 + i -= 1; 28 + log.info("services", "stopping {s}", .{all[i].name}); 29 + all[i].stop(); 30 + } 31 + }
+19
test_flow.py
··· 1 + from prefect import flow, task 2 + 3 + @task 4 + def add_task(a: int, b: int) -> int: 5 + return a + b 6 + 7 + @task 8 + def multiply_task(a: int, b: int) -> int: 9 + return a * b 10 + 11 + @flow 12 + def math_flow(a: int, b: int) -> int: 13 + sum_result = add_task(a, b) 14 + product = multiply_task(sum_result, 2) 15 + return product 16 + 17 + if __name__ == "__main__": 18 + result = math_flow(3, 4) 19 + print(f"math_flow(3, 4) = {result}")
+46
test_websocket.py
··· 1 + """Direct WebSocket test to verify ping/pong and message handling""" 2 + import asyncio 3 + import json 4 + import uuid 5 + from datetime import datetime, timezone 6 + 7 + async def test_websocket(): 8 + import websockets 9 + 10 + uri = "ws://localhost:4200/api/events/in" 11 + print(f"Connecting to {uri}...") 12 + 13 + try: 14 + async with websockets.connect(uri) as ws: 15 + print("Connected!") 16 + 17 + # Test ping/pong (protocol level) 18 + print("Sending ping...") 19 + pong = await ws.ping() 20 + await pong 21 + print("Pong received!") 22 + 23 + # Send a test event 24 + event = { 25 + "id": str(uuid.uuid4()), 26 + "occurred": datetime.now(timezone.utc).isoformat(), 27 + "event": "prefect.test.event", 28 + "resource": { 29 + "prefect.resource.id": "test-resource-123" 30 + }, 31 + "payload": {"test": True} 32 + } 33 + 34 + print(f"Sending event: {event['event']}") 35 + await ws.send(json.dumps(event)) 36 + print("Event sent!") 37 + 38 + # Keep connection open briefly 39 + await asyncio.sleep(0.5) 40 + print("Done!") 41 + 42 + except Exception as e: 43 + print(f"Error: {type(e).__name__}: {e}") 44 + 45 + if __name__ == "__main__": 46 + asyncio.run(test_websocket())