prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const json = std.json;
4
5const db = @import("../db/sqlite.zig");
6const uuid_util = @import("../utilities/uuid.zig");
7const time_util = @import("../utilities/time.zig");
8const json_util = @import("../utilities/json.zig");
9const pool_helpers = @import("work_pools.zig");
10
11pub fn heartbeat(r: zap.Request, target: []const u8) !void {
12 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
13 defer arena.deinit();
14 const alloc = arena.allocator();
15
16 const pool_name = pool_helpers.extractPoolName(target) orelse {
17 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request);
18 return;
19 };
20
21 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse {
22 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
23 return;
24 };
25
26 const body = r.body orelse {
27 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request);
28 return;
29 };
30
31 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
32 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
33 return;
34 };
35
36 const obj = parsed.value.object;
37
38 const worker_name = switch (obj.get("name") orelse {
39 json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request);
40 return;
41 }) {
42 .string => |s| s,
43 else => {
44 json_util.sendStatus(r, "{\"detail\":\"name must be string\"}", .bad_request);
45 return;
46 },
47 };
48
49 const heartbeat_interval = pool_helpers.getOptionalInt(obj.get("heartbeat_interval_seconds"));
50
51 var id_buf: [36]u8 = undefined;
52 const worker_id = uuid_util.generate(&id_buf);
53
54 var ts_buf: [32]u8 = undefined;
55 const now = time_util.timestamp(&ts_buf);
56
57 db.workers.upsertHeartbeat(worker_id, worker_name, pool.id, heartbeat_interval, now) catch {
58 json_util.sendStatus(r, "{\"detail\":\"heartbeat failed\"}", .internal_server_error);
59 return;
60 };
61
62 // Update pool status to READY if it was NOT_READY
63 if (pool.status == .not_ready) {
64 _ = db.work_pools.updateStatus(pool_name, .ready, now) catch {};
65 }
66
67 r.setStatus(.no_content);
68 r.sendBody("") catch {};
69}
70
71pub fn filter(r: zap.Request, target: []const u8) !void {
72 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
73 defer arena.deinit();
74 const alloc = arena.allocator();
75
76 const pool_name = pool_helpers.extractPoolName(target) orelse {
77 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request);
78 return;
79 };
80
81 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse {
82 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
83 return;
84 };
85
86 var limit: usize = 200;
87 var offset: usize = 0;
88
89 if (r.body) |body| {
90 if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| {
91 const obj = parsed.value.object;
92 if (obj.get("limit")) |v| {
93 if (v == .integer) limit = @intCast(v.integer);
94 }
95 if (obj.get("offset")) |v| {
96 if (v == .integer) offset = @intCast(v.integer);
97 }
98 } else |_| {}
99 }
100
101 const workers_list = db.workers.listByPool(alloc, pool.id, limit, offset) catch {
102 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
103 return;
104 };
105
106 var output: std.io.Writer.Allocating = .init(alloc);
107 var jw: json.Stringify = .{ .writer = &output.writer };
108
109 jw.beginArray() catch {
110 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
111 return;
112 };
113
114 for (workers_list) |worker| {
115 writeWorkerObject(&jw, worker) catch continue;
116 }
117
118 jw.endArray() catch {};
119
120 json_util.send(r, output.toOwnedSlice() catch "[]");
121}
122
123pub fn delete(r: zap.Request, target: []const u8) !void {
124 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
125 defer arena.deinit();
126 const alloc = arena.allocator();
127
128 const pool_name = pool_helpers.extractPoolName(target) orelse {
129 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request);
130 return;
131 };
132
133 const worker_name = pool_helpers.extractWorkerName(target) orelse {
134 json_util.sendStatus(r, "{\"detail\":\"worker name required\"}", .bad_request);
135 return;
136 };
137
138 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse {
139 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
140 return;
141 };
142
143 const deleted = db.workers.deleteByPoolAndName(pool.id, worker_name) catch {
144 json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error);
145 return;
146 };
147
148 if (!deleted) {
149 json_util.sendStatus(r, "{\"detail\":\"Worker not found.\"}", .not_found);
150 return;
151 }
152
153 r.setStatus(.no_content);
154 r.sendBody("") catch {};
155}
156
157// JSON serialization
158
159fn writeWorkerObject(jw: *json.Stringify, worker: db.workers.WorkerRow) !void {
160 try jw.beginObject();
161
162 try jw.objectField("id");
163 try jw.write(worker.id);
164
165 try jw.objectField("created");
166 try jw.write(worker.created);
167
168 try jw.objectField("updated");
169 try jw.write(worker.updated);
170
171 try jw.objectField("name");
172 try jw.write(worker.name);
173
174 try jw.objectField("work_pool_id");
175 try jw.write(worker.work_pool_id);
176
177 try jw.objectField("last_heartbeat_time");
178 if (worker.last_heartbeat_time) |lh| {
179 try jw.write(lh);
180 } else {
181 try jw.write(null);
182 }
183
184 try jw.objectField("heartbeat_interval_seconds");
185 if (worker.heartbeat_interval_seconds) |hi| {
186 try jw.write(hi);
187 } else {
188 try jw.write(null);
189 }
190
191 try jw.objectField("status");
192 try jw.write(worker.status.toString());
193
194 try jw.endObject();
195}