prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const mem = std.mem;
4const json = std.json;
5
6const db = @import("../db/sqlite.zig");
7const uuid_util = @import("../utilities/uuid.zig");
8const time_util = @import("../utilities/time.zig");
9const json_util = @import("../utilities/json.zig");
10
11// sub-handlers
12const queues = @import("work_pool_queues.zig");
13const workers = @import("work_pool_workers.zig");
14const schedule = @import("work_pool_schedule.zig");
15
16pub fn handle(r: zap.Request) !void {
17 const target = r.path orelse "/";
18 const method = r.method orelse "GET";
19
20 // POST /work_pools/filter - list work pools
21 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) {
22 if (mem.indexOf(u8, target, "/queues/filter") != null) {
23 try queues.filter(r, target);
24 return;
25 }
26 if (mem.indexOf(u8, target, "/workers/filter") != null) {
27 try workers.filter(r, target);
28 return;
29 }
30 try filterPools(r);
31 return;
32 }
33
34 // POST /work_pools/{name}/workers/heartbeat
35 if (mem.eql(u8, method, "POST") and mem.indexOf(u8, target, "/workers/heartbeat") != null) {
36 try workers.heartbeat(r, target);
37 return;
38 }
39
40 // POST /work_pools/{name}/get_scheduled_flow_runs
41 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/get_scheduled_flow_runs")) {
42 try schedule.handle(r, target);
43 return;
44 }
45
46 // POST /work_pools/{name}/queues/ - create queue
47 if (mem.eql(u8, method, "POST") and mem.indexOf(u8, target, "/queues/") != null) {
48 const queues_idx = mem.indexOf(u8, target, "/queues/") orelse {
49 json_util.sendStatus(r, "{\"detail\":\"invalid path\"}", .bad_request);
50 return;
51 };
52 const after_queues = target[queues_idx + 8 ..];
53 if (after_queues.len == 0 or mem.eql(u8, after_queues, "/")) {
54 try queues.create(r, target);
55 return;
56 }
57 }
58
59 // POST /work_pools/ - create pool
60 if (mem.eql(u8, method, "POST")) {
61 const is_root = mem.endsWith(u8, target, "/work_pools/") or mem.endsWith(u8, target, "/work_pools");
62 if (is_root) {
63 try createPool(r);
64 return;
65 }
66 }
67
68 // GET /work_pools/{name}/queues/{queue_name}
69 if (mem.eql(u8, method, "GET") and mem.indexOf(u8, target, "/queues/") != null) {
70 try queues.get(r, target);
71 return;
72 }
73
74 // PATCH /work_pools/{name}/queues/{queue_name}
75 if (mem.eql(u8, method, "PATCH") and mem.indexOf(u8, target, "/queues/") != null) {
76 try queues.update(r, target);
77 return;
78 }
79
80 // DELETE /work_pools/{name}/queues/{queue_name}
81 if (mem.eql(u8, method, "DELETE") and mem.indexOf(u8, target, "/queues/") != null) {
82 try queues.delete(r, target);
83 return;
84 }
85
86 // DELETE /work_pools/{name}/workers/{worker_name}
87 if (mem.eql(u8, method, "DELETE") and mem.indexOf(u8, target, "/workers/") != null) {
88 try workers.delete(r, target);
89 return;
90 }
91
92 // GET /work_pools/{name}
93 if (mem.eql(u8, method, "GET")) {
94 const name = extractPoolName(target) orelse {
95 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request);
96 return;
97 };
98 try getPool(r, name);
99 return;
100 }
101
102 // PATCH /work_pools/{name}
103 if (mem.eql(u8, method, "PATCH")) {
104 const name = extractPoolName(target) orelse {
105 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request);
106 return;
107 };
108 try updatePool(r, name);
109 return;
110 }
111
112 // DELETE /work_pools/{name}
113 if (mem.eql(u8, method, "DELETE")) {
114 const name = extractPoolName(target) orelse {
115 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request);
116 return;
117 };
118 try deletePool(r, name);
119 return;
120 }
121
122 json_util.sendStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented);
123}
124
125// Path extraction helpers (pub for sub-handlers)
126
127pub fn extractPoolName(target: []const u8) ?[]const u8 {
128 const prefix = if (mem.startsWith(u8, target, "/api/work_pools/"))
129 "/api/work_pools/"
130 else if (mem.startsWith(u8, target, "/work_pools/"))
131 "/work_pools/"
132 else
133 return null;
134
135 if (target.len <= prefix.len) return null;
136
137 const after = target[prefix.len..];
138 const end = mem.indexOf(u8, after, "/") orelse after.len;
139 if (end == 0) return null;
140
141 return after[0..end];
142}
143
144pub fn extractQueueName(target: []const u8) ?[]const u8 {
145 const idx = mem.indexOf(u8, target, "/queues/") orelse return null;
146 const start = idx + 8;
147 if (start >= target.len) return null;
148
149 const after = target[start..];
150 const end = mem.indexOf(u8, after, "/") orelse after.len;
151 if (end == 0) return null;
152
153 return after[0..end];
154}
155
156pub fn extractWorkerName(target: []const u8) ?[]const u8 {
157 const idx = mem.indexOf(u8, target, "/workers/") orelse return null;
158 const start = idx + 9;
159 if (start >= target.len) return null;
160
161 const after = target[start..];
162 if (mem.startsWith(u8, after, "heartbeat") or mem.startsWith(u8, after, "filter")) return null;
163
164 const end = mem.indexOf(u8, after, "/") orelse after.len;
165 if (end == 0) return null;
166
167 return after[0..end];
168}
169
170fn isReservedPool(name: []const u8) bool {
171 if (name.len < 7) return false;
172 var lower: [7]u8 = undefined;
173 for (name[0..7], 0..) |c, i| {
174 lower[i] = std.ascii.toLower(c);
175 }
176 return mem.eql(u8, &lower, "prefect");
177}
178
179// JSON helpers (pub for sub-handlers)
180
181pub fn getOptionalString(val: ?json.Value) ?[]const u8 {
182 if (val) |v| {
183 return switch (v) {
184 .string => |s| s,
185 else => null,
186 };
187 }
188 return null;
189}
190
191pub fn getOptionalBool(val: ?json.Value) ?bool {
192 if (val) |v| {
193 return switch (v) {
194 .bool => |b| b,
195 else => null,
196 };
197 }
198 return null;
199}
200
201pub fn getOptionalInt(val: ?json.Value) ?i64 {
202 if (val) |v| {
203 return switch (v) {
204 .integer => |i| i,
205 else => null,
206 };
207 }
208 return null;
209}
210
211fn stringifyField(alloc: std.mem.Allocator, val: ?json.Value, default: []const u8) []const u8 {
212 if (val) |v| {
213 return std.fmt.allocPrint(alloc, "{f}", .{json.fmt(v, .{})}) catch default;
214 }
215 return default;
216}
217
218fn stringifyFieldOptional(alloc: std.mem.Allocator, val: ?json.Value) ?[]const u8 {
219 if (val) |v| {
220 return std.fmt.allocPrint(alloc, "{f}", .{json.fmt(v, .{})}) catch null;
221 }
222 return null;
223}
224
225// Pool CRUD handlers
226
227fn createPool(r: zap.Request) !void {
228 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
229 defer arena.deinit();
230 const alloc = arena.allocator();
231
232 const body = r.body orelse {
233 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request);
234 return;
235 };
236
237 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
238 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
239 return;
240 };
241
242 const obj = parsed.value.object;
243
244 const name = switch (obj.get("name") orelse {
245 json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request);
246 return;
247 }) {
248 .string => |s| s,
249 else => {
250 json_util.sendStatus(r, "{\"detail\":\"name must be string\"}", .bad_request);
251 return;
252 },
253 };
254
255 if (isReservedPool(name)) {
256 json_util.sendStatus(r, "{\"detail\":\"Work pools starting with 'Prefect' are reserved.\"}", .forbidden);
257 return;
258 }
259
260 if (db.work_pools.getByName(alloc, name) catch null) |_| {
261 const err_msg = std.fmt.allocPrint(alloc, "{{\"detail\":\"Work pool '{s}' already exists.\"}}", .{name}) catch {
262 json_util.sendStatus(r, "{\"detail\":\"Work pool already exists\"}", .conflict);
263 return;
264 };
265 json_util.sendStatus(r, err_msg, .conflict);
266 return;
267 }
268
269 const description = getOptionalString(obj.get("description"));
270 const pool_type = getOptionalString(obj.get("type")) orelse "process";
271 const base_job_template = stringifyField(alloc, obj.get("base_job_template"), "{}");
272 const is_paused = getOptionalBool(obj.get("is_paused")) orelse false;
273 const concurrency_limit = getOptionalInt(obj.get("concurrency_limit"));
274
275 var pool_id_buf: [36]u8 = undefined;
276 const pool_id = uuid_util.generate(&pool_id_buf);
277
278 var queue_id_buf: [36]u8 = undefined;
279 const queue_id = uuid_util.generate(&queue_id_buf);
280
281 var ts_buf: [32]u8 = undefined;
282 const now = time_util.timestamp(&ts_buf);
283
284 const status: db.work_pools.Status = if (is_paused) .paused else .not_ready;
285
286 db.work_pools.insert(
287 pool_id,
288 name,
289 description,
290 pool_type,
291 base_job_template,
292 is_paused,
293 concurrency_limit,
294 queue_id,
295 status,
296 now,
297 ) catch {
298 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error);
299 return;
300 };
301
302 db.work_queues.insert(queue_id, "default", "", false, null, 1, pool_id, .not_ready, now) catch {
303 json_util.sendStatus(r, "{\"detail\":\"failed to create default queue\"}", .internal_server_error);
304 return;
305 };
306
307 const pool = db.work_pools.getByName(alloc, name) catch null orelse {
308 json_util.sendStatus(r, "{\"detail\":\"pool not found after insert\"}", .internal_server_error);
309 return;
310 };
311
312 const resp = writePool(alloc, pool) catch {
313 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
314 return;
315 };
316 json_util.sendStatus(r, resp, .created);
317}
318
319fn getPool(r: zap.Request, name: []const u8) !void {
320 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
321 defer arena.deinit();
322 const alloc = arena.allocator();
323
324 if (db.work_pools.getByName(alloc, name) catch null) |pool| {
325 const resp = writePool(alloc, pool) catch {
326 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
327 return;
328 };
329 json_util.send(r, resp);
330 } else {
331 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
332 }
333}
334
335fn updatePool(r: zap.Request, name: []const u8) !void {
336 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
337 defer arena.deinit();
338 const alloc = arena.allocator();
339
340 const body = r.body orelse {
341 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request);
342 return;
343 };
344
345 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
346 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
347 return;
348 };
349
350 const obj = parsed.value.object;
351
352 const existing = db.work_pools.getByName(alloc, name) catch null orelse {
353 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
354 return;
355 };
356
357 if (isReservedPool(name)) {
358 const has_other = obj.get("description") != null or obj.get("base_job_template") != null;
359 if (has_other) {
360 json_util.sendStatus(r, "{\"detail\":\"Cannot modify reserved work pool.\"}", .forbidden);
361 return;
362 }
363 }
364
365 const description = getOptionalString(obj.get("description"));
366 const base_job_template = stringifyFieldOptional(alloc, obj.get("base_job_template"));
367 const is_paused = getOptionalBool(obj.get("is_paused"));
368 const concurrency_limit = getOptionalInt(obj.get("concurrency_limit"));
369
370 var new_status: ?db.work_pools.Status = null;
371 if (is_paused) |paused| {
372 if (paused) {
373 new_status = .paused;
374 } else if (existing.status == .paused) {
375 const has_workers = db.work_pools.hasOnlineWorkers(existing.id) catch false;
376 new_status = if (has_workers) .ready else .not_ready;
377 }
378 }
379
380 var ts_buf: [32]u8 = undefined;
381 const now = time_util.timestamp(&ts_buf);
382
383 const did_update = db.work_pools.updateByName(
384 name,
385 description,
386 base_job_template,
387 is_paused,
388 concurrency_limit,
389 new_status,
390 now,
391 ) catch {
392 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
393 return;
394 };
395
396 if (!did_update) {
397 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
398 return;
399 }
400
401 r.setStatus(.no_content);
402 r.sendBody("") catch {};
403}
404
405fn deletePool(r: zap.Request, name: []const u8) !void {
406 if (isReservedPool(name)) {
407 json_util.sendStatus(r, "{\"detail\":\"Cannot delete reserved work pool.\"}", .forbidden);
408 return;
409 }
410
411 const deleted = db.work_pools.deleteByName(name) catch {
412 json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error);
413 return;
414 };
415
416 if (!deleted) {
417 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found);
418 return;
419 }
420
421 r.setStatus(.no_content);
422 r.sendBody("") catch {};
423}
424
425fn filterPools(r: zap.Request) !void {
426 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
427 defer arena.deinit();
428 const alloc = arena.allocator();
429
430 var limit: usize = 200;
431 var offset: usize = 0;
432
433 if (r.body) |body| {
434 if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| {
435 const obj = parsed.value.object;
436 if (obj.get("limit")) |v| {
437 if (v == .integer) limit = @intCast(v.integer);
438 }
439 if (obj.get("offset")) |v| {
440 if (v == .integer) offset = @intCast(v.integer);
441 }
442 } else |_| {}
443 }
444
445 const pools = db.work_pools.list(alloc, limit, offset) catch {
446 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
447 return;
448 };
449
450 var output: std.io.Writer.Allocating = .init(alloc);
451 var jw: json.Stringify = .{ .writer = &output.writer };
452
453 jw.beginArray() catch {
454 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
455 return;
456 };
457
458 for (pools) |pool| {
459 writePoolObject(&jw, pool) catch continue;
460 }
461
462 jw.endArray() catch {};
463
464 json_util.send(r, output.toOwnedSlice() catch "[]");
465}
466
467// JSON serialization
468
469fn writePool(alloc: std.mem.Allocator, pool: db.work_pools.WorkPoolRow) ![]const u8 {
470 var output: std.io.Writer.Allocating = .init(alloc);
471 var jw: json.Stringify = .{ .writer = &output.writer };
472 try writePoolObject(&jw, pool);
473 return output.toOwnedSlice();
474}
475
476fn writePoolObject(jw: *json.Stringify, pool: db.work_pools.WorkPoolRow) !void {
477 try jw.beginObject();
478
479 try jw.objectField("id");
480 try jw.write(pool.id);
481
482 try jw.objectField("created");
483 try jw.write(pool.created);
484
485 try jw.objectField("updated");
486 try jw.write(pool.updated);
487
488 try jw.objectField("name");
489 try jw.write(pool.name);
490
491 try jw.objectField("description");
492 if (pool.description) |d| {
493 try jw.write(d);
494 } else {
495 try jw.write(null);
496 }
497
498 try jw.objectField("type");
499 try jw.write(pool.type);
500
501 try jw.objectField("base_job_template");
502 try jw.beginWriteRaw();
503 try jw.writer.writeAll(pool.base_job_template);
504 jw.endWriteRaw();
505
506 try jw.objectField("is_paused");
507 try jw.write(pool.is_paused);
508
509 try jw.objectField("concurrency_limit");
510 if (pool.concurrency_limit) |c| {
511 try jw.write(c);
512 } else {
513 try jw.write(null);
514 }
515
516 try jw.objectField("default_queue_id");
517 if (pool.default_queue_id) |q| {
518 try jw.write(q);
519 } else {
520 try jw.write(null);
521 }
522
523 try jw.objectField("status");
524 try jw.write(pool.status.toString());
525
526 try jw.endObject();
527}