prefect server in zig

implement work pools, work queues, and workers

adds the foundation for worker-based execution:
- work_pool table + full CRUD API
- work_queue table + CRUD (default queue auto-created with pool)
- worker table + heartbeat upsert
- status tracking (NOT_READY → READY on first heartbeat)
- reserved pool protection (prefect-* pools)

split API handlers into separate files to stay under line limits:
- src/api/work_pools.zig (pool CRUD + router)
- src/api/work_pool_queues.zig (queue handlers)
- src/api/work_pool_workers.zig (worker handlers)

tested with Python prefect client - all operations compatible.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+2077 -17
+52 -14
ROADMAP.md
··· 44 44 - [ ] POST /api/deployments/{id}/create_flow_run 45 45 46 46 ### work pools & workers 47 - - [ ] POST /api/work_pools/ 48 - - [ ] GET /api/work_pools/{name} 49 - - [ ] POST /api/work_pools/filter 50 - - [ ] POST /api/work_pools/{name}/queues/ 51 - - [ ] POST /api/work_pools/{name}/workers/heartbeat 52 - - [ ] POST /api/work_pools/{name}/get_scheduled_flow_runs 47 + - [x] POST /api/work_pools/ 48 + - [x] GET /api/work_pools/{name} 49 + - [x] PATCH /api/work_pools/{name} 50 + - [x] DELETE /api/work_pools/{name} 51 + - [x] POST /api/work_pools/filter 52 + - [x] POST /api/work_pools/{name}/queues/ 53 + - [x] GET /api/work_pools/{name}/queues/{queue_name} 54 + - [x] PATCH /api/work_pools/{name}/queues/{queue_name} 55 + - [x] DELETE /api/work_pools/{name}/queues/{queue_name} 56 + - [x] POST /api/work_pools/{name}/queues/filter 57 + - [x] POST /api/work_pools/{name}/workers/heartbeat 58 + - [x] POST /api/work_pools/{name}/workers/filter 59 + - [x] DELETE /api/work_pools/{name}/workers/{worker_name} 60 + - [ ] POST /api/work_pools/{name}/get_scheduled_flow_runs (stub - needs deployments) 53 61 54 62 ### blocks 55 63 - [x] POST /api/block_types/ ··· 138 146 - [x] task_run table 139 147 - [ ] task_run_state table 140 148 - [ ] deployment table 141 - - [ ] work_pool table 142 - - [ ] work_queue table 143 - - [ ] worker table 149 + - [x] work_pool table 150 + - [x] work_queue table 151 + - [x] worker table 144 152 - [x] block_type table 145 153 - [x] block_document table 146 154 - [x] block_schema table ··· 171 179 172 180 ## notes 173 181 174 - priority order for next work: 175 - 1. deployments (needed for scheduled/triggered runs) 176 - 2. work_pools (needed for worker-based execution) 177 - 3. scheduler service (needed for deployment schedules) 182 + ### implementation order for worker-based execution 183 + 184 + workers are the primitive - they poll for work and execute runs. order matters: 185 + 186 + 1. ~~**work_pool** - infrastructure configuration container~~ ✓ 187 + - table + CRUD API 188 + - types: process, docker, kubernetes, etc. 189 + 190 + 2. ~~**work_queue** - routes runs to workers~~ ✓ 191 + - table + CRUD API 192 + - each pool has a default queue 193 + - queues have priority 178 194 179 - ### what's working (6.5x faster than python) 195 + 3. ~~**worker heartbeat** - workers register and poll~~ ✓ 196 + - `POST /work_pools/{name}/workers/heartbeat` 197 + - tracks worker health and last seen 198 + 199 + 4. **deployment** - flow + schedule + work pool binding (NEXT) 200 + - table + CRUD API 201 + - links flow_id → work_pool → work_queue 202 + 203 + 5. **deployment_schedule** - cron/interval/rrule schedules 204 + - table, linked to deployment 205 + - schedule type parsing 206 + 207 + 6. **scheduler service** - creates runs from schedules 208 + - background service 209 + - queries deployments needing runs 210 + - creates flow_runs in SCHEDULED state 211 + 212 + 7. **get_scheduled_flow_runs** - workers poll for work 213 + - `POST /work_pools/{name}/get_scheduled_flow_runs` 214 + - returns runs ready to execute 215 + 216 + ### what's working (~5x faster than python) 180 217 - flow/flow_run/task_run lifecycle 181 218 - blocks (types, schemas, documents) 182 219 - variables (full CRUD) 220 + - work pools, work queues, workers (full CRUD + heartbeat) 183 221 - events (ingest via websocket, persist, broadcast with backfill) 184 222 - dual database backends (sqlite/postgres) 185 223 - dual message brokers (memory/redis)
+6 -2
loq.toml
··· 2 2 3 3 [[rules]] 4 4 path = "src/db/backend.zig" 5 - max_lines = 530 5 + max_lines = 535 6 6 7 7 [[rules]] 8 8 path = "scripts/benchmark" ··· 10 10 11 11 [[rules]] 12 12 path = "scripts/test-api-sequence" 13 - max_lines = 580 13 + max_lines = 725 14 14 15 15 [[rules]] 16 16 path = "src/broker/redis.zig" 17 17 max_lines = 1020 18 + 19 + [[rules]] 20 + path = "src/api/work_pools.zig" 21 + max_lines = 550
+150
scripts/test-api-sequence
··· 500 500 return True 501 501 502 502 503 + def test_work_pools(client: CountingClient) -> bool: 504 + """Test work pools API (pools, queues, workers).""" 505 + pool_name = f"test-pool-{uuid.uuid4().hex[:8]}" 506 + 507 + # create work pool 508 + if not QUIET: 509 + console.print("[bold]work_pools[/bold]") 510 + resp = client.post("/work_pools/", json={ 511 + "name": pool_name, 512 + "type": "process", 513 + "description": "test work pool", 514 + }) 515 + if resp.status_code not in (200, 201): 516 + if not QUIET: 517 + console.print(f"[red]FAIL[/red]: create work_pool {resp.status_code} {resp.text}") 518 + return False 519 + pool = resp.json() 520 + if not validate_response(pool, ["id", "name", "type", "status", "default_queue_id"], {"id": str, "name": str}): 521 + return False 522 + if not QUIET: 523 + console.print(f" created: {pool.get('id')}") 524 + 525 + # check default queue was created 526 + if not pool.get("default_queue_id"): 527 + if not QUIET: 528 + console.print("[red]FAIL[/red]: no default_queue_id") 529 + return False 530 + 531 + # get by name 532 + resp = client.get(f"/work_pools/{pool_name}") 533 + if resp.status_code != 200: 534 + if not QUIET: 535 + console.print(f"[red]FAIL[/red]: get work_pool {resp.status_code}") 536 + return False 537 + 538 + # update 539 + resp = client.patch(f"/work_pools/{pool_name}", json={"description": "updated"}) 540 + if resp.status_code != 204: 541 + if not QUIET: 542 + console.print(f"[red]FAIL[/red]: update work_pool {resp.status_code}") 543 + return False 544 + if not QUIET: 545 + console.print(" updated") 546 + 547 + # filter 548 + resp = client.post("/work_pools/filter", json={}) 549 + if resp.status_code != 200: 550 + if not QUIET: 551 + console.print(f"[red]FAIL[/red]: filter work_pools {resp.status_code}") 552 + return False 553 + pools = resp.json() 554 + if not isinstance(pools, list): 555 + return False 556 + 557 + # create queue 558 + if not QUIET: 559 + console.print("[bold]work_queues[/bold]") 560 + queue_name = f"test-queue-{uuid.uuid4().hex[:8]}" 561 + resp = client.post(f"/work_pools/{pool_name}/queues/", json={ 562 + "name": queue_name, 563 + "description": "test queue", 564 + "priority": 5, 565 + }) 566 + if resp.status_code not in (200, 201): 567 + if not QUIET: 568 + console.print(f"[red]FAIL[/red]: create work_queue {resp.status_code} {resp.text}") 569 + return False 570 + queue = resp.json() 571 + if not validate_response(queue, ["id", "name", "priority", "work_pool_id"], {"id": str, "priority": int}): 572 + return False 573 + if not QUIET: 574 + console.print(f" created: {queue.get('id')}") 575 + 576 + # get queue 577 + resp = client.get(f"/work_pools/{pool_name}/queues/{queue_name}") 578 + if resp.status_code != 200: 579 + if not QUIET: 580 + console.print(f"[red]FAIL[/red]: get work_queue {resp.status_code}") 581 + return False 582 + 583 + # filter queues 584 + resp = client.post(f"/work_pools/{pool_name}/queues/filter", json={}) 585 + if resp.status_code != 200: 586 + if not QUIET: 587 + console.print(f"[red]FAIL[/red]: filter work_queues {resp.status_code}") 588 + return False 589 + queues = resp.json() 590 + if not isinstance(queues, list) or len(queues) < 2: # default + our queue 591 + if not QUIET: 592 + console.print(f"[red]FAIL[/red]: expected at least 2 queues, got {len(queues) if isinstance(queues, list) else 'not a list'}") 593 + return False 594 + 595 + # worker heartbeat 596 + if not QUIET: 597 + console.print("[bold]workers[/bold]") 598 + resp = client.post(f"/work_pools/{pool_name}/workers/heartbeat", json={ 599 + "name": "test-worker-1", 600 + "heartbeat_interval_seconds": 30, 601 + }) 602 + if resp.status_code != 204: 603 + if not QUIET: 604 + console.print(f"[red]FAIL[/red]: worker heartbeat {resp.status_code} {resp.text}") 605 + return False 606 + if not QUIET: 607 + console.print(" heartbeat sent") 608 + 609 + # check pool status is now READY 610 + resp = client.get(f"/work_pools/{pool_name}") 611 + if resp.status_code != 200: 612 + return False 613 + pool = resp.json() 614 + if pool.get("status") != "READY": 615 + if not QUIET: 616 + console.print(f"[red]FAIL[/red]: expected pool status READY, got {pool.get('status')}") 617 + return False 618 + if not QUIET: 619 + console.print(" pool status: READY") 620 + 621 + # filter workers 622 + resp = client.post(f"/work_pools/{pool_name}/workers/filter", json={}) 623 + if resp.status_code != 200: 624 + if not QUIET: 625 + console.print(f"[red]FAIL[/red]: filter workers {resp.status_code}") 626 + return False 627 + workers = resp.json() 628 + if not isinstance(workers, list) or len(workers) < 1: 629 + return False 630 + 631 + # delete queue (not default) 632 + resp = client.delete(f"/work_pools/{pool_name}/queues/{queue_name}") 633 + if resp.status_code != 204: 634 + if not QUIET: 635 + console.print(f"[red]FAIL[/red]: delete work_queue {resp.status_code}") 636 + return False 637 + if not QUIET: 638 + console.print(" deleted queue") 639 + 640 + # delete pool 641 + resp = client.delete(f"/work_pools/{pool_name}") 642 + if resp.status_code != 204: 643 + if not QUIET: 644 + console.print(f"[red]FAIL[/red]: delete work_pool {resp.status_code}") 645 + return False 646 + if not QUIET: 647 + console.print(" deleted pool") 648 + 649 + return True 650 + 651 + 503 652 def main(): 504 653 json_output = "--json" in sys.argv 505 654 ··· 517 666 results.append(run_test("logs", test_logs)) 518 667 results.append(run_test("variables", test_variables)) 519 668 results.append(run_test("blocks", test_blocks)) 669 + results.append(run_test("work_pools", test_work_pools)) 520 670 521 671 total_duration = sum(r.duration_ms for r in results) 522 672 total_requests = sum(r.requests for r in results)
+3
src/api/routes.zig
··· 11 11 pub const block_schemas = @import("block_schemas.zig"); 12 12 pub const block_documents = @import("block_documents.zig"); 13 13 pub const variables = @import("variables.zig"); 14 + pub const work_pools = @import("work_pools.zig"); 14 15 15 16 pub fn handle(r: zap.Request) !void { 16 17 const target = r.path orelse "/"; ··· 45 46 try block_documents.handle(r); 46 47 } else if (std.mem.startsWith(u8, target, "/api/variables") or std.mem.startsWith(u8, target, "/variables")) { 47 48 try variables.handle(r); 49 + } else if (std.mem.startsWith(u8, target, "/api/work_pools") or std.mem.startsWith(u8, target, "/work_pools")) { 50 + try work_pools.handle(r); 48 51 } else { 49 52 try sendNotFound(r); 50 53 }
+348
src/api/work_pool_queues.zig
··· 1 + const std = @import("std"); 2 + const zap = @import("zap"); 3 + const mem = std.mem; 4 + const json = std.json; 5 + 6 + const db = @import("../db/sqlite.zig"); 7 + const uuid_util = @import("../utilities/uuid.zig"); 8 + const time_util = @import("../utilities/time.zig"); 9 + const json_util = @import("../utilities/json.zig"); 10 + const pool_helpers = @import("work_pools.zig"); 11 + 12 + pub fn create(r: zap.Request, target: []const u8) !void { 13 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 14 + defer arena.deinit(); 15 + const alloc = arena.allocator(); 16 + 17 + const pool_name = pool_helpers.extractPoolName(target) orelse { 18 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 19 + return; 20 + }; 21 + 22 + const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 23 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 24 + return; 25 + }; 26 + 27 + const body = r.body orelse { 28 + json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 29 + return; 30 + }; 31 + 32 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 33 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 34 + return; 35 + }; 36 + 37 + const obj = parsed.value.object; 38 + 39 + const name = switch (obj.get("name") orelse { 40 + json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); 41 + return; 42 + }) { 43 + .string => |s| s, 44 + else => { 45 + json_util.sendStatus(r, "{\"detail\":\"name must be string\"}", .bad_request); 46 + return; 47 + }, 48 + }; 49 + 50 + if (db.work_queues.getByPoolAndName(alloc, pool.id, name) catch null) |_| { 51 + json_util.sendStatus(r, "{\"detail\":\"Work queue already exists.\"}", .conflict); 52 + return; 53 + } 54 + 55 + const description = pool_helpers.getOptionalString(obj.get("description")) orelse ""; 56 + const is_paused = pool_helpers.getOptionalBool(obj.get("is_paused")) orelse false; 57 + const concurrency_limit = pool_helpers.getOptionalInt(obj.get("concurrency_limit")); 58 + const priority = pool_helpers.getOptionalInt(obj.get("priority")) orelse (db.work_queues.nextPriority(pool.id) catch 1); 59 + 60 + var id_buf: [36]u8 = undefined; 61 + const queue_id = uuid_util.generate(&id_buf); 62 + 63 + var ts_buf: [32]u8 = undefined; 64 + const now = time_util.timestamp(&ts_buf); 65 + 66 + const status: db.work_queues.Status = if (is_paused) .paused else .not_ready; 67 + 68 + db.work_queues.insert( 69 + queue_id, 70 + name, 71 + description, 72 + is_paused, 73 + concurrency_limit, 74 + priority, 75 + pool.id, 76 + status, 77 + now, 78 + ) catch { 79 + json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 80 + return; 81 + }; 82 + 83 + const queue = db.work_queues.getByPoolAndName(alloc, pool.id, name) catch null orelse { 84 + json_util.sendStatus(r, "{\"detail\":\"queue not found after insert\"}", .internal_server_error); 85 + return; 86 + }; 87 + 88 + const resp = writeQueue(alloc, queue, pool_name) catch { 89 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 90 + return; 91 + }; 92 + json_util.sendStatus(r, resp, .created); 93 + } 94 + 95 + pub fn get(r: zap.Request, target: []const u8) !void { 96 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 97 + defer arena.deinit(); 98 + const alloc = arena.allocator(); 99 + 100 + const pool_name = pool_helpers.extractPoolName(target) orelse { 101 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 102 + return; 103 + }; 104 + 105 + const queue_name = pool_helpers.extractQueueName(target) orelse { 106 + json_util.sendStatus(r, "{\"detail\":\"queue name required\"}", .bad_request); 107 + return; 108 + }; 109 + 110 + const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 111 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 112 + return; 113 + }; 114 + 115 + if (db.work_queues.getByPoolAndName(alloc, pool.id, queue_name) catch null) |queue| { 116 + const resp = writeQueue(alloc, queue, pool_name) catch { 117 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 118 + return; 119 + }; 120 + json_util.send(r, resp); 121 + } else { 122 + json_util.sendStatus(r, "{\"detail\":\"Work queue not found.\"}", .not_found); 123 + } 124 + } 125 + 126 + pub fn update(r: zap.Request, target: []const u8) !void { 127 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 128 + defer arena.deinit(); 129 + const alloc = arena.allocator(); 130 + 131 + const pool_name = pool_helpers.extractPoolName(target) orelse { 132 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 133 + return; 134 + }; 135 + 136 + const queue_name = pool_helpers.extractQueueName(target) orelse { 137 + json_util.sendStatus(r, "{\"detail\":\"queue name required\"}", .bad_request); 138 + return; 139 + }; 140 + 141 + const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 142 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 143 + return; 144 + }; 145 + 146 + const body = r.body orelse { 147 + json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 148 + return; 149 + }; 150 + 151 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 152 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 153 + return; 154 + }; 155 + 156 + const obj = parsed.value.object; 157 + 158 + const new_name = pool_helpers.getOptionalString(obj.get("name")); 159 + const description = pool_helpers.getOptionalString(obj.get("description")); 160 + const is_paused = pool_helpers.getOptionalBool(obj.get("is_paused")); 161 + const concurrency_limit = pool_helpers.getOptionalInt(obj.get("concurrency_limit")); 162 + const priority = pool_helpers.getOptionalInt(obj.get("priority")); 163 + 164 + var new_status: ?db.work_queues.Status = null; 165 + if (is_paused) |paused| { 166 + new_status = if (paused) .paused else .not_ready; 167 + } 168 + 169 + var ts_buf: [32]u8 = undefined; 170 + const now = time_util.timestamp(&ts_buf); 171 + 172 + const did_update = db.work_queues.updateByPoolAndName( 173 + pool.id, 174 + queue_name, 175 + new_name, 176 + description, 177 + is_paused, 178 + concurrency_limit, 179 + priority, 180 + new_status, 181 + now, 182 + ) catch { 183 + json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 184 + return; 185 + }; 186 + 187 + if (!did_update) { 188 + json_util.sendStatus(r, "{\"detail\":\"Work queue not found.\"}", .not_found); 189 + return; 190 + } 191 + 192 + r.setStatus(.no_content); 193 + r.sendBody("") catch {}; 194 + } 195 + 196 + pub fn delete(r: zap.Request, target: []const u8) !void { 197 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 198 + defer arena.deinit(); 199 + const alloc = arena.allocator(); 200 + 201 + const pool_name = pool_helpers.extractPoolName(target) orelse { 202 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 203 + return; 204 + }; 205 + 206 + const queue_name = pool_helpers.extractQueueName(target) orelse { 207 + json_util.sendStatus(r, "{\"detail\":\"queue name required\"}", .bad_request); 208 + return; 209 + }; 210 + 211 + const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 212 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 213 + return; 214 + }; 215 + 216 + if (pool.default_queue_id) |default_id| { 217 + if (db.work_queues.getByPoolAndName(alloc, pool.id, queue_name) catch null) |queue| { 218 + if (mem.eql(u8, queue.id, default_id)) { 219 + json_util.sendStatus(r, "{\"detail\":\"Cannot delete the default work queue.\"}", .bad_request); 220 + return; 221 + } 222 + } 223 + } 224 + 225 + const deleted = db.work_queues.deleteByPoolAndName(pool.id, queue_name) catch { 226 + json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); 227 + return; 228 + }; 229 + 230 + if (!deleted) { 231 + json_util.sendStatus(r, "{\"detail\":\"Work queue not found.\"}", .not_found); 232 + return; 233 + } 234 + 235 + r.setStatus(.no_content); 236 + r.sendBody("") catch {}; 237 + } 238 + 239 + pub fn filter(r: zap.Request, target: []const u8) !void { 240 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 241 + defer arena.deinit(); 242 + const alloc = arena.allocator(); 243 + 244 + const pool_name = pool_helpers.extractPoolName(target) orelse { 245 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 246 + return; 247 + }; 248 + 249 + const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 250 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 251 + return; 252 + }; 253 + 254 + var limit: usize = 10; 255 + var offset: usize = 0; 256 + 257 + if (r.body) |body| { 258 + if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 259 + const obj = parsed.value.object; 260 + if (obj.get("limit")) |v| { 261 + if (v == .integer) limit = @intCast(v.integer); 262 + } 263 + if (obj.get("offset")) |v| { 264 + if (v == .integer) offset = @intCast(v.integer); 265 + } 266 + } else |_| {} 267 + } 268 + 269 + const queues = db.work_queues.listByPool(alloc, pool.id, limit, offset) catch { 270 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 271 + return; 272 + }; 273 + 274 + var output: std.io.Writer.Allocating = .init(alloc); 275 + var jw: json.Stringify = .{ .writer = &output.writer }; 276 + 277 + jw.beginArray() catch { 278 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 279 + return; 280 + }; 281 + 282 + for (queues) |queue| { 283 + writeQueueObject(&jw, queue, pool_name) catch continue; 284 + } 285 + 286 + jw.endArray() catch {}; 287 + 288 + json_util.send(r, output.toOwnedSlice() catch "[]"); 289 + } 290 + 291 + // JSON serialization 292 + 293 + fn writeQueue(alloc: std.mem.Allocator, queue: db.work_queues.WorkQueueRow, pool_name: []const u8) ![]const u8 { 294 + var output: std.io.Writer.Allocating = .init(alloc); 295 + var jw: json.Stringify = .{ .writer = &output.writer }; 296 + try writeQueueObject(&jw, queue, pool_name); 297 + return output.toOwnedSlice(); 298 + } 299 + 300 + fn writeQueueObject(jw: *json.Stringify, queue: db.work_queues.WorkQueueRow, pool_name: []const u8) !void { 301 + try jw.beginObject(); 302 + 303 + try jw.objectField("id"); 304 + try jw.write(queue.id); 305 + 306 + try jw.objectField("created"); 307 + try jw.write(queue.created); 308 + 309 + try jw.objectField("updated"); 310 + try jw.write(queue.updated); 311 + 312 + try jw.objectField("name"); 313 + try jw.write(queue.name); 314 + 315 + try jw.objectField("description"); 316 + try jw.write(queue.description); 317 + 318 + try jw.objectField("is_paused"); 319 + try jw.write(queue.is_paused); 320 + 321 + try jw.objectField("concurrency_limit"); 322 + if (queue.concurrency_limit) |c| { 323 + try jw.write(c); 324 + } else { 325 + try jw.write(null); 326 + } 327 + 328 + try jw.objectField("priority"); 329 + try jw.write(queue.priority); 330 + 331 + try jw.objectField("work_pool_id"); 332 + try jw.write(queue.work_pool_id); 333 + 334 + try jw.objectField("work_pool_name"); 335 + try jw.write(pool_name); 336 + 337 + try jw.objectField("last_polled"); 338 + if (queue.last_polled) |lp| { 339 + try jw.write(lp); 340 + } else { 341 + try jw.write(null); 342 + } 343 + 344 + try jw.objectField("status"); 345 + try jw.write(queue.status.toString()); 346 + 347 + try jw.endObject(); 348 + }
+195
src/api/work_pool_workers.zig
··· 1 + const std = @import("std"); 2 + const zap = @import("zap"); 3 + const json = std.json; 4 + 5 + const db = @import("../db/sqlite.zig"); 6 + const uuid_util = @import("../utilities/uuid.zig"); 7 + const time_util = @import("../utilities/time.zig"); 8 + const json_util = @import("../utilities/json.zig"); 9 + const pool_helpers = @import("work_pools.zig"); 10 + 11 + pub fn heartbeat(r: zap.Request, target: []const u8) !void { 12 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 13 + defer arena.deinit(); 14 + const alloc = arena.allocator(); 15 + 16 + const pool_name = pool_helpers.extractPoolName(target) orelse { 17 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 18 + return; 19 + }; 20 + 21 + const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 22 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 23 + return; 24 + }; 25 + 26 + const body = r.body orelse { 27 + json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 28 + return; 29 + }; 30 + 31 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 32 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 33 + return; 34 + }; 35 + 36 + const obj = parsed.value.object; 37 + 38 + const worker_name = switch (obj.get("name") orelse { 39 + json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); 40 + return; 41 + }) { 42 + .string => |s| s, 43 + else => { 44 + json_util.sendStatus(r, "{\"detail\":\"name must be string\"}", .bad_request); 45 + return; 46 + }, 47 + }; 48 + 49 + const heartbeat_interval = pool_helpers.getOptionalInt(obj.get("heartbeat_interval_seconds")); 50 + 51 + var id_buf: [36]u8 = undefined; 52 + const worker_id = uuid_util.generate(&id_buf); 53 + 54 + var ts_buf: [32]u8 = undefined; 55 + const now = time_util.timestamp(&ts_buf); 56 + 57 + db.workers.upsertHeartbeat(worker_id, worker_name, pool.id, heartbeat_interval, now) catch { 58 + json_util.sendStatus(r, "{\"detail\":\"heartbeat failed\"}", .internal_server_error); 59 + return; 60 + }; 61 + 62 + // Update pool status to READY if it was NOT_READY 63 + if (pool.status == .not_ready) { 64 + _ = db.work_pools.updateStatus(pool_name, .ready, now) catch {}; 65 + } 66 + 67 + r.setStatus(.no_content); 68 + r.sendBody("") catch {}; 69 + } 70 + 71 + pub fn filter(r: zap.Request, target: []const u8) !void { 72 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 73 + defer arena.deinit(); 74 + const alloc = arena.allocator(); 75 + 76 + const pool_name = pool_helpers.extractPoolName(target) orelse { 77 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 78 + return; 79 + }; 80 + 81 + const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 82 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 83 + return; 84 + }; 85 + 86 + var limit: usize = 10; 87 + var offset: usize = 0; 88 + 89 + if (r.body) |body| { 90 + if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 91 + const obj = parsed.value.object; 92 + if (obj.get("limit")) |v| { 93 + if (v == .integer) limit = @intCast(v.integer); 94 + } 95 + if (obj.get("offset")) |v| { 96 + if (v == .integer) offset = @intCast(v.integer); 97 + } 98 + } else |_| {} 99 + } 100 + 101 + const workers_list = db.workers.listByPool(alloc, pool.id, limit, offset) catch { 102 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 103 + return; 104 + }; 105 + 106 + var output: std.io.Writer.Allocating = .init(alloc); 107 + var jw: json.Stringify = .{ .writer = &output.writer }; 108 + 109 + jw.beginArray() catch { 110 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 111 + return; 112 + }; 113 + 114 + for (workers_list) |worker| { 115 + writeWorkerObject(&jw, worker) catch continue; 116 + } 117 + 118 + jw.endArray() catch {}; 119 + 120 + json_util.send(r, output.toOwnedSlice() catch "[]"); 121 + } 122 + 123 + pub fn delete(r: zap.Request, target: []const u8) !void { 124 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 125 + defer arena.deinit(); 126 + const alloc = arena.allocator(); 127 + 128 + const pool_name = pool_helpers.extractPoolName(target) orelse { 129 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 130 + return; 131 + }; 132 + 133 + const worker_name = pool_helpers.extractWorkerName(target) orelse { 134 + json_util.sendStatus(r, "{\"detail\":\"worker name required\"}", .bad_request); 135 + return; 136 + }; 137 + 138 + const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 139 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 140 + return; 141 + }; 142 + 143 + const deleted = db.workers.deleteByPoolAndName(pool.id, worker_name) catch { 144 + json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); 145 + return; 146 + }; 147 + 148 + if (!deleted) { 149 + json_util.sendStatus(r, "{\"detail\":\"Worker not found.\"}", .not_found); 150 + return; 151 + } 152 + 153 + r.setStatus(.no_content); 154 + r.sendBody("") catch {}; 155 + } 156 + 157 + // JSON serialization 158 + 159 + fn writeWorkerObject(jw: *json.Stringify, worker: db.workers.WorkerRow) !void { 160 + try jw.beginObject(); 161 + 162 + try jw.objectField("id"); 163 + try jw.write(worker.id); 164 + 165 + try jw.objectField("created"); 166 + try jw.write(worker.created); 167 + 168 + try jw.objectField("updated"); 169 + try jw.write(worker.updated); 170 + 171 + try jw.objectField("name"); 172 + try jw.write(worker.name); 173 + 174 + try jw.objectField("work_pool_id"); 175 + try jw.write(worker.work_pool_id); 176 + 177 + try jw.objectField("last_heartbeat_time"); 178 + if (worker.last_heartbeat_time) |lh| { 179 + try jw.write(lh); 180 + } else { 181 + try jw.write(null); 182 + } 183 + 184 + try jw.objectField("heartbeat_interval_seconds"); 185 + if (worker.heartbeat_interval_seconds) |hi| { 186 + try jw.write(hi); 187 + } else { 188 + try jw.write(null); 189 + } 190 + 191 + try jw.objectField("status"); 192 + try jw.write(worker.status.toString()); 193 + 194 + try jw.endObject(); 195 + }
+545
src/api/work_pools.zig
··· 1 + const std = @import("std"); 2 + const zap = @import("zap"); 3 + const mem = std.mem; 4 + const json = std.json; 5 + 6 + const db = @import("../db/sqlite.zig"); 7 + const uuid_util = @import("../utilities/uuid.zig"); 8 + const time_util = @import("../utilities/time.zig"); 9 + const json_util = @import("../utilities/json.zig"); 10 + 11 + // sub-handlers 12 + const queues = @import("work_pool_queues.zig"); 13 + const workers = @import("work_pool_workers.zig"); 14 + 15 + pub fn handle(r: zap.Request) !void { 16 + const target = r.path orelse "/"; 17 + const method = r.method orelse "GET"; 18 + 19 + // POST /work_pools/filter - list work pools 20 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) { 21 + if (mem.indexOf(u8, target, "/queues/filter") != null) { 22 + try queues.filter(r, target); 23 + return; 24 + } 25 + if (mem.indexOf(u8, target, "/workers/filter") != null) { 26 + try workers.filter(r, target); 27 + return; 28 + } 29 + try filterPools(r); 30 + return; 31 + } 32 + 33 + // POST /work_pools/{name}/workers/heartbeat 34 + if (mem.eql(u8, method, "POST") and mem.indexOf(u8, target, "/workers/heartbeat") != null) { 35 + try workers.heartbeat(r, target); 36 + return; 37 + } 38 + 39 + // POST /work_pools/{name}/get_scheduled_flow_runs 40 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/get_scheduled_flow_runs")) { 41 + try getScheduledFlowRuns(r, target); 42 + return; 43 + } 44 + 45 + // POST /work_pools/{name}/queues/ - create queue 46 + if (mem.eql(u8, method, "POST") and mem.indexOf(u8, target, "/queues/") != null) { 47 + const queues_idx = mem.indexOf(u8, target, "/queues/") orelse { 48 + json_util.sendStatus(r, "{\"detail\":\"invalid path\"}", .bad_request); 49 + return; 50 + }; 51 + const after_queues = target[queues_idx + 8 ..]; 52 + if (after_queues.len == 0 or mem.eql(u8, after_queues, "/")) { 53 + try queues.create(r, target); 54 + return; 55 + } 56 + } 57 + 58 + // POST /work_pools/ - create pool 59 + if (mem.eql(u8, method, "POST")) { 60 + const is_root = mem.endsWith(u8, target, "/work_pools/") or mem.endsWith(u8, target, "/work_pools"); 61 + if (is_root) { 62 + try createPool(r); 63 + return; 64 + } 65 + } 66 + 67 + // GET /work_pools/{name}/queues/{queue_name} 68 + if (mem.eql(u8, method, "GET") and mem.indexOf(u8, target, "/queues/") != null) { 69 + try queues.get(r, target); 70 + return; 71 + } 72 + 73 + // PATCH /work_pools/{name}/queues/{queue_name} 74 + if (mem.eql(u8, method, "PATCH") and mem.indexOf(u8, target, "/queues/") != null) { 75 + try queues.update(r, target); 76 + return; 77 + } 78 + 79 + // DELETE /work_pools/{name}/queues/{queue_name} 80 + if (mem.eql(u8, method, "DELETE") and mem.indexOf(u8, target, "/queues/") != null) { 81 + try queues.delete(r, target); 82 + return; 83 + } 84 + 85 + // DELETE /work_pools/{name}/workers/{worker_name} 86 + if (mem.eql(u8, method, "DELETE") and mem.indexOf(u8, target, "/workers/") != null) { 87 + try workers.delete(r, target); 88 + return; 89 + } 90 + 91 + // GET /work_pools/{name} 92 + if (mem.eql(u8, method, "GET")) { 93 + const name = extractPoolName(target) orelse { 94 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 95 + return; 96 + }; 97 + try getPool(r, name); 98 + return; 99 + } 100 + 101 + // PATCH /work_pools/{name} 102 + if (mem.eql(u8, method, "PATCH")) { 103 + const name = extractPoolName(target) orelse { 104 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 105 + return; 106 + }; 107 + try updatePool(r, name); 108 + return; 109 + } 110 + 111 + // DELETE /work_pools/{name} 112 + if (mem.eql(u8, method, "DELETE")) { 113 + const name = extractPoolName(target) orelse { 114 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 115 + return; 116 + }; 117 + try deletePool(r, name); 118 + return; 119 + } 120 + 121 + json_util.sendStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented); 122 + } 123 + 124 + // Path extraction helpers (pub for sub-handlers) 125 + 126 + pub fn extractPoolName(target: []const u8) ?[]const u8 { 127 + const prefix = if (mem.startsWith(u8, target, "/api/work_pools/")) 128 + "/api/work_pools/" 129 + else if (mem.startsWith(u8, target, "/work_pools/")) 130 + "/work_pools/" 131 + else 132 + return null; 133 + 134 + if (target.len <= prefix.len) return null; 135 + 136 + const after = target[prefix.len..]; 137 + const end = mem.indexOf(u8, after, "/") orelse after.len; 138 + if (end == 0) return null; 139 + 140 + return after[0..end]; 141 + } 142 + 143 + pub fn extractQueueName(target: []const u8) ?[]const u8 { 144 + const idx = mem.indexOf(u8, target, "/queues/") orelse return null; 145 + const start = idx + 8; 146 + if (start >= target.len) return null; 147 + 148 + const after = target[start..]; 149 + const end = mem.indexOf(u8, after, "/") orelse after.len; 150 + if (end == 0) return null; 151 + 152 + return after[0..end]; 153 + } 154 + 155 + pub fn extractWorkerName(target: []const u8) ?[]const u8 { 156 + const idx = mem.indexOf(u8, target, "/workers/") orelse return null; 157 + const start = idx + 9; 158 + if (start >= target.len) return null; 159 + 160 + const after = target[start..]; 161 + if (mem.startsWith(u8, after, "heartbeat") or mem.startsWith(u8, after, "filter")) return null; 162 + 163 + const end = mem.indexOf(u8, after, "/") orelse after.len; 164 + if (end == 0) return null; 165 + 166 + return after[0..end]; 167 + } 168 + 169 + fn isReservedPool(name: []const u8) bool { 170 + if (name.len < 7) return false; 171 + var lower: [7]u8 = undefined; 172 + for (name[0..7], 0..) |c, i| { 173 + lower[i] = std.ascii.toLower(c); 174 + } 175 + return mem.eql(u8, &lower, "prefect"); 176 + } 177 + 178 + // JSON helpers (pub for sub-handlers) 179 + 180 + pub fn getOptionalString(val: ?json.Value) ?[]const u8 { 181 + if (val) |v| { 182 + return switch (v) { 183 + .string => |s| s, 184 + else => null, 185 + }; 186 + } 187 + return null; 188 + } 189 + 190 + pub fn getOptionalBool(val: ?json.Value) ?bool { 191 + if (val) |v| { 192 + return switch (v) { 193 + .bool => |b| b, 194 + else => null, 195 + }; 196 + } 197 + return null; 198 + } 199 + 200 + pub fn getOptionalInt(val: ?json.Value) ?i64 { 201 + if (val) |v| { 202 + return switch (v) { 203 + .integer => |i| i, 204 + else => null, 205 + }; 206 + } 207 + return null; 208 + } 209 + 210 + fn stringifyField(alloc: std.mem.Allocator, val: ?json.Value, default: []const u8) []const u8 { 211 + if (val) |v| { 212 + return std.fmt.allocPrint(alloc, "{f}", .{json.fmt(v, .{})}) catch default; 213 + } 214 + return default; 215 + } 216 + 217 + fn stringifyFieldOptional(alloc: std.mem.Allocator, val: ?json.Value) ?[]const u8 { 218 + if (val) |v| { 219 + return std.fmt.allocPrint(alloc, "{f}", .{json.fmt(v, .{})}) catch null; 220 + } 221 + return null; 222 + } 223 + 224 + // Pool CRUD handlers 225 + 226 + fn createPool(r: zap.Request) !void { 227 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 228 + defer arena.deinit(); 229 + const alloc = arena.allocator(); 230 + 231 + const body = r.body orelse { 232 + json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 233 + return; 234 + }; 235 + 236 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 237 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 238 + return; 239 + }; 240 + 241 + const obj = parsed.value.object; 242 + 243 + const name = switch (obj.get("name") orelse { 244 + json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); 245 + return; 246 + }) { 247 + .string => |s| s, 248 + else => { 249 + json_util.sendStatus(r, "{\"detail\":\"name must be string\"}", .bad_request); 250 + return; 251 + }, 252 + }; 253 + 254 + if (isReservedPool(name)) { 255 + json_util.sendStatus(r, "{\"detail\":\"Work pools starting with 'Prefect' are reserved.\"}", .forbidden); 256 + return; 257 + } 258 + 259 + if (db.work_pools.getByName(alloc, name) catch null) |_| { 260 + const err_msg = std.fmt.allocPrint(alloc, "{{\"detail\":\"Work pool '{s}' already exists.\"}}", .{name}) catch { 261 + json_util.sendStatus(r, "{\"detail\":\"Work pool already exists\"}", .conflict); 262 + return; 263 + }; 264 + json_util.sendStatus(r, err_msg, .conflict); 265 + return; 266 + } 267 + 268 + const description = getOptionalString(obj.get("description")); 269 + const pool_type = getOptionalString(obj.get("type")) orelse "process"; 270 + const base_job_template = stringifyField(alloc, obj.get("base_job_template"), "{}"); 271 + const is_paused = getOptionalBool(obj.get("is_paused")) orelse false; 272 + const concurrency_limit = getOptionalInt(obj.get("concurrency_limit")); 273 + 274 + var pool_id_buf: [36]u8 = undefined; 275 + const pool_id = uuid_util.generate(&pool_id_buf); 276 + 277 + var queue_id_buf: [36]u8 = undefined; 278 + const queue_id = uuid_util.generate(&queue_id_buf); 279 + 280 + var ts_buf: [32]u8 = undefined; 281 + const now = time_util.timestamp(&ts_buf); 282 + 283 + const status: db.work_pools.Status = if (is_paused) .paused else .not_ready; 284 + 285 + db.work_pools.insert( 286 + pool_id, 287 + name, 288 + description, 289 + pool_type, 290 + base_job_template, 291 + is_paused, 292 + concurrency_limit, 293 + queue_id, 294 + status, 295 + now, 296 + ) catch { 297 + json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 298 + return; 299 + }; 300 + 301 + db.work_queues.insert(queue_id, "default", "", false, null, 1, pool_id, .not_ready, now) catch { 302 + json_util.sendStatus(r, "{\"detail\":\"failed to create default queue\"}", .internal_server_error); 303 + return; 304 + }; 305 + 306 + const pool = db.work_pools.getByName(alloc, name) catch null orelse { 307 + json_util.sendStatus(r, "{\"detail\":\"pool not found after insert\"}", .internal_server_error); 308 + return; 309 + }; 310 + 311 + const resp = writePool(alloc, pool) catch { 312 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 313 + return; 314 + }; 315 + json_util.sendStatus(r, resp, .created); 316 + } 317 + 318 + fn getPool(r: zap.Request, name: []const u8) !void { 319 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 320 + defer arena.deinit(); 321 + const alloc = arena.allocator(); 322 + 323 + if (db.work_pools.getByName(alloc, name) catch null) |pool| { 324 + const resp = writePool(alloc, pool) catch { 325 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 326 + return; 327 + }; 328 + json_util.send(r, resp); 329 + } else { 330 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 331 + } 332 + } 333 + 334 + fn updatePool(r: zap.Request, name: []const u8) !void { 335 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 336 + defer arena.deinit(); 337 + const alloc = arena.allocator(); 338 + 339 + const body = r.body orelse { 340 + json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 341 + return; 342 + }; 343 + 344 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 345 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 346 + return; 347 + }; 348 + 349 + const obj = parsed.value.object; 350 + 351 + const existing = db.work_pools.getByName(alloc, name) catch null orelse { 352 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 353 + return; 354 + }; 355 + 356 + if (isReservedPool(name)) { 357 + const has_other = obj.get("description") != null or obj.get("base_job_template") != null; 358 + if (has_other) { 359 + json_util.sendStatus(r, "{\"detail\":\"Cannot modify reserved work pool.\"}", .forbidden); 360 + return; 361 + } 362 + } 363 + 364 + const description = getOptionalString(obj.get("description")); 365 + const base_job_template = stringifyFieldOptional(alloc, obj.get("base_job_template")); 366 + const is_paused = getOptionalBool(obj.get("is_paused")); 367 + const concurrency_limit = getOptionalInt(obj.get("concurrency_limit")); 368 + 369 + var new_status: ?db.work_pools.Status = null; 370 + if (is_paused) |paused| { 371 + if (paused) { 372 + new_status = .paused; 373 + } else if (existing.status == .paused) { 374 + const has_workers = db.work_pools.hasOnlineWorkers(existing.id) catch false; 375 + new_status = if (has_workers) .ready else .not_ready; 376 + } 377 + } 378 + 379 + var ts_buf: [32]u8 = undefined; 380 + const now = time_util.timestamp(&ts_buf); 381 + 382 + const did_update = db.work_pools.updateByName( 383 + name, 384 + description, 385 + base_job_template, 386 + is_paused, 387 + concurrency_limit, 388 + new_status, 389 + now, 390 + ) catch { 391 + json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 392 + return; 393 + }; 394 + 395 + if (!did_update) { 396 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 397 + return; 398 + } 399 + 400 + r.setStatus(.no_content); 401 + r.sendBody("") catch {}; 402 + } 403 + 404 + fn deletePool(r: zap.Request, name: []const u8) !void { 405 + if (isReservedPool(name)) { 406 + json_util.sendStatus(r, "{\"detail\":\"Cannot delete reserved work pool.\"}", .forbidden); 407 + return; 408 + } 409 + 410 + const deleted = db.work_pools.deleteByName(name) catch { 411 + json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); 412 + return; 413 + }; 414 + 415 + if (!deleted) { 416 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 417 + return; 418 + } 419 + 420 + r.setStatus(.no_content); 421 + r.sendBody("") catch {}; 422 + } 423 + 424 + fn filterPools(r: zap.Request) !void { 425 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 426 + defer arena.deinit(); 427 + const alloc = arena.allocator(); 428 + 429 + var limit: usize = 10; 430 + var offset: usize = 0; 431 + 432 + if (r.body) |body| { 433 + if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 434 + const obj = parsed.value.object; 435 + if (obj.get("limit")) |v| { 436 + if (v == .integer) limit = @intCast(v.integer); 437 + } 438 + if (obj.get("offset")) |v| { 439 + if (v == .integer) offset = @intCast(v.integer); 440 + } 441 + } else |_| {} 442 + } 443 + 444 + const pools = db.work_pools.list(alloc, limit, offset) catch { 445 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 446 + return; 447 + }; 448 + 449 + var output: std.io.Writer.Allocating = .init(alloc); 450 + var jw: json.Stringify = .{ .writer = &output.writer }; 451 + 452 + jw.beginArray() catch { 453 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 454 + return; 455 + }; 456 + 457 + for (pools) |pool| { 458 + writePoolObject(&jw, pool) catch continue; 459 + } 460 + 461 + jw.endArray() catch {}; 462 + 463 + json_util.send(r, output.toOwnedSlice() catch "[]"); 464 + } 465 + 466 + fn getScheduledFlowRuns(r: zap.Request, target: []const u8) !void { 467 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 468 + defer arena.deinit(); 469 + const alloc = arena.allocator(); 470 + 471 + const pool_name = extractPoolName(target) orelse { 472 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 473 + return; 474 + }; 475 + 476 + _ = db.work_pools.getByName(alloc, pool_name) catch null orelse { 477 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 478 + return; 479 + }; 480 + 481 + // TODO: Implement when deployments are added 482 + json_util.send(r, "[]"); 483 + } 484 + 485 + // JSON serialization 486 + 487 + fn writePool(alloc: std.mem.Allocator, pool: db.work_pools.WorkPoolRow) ![]const u8 { 488 + var output: std.io.Writer.Allocating = .init(alloc); 489 + var jw: json.Stringify = .{ .writer = &output.writer }; 490 + try writePoolObject(&jw, pool); 491 + return output.toOwnedSlice(); 492 + } 493 + 494 + fn writePoolObject(jw: *json.Stringify, pool: db.work_pools.WorkPoolRow) !void { 495 + try jw.beginObject(); 496 + 497 + try jw.objectField("id"); 498 + try jw.write(pool.id); 499 + 500 + try jw.objectField("created"); 501 + try jw.write(pool.created); 502 + 503 + try jw.objectField("updated"); 504 + try jw.write(pool.updated); 505 + 506 + try jw.objectField("name"); 507 + try jw.write(pool.name); 508 + 509 + try jw.objectField("description"); 510 + if (pool.description) |d| { 511 + try jw.write(d); 512 + } else { 513 + try jw.write(null); 514 + } 515 + 516 + try jw.objectField("type"); 517 + try jw.write(pool.type); 518 + 519 + try jw.objectField("base_job_template"); 520 + try jw.beginWriteRaw(); 521 + try jw.writer.writeAll(pool.base_job_template); 522 + jw.endWriteRaw(); 523 + 524 + try jw.objectField("is_paused"); 525 + try jw.write(pool.is_paused); 526 + 527 + try jw.objectField("concurrency_limit"); 528 + if (pool.concurrency_limit) |c| { 529 + try jw.write(c); 530 + } else { 531 + try jw.write(null); 532 + } 533 + 534 + try jw.objectField("default_queue_id"); 535 + if (pool.default_queue_id) |q| { 536 + try jw.write(q); 537 + } else { 538 + try jw.write(null); 539 + } 540 + 541 + try jw.objectField("status"); 542 + try jw.write(pool.status.toString()); 543 + 544 + try jw.endObject(); 545 + }
+5 -1
src/db/backend.zig
··· 69 69 70 70 pub fn textOrNull(self: Row, col: usize) ?[]const u8 { 71 71 return switch (self) { 72 - .sqlite => |r| r.textOrNull(col), 72 + .sqlite => |r| { 73 + // SQLite returns empty string for NULL, so check length 74 + const txt = r.text(col); 75 + return if (txt.len > 0) txt else null; 76 + }, 73 77 .postgres => |r| r.getTextOrNull(col), 74 78 }; 75 79 }
+57
src/db/schema/postgres.zig
··· 156 156 \\) 157 157 , .{}); 158 158 159 + // work_pool table 160 + // NOTE: Using INTEGER for is_paused to match SQLite 161 + try backend.db.exec( 162 + \\CREATE TABLE IF NOT EXISTS work_pool ( 163 + \\ id TEXT PRIMARY KEY, 164 + \\ created TEXT NOT NULL, 165 + \\ updated TEXT NOT NULL, 166 + \\ name TEXT NOT NULL UNIQUE, 167 + \\ description TEXT, 168 + \\ type TEXT NOT NULL DEFAULT 'process', 169 + \\ base_job_template JSONB DEFAULT '{}', 170 + \\ is_paused INTEGER DEFAULT 0, 171 + \\ concurrency_limit BIGINT, 172 + \\ default_queue_id TEXT, 173 + \\ status TEXT DEFAULT 'NOT_READY' 174 + \\) 175 + , .{}); 176 + 177 + // work_queue table 178 + // NOTE: Using INTEGER for is_paused to match SQLite 179 + try backend.db.exec( 180 + \\CREATE TABLE IF NOT EXISTS work_queue ( 181 + \\ id TEXT PRIMARY KEY, 182 + \\ created TEXT NOT NULL, 183 + \\ updated TEXT NOT NULL, 184 + \\ name TEXT NOT NULL, 185 + \\ description TEXT DEFAULT '', 186 + \\ is_paused INTEGER DEFAULT 0, 187 + \\ concurrency_limit BIGINT, 188 + \\ priority BIGINT DEFAULT 1, 189 + \\ work_pool_id TEXT NOT NULL REFERENCES work_pool(id) ON DELETE CASCADE, 190 + \\ last_polled TEXT, 191 + \\ status TEXT DEFAULT 'NOT_READY', 192 + \\ UNIQUE(work_pool_id, name) 193 + \\) 194 + , .{}); 195 + 196 + // worker table 197 + try backend.db.exec( 198 + \\CREATE TABLE IF NOT EXISTS worker ( 199 + \\ id TEXT PRIMARY KEY, 200 + \\ created TEXT NOT NULL, 201 + \\ updated TEXT NOT NULL, 202 + \\ name TEXT NOT NULL, 203 + \\ work_pool_id TEXT NOT NULL REFERENCES work_pool(id) ON DELETE CASCADE, 204 + \\ last_heartbeat_time TEXT, 205 + \\ heartbeat_interval_seconds BIGINT, 206 + \\ status TEXT DEFAULT 'OFFLINE', 207 + \\ UNIQUE(work_pool_id, name) 208 + \\) 209 + , .{}); 210 + 159 211 // indexes 160 212 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)", .{}); 161 213 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)", .{}); ··· 172 224 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__block_schema_id ON block_document(block_schema_id)", .{}); 173 225 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__name ON block_document(name)", .{}); 174 226 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_variable__name ON variable(name)", .{}); 227 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_pool__name ON work_pool(name)", .{}); 228 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_pool__type ON work_pool(type)", .{}); 229 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__work_pool_id ON work_queue(work_pool_id)", .{}); 230 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__priority ON work_queue(priority)", .{}); 231 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_worker__work_pool_id ON worker(work_pool_id)", .{}); 175 232 176 233 log.info("database", "postgres schema initialized", .{}); 177 234 }
+55
src/db/schema/sqlite.zig
··· 151 151 \\) 152 152 , .{}); 153 153 154 + // work_pool table 155 + try backend.db.exec( 156 + \\CREATE TABLE IF NOT EXISTS work_pool ( 157 + \\ id TEXT PRIMARY KEY, 158 + \\ created TEXT NOT NULL, 159 + \\ updated TEXT NOT NULL, 160 + \\ name TEXT NOT NULL UNIQUE, 161 + \\ description TEXT, 162 + \\ type TEXT NOT NULL DEFAULT 'process', 163 + \\ base_job_template TEXT DEFAULT '{}', 164 + \\ is_paused INTEGER DEFAULT 0, 165 + \\ concurrency_limit INTEGER, 166 + \\ default_queue_id TEXT, 167 + \\ status TEXT DEFAULT 'NOT_READY' 168 + \\) 169 + , .{}); 170 + 171 + // work_queue table 172 + try backend.db.exec( 173 + \\CREATE TABLE IF NOT EXISTS work_queue ( 174 + \\ id TEXT PRIMARY KEY, 175 + \\ created TEXT NOT NULL, 176 + \\ updated TEXT NOT NULL, 177 + \\ name TEXT NOT NULL, 178 + \\ description TEXT DEFAULT '', 179 + \\ is_paused INTEGER DEFAULT 0, 180 + \\ concurrency_limit INTEGER, 181 + \\ priority INTEGER DEFAULT 1, 182 + \\ work_pool_id TEXT NOT NULL REFERENCES work_pool(id) ON DELETE CASCADE, 183 + \\ last_polled TEXT, 184 + \\ status TEXT DEFAULT 'NOT_READY', 185 + \\ UNIQUE(work_pool_id, name) 186 + \\) 187 + , .{}); 188 + 189 + // worker table 190 + try backend.db.exec( 191 + \\CREATE TABLE IF NOT EXISTS worker ( 192 + \\ id TEXT PRIMARY KEY, 193 + \\ created TEXT NOT NULL, 194 + \\ updated TEXT NOT NULL, 195 + \\ name TEXT NOT NULL, 196 + \\ work_pool_id TEXT NOT NULL REFERENCES work_pool(id) ON DELETE CASCADE, 197 + \\ last_heartbeat_time TEXT, 198 + \\ heartbeat_interval_seconds INTEGER, 199 + \\ status TEXT DEFAULT 'OFFLINE', 200 + \\ UNIQUE(work_pool_id, name) 201 + \\) 202 + , .{}); 203 + 154 204 // indexes 155 205 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)", .{}); 156 206 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)", .{}); ··· 167 217 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__block_schema_id ON block_document(block_schema_id)", .{}); 168 218 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_block_document__name ON block_document(name)", .{}); 169 219 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_variable__name ON variable(name)", .{}); 220 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_pool__name ON work_pool(name)", .{}); 221 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_pool__type ON work_pool(type)", .{}); 222 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__work_pool_id ON work_queue(work_pool_id)", .{}); 223 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__priority ON work_queue(priority)", .{}); 224 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_worker__work_pool_id ON worker(work_pool_id)", .{}); 170 225 171 226 log.info("database", "sqlite schema initialized", .{}); 172 227 }
+3
src/db/sqlite.zig
··· 15 15 pub const block_schemas = @import("block_schemas.zig"); 16 16 pub const block_documents = @import("block_documents.zig"); 17 17 pub const variables = @import("variables.zig"); 18 + pub const work_pools = @import("work_pools.zig"); 19 + pub const work_queues = @import("work_queues.zig"); 20 + pub const workers = @import("workers.zig"); 18 21 19 22 // re-export types for compatibility 20 23 pub const FlowRow = flows.FlowRow;
+237
src/db/work_pools.zig
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + 4 + const backend = @import("backend.zig"); 5 + const log = @import("../logging.zig"); 6 + 7 + /// Work pool status enum - matches Python's WorkPoolStatus 8 + pub const Status = enum { 9 + not_ready, 10 + ready, 11 + paused, 12 + 13 + pub fn fromString(s: []const u8) Status { 14 + if (std.mem.eql(u8, s, "READY")) return .ready; 15 + if (std.mem.eql(u8, s, "PAUSED")) return .paused; 16 + return .not_ready; 17 + } 18 + 19 + pub fn toString(self: Status) []const u8 { 20 + return switch (self) { 21 + .not_ready => "NOT_READY", 22 + .ready => "READY", 23 + .paused => "PAUSED", 24 + }; 25 + } 26 + }; 27 + 28 + pub const WorkPoolRow = struct { 29 + id: []const u8, 30 + created: []const u8, 31 + updated: []const u8, 32 + name: []const u8, 33 + description: ?[]const u8, 34 + type: []const u8, 35 + base_job_template: []const u8, 36 + is_paused: bool, 37 + concurrency_limit: ?i64, 38 + default_queue_id: ?[]const u8, 39 + status: Status, 40 + }; 41 + 42 + const Col = struct { 43 + const id: usize = 0; 44 + const created: usize = 1; 45 + const updated: usize = 2; 46 + const name: usize = 3; 47 + const description: usize = 4; 48 + const type_: usize = 5; 49 + const base_job_template: usize = 6; 50 + const is_paused: usize = 7; 51 + const concurrency_limit: usize = 8; 52 + const default_queue_id: usize = 9; 53 + const status: usize = 10; 54 + }; 55 + 56 + const select_cols = "id, created, updated, name, description, type, base_job_template, is_paused, concurrency_limit, default_queue_id, status"; 57 + 58 + fn rowFromResult(alloc: Allocator, r: anytype) !WorkPoolRow { 59 + const desc = r.textOrNull(Col.description); 60 + const queue_id = r.textOrNull(Col.default_queue_id); 61 + const concurrency = r.textOrNull(Col.concurrency_limit); 62 + 63 + return WorkPoolRow{ 64 + .id = try alloc.dupe(u8, r.text(Col.id)), 65 + .created = try alloc.dupe(u8, r.text(Col.created)), 66 + .updated = try alloc.dupe(u8, r.text(Col.updated)), 67 + .name = try alloc.dupe(u8, r.text(Col.name)), 68 + .description = if (desc) |d| try alloc.dupe(u8, d) else null, 69 + .type = try alloc.dupe(u8, r.text(Col.type_)), 70 + .base_job_template = try alloc.dupe(u8, r.text(Col.base_job_template)), 71 + .is_paused = r.int(Col.is_paused) != 0, 72 + .concurrency_limit = if (concurrency != null) r.bigint(Col.concurrency_limit) else null, 73 + .default_queue_id = if (queue_id) |q| try alloc.dupe(u8, q) else null, 74 + .status = Status.fromString(r.text(Col.status)), 75 + }; 76 + } 77 + 78 + pub fn getById(alloc: Allocator, id: []const u8) !?WorkPoolRow { 79 + var r = backend.db.row( 80 + "SELECT " ++ select_cols ++ " FROM work_pool WHERE id = ?", 81 + .{id}, 82 + ) catch return null; 83 + 84 + if (r) |*row| { 85 + defer row.deinit(); 86 + return try rowFromResult(alloc, row); 87 + } 88 + return null; 89 + } 90 + 91 + pub fn getByName(alloc: Allocator, name: []const u8) !?WorkPoolRow { 92 + var r = backend.db.row( 93 + "SELECT " ++ select_cols ++ " FROM work_pool WHERE name = ?", 94 + .{name}, 95 + ) catch return null; 96 + 97 + if (r) |*row| { 98 + defer row.deinit(); 99 + return try rowFromResult(alloc, row); 100 + } 101 + return null; 102 + } 103 + 104 + pub fn insert( 105 + id: []const u8, 106 + name: []const u8, 107 + description: ?[]const u8, 108 + pool_type: []const u8, 109 + base_job_template: []const u8, 110 + is_paused: bool, 111 + concurrency_limit: ?i64, 112 + default_queue_id: ?[]const u8, 113 + status: Status, 114 + created: []const u8, 115 + ) !void { 116 + backend.db.exec( 117 + "INSERT INTO work_pool (id, created, updated, name, description, type, base_job_template, is_paused, concurrency_limit, default_queue_id, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 118 + .{ 119 + id, 120 + created, 121 + created, 122 + name, 123 + description, 124 + pool_type, 125 + base_job_template, 126 + @as(i32, if (is_paused) 1 else 0), 127 + concurrency_limit, 128 + default_queue_id, 129 + status.toString(), 130 + }, 131 + ) catch |err| { 132 + log.err("database", "insert work_pool error: {}", .{err}); 133 + return err; 134 + }; 135 + } 136 + 137 + pub fn updateByName( 138 + name: []const u8, 139 + description: ?[]const u8, 140 + base_job_template: ?[]const u8, 141 + is_paused: ?bool, 142 + concurrency_limit: ?i64, 143 + status: ?Status, 144 + updated: []const u8, 145 + ) !bool { 146 + const affected = backend.db.execWithRowCount( 147 + "UPDATE work_pool SET description = COALESCE(?, description), base_job_template = COALESCE(?, base_job_template), is_paused = COALESCE(?, is_paused), concurrency_limit = COALESCE(?, concurrency_limit), status = COALESCE(?, status), updated = ? WHERE name = ?", 148 + .{ 149 + description, 150 + base_job_template, 151 + if (is_paused) |p| @as(?i32, if (p) 1 else 0) else null, 152 + concurrency_limit, 153 + if (status) |s| s.toString() else null, 154 + updated, 155 + name, 156 + }, 157 + ) catch |err| { 158 + log.err("database", "update work_pool error: {}", .{err}); 159 + return err; 160 + }; 161 + return affected > 0; 162 + } 163 + 164 + pub fn updateDefaultQueueId(name: []const u8, default_queue_id: []const u8, updated: []const u8) !bool { 165 + const affected = backend.db.execWithRowCount( 166 + "UPDATE work_pool SET default_queue_id = ?, updated = ? WHERE name = ?", 167 + .{ default_queue_id, updated, name }, 168 + ) catch |err| { 169 + log.err("database", "update work_pool default_queue_id error: {}", .{err}); 170 + return err; 171 + }; 172 + return affected > 0; 173 + } 174 + 175 + pub fn updateStatus(name: []const u8, status: Status, updated: []const u8) !bool { 176 + const affected = backend.db.execWithRowCount( 177 + "UPDATE work_pool SET status = ?, updated = ? WHERE name = ?", 178 + .{ status.toString(), updated, name }, 179 + ) catch |err| { 180 + log.err("database", "update work_pool status error: {}", .{err}); 181 + return err; 182 + }; 183 + return affected > 0; 184 + } 185 + 186 + pub fn deleteByName(name: []const u8) !bool { 187 + const affected = backend.db.execWithRowCount( 188 + "DELETE FROM work_pool WHERE name = ?", 189 + .{name}, 190 + ) catch |err| { 191 + log.err("database", "delete work_pool error: {}", .{err}); 192 + return err; 193 + }; 194 + return affected > 0; 195 + } 196 + 197 + pub fn list(alloc: Allocator, limit: usize, offset: usize) ![]WorkPoolRow { 198 + var results = std.ArrayListUnmanaged(WorkPoolRow){}; 199 + errdefer results.deinit(alloc); 200 + 201 + var rows = backend.db.query( 202 + "SELECT " ++ select_cols ++ " FROM work_pool ORDER BY name ASC LIMIT ? OFFSET ?", 203 + .{ @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 204 + ) catch |err| { 205 + log.err("database", "list work_pools error: {}", .{err}); 206 + return err; 207 + }; 208 + defer rows.deinit(); 209 + 210 + while (rows.next()) |r| { 211 + try results.append(alloc, try rowFromResult(alloc, &r)); 212 + } 213 + 214 + return results.toOwnedSlice(alloc); 215 + } 216 + 217 + pub fn count() !usize { 218 + var r = backend.db.row("SELECT COUNT(*) FROM work_pool", .{}) catch return 0; 219 + if (r) |*row| { 220 + defer row.deinit(); 221 + return @intCast(row.int(0)); 222 + } 223 + return 0; 224 + } 225 + 226 + /// Check if any online workers exist for a pool 227 + pub fn hasOnlineWorkers(pool_id: []const u8) !bool { 228 + var r = backend.db.row( 229 + "SELECT COUNT(*) FROM worker WHERE work_pool_id = ? AND status = 'ONLINE'", 230 + .{pool_id}, 231 + ) catch return false; 232 + if (r) |*row| { 233 + defer row.deinit(); 234 + return row.int(0) > 0; 235 + } 236 + return false; 237 + }
+227
src/db/work_queues.zig
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + 4 + const backend = @import("backend.zig"); 5 + const log = @import("../logging.zig"); 6 + 7 + /// Work queue status enum - matches Python's WorkQueueStatus 8 + pub const Status = enum { 9 + not_ready, 10 + ready, 11 + paused, 12 + 13 + pub fn fromString(s: []const u8) Status { 14 + if (std.mem.eql(u8, s, "READY")) return .ready; 15 + if (std.mem.eql(u8, s, "PAUSED")) return .paused; 16 + return .not_ready; 17 + } 18 + 19 + pub fn toString(self: Status) []const u8 { 20 + return switch (self) { 21 + .not_ready => "NOT_READY", 22 + .ready => "READY", 23 + .paused => "PAUSED", 24 + }; 25 + } 26 + }; 27 + 28 + pub const WorkQueueRow = struct { 29 + id: []const u8, 30 + created: []const u8, 31 + updated: []const u8, 32 + name: []const u8, 33 + description: []const u8, 34 + is_paused: bool, 35 + concurrency_limit: ?i64, 36 + priority: i64, 37 + work_pool_id: []const u8, 38 + last_polled: ?[]const u8, 39 + status: Status, 40 + }; 41 + 42 + const Col = struct { 43 + const id: usize = 0; 44 + const created: usize = 1; 45 + const updated: usize = 2; 46 + const name: usize = 3; 47 + const description: usize = 4; 48 + const is_paused: usize = 5; 49 + const concurrency_limit: usize = 6; 50 + const priority: usize = 7; 51 + const work_pool_id: usize = 8; 52 + const last_polled: usize = 9; 53 + const status: usize = 10; 54 + }; 55 + 56 + const select_cols = "id, created, updated, name, description, is_paused, concurrency_limit, priority, work_pool_id, last_polled, status"; 57 + 58 + fn rowFromResult(alloc: Allocator, r: anytype) !WorkQueueRow { 59 + const last_polled = r.textOrNull(Col.last_polled); 60 + const concurrency = r.textOrNull(Col.concurrency_limit); 61 + 62 + return WorkQueueRow{ 63 + .id = try alloc.dupe(u8, r.text(Col.id)), 64 + .created = try alloc.dupe(u8, r.text(Col.created)), 65 + .updated = try alloc.dupe(u8, r.text(Col.updated)), 66 + .name = try alloc.dupe(u8, r.text(Col.name)), 67 + .description = try alloc.dupe(u8, r.text(Col.description)), 68 + .is_paused = r.int(Col.is_paused) != 0, 69 + .concurrency_limit = if (concurrency != null) r.bigint(Col.concurrency_limit) else null, 70 + .priority = r.bigint(Col.priority), 71 + .work_pool_id = try alloc.dupe(u8, r.text(Col.work_pool_id)), 72 + .last_polled = if (last_polled) |lp| try alloc.dupe(u8, lp) else null, 73 + .status = Status.fromString(r.text(Col.status)), 74 + }; 75 + } 76 + 77 + pub fn getById(alloc: Allocator, id: []const u8) !?WorkQueueRow { 78 + var r = backend.db.row( 79 + "SELECT " ++ select_cols ++ " FROM work_queue WHERE id = ?", 80 + .{id}, 81 + ) catch return null; 82 + 83 + if (r) |*row| { 84 + defer row.deinit(); 85 + return try rowFromResult(alloc, row); 86 + } 87 + return null; 88 + } 89 + 90 + pub fn getByPoolAndName(alloc: Allocator, pool_id: []const u8, name: []const u8) !?WorkQueueRow { 91 + var r = backend.db.row( 92 + "SELECT " ++ select_cols ++ " FROM work_queue WHERE work_pool_id = ? AND name = ?", 93 + .{ pool_id, name }, 94 + ) catch return null; 95 + 96 + if (r) |*row| { 97 + defer row.deinit(); 98 + return try rowFromResult(alloc, row); 99 + } 100 + return null; 101 + } 102 + 103 + pub fn insert( 104 + id: []const u8, 105 + name: []const u8, 106 + description: []const u8, 107 + is_paused: bool, 108 + concurrency_limit: ?i64, 109 + priority: i64, 110 + work_pool_id: []const u8, 111 + status: Status, 112 + created: []const u8, 113 + ) !void { 114 + backend.db.exec( 115 + "INSERT INTO work_queue (id, created, updated, name, description, is_paused, concurrency_limit, priority, work_pool_id, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 116 + .{ 117 + id, 118 + created, 119 + created, 120 + name, 121 + description, 122 + @as(i32, if (is_paused) 1 else 0), 123 + concurrency_limit, 124 + priority, 125 + work_pool_id, 126 + status.toString(), 127 + }, 128 + ) catch |err| { 129 + log.err("database", "insert work_queue error: {}", .{err}); 130 + return err; 131 + }; 132 + } 133 + 134 + pub fn updateByPoolAndName( 135 + pool_id: []const u8, 136 + name: []const u8, 137 + new_name: ?[]const u8, 138 + description: ?[]const u8, 139 + is_paused: ?bool, 140 + concurrency_limit: ?i64, 141 + priority: ?i64, 142 + status: ?Status, 143 + updated: []const u8, 144 + ) !bool { 145 + const affected = backend.db.execWithRowCount( 146 + "UPDATE work_queue SET name = COALESCE(?, name), description = COALESCE(?, description), is_paused = COALESCE(?, is_paused), concurrency_limit = COALESCE(?, concurrency_limit), priority = COALESCE(?, priority), status = COALESCE(?, status), updated = ? WHERE work_pool_id = ? AND name = ?", 147 + .{ 148 + new_name, 149 + description, 150 + if (is_paused) |p| @as(?i32, if (p) 1 else 0) else null, 151 + concurrency_limit, 152 + priority, 153 + if (status) |s| s.toString() else null, 154 + updated, 155 + pool_id, 156 + name, 157 + }, 158 + ) catch |err| { 159 + log.err("database", "update work_queue error: {}", .{err}); 160 + return err; 161 + }; 162 + return affected > 0; 163 + } 164 + 165 + pub fn updateLastPolled(pool_id: []const u8, name: []const u8, last_polled: []const u8, status: Status) !bool { 166 + const affected = backend.db.execWithRowCount( 167 + "UPDATE work_queue SET last_polled = ?, status = ?, updated = ? WHERE work_pool_id = ? AND name = ?", 168 + .{ last_polled, status.toString(), last_polled, pool_id, name }, 169 + ) catch |err| { 170 + log.err("database", "update work_queue last_polled error: {}", .{err}); 171 + return err; 172 + }; 173 + return affected > 0; 174 + } 175 + 176 + pub fn deleteByPoolAndName(pool_id: []const u8, name: []const u8) !bool { 177 + const affected = backend.db.execWithRowCount( 178 + "DELETE FROM work_queue WHERE work_pool_id = ? AND name = ?", 179 + .{ pool_id, name }, 180 + ) catch |err| { 181 + log.err("database", "delete work_queue error: {}", .{err}); 182 + return err; 183 + }; 184 + return affected > 0; 185 + } 186 + 187 + pub fn listByPool(alloc: Allocator, pool_id: []const u8, limit: usize, offset: usize) ![]WorkQueueRow { 188 + var results = std.ArrayListUnmanaged(WorkQueueRow){}; 189 + errdefer results.deinit(alloc); 190 + 191 + var rows = backend.db.query( 192 + "SELECT " ++ select_cols ++ " FROM work_queue WHERE work_pool_id = ? ORDER BY priority ASC LIMIT ? OFFSET ?", 193 + .{ pool_id, @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 194 + ) catch |err| { 195 + log.err("database", "list work_queues error: {}", .{err}); 196 + return err; 197 + }; 198 + defer rows.deinit(); 199 + 200 + while (rows.next()) |r| { 201 + try results.append(alloc, try rowFromResult(alloc, &r)); 202 + } 203 + 204 + return results.toOwnedSlice(alloc); 205 + } 206 + 207 + pub fn countByPool(pool_id: []const u8) !usize { 208 + var r = backend.db.row("SELECT COUNT(*) FROM work_queue WHERE work_pool_id = ?", .{pool_id}) catch return 0; 209 + if (r) |*row| { 210 + defer row.deinit(); 211 + return @intCast(row.int(0)); 212 + } 213 + return 0; 214 + } 215 + 216 + /// Get next available priority for a pool (max + 1, or 1 if no queues) 217 + pub fn nextPriority(pool_id: []const u8) !i64 { 218 + var r = backend.db.row( 219 + "SELECT COALESCE(MAX(priority), 0) + 1 FROM work_queue WHERE work_pool_id = ?", 220 + .{pool_id}, 221 + ) catch return 1; 222 + if (r) |*row| { 223 + defer row.deinit(); 224 + return row.bigint(0); 225 + } 226 + return 1; 227 + }
+194
src/db/workers.zig
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + 4 + const backend = @import("backend.zig"); 5 + const log = @import("../logging.zig"); 6 + 7 + /// Worker status enum - matches Python's WorkerStatus 8 + pub const Status = enum { 9 + online, 10 + offline, 11 + 12 + pub fn fromString(s: []const u8) Status { 13 + if (std.mem.eql(u8, s, "ONLINE")) return .online; 14 + return .offline; 15 + } 16 + 17 + pub fn toString(self: Status) []const u8 { 18 + return switch (self) { 19 + .online => "ONLINE", 20 + .offline => "OFFLINE", 21 + }; 22 + } 23 + }; 24 + 25 + pub const WorkerRow = struct { 26 + id: []const u8, 27 + created: []const u8, 28 + updated: []const u8, 29 + name: []const u8, 30 + work_pool_id: []const u8, 31 + last_heartbeat_time: ?[]const u8, 32 + heartbeat_interval_seconds: ?i64, 33 + status: Status, 34 + }; 35 + 36 + const Col = struct { 37 + const id: usize = 0; 38 + const created: usize = 1; 39 + const updated: usize = 2; 40 + const name: usize = 3; 41 + const work_pool_id: usize = 4; 42 + const last_heartbeat_time: usize = 5; 43 + const heartbeat_interval_seconds: usize = 6; 44 + const status: usize = 7; 45 + }; 46 + 47 + const select_cols = "id, created, updated, name, work_pool_id, last_heartbeat_time, heartbeat_interval_seconds, status"; 48 + 49 + fn rowFromResult(alloc: Allocator, r: anytype) !WorkerRow { 50 + const last_heartbeat = r.textOrNull(Col.last_heartbeat_time); 51 + const interval = r.textOrNull(Col.heartbeat_interval_seconds); 52 + 53 + return WorkerRow{ 54 + .id = try alloc.dupe(u8, r.text(Col.id)), 55 + .created = try alloc.dupe(u8, r.text(Col.created)), 56 + .updated = try alloc.dupe(u8, r.text(Col.updated)), 57 + .name = try alloc.dupe(u8, r.text(Col.name)), 58 + .work_pool_id = try alloc.dupe(u8, r.text(Col.work_pool_id)), 59 + .last_heartbeat_time = if (last_heartbeat) |lh| try alloc.dupe(u8, lh) else null, 60 + .heartbeat_interval_seconds = if (interval != null) r.bigint(Col.heartbeat_interval_seconds) else null, 61 + .status = Status.fromString(r.text(Col.status)), 62 + }; 63 + } 64 + 65 + pub fn getById(alloc: Allocator, id: []const u8) !?WorkerRow { 66 + var r = backend.db.row( 67 + "SELECT " ++ select_cols ++ " FROM worker WHERE id = ?", 68 + .{id}, 69 + ) catch return null; 70 + 71 + if (r) |*row| { 72 + defer row.deinit(); 73 + return try rowFromResult(alloc, row); 74 + } 75 + return null; 76 + } 77 + 78 + pub fn getByPoolAndName(alloc: Allocator, pool_id: []const u8, name: []const u8) !?WorkerRow { 79 + var r = backend.db.row( 80 + "SELECT " ++ select_cols ++ " FROM worker WHERE work_pool_id = ? AND name = ?", 81 + .{ pool_id, name }, 82 + ) catch return null; 83 + 84 + if (r) |*row| { 85 + defer row.deinit(); 86 + return try rowFromResult(alloc, row); 87 + } 88 + return null; 89 + } 90 + 91 + /// Upsert worker heartbeat - inserts if not exists, updates if exists 92 + /// This is the core worker registration mechanism 93 + pub fn upsertHeartbeat( 94 + id: []const u8, 95 + name: []const u8, 96 + work_pool_id: []const u8, 97 + heartbeat_interval_seconds: ?i64, 98 + now: []const u8, 99 + ) !void { 100 + // Use dialect-specific upsert SQL 101 + const sql = switch (backend.db.dialect) { 102 + .sqlite => sqlite_upsert, 103 + .postgres => postgres_upsert, 104 + }; 105 + 106 + backend.db.exec(sql, .{ 107 + id, 108 + now, 109 + now, 110 + name, 111 + work_pool_id, 112 + now, 113 + heartbeat_interval_seconds, 114 + // For the ON CONFLICT update clause: 115 + now, // updated 116 + now, // last_heartbeat_time 117 + heartbeat_interval_seconds, 118 + }) catch |err| { 119 + log.err("database", "upsert worker heartbeat error: {}", .{err}); 120 + return err; 121 + }; 122 + } 123 + 124 + const sqlite_upsert = 125 + \\INSERT INTO worker (id, created, updated, name, work_pool_id, last_heartbeat_time, heartbeat_interval_seconds, status) 126 + \\VALUES (?, ?, ?, ?, ?, ?, ?, 'ONLINE') 127 + \\ON CONFLICT(work_pool_id, name) DO UPDATE SET 128 + \\ updated = ?, 129 + \\ last_heartbeat_time = ?, 130 + \\ heartbeat_interval_seconds = COALESCE(?, heartbeat_interval_seconds), 131 + \\ status = 'ONLINE' 132 + ; 133 + 134 + const postgres_upsert = 135 + \\INSERT INTO worker (id, created, updated, name, work_pool_id, last_heartbeat_time, heartbeat_interval_seconds, status) 136 + \\VALUES ($1, $2, $3, $4, $5, $6, $7, 'ONLINE') 137 + \\ON CONFLICT(work_pool_id, name) DO UPDATE SET 138 + \\ updated = $8, 139 + \\ last_heartbeat_time = $9, 140 + \\ heartbeat_interval_seconds = COALESCE($10, worker.heartbeat_interval_seconds), 141 + \\ status = 'ONLINE' 142 + ; 143 + 144 + pub fn deleteByPoolAndName(pool_id: []const u8, name: []const u8) !bool { 145 + const affected = backend.db.execWithRowCount( 146 + "DELETE FROM worker WHERE work_pool_id = ? AND name = ?", 147 + .{ pool_id, name }, 148 + ) catch |err| { 149 + log.err("database", "delete worker error: {}", .{err}); 150 + return err; 151 + }; 152 + return affected > 0; 153 + } 154 + 155 + pub fn listByPool(alloc: Allocator, pool_id: []const u8, limit: usize, offset: usize) ![]WorkerRow { 156 + var results = std.ArrayListUnmanaged(WorkerRow){}; 157 + errdefer results.deinit(alloc); 158 + 159 + var rows = backend.db.query( 160 + "SELECT " ++ select_cols ++ " FROM worker WHERE work_pool_id = ? ORDER BY last_heartbeat_time DESC NULLS LAST LIMIT ? OFFSET ?", 161 + .{ pool_id, @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 162 + ) catch |err| { 163 + log.err("database", "list workers error: {}", .{err}); 164 + return err; 165 + }; 166 + defer rows.deinit(); 167 + 168 + while (rows.next()) |r| { 169 + try results.append(alloc, try rowFromResult(alloc, &r)); 170 + } 171 + 172 + return results.toOwnedSlice(alloc); 173 + } 174 + 175 + pub fn countByPool(pool_id: []const u8) !usize { 176 + var r = backend.db.row("SELECT COUNT(*) FROM worker WHERE work_pool_id = ?", .{pool_id}) catch return 0; 177 + if (r) |*row| { 178 + defer row.deinit(); 179 + return @intCast(row.int(0)); 180 + } 181 + return 0; 182 + } 183 + 184 + pub fn countOnlineByPool(pool_id: []const u8) !usize { 185 + var r = backend.db.row( 186 + "SELECT COUNT(*) FROM worker WHERE work_pool_id = ? AND status = 'ONLINE'", 187 + .{pool_id}, 188 + ) catch return 0; 189 + if (r) |*row| { 190 + defer row.deinit(); 191 + return @intCast(row.int(0)); 192 + } 193 + return 0; 194 + }