prefect server in zig

split api routes into separate resource files

- add uuid-zig dependency for proper uuid v4 generation
- split routes.zig into flows.zig, flow_runs.zig, admin.zig, logs.zig
- extract shared utilities to common.zig (normalizeUuid, getTimestamp, etc)
- add sqlite pragmas: WAL mode, busy_timeout, foreign_keys
- add test-flow-sequence script for integration testing
- add loq.toml config for line count limits

all flow run tests pass (success and failure cases)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+862 -436
+3 -1
.gitignore
··· 1 1 .zig-cache/ 2 2 zig-out/ 3 3 *.db 4 - .loq_cache/ 4 + *.db-wal 5 + *.db-shm 6 + .loq_cache
+9
build.zig
··· 4 4 const target = b.standardTargetOptions(.{}); 5 5 const optimize = b.standardOptimizeOption(.{}); 6 6 7 + // get uuid dependency 8 + const uuid_dep = b.dependency("uuid", .{ 9 + .target = target, 10 + .optimize = optimize, 11 + }); 12 + 7 13 const exe = b.addExecutable(.{ 8 14 .name = "prefect-server", 9 15 .root_module = b.createModule(.{ 10 16 .root_source_file = b.path("src/main.zig"), 11 17 .target = target, 12 18 .optimize = optimize, 19 + .imports = &.{ 20 + .{ .name = "uuid", .module = uuid_dep.module("uuid") }, 21 + }, 13 22 }), 14 23 }); 15 24
+6 -1
build.zig.zon
··· 3 3 .version = "0.0.1", 4 4 .fingerprint = 0x913455b313f533b0, 5 5 .minimum_zig_version = "0.15.0", 6 - .dependencies = .{}, 6 + .dependencies = .{ 7 + .uuid = .{ 8 + .url = "git+https://codeberg.org/r4gus/uuid-zig.git#97220fa411b69513df603f935fe2f8a294f1ca64", 9 + .hash = "uuid-0.4.0-oOieIVh6AAA32OyOsZmY3yAkszkVtDyvIWaCufEfACsc", 10 + }, 11 + }, 7 12 .paths = .{ 8 13 "build.zig", 9 14 "build.zig.zon",
+5
loq.toml
··· 1 + default_max_lines = 500 2 + 3 + [[rules]] 4 + path = "src/api/routes.zig" 5 + max_lines = 682
+199
scripts/test-flow-sequence
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "rich"] 5 + # /// 6 + """ 7 + Test the exact API call sequence for a flow run against prefect-zig. 8 + 9 + This mimics what the Prefect Python client does, step by step: 10 + 1. POST /flows/ - create/get flow 11 + 2. POST /flow_runs/ - create flow run with PENDING state 12 + 3. GET /flow_runs/{id} - read flow run 13 + 4. POST /flow_runs/{id}/set_state - transition to RUNNING 14 + 5. (execute user code) 15 + 6. POST /flow_runs/{id}/set_state - transition to COMPLETED or FAILED 16 + """ 17 + 18 + import os 19 + import sys 20 + import uuid 21 + from datetime import datetime, timezone 22 + 23 + import httpx 24 + from rich.console import Console 25 + from rich.panel import Panel 26 + 27 + console = Console() 28 + BASE_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") 29 + 30 + 31 + def iso_now() -> str: 32 + return datetime.now(timezone.utc).isoformat() 33 + 34 + 35 + def test_flow_sequence(flow_name: str = "test-flow", should_fail: bool = False): 36 + """Run through the exact sequence of API calls the Prefect client makes.""" 37 + 38 + console.print(Panel(f"testing flow sequence: {flow_name} (fail={should_fail})", style="blue")) 39 + console.print(f"server: {BASE_URL}\n") 40 + 41 + with httpx.Client(base_url=BASE_URL, timeout=10) as client: 42 + # step 1: create/get flow 43 + console.print("[bold]1. POST /flows/[/bold]") 44 + resp = client.post("/flows/", json={"name": flow_name}) 45 + if resp.status_code not in (200, 201): 46 + console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 47 + return False 48 + flow = resp.json() 49 + console.print(f" flow_id: {flow.get('id')}") 50 + 51 + # validate flow response has required fields 52 + for field in ["id", "name", "created"]: 53 + if field not in flow: 54 + console.print(f"[red]FAIL[/red]: missing field '{field}' in flow response") 55 + return False 56 + 57 + # step 2: create flow run 58 + console.print("\n[bold]2. POST /flow_runs/[/bold]") 59 + flow_run_create = { 60 + "flow_id": flow["id"], 61 + "name": f"run-{uuid.uuid4().hex[:8]}", 62 + "parameters": {"x": 1, "y": 2}, 63 + "state": { 64 + "type": "PENDING", 65 + "name": "Pending", 66 + "timestamp": iso_now(), 67 + "message": None, 68 + }, 69 + } 70 + resp = client.post("/flow_runs/", json=flow_run_create) 71 + if resp.status_code not in (200, 201): 72 + console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 73 + return False 74 + flow_run = resp.json() 75 + flow_run_id = flow_run.get("id") 76 + console.print(f" flow_run_id: {flow_run_id}") 77 + console.print(f" state: {flow_run.get('state_type')}") 78 + 79 + # validate flow run response 80 + for field in ["id", "flow_id", "name", "state_type", "state"]: 81 + if field not in flow_run: 82 + console.print(f"[red]FAIL[/red]: missing field '{field}' in flow_run response") 83 + return False 84 + 85 + # step 3: read flow run (refresh before execution) 86 + console.print("\n[bold]3. GET /flow_runs/{id}[/bold]") 87 + resp = client.get(f"/flow_runs/{flow_run_id}") 88 + if resp.status_code != 200: 89 + console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 90 + return False 91 + flow_run = resp.json() 92 + console.print(f" state: {flow_run.get('state_type')}") 93 + 94 + # step 4: set state to RUNNING 95 + console.print("\n[bold]4. POST /flow_runs/{id}/set_state (RUNNING)[/bold]") 96 + set_state_running = { 97 + "state": { 98 + "type": "RUNNING", 99 + "name": "Running", 100 + "timestamp": iso_now(), 101 + "message": None, 102 + }, 103 + "force": False, 104 + } 105 + resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json=set_state_running) 106 + if resp.status_code != 200: 107 + console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 108 + return False 109 + result = resp.json() 110 + console.print(f" status: {result.get('status')}") 111 + console.print(f" state.type: {result.get('state', {}).get('type')}") 112 + 113 + # validate orchestration result 114 + if result.get("status") != "ACCEPT": 115 + console.print(f"[yellow]WARN[/yellow]: expected ACCEPT, got {result.get('status')}") 116 + 117 + # step 5: simulate user code execution 118 + console.print("\n[bold]5. (execute user code)[/bold]") 119 + if should_fail: 120 + console.print(" simulating failure...") 121 + error_message = "Flow run encountered an exception: ValueError('test error')" 122 + else: 123 + console.print(" simulating success...") 124 + error_message = None 125 + 126 + # step 6: set final state 127 + if should_fail: 128 + console.print("\n[bold]6. POST /flow_runs/{id}/set_state (FAILED)[/bold]") 129 + final_state = { 130 + "state": { 131 + "type": "FAILED", 132 + "name": "Failed", 133 + "timestamp": iso_now(), 134 + "message": error_message, 135 + }, 136 + "force": False, 137 + } 138 + else: 139 + console.print("\n[bold]6. POST /flow_runs/{id}/set_state (COMPLETED)[/bold]") 140 + final_state = { 141 + "state": { 142 + "type": "COMPLETED", 143 + "name": "Completed", 144 + "timestamp": iso_now(), 145 + "message": None, 146 + }, 147 + "force": False, 148 + } 149 + 150 + resp = client.post(f"/flow_runs/{flow_run_id}/set_state", json=final_state) 151 + if resp.status_code != 200: 152 + console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 153 + return False 154 + result = resp.json() 155 + console.print(f" status: {result.get('status')}") 156 + console.print(f" state.type: {result.get('state', {}).get('type')}") 157 + 158 + # verify final state 159 + console.print("\n[bold]7. GET /flow_runs/{id} (verify final state)[/bold]") 160 + resp = client.get(f"/flow_runs/{flow_run_id}") 161 + if resp.status_code != 200: 162 + console.print(f"[red]FAIL[/red]: {resp.status_code} {resp.text}") 163 + return False 164 + flow_run = resp.json() 165 + final_type = flow_run.get("state_type") 166 + expected_type = "FAILED" if should_fail else "COMPLETED" 167 + 168 + if final_type == expected_type: 169 + console.print(f" [green]state: {final_type} (correct)[/green]") 170 + else: 171 + console.print(f" [red]state: {final_type} (expected {expected_type})[/red]") 172 + return False 173 + 174 + return True 175 + 176 + 177 + def main(): 178 + console.print("\n[bold cyan]prefect-zig flow sequence test[/bold cyan]\n") 179 + 180 + # test happy path 181 + success = test_flow_sequence("happy-flow", should_fail=False) 182 + console.print() 183 + 184 + # test failure path 185 + failure = test_flow_sequence("sad-flow", should_fail=True) 186 + console.print() 187 + 188 + # summary 189 + console.print("=" * 50) 190 + if success and failure: 191 + console.print("[bold green]all tests passed[/bold green]") 192 + sys.exit(0) 193 + else: 194 + console.print(f"[bold red]tests failed[/bold red] (success={success}, failure={failure})") 195 + sys.exit(1) 196 + 197 + 198 + if __name__ == "__main__": 199 + main()
+40
src/api/admin.zig
··· 1 + const std = @import("std"); 2 + const http = std.http; 3 + const mem = std.mem; 4 + 5 + const http_server = @import("../server/http.zig"); 6 + 7 + pub fn health(request: *http.Server.Request) !void { 8 + try http_server.sendJson(request, "{\"status\":\"healthy\"}"); 9 + } 10 + 11 + pub fn csrfToken(request: *http.Server.Request) !void { 12 + // prefect client expects a csrf token with client id and expiration 13 + // extract client from query string: /api/csrf-token?client=... 14 + const target = request.head.target; 15 + var client_id: []const u8 = "unknown"; 16 + 17 + if (mem.indexOf(u8, target, "client=")) |start| { 18 + const rest = target[start + 7 ..]; 19 + if (mem.indexOf(u8, rest, "&")) |end| { 20 + client_id = rest[0..end]; 21 + } else { 22 + client_id = rest; 23 + } 24 + } 25 + 26 + var buf: [512]u8 = undefined; 27 + const response = std.fmt.bufPrint(&buf, 28 + \\{{"token":"zig-csrf-token","client":"{s}","expiration":"2099-01-01T00:00:00Z"}} 29 + , .{client_id}) catch { 30 + try http_server.sendJson(request, "{\"token\":\"zig-csrf-token\",\"client\":\"unknown\",\"expiration\":\"2099-01-01T00:00:00Z\"}"); 31 + return; 32 + }; 33 + try http_server.sendJson(request, response); 34 + } 35 + 36 + pub fn version(request: *http.Server.Request) !void { 37 + // return version as JSON string (FastAPI behavior) 38 + // must return 3.x to match Python client major version 39 + try http_server.sendJson(request, "\"3.0.0\""); 40 + }
+123
src/api/common.zig
··· 1 + const std = @import("std"); 2 + const mem = std.mem; 3 + const uuid = @import("uuid"); 4 + 5 + pub fn normalizeUuid(alloc: std.mem.Allocator, input_uuid: []const u8) []const u8 { 6 + // remove hyphens from UUID (convert 36-char to 32-char) 7 + // ALWAYS copy because input may point to buffer that gets overwritten 8 + if (input_uuid.len == 32) { 9 + return alloc.dupe(u8, input_uuid) catch input_uuid; 10 + } 11 + if (input_uuid.len != 36) { 12 + return alloc.dupe(u8, input_uuid) catch input_uuid; 13 + } 14 + 15 + var result: [32]u8 = undefined; 16 + var j: usize = 0; 17 + for (input_uuid) |c| { 18 + if (c != '-') { 19 + if (j >= 32) return alloc.dupe(u8, input_uuid) catch input_uuid; 20 + result[j] = c; 21 + j += 1; 22 + } 23 + } 24 + 25 + return alloc.dupe(u8, &result) catch input_uuid; 26 + } 27 + 28 + pub fn extractId(target: []const u8, prefix: []const u8, suffix: []const u8) ?[]const u8 { 29 + if (!mem.startsWith(u8, target, prefix)) return null; 30 + if (!mem.endsWith(u8, target, suffix)) return null; 31 + const start = prefix.len; 32 + const end = target.len - suffix.len; 33 + if (start >= end) return null; 34 + return target[start..end]; 35 + } 36 + 37 + pub fn extractIdSimple(target: []const u8, prefix: []const u8) ?[]const u8 { 38 + if (!mem.startsWith(u8, target, prefix)) return null; 39 + const rest = target[prefix.len..]; 40 + // accept both 32-char hex and 36-char hyphenated UUIDs 41 + // strip query string if present 42 + const id_end = mem.indexOf(u8, rest, "?") orelse rest.len; 43 + if (id_end >= 36) { 44 + return rest[0..36]; 45 + } else if (id_end >= 32) { 46 + return rest[0..32]; 47 + } 48 + return null; 49 + } 50 + 51 + pub fn generateUuid(alloc: std.mem.Allocator) []const u8 { 52 + const id = uuid.v4.new(); 53 + var buf: [36]u8 = undefined; 54 + const urn = uuid.urn.serialize(id); 55 + @memcpy(&buf, &urn); 56 + 57 + var hex_buf: [32]u8 = undefined; 58 + var j: usize = 0; 59 + for (buf) |c| { 60 + if (c != '-') { 61 + hex_buf[j] = c; 62 + j += 1; 63 + } 64 + } 65 + 66 + return alloc.dupe(u8, &hex_buf) catch "00000000000000000000000000000000"; 67 + } 68 + 69 + pub fn generateRunName(alloc: std.mem.Allocator) []const u8 { 70 + const adjectives = [_][]const u8{ "happy", "quick", "brave", "calm", "eager" }; 71 + const nouns = [_][]const u8{ "panda", "tiger", "eagle", "dolphin", "falcon" }; 72 + 73 + var rng_buf: [2]u8 = undefined; 74 + std.crypto.random.bytes(&rng_buf); 75 + 76 + const adj = adjectives[rng_buf[0] % adjectives.len]; 77 + const noun = nouns[rng_buf[1] % nouns.len]; 78 + 79 + return std.fmt.allocPrint(alloc, "{s}-{s}", .{ adj, noun }) catch "unnamed-run"; 80 + } 81 + 82 + pub fn getTimestamp(buf: *[32]u8) []const u8 { 83 + const ts = std.time.timestamp(); 84 + const epoch_secs: u64 = @intCast(ts); 85 + 86 + const secs_per_day: u64 = 86400; 87 + const days_since_epoch = epoch_secs / secs_per_day; 88 + const secs_today = epoch_secs % secs_per_day; 89 + 90 + const hours: u64 = secs_today / 3600; 91 + const mins: u64 = (secs_today % 3600) / 60; 92 + const secs: u64 = secs_today % 60; 93 + 94 + var days = days_since_epoch; 95 + var year: u64 = 1970; 96 + while (true) { 97 + const days_in_year: u64 = if (isLeapYear(year)) 366 else 365; 98 + if (days < days_in_year) break; 99 + days -= days_in_year; 100 + year += 1; 101 + } 102 + 103 + const month_days = if (isLeapYear(year)) 104 + [_]u64{ 31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31 } 105 + else 106 + [_]u64{ 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31 }; 107 + 108 + var month: u64 = 1; 109 + for (month_days) |md| { 110 + if (days < md) break; 111 + days -= md; 112 + month += 1; 113 + } 114 + const day = days + 1; 115 + 116 + return std.fmt.bufPrint(buf, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}.000000Z", .{ 117 + year, month, day, hours, mins, secs, 118 + }) catch "2025-01-17T00:00:00.000000Z"; 119 + } 120 + 121 + fn isLeapYear(year: u64) bool { 122 + return (year % 4 == 0 and year % 100 != 0) or (year % 400 == 0); 123 + }
+310
src/api/flow_runs.zig
··· 1 + const std = @import("std"); 2 + const http = std.http; 3 + const mem = std.mem; 4 + const json = std.json; 5 + 6 + const db = @import("../db/sqlite.zig"); 7 + const http_server = @import("../server/http.zig"); 8 + const common = @import("common.zig"); 9 + 10 + // POST /flow_runs/ - create flow run 11 + // GET /flow_runs/{id} - read flow run 12 + // POST /flow_runs/{id}/set_state - set state 13 + // POST /flow_runs/filter - list flow runs 14 + pub fn handle(request: *http.Server.Request, database: *db.Database, target: []const u8) !void { 15 + const method = request.head.method; 16 + 17 + // POST /flow_runs/ - create 18 + if (method == .POST and (mem.eql(u8, target, "/flow_runs/") or mem.eql(u8, target, "/api/flow_runs/"))) { 19 + try create(request, database); 20 + return; 21 + } 22 + 23 + // POST /flow_runs/filter - list 24 + if (method == .POST and (mem.endsWith(u8, target, "/filter"))) { 25 + try filter(request, database); 26 + return; 27 + } 28 + 29 + // check for /{id}/set_state 30 + if (method == .POST and mem.endsWith(u8, target, "/set_state")) { 31 + const id = common.extractId(target, "/flow_runs/", "/set_state") orelse 32 + common.extractId(target, "/api/flow_runs/", "/set_state"); 33 + if (id) |flow_run_id| { 34 + try setState(request, database, flow_run_id); 35 + return; 36 + } 37 + } 38 + 39 + // GET /flow_runs/{id} - read single 40 + if (method == .GET) { 41 + const id = common.extractIdSimple(target, "/flow_runs/") orelse 42 + common.extractIdSimple(target, "/api/flow_runs/"); 43 + if (id) |flow_run_id| { 44 + try read(request, database, flow_run_id); 45 + return; 46 + } 47 + } 48 + 49 + try http_server.sendJsonStatus(request, "{\"detail\":\"not found\"}", .not_found); 50 + } 51 + 52 + fn create(request: *http.Server.Request, database: *db.Database) !void { 53 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 54 + defer arena.deinit(); 55 + const alloc = arena.allocator(); 56 + 57 + var body_buf: [16384]u8 = undefined; 58 + request.head.expect = null; 59 + const body_reader = request.readerExpectNone(&body_buf); 60 + const body = body_reader.allocRemaining(alloc, .unlimited) catch { 61 + try http_server.sendJsonStatus(request, "{\"detail\":\"failed to read body\"}", .bad_request); 62 + return; 63 + }; 64 + 65 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 66 + try http_server.sendJsonStatus(request, "{\"detail\":\"invalid json\"}", .bad_request); 67 + return; 68 + }; 69 + 70 + const obj = parsed.value.object; 71 + const raw_flow_id = if (obj.get("flow_id")) |v| switch (v) { 72 + .string => |s| s, 73 + else => { 74 + try http_server.sendJsonStatus(request, "{\"detail\":\"flow_id must be string\"}", .bad_request); 75 + return; 76 + }, 77 + } else { 78 + try http_server.sendJsonStatus(request, "{\"detail\":\"flow_id required\"}", .bad_request); 79 + return; 80 + }; 81 + 82 + // normalize UUID by removing hyphens (client may send hyphenated format) 83 + const flow_id = common.normalizeUuid(alloc, raw_flow_id); 84 + 85 + const name = if (obj.get("name")) |v| switch (v) { 86 + .string => |s| s, 87 + .null => common.generateRunName(alloc), 88 + else => common.generateRunName(alloc), 89 + } else common.generateRunName(alloc); 90 + const state = obj.get("state"); 91 + 92 + // extract state info 93 + var state_type: []const u8 = "PENDING"; 94 + var state_name: []const u8 = "Pending"; 95 + if (state) |s| { 96 + if (s.object.get("type")) |t| state_type = t.string; 97 + if (s.object.get("name")) |n| state_name = n.string; 98 + } 99 + 100 + const new_id = common.generateUuid(alloc); 101 + var ts_buf: [32]u8 = undefined; 102 + const now = common.getTimestamp(&ts_buf); 103 + 104 + var stmt = database.prepare( 105 + \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp) 106 + \\VALUES (?1, ?2, ?3, ?4, ?5, ?6) 107 + ) catch { 108 + try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 109 + return; 110 + }; 111 + defer stmt.deinit(); 112 + 113 + stmt.bindText(1, new_id) catch {}; 114 + stmt.bindText(2, flow_id) catch {}; 115 + stmt.bindText(3, name) catch {}; 116 + stmt.bindText(4, state_type) catch {}; 117 + stmt.bindText(5, state_name) catch {}; 118 + stmt.bindText(6, now) catch {}; 119 + 120 + _ = stmt.step() catch { 121 + try http_server.sendJsonStatus(request, "{\"detail\":\"insert failed\"}", .internal_server_error); 122 + return; 123 + }; 124 + 125 + // return the created flow run (minimal response for now) 126 + const response = std.fmt.allocPrint(alloc, 127 + \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}},"parameters":{{}},"tags":[],"run_count":0}} 128 + , .{ new_id, now, now, name, flow_id, state_type, state_name, state_type, state_name, now, common.generateUuid(alloc) }) catch { 129 + try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 130 + return; 131 + }; 132 + 133 + try http_server.sendJsonStatus(request, response, .created); 134 + } 135 + 136 + fn read(request: *http.Server.Request, database: *db.Database, raw_id: []const u8) !void { 137 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 138 + defer arena.deinit(); 139 + const alloc = arena.allocator(); 140 + 141 + const id = common.normalizeUuid(alloc, raw_id); 142 + 143 + var stmt = database.prepare( 144 + \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, parameters, tags 145 + \\FROM flow_run WHERE id = ?1 146 + ) catch { 147 + try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 148 + return; 149 + }; 150 + defer stmt.deinit(); 151 + 152 + stmt.bindText(1, id) catch {}; 153 + 154 + if (!(stmt.step() catch false)) { 155 + try http_server.sendJsonStatus(request, "{\"detail\":\"flow run not found\"}", .not_found); 156 + return; 157 + } 158 + 159 + const run_id = stmt.columnText(0) orelse ""; 160 + const created = stmt.columnText(1) orelse ""; 161 + const updated = stmt.columnText(2) orelse ""; 162 + const flow_id = stmt.columnText(3) orelse ""; 163 + const run_name = stmt.columnText(4) orelse ""; 164 + const state_type = stmt.columnText(5) orelse "PENDING"; 165 + const state_name = stmt.columnText(6) orelse "Pending"; 166 + const state_timestamp = stmt.columnText(7) orelse created; 167 + const parameters = stmt.columnText(8) orelse "{}"; 168 + const tags = stmt.columnText(9) orelse "[]"; 169 + 170 + const response = std.fmt.allocPrint(alloc, 171 + \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"parameters":{s},"tags":{s},"run_count":0}} 172 + , .{ run_id, created, updated, run_name, flow_id, state_type, state_name, state_type, state_name, state_timestamp, parameters, tags }) catch { 173 + try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 174 + return; 175 + }; 176 + 177 + try http_server.sendJson(request, response); 178 + } 179 + 180 + fn setState(request: *http.Server.Request, database: *db.Database, raw_id: []const u8) !void { 181 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 182 + defer arena.deinit(); 183 + const alloc = arena.allocator(); 184 + 185 + const id = common.normalizeUuid(alloc, raw_id); 186 + 187 + var body_buf: [8192]u8 = undefined; 188 + request.head.expect = null; 189 + const body_reader = request.readerExpectNone(&body_buf); 190 + const body = body_reader.allocRemaining(alloc, .unlimited) catch { 191 + try http_server.sendJsonStatus(request, "{\"detail\":\"failed to read body\"}", .bad_request); 192 + return; 193 + }; 194 + 195 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 196 + try http_server.sendJsonStatus(request, "{\"detail\":\"invalid json\"}", .bad_request); 197 + return; 198 + }; 199 + 200 + const state = parsed.value.object.get("state") orelse { 201 + try http_server.sendJsonStatus(request, "{\"detail\":\"state required\"}", .bad_request); 202 + return; 203 + }; 204 + 205 + const state_type = if (state.object.get("type")) |v| v.string else "PENDING"; 206 + const state_name = if (state.object.get("name")) |v| v.string else "Pending"; 207 + var ts_buf: [32]u8 = undefined; 208 + const now = common.getTimestamp(&ts_buf); 209 + 210 + // update the flow run state 211 + var stmt = database.prepare( 212 + \\UPDATE flow_run SET state_type = ?1, state_name = ?2, state_timestamp = ?3, updated = ?3 213 + \\WHERE id = ?4 214 + ) catch { 215 + try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 216 + return; 217 + }; 218 + defer stmt.deinit(); 219 + 220 + stmt.bindText(1, state_type) catch {}; 221 + stmt.bindText(2, state_name) catch {}; 222 + stmt.bindText(3, now) catch {}; 223 + stmt.bindText(4, id) catch {}; 224 + 225 + _ = stmt.step() catch { 226 + try http_server.sendJsonStatus(request, "{\"detail\":\"update failed\"}", .internal_server_error); 227 + return; 228 + }; 229 + 230 + // insert state history (non-fatal if it fails) 231 + var history_stmt = database.prepare( 232 + \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp) 233 + \\VALUES (?1, ?2, ?3, ?4, ?5) 234 + ) catch { 235 + // non-fatal, skip state history 236 + const state_id = common.generateUuid(alloc); 237 + const response = std.fmt.allocPrint(alloc, 238 + \\{{"status":"ACCEPT","details":{{}},"state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}}}} 239 + , .{ state_type, state_name, now, state_id }) catch { 240 + try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 241 + return; 242 + }; 243 + try http_server.sendJson(request, response); 244 + return; 245 + }; 246 + defer history_stmt.deinit(); 247 + 248 + history_stmt.bindText(1, common.generateUuid(alloc)) catch {}; 249 + history_stmt.bindText(2, id) catch {}; 250 + history_stmt.bindText(3, state_type) catch {}; 251 + history_stmt.bindText(4, state_name) catch {}; 252 + history_stmt.bindText(5, now) catch {}; 253 + _ = history_stmt.step() catch {}; // non-fatal 254 + 255 + // return orchestration result 256 + const state_id = common.generateUuid(alloc); 257 + const response = std.fmt.allocPrint(alloc, 258 + \\{{"status":"ACCEPT","details":{{}},"state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}}}} 259 + , .{ state_type, state_name, now, state_id }) catch { 260 + try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 261 + return; 262 + }; 263 + 264 + try http_server.sendJson(request, response); 265 + } 266 + 267 + fn filter(request: *http.Server.Request, database: *db.Database) !void { 268 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 269 + defer arena.deinit(); 270 + const alloc = arena.allocator(); 271 + 272 + // for now, just return all flow runs (TODO: implement filters) 273 + var stmt = database.prepare( 274 + \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp 275 + \\FROM flow_run ORDER BY created DESC LIMIT 50 276 + ) catch { 277 + try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 278 + return; 279 + }; 280 + defer stmt.deinit(); 281 + 282 + var results: std.ArrayList(u8) = .{}; 283 + defer results.deinit(alloc); 284 + 285 + try results.appendSlice(alloc, "["); 286 + 287 + var first = true; 288 + while (stmt.step() catch false) { 289 + if (!first) try results.appendSlice(alloc, ","); 290 + first = false; 291 + 292 + const run_id = stmt.columnText(0) orelse ""; 293 + const created = stmt.columnText(1) orelse ""; 294 + const updated = stmt.columnText(2) orelse ""; 295 + const flow_id = stmt.columnText(3) orelse ""; 296 + const run_name = stmt.columnText(4) orelse ""; 297 + const state_type = stmt.columnText(5) orelse "PENDING"; 298 + const state_name = stmt.columnText(6) orelse "Pending"; 299 + const state_timestamp = stmt.columnText(7) orelse created; 300 + 301 + const item = std.fmt.allocPrint(alloc, 302 + \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"parameters":{{}},"tags":[],"run_count":0}} 303 + , .{ run_id, created, updated, run_name, flow_id, state_type, state_name, state_type, state_name, state_timestamp }) catch continue; 304 + 305 + try results.appendSlice(alloc, item); 306 + } 307 + 308 + try results.appendSlice(alloc, "]"); 309 + try http_server.sendJson(request, results.items); 310 + }
+101
src/api/flows.zig
··· 1 + const std = @import("std"); 2 + const http = std.http; 3 + const mem = std.mem; 4 + const json = std.json; 5 + 6 + const db = @import("../db/sqlite.zig"); 7 + const http_server = @import("../server/http.zig"); 8 + const common = @import("common.zig"); 9 + 10 + // POST /flows/ - create or get flow by name 11 + // GET /flows/{id} - read flow by id 12 + pub fn handle(request: *http.Server.Request, database: *db.Database, target: []const u8) !void { 13 + if (request.head.method == .POST and (mem.eql(u8, target, "/flows/") or mem.eql(u8, target, "/api/flows/"))) { 14 + try createFlow(request, database); 15 + } else { 16 + try http_server.sendJsonStatus(request, "{\"detail\":\"not implemented\"}", .not_implemented); 17 + } 18 + } 19 + 20 + fn createFlow(request: *http.Server.Request, database: *db.Database) !void { 21 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 22 + defer arena.deinit(); 23 + const alloc = arena.allocator(); 24 + 25 + // read request body 26 + var body_buf: [8192]u8 = undefined; 27 + request.head.expect = null; // we don't send 100-continue 28 + const body_reader = request.readerExpectNone(&body_buf); 29 + const body = body_reader.allocRemaining(alloc, .unlimited) catch { 30 + try http_server.sendJsonStatus(request, "{\"detail\":\"failed to read body\"}", .bad_request); 31 + return; 32 + }; 33 + 34 + // parse json to get name 35 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 36 + try http_server.sendJsonStatus(request, "{\"detail\":\"invalid json\"}", .bad_request); 37 + return; 38 + }; 39 + 40 + const name = parsed.value.object.get("name") orelse { 41 + try http_server.sendJsonStatus(request, "{\"detail\":\"name required\"}", .bad_request); 42 + return; 43 + }; 44 + const name_str = name.string; 45 + 46 + // try to get existing flow first 47 + var select_stmt = database.prepare("SELECT id, created, updated, name, tags FROM flow WHERE name = ?1") catch { 48 + try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 49 + return; 50 + }; 51 + defer select_stmt.deinit(); 52 + 53 + select_stmt.bindText(1, name_str) catch {}; 54 + 55 + if (select_stmt.step() catch false) { 56 + // flow exists, return it 57 + const id = select_stmt.columnText(0) orelse ""; 58 + const created = select_stmt.columnText(1) orelse ""; 59 + const updated = select_stmt.columnText(2) orelse ""; 60 + const flow_name = select_stmt.columnText(3) orelse ""; 61 + const tags = select_stmt.columnText(4) orelse "[]"; 62 + 63 + const response = std.fmt.allocPrint(alloc, 64 + \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","tags":{s}}} 65 + , .{ id, created, updated, flow_name, tags }) catch { 66 + try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 67 + return; 68 + }; 69 + 70 + try http_server.sendJson(request, response); 71 + return; 72 + } 73 + 74 + // create new flow 75 + const new_id = common.generateUuid(alloc); 76 + 77 + var insert_stmt = database.prepare("INSERT INTO flow (id, name) VALUES (?1, ?2)") catch { 78 + try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 79 + return; 80 + }; 81 + defer insert_stmt.deinit(); 82 + 83 + insert_stmt.bindText(1, new_id) catch {}; 84 + insert_stmt.bindText(2, name_str) catch {}; 85 + _ = insert_stmt.step() catch { 86 + try http_server.sendJsonStatus(request, "{\"detail\":\"insert failed\"}", .internal_server_error); 87 + return; 88 + }; 89 + 90 + // return created flow 91 + var ts_buf: [32]u8 = undefined; 92 + const now = common.getTimestamp(&ts_buf); 93 + const response = std.fmt.allocPrint(alloc, 94 + \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","tags":[]}} 95 + , .{ new_id, now, now, name_str }) catch { 96 + try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 97 + return; 98 + }; 99 + 100 + try http_server.sendJsonStatus(request, response, .created); 101 + }
+18
src/api/logs.zig
··· 1 + const std = @import("std"); 2 + const http = std.http; 3 + 4 + const http_server = @import("../server/http.zig"); 5 + 6 + pub fn logs(request: *http.Server.Request) !void { 7 + // accept logs but don't store them yet (TODO: implement log storage) 8 + if (request.head.method == .POST) { 9 + // consume the body 10 + var body_buf: [65536]u8 = undefined; 11 + request.head.expect = null; 12 + const body_reader = request.readerExpectNone(&body_buf); 13 + _ = body_reader.allocRemaining(std.heap.page_allocator, .unlimited) catch {}; 14 + try http_server.sendJsonStatus(request, "[]", .created); 15 + } else { 16 + try http_server.sendJson(request, "[]"); 17 + } 18 + }
+10 -421
src/api/routes.zig
··· 1 1 const std = @import("std"); 2 2 const http = std.http; 3 - const mem = std.mem; 4 - const json = std.json; 5 3 6 4 const db = @import("../db/sqlite.zig"); 7 - const http_server = @import("../server/http.zig"); 8 5 9 - // POST /flows/ - create or get flow by name 10 - // GET /flows/{id} - read flow by id 11 - pub fn flows(request: *http.Server.Request, database: *db.Database, target: []const u8) !void { 12 - if (request.head.method == .POST and (mem.eql(u8, target, "/flows/") or mem.eql(u8, target, "/api/flows/"))) { 13 - try createFlow(request, database); 14 - } else { 15 - try http_server.sendJsonStatus(request, "{\"detail\":\"not implemented\"}", .not_implemented); 16 - } 17 - } 18 - 19 - // POST /flow_runs/ - create flow run 20 - // GET /flow_runs/{id} - read flow run 21 - // POST /flow_runs/{id}/set_state - set state 22 - // POST /flow_runs/filter - list flow runs 23 - pub fn flowRuns(request: *http.Server.Request, database: *db.Database, target: []const u8) !void { 24 - const method = request.head.method; 25 - 26 - // POST /flow_runs/ - create 27 - if (method == .POST and (mem.eql(u8, target, "/flow_runs/") or mem.eql(u8, target, "/api/flow_runs/"))) { 28 - try createFlowRun(request, database); 29 - return; 30 - } 31 - 32 - // POST /flow_runs/filter - list 33 - if (method == .POST and (mem.endsWith(u8, target, "/filter"))) { 34 - try filterFlowRuns(request, database); 35 - return; 36 - } 37 - 38 - // check for /{id}/set_state 39 - if (method == .POST and mem.endsWith(u8, target, "/set_state")) { 40 - const id = extractId(target, "/flow_runs/", "/set_state") orelse 41 - extractId(target, "/api/flow_runs/", "/set_state"); 42 - if (id) |flow_run_id| { 43 - try setFlowRunState(request, database, flow_run_id); 44 - return; 45 - } 46 - } 47 - 48 - // GET /flow_runs/{id} - read single 49 - if (method == .GET) { 50 - const id = extractIdSimple(target, "/flow_runs/") orelse 51 - extractIdSimple(target, "/api/flow_runs/"); 52 - if (id) |flow_run_id| { 53 - try readFlowRun(request, database, flow_run_id); 54 - return; 55 - } 56 - } 57 - 58 - try http_server.sendJsonStatus(request, "{\"detail\":\"not found\"}", .not_found); 59 - } 60 - 61 - pub fn health(request: *http.Server.Request) !void { 62 - try http_server.sendJson(request, "{\"status\":\"healthy\"}"); 63 - } 64 - 65 - // --- implementations --- 66 - 67 - fn createFlow(request: *http.Server.Request, database: *db.Database) !void { 68 - var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 69 - defer arena.deinit(); 70 - const alloc = arena.allocator(); 71 - 72 - // read request body 73 - var body_buf: [8192]u8 = undefined; 74 - request.head.expect = null; // we don't send 100-continue 75 - const body_reader = request.readerExpectNone(&body_buf); 76 - const body = body_reader.allocRemaining(alloc, .unlimited) catch { 77 - try http_server.sendJsonStatus(request, "{\"detail\":\"failed to read body\"}", .bad_request); 78 - return; 79 - }; 80 - 81 - // parse json to get name 82 - const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 83 - try http_server.sendJsonStatus(request, "{\"detail\":\"invalid json\"}", .bad_request); 84 - return; 85 - }; 86 - 87 - const name = parsed.value.object.get("name") orelse { 88 - try http_server.sendJsonStatus(request, "{\"detail\":\"name required\"}", .bad_request); 89 - return; 90 - }; 91 - const name_str = name.string; 92 - 93 - // try to get existing flow first 94 - var select_stmt = database.prepare("SELECT id, created, updated, name, tags FROM flow WHERE name = ?1") catch { 95 - try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 96 - return; 97 - }; 98 - defer select_stmt.deinit(); 99 - 100 - select_stmt.bindText(1, name_str) catch {}; 101 - 102 - if (select_stmt.step() catch false) { 103 - // flow exists, return it 104 - const id = select_stmt.columnText(0) orelse ""; 105 - const created = select_stmt.columnText(1) orelse ""; 106 - const updated = select_stmt.columnText(2) orelse ""; 107 - const flow_name = select_stmt.columnText(3) orelse ""; 108 - const tags = select_stmt.columnText(4) orelse "[]"; 109 - 110 - const response = std.fmt.allocPrint(alloc, 111 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","tags":{s}}} 112 - , .{ id, created, updated, flow_name, tags }) catch { 113 - try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 114 - return; 115 - }; 116 - 117 - try http_server.sendJson(request, response); 118 - return; 119 - } 120 - 121 - // create new flow 122 - const new_id = generateUuid(alloc); 123 - 124 - var insert_stmt = database.prepare("INSERT INTO flow (id, name) VALUES (?1, ?2)") catch { 125 - try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 126 - return; 127 - }; 128 - defer insert_stmt.deinit(); 129 - 130 - insert_stmt.bindText(1, new_id) catch {}; 131 - insert_stmt.bindText(2, name_str) catch {}; 132 - _ = insert_stmt.step() catch { 133 - try http_server.sendJsonStatus(request, "{\"detail\":\"insert failed\"}", .internal_server_error); 134 - return; 135 - }; 136 - 137 - // return created flow 138 - const now = "2025-01-17T00:00:00Z"; // TODO: proper timestamp 139 - const response = std.fmt.allocPrint(alloc, 140 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","tags":[]}} 141 - , .{ new_id, now, now, name_str }) catch { 142 - try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 143 - return; 144 - }; 145 - 146 - try http_server.sendJsonStatus(request, response, .created); 147 - } 148 - 149 - fn createFlowRun(request: *http.Server.Request, database: *db.Database) !void { 150 - var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 151 - defer arena.deinit(); 152 - const alloc = arena.allocator(); 153 - 154 - var body_buf: [16384]u8 = undefined; 155 - request.head.expect = null; 156 - const body_reader = request.readerExpectNone(&body_buf); 157 - const body = body_reader.allocRemaining(alloc, .unlimited) catch { 158 - try http_server.sendJsonStatus(request, "{\"detail\":\"failed to read body\"}", .bad_request); 159 - return; 160 - }; 161 - 162 - const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 163 - try http_server.sendJsonStatus(request, "{\"detail\":\"invalid json\"}", .bad_request); 164 - return; 165 - }; 166 - 167 - const obj = parsed.value.object; 168 - const flow_id = if (obj.get("flow_id")) |v| v.string else { 169 - try http_server.sendJsonStatus(request, "{\"detail\":\"flow_id required\"}", .bad_request); 170 - return; 171 - }; 172 - 173 - const name = if (obj.get("name")) |v| v.string else generateRunName(alloc); 174 - const state = obj.get("state"); 175 - 176 - // extract state info 177 - var state_type: []const u8 = "PENDING"; 178 - var state_name: []const u8 = "Pending"; 179 - if (state) |s| { 180 - if (s.object.get("type")) |t| state_type = t.string; 181 - if (s.object.get("name")) |n| state_name = n.string; 182 - } 183 - 184 - const new_id = generateUuid(alloc); 185 - const now = "2025-01-17T00:00:00.000000Z"; 186 - 187 - var stmt = database.prepare( 188 - \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp) 189 - \\VALUES (?1, ?2, ?3, ?4, ?5, ?6) 190 - ) catch { 191 - try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 192 - return; 193 - }; 194 - defer stmt.deinit(); 6 + // resource modules 7 + pub const admin = @import("admin.zig"); 8 + pub const flows = @import("flows.zig"); 9 + pub const flow_runs = @import("flow_runs.zig"); 10 + pub const logs = @import("logs.zig"); 11 + pub const common = @import("common.zig"); 195 12 196 - stmt.bindText(1, new_id) catch {}; 197 - stmt.bindText(2, flow_id) catch {}; 198 - stmt.bindText(3, name) catch {}; 199 - stmt.bindText(4, state_type) catch {}; 200 - stmt.bindText(5, state_name) catch {}; 201 - stmt.bindText(6, now) catch {}; 202 - 203 - _ = stmt.step() catch { 204 - try http_server.sendJsonStatus(request, "{\"detail\":\"insert failed\"}", .internal_server_error); 205 - return; 206 - }; 207 - 208 - // return the created flow run (minimal response for now) 209 - const response = std.fmt.allocPrint(alloc, 210 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}},"parameters":{{}},"tags":[],"run_count":0}} 211 - , .{ new_id, now, now, name, flow_id, state_type, state_name, state_type, state_name, now, generateUuid(alloc) }) catch { 212 - try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 213 - return; 214 - }; 215 - 216 - try http_server.sendJsonStatus(request, response, .created); 217 - } 218 - 219 - fn readFlowRun(request: *http.Server.Request, database: *db.Database, id: []const u8) !void { 220 - var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 221 - defer arena.deinit(); 222 - const alloc = arena.allocator(); 223 - 224 - var stmt = database.prepare( 225 - \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, parameters, tags 226 - \\FROM flow_run WHERE id = ?1 227 - ) catch { 228 - try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 229 - return; 230 - }; 231 - defer stmt.deinit(); 232 - 233 - stmt.bindText(1, id) catch {}; 234 - 235 - if (!(stmt.step() catch false)) { 236 - try http_server.sendJsonStatus(request, "{\"detail\":\"flow run not found\"}", .not_found); 237 - return; 238 - } 239 - 240 - const run_id = stmt.columnText(0) orelse ""; 241 - const created = stmt.columnText(1) orelse ""; 242 - const updated = stmt.columnText(2) orelse ""; 243 - const flow_id = stmt.columnText(3) orelse ""; 244 - const name = stmt.columnText(4) orelse ""; 245 - const state_type = stmt.columnText(5) orelse "PENDING"; 246 - const state_name = stmt.columnText(6) orelse "Pending"; 247 - const state_timestamp = stmt.columnText(7) orelse created; 248 - const parameters = stmt.columnText(8) orelse "{}"; 249 - const tags = stmt.columnText(9) orelse "[]"; 250 - 251 - const response = std.fmt.allocPrint(alloc, 252 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"parameters":{s},"tags":{s},"run_count":0}} 253 - , .{ run_id, created, updated, name, flow_id, state_type, state_name, state_type, state_name, state_timestamp, parameters, tags }) catch { 254 - try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 255 - return; 256 - }; 257 - 258 - try http_server.sendJson(request, response); 259 - } 260 - 261 - fn setFlowRunState(request: *http.Server.Request, database: *db.Database, id: []const u8) !void { 262 - var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 263 - defer arena.deinit(); 264 - const alloc = arena.allocator(); 265 - 266 - var body_buf: [8192]u8 = undefined; 267 - request.head.expect = null; 268 - const body_reader = request.readerExpectNone(&body_buf); 269 - const body = body_reader.allocRemaining(alloc, .unlimited) catch { 270 - try http_server.sendJsonStatus(request, "{\"detail\":\"failed to read body\"}", .bad_request); 271 - return; 272 - }; 273 - 274 - const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 275 - try http_server.sendJsonStatus(request, "{\"detail\":\"invalid json\"}", .bad_request); 276 - return; 277 - }; 278 - 279 - const state = parsed.value.object.get("state") orelse { 280 - try http_server.sendJsonStatus(request, "{\"detail\":\"state required\"}", .bad_request); 281 - return; 282 - }; 283 - 284 - const state_type = if (state.object.get("type")) |v| v.string else "PENDING"; 285 - const state_name = if (state.object.get("name")) |v| v.string else "Pending"; 286 - const now = "2025-01-17T00:00:00.000000Z"; 287 - 288 - // update the flow run state 289 - var stmt = database.prepare( 290 - \\UPDATE flow_run SET state_type = ?1, state_name = ?2, state_timestamp = ?3, updated = ?3 291 - \\WHERE id = ?4 292 - ) catch { 293 - try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 294 - return; 295 - }; 296 - defer stmt.deinit(); 297 - 298 - stmt.bindText(1, state_type) catch {}; 299 - stmt.bindText(2, state_name) catch {}; 300 - stmt.bindText(3, now) catch {}; 301 - stmt.bindText(4, id) catch {}; 302 - 303 - _ = stmt.step() catch { 304 - try http_server.sendJsonStatus(request, "{\"detail\":\"update failed\"}", .internal_server_error); 305 - return; 306 - }; 307 - 308 - // insert state history 309 - var history_stmt = database.prepare( 310 - \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp) 311 - \\VALUES (?1, ?2, ?3, ?4, ?5) 312 - ) catch { 313 - // non-fatal, continue 314 - return; 315 - }; 316 - defer history_stmt.deinit(); 317 - 318 - history_stmt.bindText(1, generateUuid(alloc)) catch {}; 319 - history_stmt.bindText(2, id) catch {}; 320 - history_stmt.bindText(3, state_type) catch {}; 321 - history_stmt.bindText(4, state_name) catch {}; 322 - history_stmt.bindText(5, now) catch {}; 323 - _ = history_stmt.step() catch {}; 324 - 325 - // return orchestration result 326 - const state_id = generateUuid(alloc); 327 - const response = std.fmt.allocPrint(alloc, 328 - \\{{"status":"ACCEPT","details":{{}},"state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}}}} 329 - , .{ state_type, state_name, now, state_id }) catch { 330 - try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 331 - return; 332 - }; 333 - 334 - try http_server.sendJson(request, response); 335 - } 336 - 337 - fn filterFlowRuns(request: *http.Server.Request, database: *db.Database) !void { 338 - var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 339 - defer arena.deinit(); 340 - const alloc = arena.allocator(); 341 - 342 - // for now, just return all flow runs (TODO: implement filters) 343 - var stmt = database.prepare( 344 - \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp 345 - \\FROM flow_run ORDER BY created DESC LIMIT 50 346 - ) catch { 347 - try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 348 - return; 349 - }; 350 - defer stmt.deinit(); 351 - 352 - var results: std.ArrayList(u8) = .{}; 353 - defer results.deinit(alloc); 354 - 355 - try results.appendSlice(alloc, "["); 356 - 357 - var first = true; 358 - while (stmt.step() catch false) { 359 - if (!first) try results.appendSlice(alloc, ","); 360 - first = false; 361 - 362 - const run_id = stmt.columnText(0) orelse ""; 363 - const created = stmt.columnText(1) orelse ""; 364 - const updated = stmt.columnText(2) orelse ""; 365 - const flow_id = stmt.columnText(3) orelse ""; 366 - const name = stmt.columnText(4) orelse ""; 367 - const state_type = stmt.columnText(5) orelse "PENDING"; 368 - const state_name = stmt.columnText(6) orelse "Pending"; 369 - const state_timestamp = stmt.columnText(7) orelse created; 370 - 371 - const item = std.fmt.allocPrint(alloc, 372 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"parameters":{{}},"tags":[],"run_count":0}} 373 - , .{ run_id, created, updated, name, flow_id, state_type, state_name, state_type, state_name, state_timestamp }) catch continue; 374 - 375 - try results.appendSlice(alloc, item); 376 - } 377 - 378 - try results.appendSlice(alloc, "]"); 379 - try http_server.sendJson(request, results.items); 380 - } 381 - 382 - // --- helpers --- 383 - 384 - fn extractId(target: []const u8, prefix: []const u8, suffix: []const u8) ?[]const u8 { 385 - if (!mem.startsWith(u8, target, prefix)) return null; 386 - if (!mem.endsWith(u8, target, suffix)) return null; 387 - const start = prefix.len; 388 - const end = target.len - suffix.len; 389 - if (start >= end) return null; 390 - return target[start..end]; 391 - } 392 - 393 - fn extractIdSimple(target: []const u8, prefix: []const u8) ?[]const u8 { 394 - if (!mem.startsWith(u8, target, prefix)) return null; 395 - const rest = target[prefix.len..]; 396 - // check it looks like a uuid (32 hex chars) 397 - if (rest.len >= 32) { 398 - return rest[0..32]; 399 - } 400 - return null; 401 - } 402 - 403 - fn generateUuid(alloc: std.mem.Allocator) []const u8 { 404 - var random_bytes: [16]u8 = undefined; 405 - std.crypto.random.bytes(&random_bytes); 406 - 407 - var hex_buf: [32]u8 = undefined; 408 - _ = std.fmt.bufPrint(&hex_buf, "{x:0>16}{x:0>16}", .{ 409 - std.mem.readInt(u64, random_bytes[0..8], .big), 410 - std.mem.readInt(u64, random_bytes[8..16], .big), 411 - }) catch return "00000000000000000000000000000000"; 412 - 413 - return alloc.dupe(u8, &hex_buf) catch "00000000000000000000000000000000"; 414 - } 415 - 416 - fn generateRunName(alloc: std.mem.Allocator) []const u8 { 417 - const adjectives = [_][]const u8{ "happy", "quick", "brave", "calm", "eager" }; 418 - const nouns = [_][]const u8{ "panda", "tiger", "eagle", "dolphin", "falcon" }; 419 - 420 - var rng_buf: [2]u8 = undefined; 421 - std.crypto.random.bytes(&rng_buf); 422 - 423 - const adj = adjectives[rng_buf[0] % adjectives.len]; 424 - const noun = nouns[rng_buf[1] % nouns.len]; 425 - 426 - return std.fmt.allocPrint(alloc, "{s}-{s}", .{ adj, noun }) catch "unnamed-run"; 427 - } 13 + // re-export admin handlers for convenience 14 + pub const health = admin.health; 15 + pub const csrfToken = admin.csrfToken; 16 + pub const version = admin.version;
+25
src/db/sqlite.zig
··· 23 23 } 24 24 25 25 var db = Database{ .conn = conn, .allocator = allocator }; 26 + try db.configurePragmas(); 26 27 try db.initSchema(); 27 28 28 29 std.debug.print("database initialized: {s}\n", .{db_path}); 29 30 return db; 31 + } 32 + 33 + fn configurePragmas(self: *Database) !void { 34 + // enable WAL mode for better concurrent read/write performance 35 + var err_msg: [*c]u8 = null; 36 + var rc = c.sqlite3_exec(self.conn, "PRAGMA journal_mode=WAL;", null, null, &err_msg); 37 + if (rc != c.SQLITE_OK) { 38 + std.debug.print("WAL mode error: {s}\n", .{err_msg}); 39 + c.sqlite3_free(err_msg); 40 + } 41 + 42 + // set busy timeout to 5 seconds 43 + rc = c.sqlite3_exec(self.conn, "PRAGMA busy_timeout=5000;", null, null, &err_msg); 44 + if (rc != c.SQLITE_OK) { 45 + std.debug.print("busy_timeout error: {s}\n", .{err_msg}); 46 + c.sqlite3_free(err_msg); 47 + } 48 + 49 + // enable foreign keys 50 + rc = c.sqlite3_exec(self.conn, "PRAGMA foreign_keys=ON;", null, null, &err_msg); 51 + if (rc != c.SQLITE_OK) { 52 + std.debug.print("foreign_keys error: {s}\n", .{err_msg}); 53 + c.sqlite3_free(err_msg); 54 + } 30 55 } 31 56 32 57 pub fn deinit(self: *Database) void {
+13 -13
src/server/http.zig
··· 37 37 fn handleRequest(request: *http.Server.Request, database: *db.Database) !void { 38 38 const target = request.head.target; 39 39 40 + std.debug.print("{s} {s}\n", .{ @tagName(request.head.method), target }); 41 + 40 42 // cors preflight 41 43 if (request.head.method == .OPTIONS) { 42 44 try sendCors(request, ""); ··· 44 46 } 45 47 46 48 // route dispatch 47 - if (mem.eql(u8, target, "/api/health")) { 49 + if (mem.eql(u8, target, "/api/health") or mem.eql(u8, target, "/health")) { 48 50 try api.health(request); 49 - } else if (mem.eql(u8, target, "/health")) { 50 - try api.health(request); 51 - } else if (mem.startsWith(u8, target, "/api/flows")) { 52 - try api.flows(request, database, target); 53 - } else if (mem.startsWith(u8, target, "/api/flow_runs")) { 54 - try api.flowRuns(request, database, target); 55 - } 56 - // legacy routes (no /api prefix) - prefect client uses these 57 - else if (mem.startsWith(u8, target, "/flows")) { 58 - try api.flows(request, database, target); 59 - } else if (mem.startsWith(u8, target, "/flow_runs")) { 60 - try api.flowRuns(request, database, target); 51 + } else if (mem.startsWith(u8, target, "/api/csrf-token") or mem.startsWith(u8, target, "/csrf-token")) { 52 + try api.csrfToken(request); 53 + } else if (mem.eql(u8, target, "/api/admin/version") or mem.eql(u8, target, "/admin/version")) { 54 + try api.version(request); 55 + } else if (mem.startsWith(u8, target, "/api/logs") or mem.startsWith(u8, target, "/logs")) { 56 + try api.logs.logs(request); 57 + } else if (mem.startsWith(u8, target, "/api/flows") or mem.startsWith(u8, target, "/flows")) { 58 + try api.flows.handle(request, database, target); 59 + } else if (mem.startsWith(u8, target, "/api/flow_runs") or mem.startsWith(u8, target, "/flow_runs")) { 60 + try api.flow_runs.handle(request, database, target); 61 61 } else { 62 62 try sendNotFound(request); 63 63 }