prefect server in zig
at main 358 lines 11 kB view raw
1const std = @import("std"); 2const zap = @import("zap"); 3const mem = std.mem; 4const json = std.json; 5 6const db = @import("../db/sqlite.zig"); 7const routing = @import("routing.zig"); 8const uuid_util = @import("../utilities/uuid.zig"); 9const time_util = @import("../utilities/time.zig"); 10const json_util = @import("../utilities/json.zig"); 11 12// POST /task_runs/ - create task run 13// POST /task_runs/filter - list task runs 14// GET /task_runs/{id} - read task run 15// POST /task_runs/{id}/set_state - set state 16pub fn handle(r: zap.Request) !void { 17 const target = r.path orelse "/"; 18 const method = r.method orelse "GET"; 19 20 // POST /task_runs/filter - list 21 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) { 22 try filter(r); 23 return; 24 } 25 26 // POST /task_runs/ - create 27 if (mem.eql(u8, method, "POST") and (mem.eql(u8, target, "/task_runs/") or mem.eql(u8, target, "/api/task_runs/"))) { 28 try create(r); 29 return; 30 } 31 32 // check for /{id}/set_state 33 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/set_state")) { 34 const id = routing.extractId(target, "/task_runs/", "/set_state") orelse 35 routing.extractId(target, "/api/task_runs/", "/set_state"); 36 if (id) |task_run_id| { 37 try setState(r, task_run_id); 38 return; 39 } 40 } 41 42 // GET /task_runs/{id} - read single 43 if (mem.eql(u8, method, "GET")) { 44 const id = routing.extractIdAfter(target, "/task_runs/") orelse 45 routing.extractIdAfter(target, "/api/task_runs/"); 46 if (id) |task_run_id| { 47 try read(r, task_run_id); 48 return; 49 } 50 } 51 52 json_util.sendStatus(r, "{\"detail\":\"not found\"}", .not_found); 53} 54 55fn create(r: zap.Request) !void { 56 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 57 defer arena.deinit(); 58 const alloc = arena.allocator(); 59 60 const body = r.body orelse { 61 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 62 return; 63 }; 64 65 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 66 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 67 return; 68 }; 69 70 const obj = parsed.value.object; 71 72 // required fields 73 const task_key = if (obj.get("task_key")) |v| switch (v) { 74 .string => |s| s, 75 else => { 76 json_util.sendStatus(r, "{\"detail\":\"task_key must be string\"}", .bad_request); 77 return; 78 }, 79 } else { 80 json_util.sendStatus(r, "{\"detail\":\"task_key required\"}", .bad_request); 81 return; 82 }; 83 84 const dynamic_key = if (obj.get("dynamic_key")) |v| switch (v) { 85 .string => |s| s, 86 else => { 87 json_util.sendStatus(r, "{\"detail\":\"dynamic_key must be string\"}", .bad_request); 88 return; 89 }, 90 } else { 91 json_util.sendStatus(r, "{\"detail\":\"dynamic_key required\"}", .bad_request); 92 return; 93 }; 94 95 // optional fields 96 const flow_run_id: ?[]const u8 = if (obj.get("flow_run_id")) |v| switch (v) { 97 .string => |s| s, 98 .null => null, 99 else => null, 100 } else null; 101 102 const name = if (obj.get("name")) |v| switch (v) { 103 .string => |s| s, 104 .null => routing.generateRunName(alloc), 105 else => routing.generateRunName(alloc), 106 } else routing.generateRunName(alloc); 107 108 const state = obj.get("state"); 109 110 // check for existing task run (idempotency) 111 if (db.getTaskRunByKey(alloc, flow_run_id, task_key, dynamic_key) catch null) |existing| { 112 const resp = writeTaskRun(alloc, existing, null) catch { 113 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 114 return; 115 }; 116 json_util.send(r, resp); 117 return; 118 } 119 120 // extract state info 121 var state_type: []const u8 = "PENDING"; 122 var state_name: []const u8 = "Pending"; 123 if (state) |s| { 124 if (s == .object) { 125 if (s.object.get("type")) |t| { 126 if (t == .string) state_type = t.string; 127 } 128 if (s.object.get("name")) |n| { 129 if (n == .string) state_name = n.string; 130 } 131 } 132 } 133 134 var new_id_buf: [36]u8 = undefined; 135 const new_id = uuid_util.generate(&new_id_buf); 136 var ts_buf: [32]u8 = undefined; 137 const now = time_util.timestamp(&ts_buf); 138 139 db.insertTaskRun(new_id, flow_run_id, name, task_key, dynamic_key, state_type, state_name, now) catch { 140 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 141 return; 142 }; 143 144 var state_id_buf: [36]u8 = undefined; 145 const state_id = uuid_util.generate(&state_id_buf); 146 147 const run = db.TaskRunRow{ 148 .id = new_id, 149 .created = now, 150 .updated = now, 151 .name = name, 152 .flow_run_id = flow_run_id orelse "", 153 .task_key = task_key, 154 .dynamic_key = dynamic_key, 155 .state_type = state_type, 156 .state_name = state_name, 157 .state_timestamp = now, 158 .tags = "[]", 159 .run_count = 0, 160 }; 161 162 const resp = writeTaskRun(alloc, run, state_id) catch { 163 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 164 return; 165 }; 166 json_util.sendStatus(r, resp, .created); 167} 168 169fn read(r: zap.Request, id: []const u8) !void { 170 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 171 defer arena.deinit(); 172 const alloc = arena.allocator(); 173 174 const run = db.getTaskRun(alloc, id) catch null orelse { 175 json_util.sendStatus(r, "{\"detail\":\"task run not found\"}", .not_found); 176 return; 177 }; 178 179 const resp = writeTaskRun(alloc, run, null) catch { 180 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 181 return; 182 }; 183 json_util.send(r, resp); 184} 185 186fn setState(r: zap.Request, id: []const u8) !void { 187 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 188 defer arena.deinit(); 189 const alloc = arena.allocator(); 190 191 const body = r.body orelse { 192 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 193 return; 194 }; 195 196 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 197 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 198 return; 199 }; 200 201 const state = parsed.value.object.get("state") orelse { 202 json_util.sendStatus(r, "{\"detail\":\"state required\"}", .bad_request); 203 return; 204 }; 205 206 const state_type = if (state.object.get("type")) |v| v.string else "PENDING"; 207 const state_name = if (state.object.get("name")) |v| v.string else "Pending"; 208 var ts_buf: [32]u8 = undefined; 209 const now = time_util.timestamp(&ts_buf); 210 var state_id_buf: [36]u8 = undefined; 211 const state_id = uuid_util.generate(&state_id_buf); 212 213 db.setTaskRunState(id, state_id, state_type, state_name, now) catch { 214 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 215 return; 216 }; 217 218 const resp = writeStateResponse(alloc, state_type, state_name, now, state_id) catch { 219 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 220 return; 221 }; 222 json_util.send(r, resp); 223} 224 225fn filter(r: zap.Request) !void { 226 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 227 defer arena.deinit(); 228 const alloc = arena.allocator(); 229 230 const runs = db.listTaskRuns(alloc, 50) catch { 231 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 232 return; 233 }; 234 235 var output: std.Io.Writer.Allocating = .init(alloc); 236 var jw: json.Stringify = .{ .writer = &output.writer }; 237 238 jw.beginArray() catch { 239 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 240 return; 241 }; 242 243 for (runs) |run| { 244 writeTaskRunObject(&jw, run, null) catch continue; 245 } 246 247 jw.endArray() catch {}; 248 249 json_util.send(r, output.toOwnedSlice() catch "[]"); 250} 251 252fn writeTaskRun(alloc: std.mem.Allocator, run: db.TaskRunRow, state_id: ?[]const u8) ![]const u8 { 253 var output: std.Io.Writer.Allocating = .init(alloc); 254 var jw: json.Stringify = .{ .writer = &output.writer }; 255 try writeTaskRunObject(&jw, run, state_id); 256 return output.toOwnedSlice(); 257} 258 259fn writeTaskRunObject(jw: *json.Stringify, run: db.TaskRunRow, state_id: ?[]const u8) !void { 260 try jw.beginObject(); 261 262 try jw.objectField("id"); 263 try jw.write(run.id); 264 265 try jw.objectField("created"); 266 try jw.write(run.created); 267 268 try jw.objectField("updated"); 269 try jw.write(run.updated); 270 271 try jw.objectField("name"); 272 try jw.write(run.name); 273 274 try jw.objectField("flow_run_id"); 275 if (run.flow_run_id.len > 0) { 276 try jw.write(run.flow_run_id); 277 } else { 278 try jw.write(null); 279 } 280 281 try jw.objectField("task_key"); 282 try jw.write(run.task_key); 283 284 try jw.objectField("dynamic_key"); 285 try jw.write(run.dynamic_key); 286 287 try jw.objectField("state_type"); 288 try jw.write(run.state_type); 289 290 try jw.objectField("state_name"); 291 try jw.write(run.state_name); 292 293 try jw.objectField("state"); 294 try jw.beginObject(); 295 try jw.objectField("type"); 296 try jw.write(run.state_type); 297 try jw.objectField("name"); 298 try jw.write(run.state_name); 299 try jw.objectField("timestamp"); 300 try jw.write(run.state_timestamp); 301 if (state_id) |sid| { 302 try jw.objectField("id"); 303 try jw.write(sid); 304 } 305 try jw.endObject(); 306 307 try jw.objectField("tags"); 308 try jw.beginWriteRaw(); 309 try jw.writer.writeAll(run.tags); 310 jw.endWriteRaw(); 311 312 try jw.objectField("run_count"); 313 try jw.write(run.run_count); 314 315 try jw.objectField("expected_start_time"); 316 try jw.write(null); 317 318 try jw.objectField("start_time"); 319 try jw.write(null); 320 321 try jw.objectField("end_time"); 322 try jw.write(null); 323 324 try jw.objectField("total_run_time"); 325 try jw.write(@as(i32, 0)); 326 327 try jw.endObject(); 328} 329 330fn writeStateResponse(alloc: std.mem.Allocator, state_type: []const u8, state_name: []const u8, timestamp: []const u8, state_id: []const u8) ![]const u8 { 331 var output: std.Io.Writer.Allocating = .init(alloc); 332 var jw: json.Stringify = .{ .writer = &output.writer }; 333 334 try jw.beginObject(); 335 336 try jw.objectField("status"); 337 try jw.write("ACCEPT"); 338 339 try jw.objectField("details"); 340 try jw.beginObject(); 341 try jw.endObject(); 342 343 try jw.objectField("state"); 344 try jw.beginObject(); 345 try jw.objectField("type"); 346 try jw.write(state_type); 347 try jw.objectField("name"); 348 try jw.write(state_name); 349 try jw.objectField("timestamp"); 350 try jw.write(timestamp); 351 try jw.objectField("id"); 352 try jw.write(state_id); 353 try jw.endObject(); 354 355 try jw.endObject(); 356 357 return output.toOwnedSlice(); 358}