prefect server in zig
at 7569870f69992c6426b54b9ca0721e1162577f52 527 lines 16 kB view raw
1const std = @import("std"); 2const zap = @import("zap"); 3const mem = std.mem; 4const json = std.json; 5 6const db = @import("../db/sqlite.zig"); 7const uuid_util = @import("../utilities/uuid.zig"); 8const time_util = @import("../utilities/time.zig"); 9const json_util = @import("../utilities/json.zig"); 10 11// sub-handlers 12const queues = @import("work_pool_queues.zig"); 13const workers = @import("work_pool_workers.zig"); 14const schedule = @import("work_pool_schedule.zig"); 15 16pub fn handle(r: zap.Request) !void { 17 const target = r.path orelse "/"; 18 const method = r.method orelse "GET"; 19 20 // POST /work_pools/filter - list work pools 21 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) { 22 if (mem.indexOf(u8, target, "/queues/filter") != null) { 23 try queues.filter(r, target); 24 return; 25 } 26 if (mem.indexOf(u8, target, "/workers/filter") != null) { 27 try workers.filter(r, target); 28 return; 29 } 30 try filterPools(r); 31 return; 32 } 33 34 // POST /work_pools/{name}/workers/heartbeat 35 if (mem.eql(u8, method, "POST") and mem.indexOf(u8, target, "/workers/heartbeat") != null) { 36 try workers.heartbeat(r, target); 37 return; 38 } 39 40 // POST /work_pools/{name}/get_scheduled_flow_runs 41 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/get_scheduled_flow_runs")) { 42 try schedule.handle(r, target); 43 return; 44 } 45 46 // POST /work_pools/{name}/queues/ - create queue 47 if (mem.eql(u8, method, "POST") and mem.indexOf(u8, target, "/queues/") != null) { 48 const queues_idx = mem.indexOf(u8, target, "/queues/") orelse { 49 json_util.sendStatus(r, "{\"detail\":\"invalid path\"}", .bad_request); 50 return; 51 }; 52 const after_queues = target[queues_idx + 8 ..]; 53 if (after_queues.len == 0 or mem.eql(u8, after_queues, "/")) { 54 try queues.create(r, target); 55 return; 56 } 57 } 58 59 // POST /work_pools/ - create pool 60 if (mem.eql(u8, method, "POST")) { 61 const is_root = mem.endsWith(u8, target, "/work_pools/") or mem.endsWith(u8, target, "/work_pools"); 62 if (is_root) { 63 try createPool(r); 64 return; 65 } 66 } 67 68 // GET /work_pools/{name}/queues/{queue_name} 69 if (mem.eql(u8, method, "GET") and mem.indexOf(u8, target, "/queues/") != null) { 70 try queues.get(r, target); 71 return; 72 } 73 74 // PATCH /work_pools/{name}/queues/{queue_name} 75 if (mem.eql(u8, method, "PATCH") and mem.indexOf(u8, target, "/queues/") != null) { 76 try queues.update(r, target); 77 return; 78 } 79 80 // DELETE /work_pools/{name}/queues/{queue_name} 81 if (mem.eql(u8, method, "DELETE") and mem.indexOf(u8, target, "/queues/") != null) { 82 try queues.delete(r, target); 83 return; 84 } 85 86 // DELETE /work_pools/{name}/workers/{worker_name} 87 if (mem.eql(u8, method, "DELETE") and mem.indexOf(u8, target, "/workers/") != null) { 88 try workers.delete(r, target); 89 return; 90 } 91 92 // GET /work_pools/{name} 93 if (mem.eql(u8, method, "GET")) { 94 const name = extractPoolName(target) orelse { 95 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 96 return; 97 }; 98 try getPool(r, name); 99 return; 100 } 101 102 // PATCH /work_pools/{name} 103 if (mem.eql(u8, method, "PATCH")) { 104 const name = extractPoolName(target) orelse { 105 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 106 return; 107 }; 108 try updatePool(r, name); 109 return; 110 } 111 112 // DELETE /work_pools/{name} 113 if (mem.eql(u8, method, "DELETE")) { 114 const name = extractPoolName(target) orelse { 115 json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 116 return; 117 }; 118 try deletePool(r, name); 119 return; 120 } 121 122 json_util.sendStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented); 123} 124 125// Path extraction helpers (pub for sub-handlers) 126 127pub fn extractPoolName(target: []const u8) ?[]const u8 { 128 const prefix = if (mem.startsWith(u8, target, "/api/work_pools/")) 129 "/api/work_pools/" 130 else if (mem.startsWith(u8, target, "/work_pools/")) 131 "/work_pools/" 132 else 133 return null; 134 135 if (target.len <= prefix.len) return null; 136 137 const after = target[prefix.len..]; 138 const end = mem.indexOf(u8, after, "/") orelse after.len; 139 if (end == 0) return null; 140 141 return after[0..end]; 142} 143 144pub fn extractQueueName(target: []const u8) ?[]const u8 { 145 const idx = mem.indexOf(u8, target, "/queues/") orelse return null; 146 const start = idx + 8; 147 if (start >= target.len) return null; 148 149 const after = target[start..]; 150 const end = mem.indexOf(u8, after, "/") orelse after.len; 151 if (end == 0) return null; 152 153 return after[0..end]; 154} 155 156pub fn extractWorkerName(target: []const u8) ?[]const u8 { 157 const idx = mem.indexOf(u8, target, "/workers/") orelse return null; 158 const start = idx + 9; 159 if (start >= target.len) return null; 160 161 const after = target[start..]; 162 if (mem.startsWith(u8, after, "heartbeat") or mem.startsWith(u8, after, "filter")) return null; 163 164 const end = mem.indexOf(u8, after, "/") orelse after.len; 165 if (end == 0) return null; 166 167 return after[0..end]; 168} 169 170fn isReservedPool(name: []const u8) bool { 171 if (name.len < 7) return false; 172 var lower: [7]u8 = undefined; 173 for (name[0..7], 0..) |c, i| { 174 lower[i] = std.ascii.toLower(c); 175 } 176 return mem.eql(u8, &lower, "prefect"); 177} 178 179// JSON helpers (pub for sub-handlers) 180 181pub fn getOptionalString(val: ?json.Value) ?[]const u8 { 182 if (val) |v| { 183 return switch (v) { 184 .string => |s| s, 185 else => null, 186 }; 187 } 188 return null; 189} 190 191pub fn getOptionalBool(val: ?json.Value) ?bool { 192 if (val) |v| { 193 return switch (v) { 194 .bool => |b| b, 195 else => null, 196 }; 197 } 198 return null; 199} 200 201pub fn getOptionalInt(val: ?json.Value) ?i64 { 202 if (val) |v| { 203 return switch (v) { 204 .integer => |i| i, 205 else => null, 206 }; 207 } 208 return null; 209} 210 211fn stringifyField(alloc: std.mem.Allocator, val: ?json.Value, default: []const u8) []const u8 { 212 if (val) |v| { 213 return std.fmt.allocPrint(alloc, "{f}", .{json.fmt(v, .{})}) catch default; 214 } 215 return default; 216} 217 218fn stringifyFieldOptional(alloc: std.mem.Allocator, val: ?json.Value) ?[]const u8 { 219 if (val) |v| { 220 return std.fmt.allocPrint(alloc, "{f}", .{json.fmt(v, .{})}) catch null; 221 } 222 return null; 223} 224 225// Pool CRUD handlers 226 227fn createPool(r: zap.Request) !void { 228 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 229 defer arena.deinit(); 230 const alloc = arena.allocator(); 231 232 const body = r.body orelse { 233 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 234 return; 235 }; 236 237 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 238 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 239 return; 240 }; 241 242 const obj = parsed.value.object; 243 244 const name = switch (obj.get("name") orelse { 245 json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); 246 return; 247 }) { 248 .string => |s| s, 249 else => { 250 json_util.sendStatus(r, "{\"detail\":\"name must be string\"}", .bad_request); 251 return; 252 }, 253 }; 254 255 if (isReservedPool(name)) { 256 json_util.sendStatus(r, "{\"detail\":\"Work pools starting with 'Prefect' are reserved.\"}", .forbidden); 257 return; 258 } 259 260 if (db.work_pools.getByName(alloc, name) catch null) |_| { 261 const err_msg = std.fmt.allocPrint(alloc, "{{\"detail\":\"Work pool '{s}' already exists.\"}}", .{name}) catch { 262 json_util.sendStatus(r, "{\"detail\":\"Work pool already exists\"}", .conflict); 263 return; 264 }; 265 json_util.sendStatus(r, err_msg, .conflict); 266 return; 267 } 268 269 const description = getOptionalString(obj.get("description")); 270 const pool_type = getOptionalString(obj.get("type")) orelse "process"; 271 const base_job_template = stringifyField(alloc, obj.get("base_job_template"), "{}"); 272 const is_paused = getOptionalBool(obj.get("is_paused")) orelse false; 273 const concurrency_limit = getOptionalInt(obj.get("concurrency_limit")); 274 275 var pool_id_buf: [36]u8 = undefined; 276 const pool_id = uuid_util.generate(&pool_id_buf); 277 278 var queue_id_buf: [36]u8 = undefined; 279 const queue_id = uuid_util.generate(&queue_id_buf); 280 281 var ts_buf: [32]u8 = undefined; 282 const now = time_util.timestamp(&ts_buf); 283 284 const status: db.work_pools.Status = if (is_paused) .paused else .not_ready; 285 286 db.work_pools.insert( 287 pool_id, 288 name, 289 description, 290 pool_type, 291 base_job_template, 292 is_paused, 293 concurrency_limit, 294 queue_id, 295 status, 296 now, 297 ) catch { 298 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 299 return; 300 }; 301 302 db.work_queues.insert(queue_id, "default", "", false, null, 1, pool_id, .not_ready, now) catch { 303 json_util.sendStatus(r, "{\"detail\":\"failed to create default queue\"}", .internal_server_error); 304 return; 305 }; 306 307 const pool = db.work_pools.getByName(alloc, name) catch null orelse { 308 json_util.sendStatus(r, "{\"detail\":\"pool not found after insert\"}", .internal_server_error); 309 return; 310 }; 311 312 const resp = writePool(alloc, pool) catch { 313 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 314 return; 315 }; 316 json_util.sendStatus(r, resp, .created); 317} 318 319fn getPool(r: zap.Request, name: []const u8) !void { 320 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 321 defer arena.deinit(); 322 const alloc = arena.allocator(); 323 324 if (db.work_pools.getByName(alloc, name) catch null) |pool| { 325 const resp = writePool(alloc, pool) catch { 326 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 327 return; 328 }; 329 json_util.send(r, resp); 330 } else { 331 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 332 } 333} 334 335fn updatePool(r: zap.Request, name: []const u8) !void { 336 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 337 defer arena.deinit(); 338 const alloc = arena.allocator(); 339 340 const body = r.body orelse { 341 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 342 return; 343 }; 344 345 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 346 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 347 return; 348 }; 349 350 const obj = parsed.value.object; 351 352 const existing = db.work_pools.getByName(alloc, name) catch null orelse { 353 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 354 return; 355 }; 356 357 if (isReservedPool(name)) { 358 const has_other = obj.get("description") != null or obj.get("base_job_template") != null; 359 if (has_other) { 360 json_util.sendStatus(r, "{\"detail\":\"Cannot modify reserved work pool.\"}", .forbidden); 361 return; 362 } 363 } 364 365 const description = getOptionalString(obj.get("description")); 366 const base_job_template = stringifyFieldOptional(alloc, obj.get("base_job_template")); 367 const is_paused = getOptionalBool(obj.get("is_paused")); 368 const concurrency_limit = getOptionalInt(obj.get("concurrency_limit")); 369 370 var new_status: ?db.work_pools.Status = null; 371 if (is_paused) |paused| { 372 if (paused) { 373 new_status = .paused; 374 } else if (existing.status == .paused) { 375 const has_workers = db.work_pools.hasOnlineWorkers(existing.id) catch false; 376 new_status = if (has_workers) .ready else .not_ready; 377 } 378 } 379 380 var ts_buf: [32]u8 = undefined; 381 const now = time_util.timestamp(&ts_buf); 382 383 const did_update = db.work_pools.updateByName( 384 name, 385 description, 386 base_job_template, 387 is_paused, 388 concurrency_limit, 389 new_status, 390 now, 391 ) catch { 392 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 393 return; 394 }; 395 396 if (!did_update) { 397 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 398 return; 399 } 400 401 r.setStatus(.no_content); 402 r.sendBody("") catch {}; 403} 404 405fn deletePool(r: zap.Request, name: []const u8) !void { 406 if (isReservedPool(name)) { 407 json_util.sendStatus(r, "{\"detail\":\"Cannot delete reserved work pool.\"}", .forbidden); 408 return; 409 } 410 411 const deleted = db.work_pools.deleteByName(name) catch { 412 json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); 413 return; 414 }; 415 416 if (!deleted) { 417 json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 418 return; 419 } 420 421 r.setStatus(.no_content); 422 r.sendBody("") catch {}; 423} 424 425fn filterPools(r: zap.Request) !void { 426 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 427 defer arena.deinit(); 428 const alloc = arena.allocator(); 429 430 var limit: usize = 200; 431 var offset: usize = 0; 432 433 if (r.body) |body| { 434 if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 435 const obj = parsed.value.object; 436 if (obj.get("limit")) |v| { 437 if (v == .integer) limit = @intCast(v.integer); 438 } 439 if (obj.get("offset")) |v| { 440 if (v == .integer) offset = @intCast(v.integer); 441 } 442 } else |_| {} 443 } 444 445 const pools = db.work_pools.list(alloc, limit, offset) catch { 446 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 447 return; 448 }; 449 450 var output: std.io.Writer.Allocating = .init(alloc); 451 var jw: json.Stringify = .{ .writer = &output.writer }; 452 453 jw.beginArray() catch { 454 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 455 return; 456 }; 457 458 for (pools) |pool| { 459 writePoolObject(&jw, pool) catch continue; 460 } 461 462 jw.endArray() catch {}; 463 464 json_util.send(r, output.toOwnedSlice() catch "[]"); 465} 466 467// JSON serialization 468 469fn writePool(alloc: std.mem.Allocator, pool: db.work_pools.WorkPoolRow) ![]const u8 { 470 var output: std.io.Writer.Allocating = .init(alloc); 471 var jw: json.Stringify = .{ .writer = &output.writer }; 472 try writePoolObject(&jw, pool); 473 return output.toOwnedSlice(); 474} 475 476fn writePoolObject(jw: *json.Stringify, pool: db.work_pools.WorkPoolRow) !void { 477 try jw.beginObject(); 478 479 try jw.objectField("id"); 480 try jw.write(pool.id); 481 482 try jw.objectField("created"); 483 try jw.write(pool.created); 484 485 try jw.objectField("updated"); 486 try jw.write(pool.updated); 487 488 try jw.objectField("name"); 489 try jw.write(pool.name); 490 491 try jw.objectField("description"); 492 if (pool.description) |d| { 493 try jw.write(d); 494 } else { 495 try jw.write(null); 496 } 497 498 try jw.objectField("type"); 499 try jw.write(pool.type); 500 501 try jw.objectField("base_job_template"); 502 try jw.beginWriteRaw(); 503 try jw.writer.writeAll(pool.base_job_template); 504 jw.endWriteRaw(); 505 506 try jw.objectField("is_paused"); 507 try jw.write(pool.is_paused); 508 509 try jw.objectField("concurrency_limit"); 510 if (pool.concurrency_limit) |c| { 511 try jw.write(c); 512 } else { 513 try jw.write(null); 514 } 515 516 try jw.objectField("default_queue_id"); 517 if (pool.default_queue_id) |q| { 518 try jw.write(q); 519 } else { 520 try jw.write(null); 521 } 522 523 try jw.objectField("status"); 524 try jw.write(pool.status.toString()); 525 526 try jw.endObject(); 527}