const std = @import("std"); const zap = @import("zap"); const json = std.json; const db = @import("../db/sqlite.zig"); const time_util = @import("../utilities/time.zig"); const json_util = @import("../utilities/json.zig"); /// Handle POST /work_pools/{name}/get_scheduled_flow_runs pub fn handle(r: zap.Request, target: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const pool_name = extractPoolName(target) orelse { json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); return; }; const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); return; }; // Parse request body var limit: usize = 200; var scheduled_before: ?[]const u8 = null; var work_queue_names = std.ArrayListUnmanaged([]const u8){}; if (r.body) |body| { if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { const obj = parsed.value.object; if (obj.get("limit")) |v| { if (v == .integer) limit = @intCast(v.integer); } if (obj.get("scheduled_before")) |v| { if (v == .string) scheduled_before = v.string; } if (obj.get("work_queue_names")) |v| { if (v == .array) { for (v.array.items) |item| { if (item == .string) { try work_queue_names.append(alloc, item.string); } } } } } else |_| {} } // Default scheduled_before to now if not provided var ts_buf: [32]u8 = undefined; const now = time_util.timestamp(&ts_buf); if (scheduled_before == null) { scheduled_before = now; } // Get work queues var queue_ids = std.ArrayListUnmanaged([]const u8){}; var work_queues = std.ArrayListUnmanaged(db.work_queues.WorkQueueRow){}; if (work_queue_names.items.len > 0) { // Filter by specific queue names for (work_queue_names.items) |qname| { if (db.work_queues.getByPoolAndName(alloc, pool.id, qname) catch null) |queue| { try queue_ids.append(alloc, queue.id); try work_queues.append(alloc, queue); } } } else { // Get all queues in pool const all_queues = db.work_queues.listByPool(alloc, pool.id, 1000, 0) catch { json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); return; }; for (all_queues) |queue| { try queue_ids.append(alloc, queue.id); try work_queues.append(alloc, queue); } } // Get scheduled flow runs const flow_runs = db.flow_runs.getScheduledByWorkQueues( alloc, queue_ids.items, scheduled_before, limit, ) catch { json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); return; }; // Update last_polled for work queues and mark as ready if needed for (work_queues.items) |queue| { const new_status: db.work_queues.Status = if (queue.is_paused) .paused else .ready; _ = db.work_queues.updateLastPolled(pool.id, queue.name, now, new_status) catch {}; } // Mark pool as ready if it was not_ready if (pool.status == .not_ready) { _ = db.work_pools.updateStatus(pool_name, .ready, now) catch {}; } // Mark deployments as ready (deployments for the queues we polled) db.deployments.markReadyByWorkQueues(queue_ids.items, now) catch {}; // Build response as WorkerFlowRunResponse[] var output: std.io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &output.writer }; jw.beginArray() catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; for (flow_runs) |run| { writeWorkerFlowRunResponse(&jw, pool.id, run) catch continue; } jw.endArray() catch {}; json_util.send(r, output.toOwnedSlice() catch "[]"); } fn extractPoolName(target: []const u8) ?[]const u8 { const prefix = "/api/work_pools/"; if (!std.mem.startsWith(u8, target, prefix)) return null; const rest = target[prefix.len..]; const end = std.mem.indexOf(u8, rest, "/") orelse return null; if (end == 0) return null; return rest[0..end]; } fn writeWorkerFlowRunResponse(jw: *json.Stringify, work_pool_id: []const u8, run: db.flow_runs.FlowRunRow) !void { try jw.beginObject(); try jw.objectField("work_pool_id"); try jw.write(work_pool_id); try jw.objectField("work_queue_id"); try jw.write(run.work_queue_id); try jw.objectField("flow_run"); try writeFlowRunObject(jw, run); try jw.endObject(); } fn writeFlowRunObject(jw: *json.Stringify, run: db.flow_runs.FlowRunRow) !void { try jw.beginObject(); try jw.objectField("id"); try jw.write(run.id); try jw.objectField("created"); try jw.write(run.created); try jw.objectField("updated"); try jw.write(run.updated); try jw.objectField("name"); try jw.write(run.name); try jw.objectField("flow_id"); try jw.write(run.flow_id); try jw.objectField("state_type"); try jw.write(run.state_type); try jw.objectField("state_name"); try jw.write(run.state_name); try jw.objectField("state_timestamp"); try jw.write(run.state_timestamp); try jw.objectField("parameters"); try jw.beginWriteRaw(); try jw.writer.writeAll(run.parameters); jw.endWriteRaw(); try jw.objectField("tags"); try jw.beginWriteRaw(); try jw.writer.writeAll(run.tags); jw.endWriteRaw(); try jw.objectField("run_count"); try jw.write(run.run_count); try jw.objectField("expected_start_time"); if (run.expected_start_time) |t| { try jw.write(t); } else { try jw.write(null); } try jw.objectField("start_time"); if (run.start_time) |t| { try jw.write(t); } else { try jw.write(null); } try jw.objectField("end_time"); if (run.end_time) |t| { try jw.write(t); } else { try jw.write(null); } try jw.objectField("total_run_time"); try jw.write(run.total_run_time); try jw.objectField("deployment_id"); if (run.deployment_id) |d| { try jw.write(d); } else { try jw.write(null); } try jw.objectField("deployment_version"); if (run.deployment_version) |v| { try jw.write(v); } else { try jw.write(null); } try jw.objectField("work_queue_name"); if (run.work_queue_name) |n| { try jw.write(n); } else { try jw.write(null); } try jw.objectField("work_queue_id"); if (run.work_queue_id) |q| { try jw.write(q); } else { try jw.write(null); } try jw.objectField("auto_scheduled"); try jw.write(run.auto_scheduled); try jw.endObject(); }