prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const json = std.json;
4
5const db = @import("../db/sqlite.zig");
6const time_util = @import("../utilities/time.zig");
7const json_util = @import("../utilities/json.zig");
8
9/// Handle POST /work_pools/{name}/get_scheduled_flow_runs
10pub fn handle(r: zap.Request, target: []const u8) !void {
11 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
12 defer arena.deinit();
13 const alloc = arena.allocator();
14
15 const pool_name = extractPoolName(target) orelse {
16 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request);
17 return;
18 };
19
20 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse {
21 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
22 return;
23 };
24
25 // Parse request body
26 var limit: usize = 200;
27 var scheduled_before: ?[]const u8 = null;
28 var work_queue_names = std.ArrayListUnmanaged([]const u8){};
29
30 if (r.body) |body| {
31 if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| {
32 const obj = parsed.value.object;
33
34 if (obj.get("limit")) |v| {
35 if (v == .integer) limit = @intCast(v.integer);
36 }
37
38 if (obj.get("scheduled_before")) |v| {
39 if (v == .string) scheduled_before = v.string;
40 }
41
42 if (obj.get("work_queue_names")) |v| {
43 if (v == .array) {
44 for (v.array.items) |item| {
45 if (item == .string) {
46 try work_queue_names.append(alloc, item.string);
47 }
48 }
49 }
50 }
51 } else |_| {}
52 }
53
54 // Default scheduled_before to now if not provided
55 var ts_buf: [32]u8 = undefined;
56 const now = time_util.timestamp(&ts_buf);
57 if (scheduled_before == null) {
58 scheduled_before = now;
59 }
60
61 // Get work queues
62 var queue_ids = std.ArrayListUnmanaged([]const u8){};
63 var work_queues = std.ArrayListUnmanaged(db.work_queues.WorkQueueRow){};
64
65 if (work_queue_names.items.len > 0) {
66 // Filter by specific queue names
67 for (work_queue_names.items) |qname| {
68 if (db.work_queues.getByPoolAndName(alloc, pool.id, qname) catch null) |queue| {
69 try queue_ids.append(alloc, queue.id);
70 try work_queues.append(alloc, queue);
71 }
72 }
73 } else {
74 // Get all queues in pool
75 const all_queues = db.work_queues.listByPool(alloc, pool.id, 1000, 0) catch {
76 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
77 return;
78 };
79 for (all_queues) |queue| {
80 try queue_ids.append(alloc, queue.id);
81 try work_queues.append(alloc, queue);
82 }
83 }
84
85 // Get scheduled flow runs
86 const flow_runs = db.flow_runs.getScheduledByWorkQueues(
87 alloc,
88 queue_ids.items,
89 scheduled_before,
90 limit,
91 ) catch {
92 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
93 return;
94 };
95
96 // Update last_polled for work queues and mark as ready if needed
97 for (work_queues.items) |queue| {
98 const new_status: db.work_queues.Status = if (queue.is_paused) .paused else .ready;
99 _ = db.work_queues.updateLastPolled(pool.id, queue.name, now, new_status) catch {};
100 }
101
102 // Mark pool as ready if it was not_ready
103 if (pool.status == .not_ready) {
104 _ = db.work_pools.updateStatus(pool_name, .ready, now) catch {};
105 }
106
107 // Mark deployments as ready (deployments for the queues we polled)
108 db.deployments.markReadyByWorkQueues(queue_ids.items, now) catch {};
109
110 // Build response as WorkerFlowRunResponse[]
111 var output: std.io.Writer.Allocating = .init(alloc);
112 var jw: json.Stringify = .{ .writer = &output.writer };
113
114 jw.beginArray() catch {
115 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
116 return;
117 };
118
119 for (flow_runs) |run| {
120 writeWorkerFlowRunResponse(&jw, pool.id, run) catch continue;
121 }
122
123 jw.endArray() catch {};
124
125 json_util.send(r, output.toOwnedSlice() catch "[]");
126}
127
128fn extractPoolName(target: []const u8) ?[]const u8 {
129 const prefix = "/api/work_pools/";
130 if (!std.mem.startsWith(u8, target, prefix)) return null;
131
132 const rest = target[prefix.len..];
133 const end = std.mem.indexOf(u8, rest, "/") orelse return null;
134 if (end == 0) return null;
135
136 return rest[0..end];
137}
138
139fn writeWorkerFlowRunResponse(jw: *json.Stringify, work_pool_id: []const u8, run: db.flow_runs.FlowRunRow) !void {
140 try jw.beginObject();
141
142 try jw.objectField("work_pool_id");
143 try jw.write(work_pool_id);
144
145 try jw.objectField("work_queue_id");
146 try jw.write(run.work_queue_id);
147
148 try jw.objectField("flow_run");
149 try writeFlowRunObject(jw, run);
150
151 try jw.endObject();
152}
153
154fn writeFlowRunObject(jw: *json.Stringify, run: db.flow_runs.FlowRunRow) !void {
155 try jw.beginObject();
156
157 try jw.objectField("id");
158 try jw.write(run.id);
159
160 try jw.objectField("created");
161 try jw.write(run.created);
162
163 try jw.objectField("updated");
164 try jw.write(run.updated);
165
166 try jw.objectField("name");
167 try jw.write(run.name);
168
169 try jw.objectField("flow_id");
170 try jw.write(run.flow_id);
171
172 try jw.objectField("state_type");
173 try jw.write(run.state_type);
174
175 try jw.objectField("state_name");
176 try jw.write(run.state_name);
177
178 try jw.objectField("state_timestamp");
179 try jw.write(run.state_timestamp);
180
181 try jw.objectField("parameters");
182 try jw.beginWriteRaw();
183 try jw.writer.writeAll(run.parameters);
184 jw.endWriteRaw();
185
186 try jw.objectField("tags");
187 try jw.beginWriteRaw();
188 try jw.writer.writeAll(run.tags);
189 jw.endWriteRaw();
190
191 try jw.objectField("run_count");
192 try jw.write(run.run_count);
193
194 try jw.objectField("expected_start_time");
195 if (run.expected_start_time) |t| {
196 try jw.write(t);
197 } else {
198 try jw.write(null);
199 }
200
201 try jw.objectField("start_time");
202 if (run.start_time) |t| {
203 try jw.write(t);
204 } else {
205 try jw.write(null);
206 }
207
208 try jw.objectField("end_time");
209 if (run.end_time) |t| {
210 try jw.write(t);
211 } else {
212 try jw.write(null);
213 }
214
215 try jw.objectField("total_run_time");
216 try jw.write(run.total_run_time);
217
218 try jw.objectField("deployment_id");
219 if (run.deployment_id) |d| {
220 try jw.write(d);
221 } else {
222 try jw.write(null);
223 }
224
225 try jw.objectField("deployment_version");
226 if (run.deployment_version) |v| {
227 try jw.write(v);
228 } else {
229 try jw.write(null);
230 }
231
232 try jw.objectField("work_queue_name");
233 if (run.work_queue_name) |n| {
234 try jw.write(n);
235 } else {
236 try jw.write(null);
237 }
238
239 try jw.objectField("work_queue_id");
240 if (run.work_queue_id) |q| {
241 try jw.write(q);
242 } else {
243 try jw.write(null);
244 }
245
246 try jw.objectField("auto_scheduled");
247 try jw.write(run.auto_scheduled);
248
249 try jw.endObject();
250}