prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const mem = std.mem;
4
5const db = @import("../db/sqlite.zig");
6const json_util = @import("../utilities/json.zig");
7
8pub fn handle(r: zap.Request) !void {
9 const target = r.path orelse "/";
10 const method = r.method orelse "GET";
11
12 // only GET is supported
13 if (!mem.eql(u8, method, "GET")) {
14 json_util.sendStatus(r, "{\"detail\":\"method not allowed\"}", .method_not_allowed);
15 return;
16 }
17
18 // GET /task_run_states/?task_run_id=... - list states for a task run
19 if (r.query != null) {
20 try listByTaskRunId(r);
21 return;
22 }
23
24 // GET /task_run_states/{id} - get specific state
25 const id = extractId(target) orelse {
26 json_util.sendStatus(r, "{\"detail\":\"state id or task_run_id query param required\"}", .bad_request);
27 return;
28 };
29 try getById(r, id);
30}
31
32fn extractId(target: []const u8) ?[]const u8 {
33 const prefix = if (mem.startsWith(u8, target, "/api/task_run_states/"))
34 "/api/task_run_states/"
35 else
36 "/task_run_states/";
37 if (target.len > prefix.len) {
38 const id = target[prefix.len..];
39 if (id.len > 0 and id[0] != '?') return id;
40 }
41 return null;
42}
43
44fn getById(r: zap.Request, id: []const u8) !void {
45 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
46 defer arena.deinit();
47 const alloc = arena.allocator();
48
49 const state = db.task_run_states.getById(alloc, id) catch {
50 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
51 return;
52 };
53
54 if (state) |s| {
55 json_util.send(r, s.toJson(alloc) catch "{\"detail\":\"serialization error\"}");
56 } else {
57 json_util.sendStatus(r, "{\"detail\":\"Task run state not found\"}", .not_found);
58 }
59}
60
61fn listByTaskRunId(r: zap.Request) !void {
62 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
63 defer arena.deinit();
64 const alloc = arena.allocator();
65
66 const query = r.query orelse "";
67
68 // parse task_run_id from query string
69 const task_run_id = parseQueryParam(query, "task_run_id") orelse {
70 json_util.sendStatus(r, "{\"detail\":\"task_run_id query param required\"}", .bad_request);
71 return;
72 };
73
74 const states = db.task_run_states.listByTaskRunId(alloc, task_run_id) catch {
75 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
76 return;
77 };
78
79 // build JSON array response
80 var output = std.ArrayListUnmanaged(u8){};
81 try output.append(alloc, '[');
82 for (states, 0..) |s, i| {
83 if (i > 0) try output.append(alloc, ',');
84 const state_json = s.toJson(alloc) catch continue;
85 try output.appendSlice(alloc, state_json);
86 }
87 try output.append(alloc, ']');
88
89 json_util.send(r, output.items);
90}
91
92fn parseQueryParam(query: []const u8, name: []const u8) ?[]const u8 {
93 const search = std.fmt.allocPrint(std.heap.page_allocator, "{s}=", .{name}) catch return null;
94 defer std.heap.page_allocator.free(search);
95
96 const start = mem.indexOf(u8, query, search) orelse return null;
97 const value_start = start + search.len;
98 if (value_start >= query.len) return null;
99
100 var value_end = query.len;
101 if (mem.indexOf(u8, query[value_start..], "&")) |amp| {
102 value_end = value_start + amp;
103 }
104
105 return query[value_start..value_end];
106}