const std = @import("std"); const zap = @import("zap"); const mem = std.mem; const json = std.json; const db = @import("../db/sqlite.zig"); const uuid_util = @import("../utilities/uuid.zig"); const time_util = @import("../utilities/time.zig"); const json_util = @import("../utilities/json.zig"); // sub-handlers const queues = @import("work_pool_queues.zig"); const workers = @import("work_pool_workers.zig"); const schedule = @import("work_pool_schedule.zig"); pub fn handle(r: zap.Request) !void { const target = r.path orelse "/"; const method = r.method orelse "GET"; // POST /work_pools/filter - list work pools if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) { if (mem.indexOf(u8, target, "/queues/filter") != null) { try queues.filter(r, target); return; } if (mem.indexOf(u8, target, "/workers/filter") != null) { try workers.filter(r, target); return; } try filterPools(r); return; } // POST /work_pools/{name}/workers/heartbeat if (mem.eql(u8, method, "POST") and mem.indexOf(u8, target, "/workers/heartbeat") != null) { try workers.heartbeat(r, target); return; } // POST /work_pools/{name}/get_scheduled_flow_runs if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/get_scheduled_flow_runs")) { try schedule.handle(r, target); return; } // POST /work_pools/{name}/queues/ - create queue if (mem.eql(u8, method, "POST") and mem.indexOf(u8, target, "/queues/") != null) { const queues_idx = mem.indexOf(u8, target, "/queues/") orelse { json_util.sendStatus(r, "{\"detail\":\"invalid path\"}", .bad_request); return; }; const after_queues = target[queues_idx + 8 ..]; if (after_queues.len == 0 or mem.eql(u8, after_queues, "/")) { try queues.create(r, target); return; } } // POST /work_pools/ - create pool if (mem.eql(u8, method, "POST")) { const is_root = mem.endsWith(u8, target, "/work_pools/") or mem.endsWith(u8, target, "/work_pools"); if (is_root) { try createPool(r); return; } } // GET /work_pools/{name}/queues/{queue_name} if (mem.eql(u8, method, "GET") and mem.indexOf(u8, target, "/queues/") != null) { try queues.get(r, target); return; } // PATCH /work_pools/{name}/queues/{queue_name} if (mem.eql(u8, method, "PATCH") and mem.indexOf(u8, target, "/queues/") != null) { try queues.update(r, target); return; } // DELETE /work_pools/{name}/queues/{queue_name} if (mem.eql(u8, method, "DELETE") and mem.indexOf(u8, target, "/queues/") != null) { try queues.delete(r, target); return; } // DELETE /work_pools/{name}/workers/{worker_name} if (mem.eql(u8, method, "DELETE") and mem.indexOf(u8, target, "/workers/") != null) { try workers.delete(r, target); return; } // GET /work_pools/{name} if (mem.eql(u8, method, "GET")) { const name = extractPoolName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); return; }; try getPool(r, name); return; } // PATCH /work_pools/{name} if (mem.eql(u8, method, "PATCH")) { const name = extractPoolName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); return; }; try updatePool(r, name); return; } // DELETE /work_pools/{name} if (mem.eql(u8, method, "DELETE")) { const name = extractPoolName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); return; }; try deletePool(r, name); return; } json_util.sendStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented); } // Path extraction helpers (pub for sub-handlers) pub fn extractPoolName(target: []const u8) ?[]const u8 { const prefix = if (mem.startsWith(u8, target, "/api/work_pools/")) "/api/work_pools/" else if (mem.startsWith(u8, target, "/work_pools/")) "/work_pools/" else return null; if (target.len <= prefix.len) return null; const after = target[prefix.len..]; const end = mem.indexOf(u8, after, "/") orelse after.len; if (end == 0) return null; return after[0..end]; } pub fn extractQueueName(target: []const u8) ?[]const u8 { const idx = mem.indexOf(u8, target, "/queues/") orelse return null; const start = idx + 8; if (start >= target.len) return null; const after = target[start..]; const end = mem.indexOf(u8, after, "/") orelse after.len; if (end == 0) return null; return after[0..end]; } pub fn extractWorkerName(target: []const u8) ?[]const u8 { const idx = mem.indexOf(u8, target, "/workers/") orelse return null; const start = idx + 9; if (start >= target.len) return null; const after = target[start..]; if (mem.startsWith(u8, after, "heartbeat") or mem.startsWith(u8, after, "filter")) return null; const end = mem.indexOf(u8, after, "/") orelse after.len; if (end == 0) return null; return after[0..end]; } fn isReservedPool(name: []const u8) bool { if (name.len < 7) return false; var lower: [7]u8 = undefined; for (name[0..7], 0..) |c, i| { lower[i] = std.ascii.toLower(c); } return mem.eql(u8, &lower, "prefect"); } // JSON helpers (pub for sub-handlers) pub fn getOptionalString(val: ?json.Value) ?[]const u8 { if (val) |v| { return switch (v) { .string => |s| s, else => null, }; } return null; } pub fn getOptionalBool(val: ?json.Value) ?bool { if (val) |v| { return switch (v) { .bool => |b| b, else => null, }; } return null; } pub fn getOptionalInt(val: ?json.Value) ?i64 { if (val) |v| { return switch (v) { .integer => |i| i, else => null, }; } return null; } fn stringifyField(alloc: std.mem.Allocator, val: ?json.Value, default: []const u8) []const u8 { if (val) |v| { return std.fmt.allocPrint(alloc, "{f}", .{json.fmt(v, .{})}) catch default; } return default; } fn stringifyFieldOptional(alloc: std.mem.Allocator, val: ?json.Value) ?[]const u8 { if (val) |v| { return std.fmt.allocPrint(alloc, "{f}", .{json.fmt(v, .{})}) catch null; } return null; } // Pool CRUD handlers fn createPool(r: zap.Request) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const body = r.body orelse { json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); return; }; const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); return; }; const obj = parsed.value.object; const name = switch (obj.get("name") orelse { json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); return; }) { .string => |s| s, else => { json_util.sendStatus(r, "{\"detail\":\"name must be string\"}", .bad_request); return; }, }; if (isReservedPool(name)) { json_util.sendStatus(r, "{\"detail\":\"Work pools starting with 'Prefect' are reserved.\"}", .forbidden); return; } if (db.work_pools.getByName(alloc, name) catch null) |_| { const err_msg = std.fmt.allocPrint(alloc, "{{\"detail\":\"Work pool '{s}' already exists.\"}}", .{name}) catch { json_util.sendStatus(r, "{\"detail\":\"Work pool already exists\"}", .conflict); return; }; json_util.sendStatus(r, err_msg, .conflict); return; } const description = getOptionalString(obj.get("description")); const pool_type = getOptionalString(obj.get("type")) orelse "process"; const base_job_template = stringifyField(alloc, obj.get("base_job_template"), "{}"); const is_paused = getOptionalBool(obj.get("is_paused")) orelse false; const concurrency_limit = getOptionalInt(obj.get("concurrency_limit")); var pool_id_buf: [36]u8 = undefined; const pool_id = uuid_util.generate(&pool_id_buf); var queue_id_buf: [36]u8 = undefined; const queue_id = uuid_util.generate(&queue_id_buf); var ts_buf: [32]u8 = undefined; const now = time_util.timestamp(&ts_buf); const status: db.work_pools.Status = if (is_paused) .paused else .not_ready; db.work_pools.insert( pool_id, name, description, pool_type, base_job_template, is_paused, concurrency_limit, queue_id, status, now, ) catch { json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); return; }; db.work_queues.insert(queue_id, "default", "", false, null, 1, pool_id, .not_ready, now) catch { json_util.sendStatus(r, "{\"detail\":\"failed to create default queue\"}", .internal_server_error); return; }; const pool = db.work_pools.getByName(alloc, name) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"pool not found after insert\"}", .internal_server_error); return; }; const resp = writePool(alloc, pool) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.sendStatus(r, resp, .created); } fn getPool(r: zap.Request, name: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); if (db.work_pools.getByName(alloc, name) catch null) |pool| { const resp = writePool(alloc, pool) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.send(r, resp); } else { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); } } fn updatePool(r: zap.Request, name: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const body = r.body orelse { json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); return; }; const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); return; }; const obj = parsed.value.object; const existing = db.work_pools.getByName(alloc, name) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); return; }; if (isReservedPool(name)) { const has_other = obj.get("description") != null or obj.get("base_job_template") != null; if (has_other) { json_util.sendStatus(r, "{\"detail\":\"Cannot modify reserved work pool.\"}", .forbidden); return; } } const description = getOptionalString(obj.get("description")); const base_job_template = stringifyFieldOptional(alloc, obj.get("base_job_template")); const is_paused = getOptionalBool(obj.get("is_paused")); const concurrency_limit = getOptionalInt(obj.get("concurrency_limit")); var new_status: ?db.work_pools.Status = null; if (is_paused) |paused| { if (paused) { new_status = .paused; } else if (existing.status == .paused) { const has_workers = db.work_pools.hasOnlineWorkers(existing.id) catch false; new_status = if (has_workers) .ready else .not_ready; } } var ts_buf: [32]u8 = undefined; const now = time_util.timestamp(&ts_buf); const did_update = db.work_pools.updateByName( name, description, base_job_template, is_paused, concurrency_limit, new_status, now, ) catch { json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); return; }; if (!did_update) { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); return; } r.setStatus(.no_content); r.sendBody("") catch {}; } fn deletePool(r: zap.Request, name: []const u8) !void { if (isReservedPool(name)) { json_util.sendStatus(r, "{\"detail\":\"Cannot delete reserved work pool.\"}", .forbidden); return; } const deleted = db.work_pools.deleteByName(name) catch { json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); return; }; if (!deleted) { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); return; } r.setStatus(.no_content); r.sendBody("") catch {}; } fn filterPools(r: zap.Request) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); var limit: usize = 200; var offset: usize = 0; if (r.body) |body| { if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { const obj = parsed.value.object; if (obj.get("limit")) |v| { if (v == .integer) limit = @intCast(v.integer); } if (obj.get("offset")) |v| { if (v == .integer) offset = @intCast(v.integer); } } else |_| {} } const pools = db.work_pools.list(alloc, limit, offset) catch { json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); return; }; var output: std.io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &output.writer }; jw.beginArray() catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; for (pools) |pool| { writePoolObject(&jw, pool) catch continue; } jw.endArray() catch {}; json_util.send(r, output.toOwnedSlice() catch "[]"); } // JSON serialization fn writePool(alloc: std.mem.Allocator, pool: db.work_pools.WorkPoolRow) ![]const u8 { var output: std.io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &output.writer }; try writePoolObject(&jw, pool); return output.toOwnedSlice(); } fn writePoolObject(jw: *json.Stringify, pool: db.work_pools.WorkPoolRow) !void { try jw.beginObject(); try jw.objectField("id"); try jw.write(pool.id); try jw.objectField("created"); try jw.write(pool.created); try jw.objectField("updated"); try jw.write(pool.updated); try jw.objectField("name"); try jw.write(pool.name); try jw.objectField("description"); if (pool.description) |d| { try jw.write(d); } else { try jw.write(null); } try jw.objectField("type"); try jw.write(pool.type); try jw.objectField("base_job_template"); try jw.beginWriteRaw(); try jw.writer.writeAll(pool.base_job_template); jw.endWriteRaw(); try jw.objectField("is_paused"); try jw.write(pool.is_paused); try jw.objectField("concurrency_limit"); if (pool.concurrency_limit) |c| { try jw.write(c); } else { try jw.write(null); } try jw.objectField("default_queue_id"); if (pool.default_queue_id) |q| { try jw.write(q); } else { try jw.write(null); } try jw.objectField("status"); try jw.write(pool.status.toString()); try jw.endObject(); }