prefect server in zig
at f511403a2b901063559cd17995b45527418e76c6 195 lines 6.0 kB view raw
1const std = @import("std"); 2const zap = @import("zap"); 3const json = std.json; 4 5const db = @import("../db/sqlite.zig"); 6const uuid_util = @import("../utilities/uuid.zig"); 7const time_util = @import("../utilities/time.zig"); 8const json_util = @import("../utilities/json.zig"); 9const pool_helpers = @import("work_pools.zig"); 10 11pub fn heartbeat(r: zap.Request, target: []const u8) !void { 12 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 13 defer arena.deinit(); 14 const alloc = arena.allocator(); 15 16 const pool_name = pool_helpers.extractPoolName(target) orelse { 17 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 18 return; 19 }; 20 21 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 22 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 23 return; 24 }; 25 26 const body = r.body orelse { 27 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 28 return; 29 }; 30 31 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 32 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 33 return; 34 }; 35 36 const obj = parsed.value.object; 37 38 const worker_name = switch (obj.get("name") orelse { 39 json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); 40 return; 41 }) { 42 .string => |s| s, 43 else => { 44 json_util.sendStatus(r, "{\"detail\":\"name must be string\"}", .bad_request); 45 return; 46 }, 47 }; 48 49 const heartbeat_interval = pool_helpers.getOptionalInt(obj.get("heartbeat_interval_seconds")); 50 51 var id_buf: [36]u8 = undefined; 52 const worker_id = uuid_util.generate(&id_buf); 53 54 var ts_buf: [32]u8 = undefined; 55 const now = time_util.timestamp(&ts_buf); 56 57 db.workers.upsertHeartbeat(worker_id, worker_name, pool.id, heartbeat_interval, now) catch { 58 json_util.sendStatus(r, "{\"detail\":\"heartbeat failed\"}", .internal_server_error); 59 return; 60 }; 61 62 // Update pool status to READY if it was NOT_READY 63 if (pool.status == .not_ready) { 64 _ = db.work_pools.updateStatus(pool_name, .ready, now) catch {}; 65 } 66 67 r.setStatus(.no_content); 68 r.sendBody("") catch {}; 69} 70 71pub fn filter(r: zap.Request, target: []const u8) !void { 72 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 73 defer arena.deinit(); 74 const alloc = arena.allocator(); 75 76 const pool_name = pool_helpers.extractPoolName(target) orelse { 77 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 78 return; 79 }; 80 81 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 82 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 83 return; 84 }; 85 86 var limit: usize = 200; 87 var offset: usize = 0; 88 89 if (r.body) |body| { 90 if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 91 const obj = parsed.value.object; 92 if (obj.get("limit")) |v| { 93 if (v == .integer) limit = @intCast(v.integer); 94 } 95 if (obj.get("offset")) |v| { 96 if (v == .integer) offset = @intCast(v.integer); 97 } 98 } else |_| {} 99 } 100 101 const workers_list = db.workers.listByPool(alloc, pool.id, limit, offset) catch { 102 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 103 return; 104 }; 105 106 var output: std.io.Writer.Allocating = .init(alloc); 107 var jw: json.Stringify = .{ .writer = &output.writer }; 108 109 jw.beginArray() catch { 110 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 111 return; 112 }; 113 114 for (workers_list) |worker| { 115 writeWorkerObject(&jw, worker) catch continue; 116 } 117 118 jw.endArray() catch {}; 119 120 json_util.send(r, output.toOwnedSlice() catch "[]"); 121} 122 123pub fn delete(r: zap.Request, target: []const u8) !void { 124 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 125 defer arena.deinit(); 126 const alloc = arena.allocator(); 127 128 const pool_name = pool_helpers.extractPoolName(target) orelse { 129 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 130 return; 131 }; 132 133 const worker_name = pool_helpers.extractWorkerName(target) orelse { 134 json_util.sendStatus(r, "{\"detail\":\"worker name required\"}", .bad_request); 135 return; 136 }; 137 138 const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 139 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 140 return; 141 }; 142 143 const deleted = db.workers.deleteByPoolAndName(pool.id, worker_name) catch { 144 json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); 145 return; 146 }; 147 148 if (!deleted) { 149 json_util.sendStatus(r, "{\"detail\":\"Worker not found.\"}", .not_found); 150 return; 151 } 152 153 r.setStatus(.no_content); 154 r.sendBody("") catch {}; 155} 156 157// JSON serialization 158 159fn writeWorkerObject(jw: *json.Stringify, worker: db.workers.WorkerRow) !void { 160 try jw.beginObject(); 161 162 try jw.objectField("id"); 163 try jw.write(worker.id); 164 165 try jw.objectField("created"); 166 try jw.write(worker.created); 167 168 try jw.objectField("updated"); 169 try jw.write(worker.updated); 170 171 try jw.objectField("name"); 172 try jw.write(worker.name); 173 174 try jw.objectField("work_pool_id"); 175 try jw.write(worker.work_pool_id); 176 177 try jw.objectField("last_heartbeat_time"); 178 if (worker.last_heartbeat_time) |lh| { 179 try jw.write(lh); 180 } else { 181 try jw.write(null); 182 } 183 184 try jw.objectField("heartbeat_interval_seconds"); 185 if (worker.heartbeat_interval_seconds) |hi| { 186 try jw.write(hi); 187 } else { 188 try jw.write(null); 189 } 190 191 try jw.objectField("status"); 192 try jw.write(worker.status.toString()); 193 194 try jw.endObject(); 195}