prefect server in zig
at main 250 lines 7.2 kB view raw
1const std = @import("std"); 2const zap = @import("zap"); 3const json = std.json; 4 5const db = @import("../db/sqlite.zig"); 6const time_util = @import("../utilities/time.zig"); 7const json_util = @import("../utilities/json.zig"); 8 9/// Handle POST /work_pools/{name}/get_scheduled_flow_runs 10pub fn handle(r: zap.Request, target: []const u8) !void { 11 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 12 defer arena.deinit(); 13 const alloc = arena.allocator(); 14 15 const pool_name = extractPoolName(target) orelse { 16 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 17 return; 18 }; 19 20 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 21 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 22 return; 23 }; 24 25 // Parse request body 26 var limit: usize = 200; 27 var scheduled_before: ?[]const u8 = null; 28 var work_queue_names = std.ArrayListUnmanaged([]const u8){}; 29 30 if (r.body) |body| { 31 if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 32 const obj = parsed.value.object; 33 34 if (obj.get("limit")) |v| { 35 if (v == .integer) limit = @intCast(v.integer); 36 } 37 38 if (obj.get("scheduled_before")) |v| { 39 if (v == .string) scheduled_before = v.string; 40 } 41 42 if (obj.get("work_queue_names")) |v| { 43 if (v == .array) { 44 for (v.array.items) |item| { 45 if (item == .string) { 46 try work_queue_names.append(alloc, item.string); 47 } 48 } 49 } 50 } 51 } else |_| {} 52 } 53 54 // Default scheduled_before to now if not provided 55 var ts_buf: [32]u8 = undefined; 56 const now = time_util.timestamp(&ts_buf); 57 if (scheduled_before == null) { 58 scheduled_before = now; 59 } 60 61 // Get work queues 62 var queue_ids = std.ArrayListUnmanaged([]const u8){}; 63 var work_queues = std.ArrayListUnmanaged(db.work_queues.WorkQueueRow){}; 64 65 if (work_queue_names.items.len > 0) { 66 // Filter by specific queue names 67 for (work_queue_names.items) |qname| { 68 if (db.work_queues.getByPoolAndName(alloc, pool.id, qname) catch null) |queue| { 69 try queue_ids.append(alloc, queue.id); 70 try work_queues.append(alloc, queue); 71 } 72 } 73 } else { 74 // Get all queues in pool 75 const all_queues = db.work_queues.listByPool(alloc, pool.id, 1000, 0) catch { 76 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 77 return; 78 }; 79 for (all_queues) |queue| { 80 try queue_ids.append(alloc, queue.id); 81 try work_queues.append(alloc, queue); 82 } 83 } 84 85 // Get scheduled flow runs 86 const flow_runs = db.flow_runs.getScheduledByWorkQueues( 87 alloc, 88 queue_ids.items, 89 scheduled_before, 90 limit, 91 ) catch { 92 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 93 return; 94 }; 95 96 // Update last_polled for work queues and mark as ready if needed 97 for (work_queues.items) |queue| { 98 const new_status: db.work_queues.Status = if (queue.is_paused) .paused else .ready; 99 _ = db.work_queues.updateLastPolled(pool.id, queue.name, now, new_status) catch {}; 100 } 101 102 // Mark pool as ready if it was not_ready 103 if (pool.status == .not_ready) { 104 _ = db.work_pools.updateStatus(pool_name, .ready, now) catch {}; 105 } 106 107 // Mark deployments as ready (deployments for the queues we polled) 108 db.deployments.markReadyByWorkQueues(queue_ids.items, now) catch {}; 109 110 // Build response as WorkerFlowRunResponse[] 111 var output: std.io.Writer.Allocating = .init(alloc); 112 var jw: json.Stringify = .{ .writer = &output.writer }; 113 114 jw.beginArray() catch { 115 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 116 return; 117 }; 118 119 for (flow_runs) |run| { 120 writeWorkerFlowRunResponse(&jw, pool.id, run) catch continue; 121 } 122 123 jw.endArray() catch {}; 124 125 json_util.send(r, output.toOwnedSlice() catch "[]"); 126} 127 128fn extractPoolName(target: []const u8) ?[]const u8 { 129 const prefix = "/api/work_pools/"; 130 if (!std.mem.startsWith(u8, target, prefix)) return null; 131 132 const rest = target[prefix.len..]; 133 const end = std.mem.indexOf(u8, rest, "/") orelse return null; 134 if (end == 0) return null; 135 136 return rest[0..end]; 137} 138 139fn writeWorkerFlowRunResponse(jw: *json.Stringify, work_pool_id: []const u8, run: db.flow_runs.FlowRunRow) !void { 140 try jw.beginObject(); 141 142 try jw.objectField("work_pool_id"); 143 try jw.write(work_pool_id); 144 145 try jw.objectField("work_queue_id"); 146 try jw.write(run.work_queue_id); 147 148 try jw.objectField("flow_run"); 149 try writeFlowRunObject(jw, run); 150 151 try jw.endObject(); 152} 153 154fn writeFlowRunObject(jw: *json.Stringify, run: db.flow_runs.FlowRunRow) !void { 155 try jw.beginObject(); 156 157 try jw.objectField("id"); 158 try jw.write(run.id); 159 160 try jw.objectField("created"); 161 try jw.write(run.created); 162 163 try jw.objectField("updated"); 164 try jw.write(run.updated); 165 166 try jw.objectField("name"); 167 try jw.write(run.name); 168 169 try jw.objectField("flow_id"); 170 try jw.write(run.flow_id); 171 172 try jw.objectField("state_type"); 173 try jw.write(run.state_type); 174 175 try jw.objectField("state_name"); 176 try jw.write(run.state_name); 177 178 try jw.objectField("state_timestamp"); 179 try jw.write(run.state_timestamp); 180 181 try jw.objectField("parameters"); 182 try jw.beginWriteRaw(); 183 try jw.writer.writeAll(run.parameters); 184 jw.endWriteRaw(); 185 186 try jw.objectField("tags"); 187 try jw.beginWriteRaw(); 188 try jw.writer.writeAll(run.tags); 189 jw.endWriteRaw(); 190 191 try jw.objectField("run_count"); 192 try jw.write(run.run_count); 193 194 try jw.objectField("expected_start_time"); 195 if (run.expected_start_time) |t| { 196 try jw.write(t); 197 } else { 198 try jw.write(null); 199 } 200 201 try jw.objectField("start_time"); 202 if (run.start_time) |t| { 203 try jw.write(t); 204 } else { 205 try jw.write(null); 206 } 207 208 try jw.objectField("end_time"); 209 if (run.end_time) |t| { 210 try jw.write(t); 211 } else { 212 try jw.write(null); 213 } 214 215 try jw.objectField("total_run_time"); 216 try jw.write(run.total_run_time); 217 218 try jw.objectField("deployment_id"); 219 if (run.deployment_id) |d| { 220 try jw.write(d); 221 } else { 222 try jw.write(null); 223 } 224 225 try jw.objectField("deployment_version"); 226 if (run.deployment_version) |v| { 227 try jw.write(v); 228 } else { 229 try jw.write(null); 230 } 231 232 try jw.objectField("work_queue_name"); 233 if (run.work_queue_name) |n| { 234 try jw.write(n); 235 } else { 236 try jw.write(null); 237 } 238 239 try jw.objectField("work_queue_id"); 240 if (run.work_queue_id) |q| { 241 try jw.write(q); 242 } else { 243 try jw.write(null); 244 } 245 246 try jw.objectField("auto_scheduled"); 247 try jw.write(run.auto_scheduled); 248 249 try jw.endObject(); 250}