prefect server in zig

add state history endpoints (flow_run_states, task_run_states)

- GET /api/flow_run_states/{id} - get state by ID
- GET /api/flow_run_states/?flow_run_id=... - list states for flow run
- GET /api/task_run_states/{id} - get state by ID
- GET /api/task_run_states/?task_run_id=... - list states for task run

also:
- add task_run_state table to migrations
- record state history in task_runs.setState (matches flow_runs)
- condense test functions to meet line limit

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+507 -87
+8 -7
ROADMAP.md
··· 29 29 - [x] POST /api/task_runs/filter 30 30 31 31 ### state endpoints 32 - - [ ] GET /api/flow_run_states/{id} 33 - - [ ] POST /api/flow_run_states/filter 34 - - [ ] GET /api/task_run_states/{id} 35 - - [ ] POST /api/task_run_states/filter 32 + - [x] GET /api/flow_run_states/{id} 33 + - [x] GET /api/flow_run_states/?flow_run_id=... 34 + - [x] GET /api/task_run_states/{id} 35 + - [x] GET /api/task_run_states/?task_run_id=... 36 36 37 37 ### deployments 38 38 - [x] POST /api/deployments/ ··· 65 65 - [x] POST /api/work_pools/{name}/workers/heartbeat 66 66 - [x] POST /api/work_pools/{name}/workers/filter 67 67 - [x] DELETE /api/work_pools/{name}/workers/{worker_name} 68 - - [ ] POST /api/work_pools/{name}/get_scheduled_flow_runs (stub - needs deployments) 68 + - [x] POST /api/work_pools/{name}/get_scheduled_flow_runs 69 69 70 70 ### blocks 71 71 - [x] POST /api/block_types/ ··· 117 117 - [x] DELETE /api/variables/name/{name} 118 118 119 119 ### events 120 - - [ ] POST /api/events/filter 120 + - [x] POST /api/events/filter 121 + - [x] GET /api/events/count 121 122 - [x] WS /api/events/out (subscribe) 122 123 123 124 ### ui endpoints ··· 186 187 - [x] flow_run_state table 187 188 - [x] events table 188 189 - [x] task_run table 189 - - [ ] task_run_state table 190 + - [x] task_run_state table 190 191 - [x] deployment table 191 192 - [x] deployment_schedule table 192 193 - [x] work_pool table
+54 -79
scripts/test-api-sequence
··· 1600 1600 return True 1601 1601 1602 1602 1603 + def test_late_runs(client: CountingClient) -> bool: 1604 + """Test that late_runs service marks overdue scheduled runs as Late.""" 1605 + from datetime import datetime, timezone, timedelta 1606 + import time as time_mod 1607 + fail = lambda msg: (QUIET or console.print(f"[red]FAIL[/red]: {msg}"), False)[1] 1608 + log = lambda msg: QUIET or console.print(msg) 1609 + 1610 + resp = client.post("/flows/", json={"name": f"late-flow-{uuid.uuid4().hex[:8]}"}) 1611 + if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1612 + flow_id = resp.json().get("id") 1613 + 1614 + # create flow run with past scheduled time (60s ago, threshold is 15s) 1615 + past_time = (datetime.now(timezone.utc) - timedelta(seconds=60)).isoformat() 1616 + resp = client.post("/flow_runs/", json={ 1617 + "flow_id": flow_id, "name": f"late-run-{uuid.uuid4().hex[:8]}", 1618 + "state": {"type": "SCHEDULED", "name": "Scheduled"}, "next_scheduled_start_time": past_time}) 1619 + if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") 1620 + flow_run_id = resp.json().get("id") 1621 + 1622 + # wait for late_runs service (runs every 5s) 1623 + for i in range(15): 1624 + time_mod.sleep(1) 1625 + resp = client.get(f"/flow_runs/{flow_run_id}") 1626 + if resp.status_code == 200 and resp.json().get("state_name") == "Late": 1627 + log(f" [green]state changed to Late after {i+1}s[/green]") 1628 + return True 1629 + return fail("late_runs service did not mark run as Late within timeout") 1630 + 1631 + 1603 1632 def test_work_queue_priority(client: CountingClient) -> bool: 1604 1633 """Test work queue priority ordering in get_scheduled_flow_runs.""" 1634 + fail = lambda msg: (QUIET or console.print(f"[red]FAIL[/red]: {msg}"), False)[1] 1635 + log = lambda msg: QUIET or console.print(msg) 1605 1636 1606 - def fail(msg: str) -> bool: 1607 - if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 1608 - return False 1609 - 1610 - def log(msg: str) -> None: 1611 - if not QUIET: console.print(msg) 1612 - 1613 - # setup 1614 - log("[bold]setup: create pool with multiple queues[/bold]") 1615 1637 resp = client.post("/flows/", json={"name": f"priority-flow-{uuid.uuid4().hex[:8]}"}) 1616 1638 if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 1617 1639 flow_id = resp.json().get("id") ··· 1619 1641 pool_name = f"priority-pool-{uuid.uuid4().hex[:8]}" 1620 1642 resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) 1621 1643 if resp.status_code not in (200, 201): return fail(f"create pool {resp.status_code}") 1622 - default_queue_id = resp.json().get("default_queue_id") 1623 - log(f" pool: {pool_name} (default queue priority=1)") 1624 1644 1625 - # create high-priority queue (lower number = higher priority) 1626 - resp = client.post(f"/work_pools/{pool_name}/queues/", json={ 1627 - "name": "high-priority", 1628 - "priority": 1, # highest priority 1629 - }) 1630 - if resp.status_code not in (200, 201): return fail(f"create high-priority queue {resp.status_code}") 1631 - high_queue_id = resp.json().get("id") 1632 - log(f" high-priority queue: {high_queue_id}") 1645 + # create high and low priority queues 1646 + for name, pri in [("high-priority", 1), ("low-priority", 100)]: 1647 + resp = client.post(f"/work_pools/{pool_name}/queues/", json={"name": name, "priority": pri}) 1648 + if resp.status_code not in (200, 201): return fail(f"create {name} queue {resp.status_code}") 1633 1649 1634 - # create low-priority queue 1635 - resp = client.post(f"/work_pools/{pool_name}/queues/", json={ 1636 - "name": "low-priority", 1637 - "priority": 100, # lowest priority 1638 - }) 1639 - if resp.status_code not in (200, 201): return fail(f"create low-priority queue {resp.status_code}") 1640 - low_queue_id = resp.json().get("id") 1641 - log(f" low-priority queue: {low_queue_id}") 1650 + # create deployments for each queue and flow runs in reverse priority order 1651 + deployments, flow_runs = {}, {} 1652 + for q in ["high-priority", "low-priority", "default"]: 1653 + resp = client.post("/deployments/", json={"name": f"d-{q}-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, 1654 + "work_pool_name": pool_name, "work_queue_name": q}) 1655 + if resp.status_code not in (200, 201): return fail(f"create deployment {q} {resp.status_code}") 1656 + deployments[q] = resp.json().get("id") 1657 + for q in ["low-priority", "default", "high-priority"]: 1658 + resp = client.post(f"/deployments/{deployments[q]}/create_flow_run", json={"state": {"type": "SCHEDULED", "name": "Scheduled"}}) 1659 + if resp.status_code not in (200, 201): return fail(f"create run for {q} {resp.status_code}") 1660 + flow_runs[q] = resp.json().get("id") 1642 1661 1643 - # create deployments for each queue 1644 - deployments = {} 1645 - for queue_name in ["high-priority", "low-priority", "default"]: 1646 - resp = client.post("/deployments/", json={ 1647 - "name": f"deploy-{queue_name}-{uuid.uuid4().hex[:8]}", 1648 - "flow_id": flow_id, 1649 - "work_pool_name": pool_name, 1650 - "work_queue_name": queue_name, 1651 - }) 1652 - if resp.status_code not in (200, 201): return fail(f"create deployment {queue_name} {resp.status_code}") 1653 - deployments[queue_name] = resp.json().get("id") 1654 - log(f" deployment ({queue_name}): {deployments[queue_name]}") 1655 - 1656 - # create flow runs in each queue (order: low, default, high) 1657 - flow_runs = {} 1658 - log("[bold]create flow runs in reverse priority order[/bold]") 1659 - for queue_name in ["low-priority", "default", "high-priority"]: 1660 - resp = client.post(f"/deployments/{deployments[queue_name]}/create_flow_run", json={ 1661 - "state": {"type": "SCHEDULED", "name": "Scheduled"}, 1662 - }) 1663 - if resp.status_code not in (200, 201): return fail(f"create run for {queue_name} {resp.status_code}") 1664 - flow_runs[queue_name] = resp.json().get("id") 1665 - log(f" run ({queue_name}): {flow_runs[queue_name]}") 1666 - 1667 - # poll for scheduled runs 1668 - log("[bold]get_scheduled_flow_runs (expect high-priority first)[/bold]") 1662 + # poll and verify priority ordering 1669 1663 resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={}) 1670 1664 if resp.status_code != 200: return fail(f"poll {resp.status_code}") 1671 1665 scheduled = resp.json() 1672 - log(f" returned {len(scheduled)} runs") 1673 - 1674 - if len(scheduled) < 3: 1675 - return fail(f"expected at least 3 runs, got {len(scheduled)}") 1666 + if len(scheduled) < 3: return fail(f"expected 3 runs, got {len(scheduled)}") 1676 1667 1677 - # verify order: high-priority first, then default, then low-priority 1678 - # the order in the response should match priority 1679 1668 run_ids = [r.get("flow_run", {}).get("id") for r in scheduled] 1680 - 1681 - # find positions 1682 - high_pos = run_ids.index(flow_runs["high-priority"]) if flow_runs["high-priority"] in run_ids else -1 1683 - default_pos = run_ids.index(flow_runs["default"]) if flow_runs["default"] in run_ids else -1 1684 - low_pos = run_ids.index(flow_runs["low-priority"]) if flow_runs["low-priority"] in run_ids else -1 1685 - 1686 - log(f" positions: high={high_pos}, default={default_pos}, low={low_pos}") 1687 - 1688 - if high_pos == -1 or default_pos == -1 or low_pos == -1: 1689 - return fail("not all runs found in results") 1690 - 1691 - # high-priority should come before low-priority 1692 - if high_pos > low_pos: 1693 - return fail(f"high-priority run should come before low-priority (positions: {high_pos} vs {low_pos})") 1694 - log(f" [green]priority ordering verified[/green]") 1669 + pos = {q: run_ids.index(flow_runs[q]) if flow_runs[q] in run_ids else -1 for q in flow_runs} 1670 + if -1 in pos.values(): return fail("not all runs found") 1671 + if pos["high-priority"] > pos["low-priority"]: return fail("high should come before low") 1672 + log(f" [green]priority ordering verified (high={pos['high-priority']}, low={pos['low-priority']})[/green]") 1695 1673 1696 - # cleanup 1697 - for deploy_id in deployments.values(): 1698 - client.delete(f"/deployments/{deploy_id}") 1674 + for d in deployments.values(): client.delete(f"/deployments/{d}") 1699 1675 client.delete(f"/work_pools/{pool_name}") 1700 - log(" cleanup: ok") 1701 - 1702 1676 return True 1703 1677 1704 1678 ··· 1730 1704 results.append(run_test("work_queue_priority", test_work_queue_priority)) 1731 1705 results.append(run_test("retry_failed_flows", test_retry_failed_flows)) 1732 1706 results.append(run_test("cancellation_flow", test_cancellation_flow)) 1707 + results.append(run_test("late_runs", test_late_runs)) # background service marks overdue runs 1733 1708 1734 1709 # scheduler integration tests (require sleeps for background service) 1735 1710 results.append(run_test("scheduler_idempotency", test_scheduler_idempotency))
+108
src/api/flow_run_states.zig
··· 1 + const std = @import("std"); 2 + const zap = @import("zap"); 3 + const mem = std.mem; 4 + 5 + const db = @import("../db/sqlite.zig"); 6 + const json_util = @import("../utilities/json.zig"); 7 + 8 + pub fn handle(r: zap.Request) !void { 9 + const target = r.path orelse "/"; 10 + const method = r.method orelse "GET"; 11 + 12 + // only GET is supported 13 + if (!mem.eql(u8, method, "GET")) { 14 + json_util.sendStatus(r, "{\"detail\":\"method not allowed\"}", .method_not_allowed); 15 + return; 16 + } 17 + 18 + // GET /flow_run_states/?flow_run_id=... - list states for a flow run 19 + if (r.query != null) { 20 + try listByFlowRunId(r); 21 + return; 22 + } 23 + 24 + // GET /flow_run_states/{id} - get specific state 25 + const id = extractId(target) orelse { 26 + json_util.sendStatus(r, "{\"detail\":\"state id or flow_run_id query param required\"}", .bad_request); 27 + return; 28 + }; 29 + try getById(r, id); 30 + } 31 + 32 + fn extractId(target: []const u8) ?[]const u8 { 33 + const prefix = if (mem.startsWith(u8, target, "/api/flow_run_states/")) 34 + "/api/flow_run_states/" 35 + else 36 + "/flow_run_states/"; 37 + if (target.len > prefix.len) { 38 + const id = target[prefix.len..]; 39 + if (id.len > 0 and id[0] != '?') return id; 40 + } 41 + return null; 42 + } 43 + 44 + fn getById(r: zap.Request, id: []const u8) !void { 45 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 46 + defer arena.deinit(); 47 + const alloc = arena.allocator(); 48 + 49 + const state = db.flow_run_states.getById(alloc, id) catch { 50 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 51 + return; 52 + }; 53 + 54 + if (state) |s| { 55 + json_util.send(r, s.toJson(alloc) catch "{\"detail\":\"serialization error\"}"); 56 + } else { 57 + json_util.sendStatus(r, "{\"detail\":\"Flow run state not found\"}", .not_found); 58 + } 59 + } 60 + 61 + fn listByFlowRunId(r: zap.Request) !void { 62 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 63 + defer arena.deinit(); 64 + const alloc = arena.allocator(); 65 + 66 + const query = r.query orelse ""; 67 + 68 + // parse flow_run_id from query string 69 + const flow_run_id = parseQueryParam(query, "flow_run_id") orelse { 70 + json_util.sendStatus(r, "{\"detail\":\"flow_run_id query param required\"}", .bad_request); 71 + return; 72 + }; 73 + 74 + const states = db.flow_run_states.listByFlowRunId(alloc, flow_run_id) catch { 75 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 76 + return; 77 + }; 78 + 79 + // build JSON array response 80 + var output = std.ArrayListUnmanaged(u8){}; 81 + try output.append(alloc, '['); 82 + for (states, 0..) |s, i| { 83 + if (i > 0) try output.append(alloc, ','); 84 + const state_json = s.toJson(alloc) catch continue; 85 + try output.appendSlice(alloc, state_json); 86 + } 87 + try output.append(alloc, ']'); 88 + 89 + json_util.send(r, output.items); 90 + } 91 + 92 + fn parseQueryParam(query: []const u8, name: []const u8) ?[]const u8 { 93 + // look for name= in query string 94 + const search = std.fmt.allocPrint(std.heap.page_allocator, "{s}=", .{name}) catch return null; 95 + defer std.heap.page_allocator.free(search); 96 + 97 + const start = mem.indexOf(u8, query, search) orelse return null; 98 + const value_start = start + search.len; 99 + if (value_start >= query.len) return null; 100 + 101 + // find end (& or end of string) 102 + var value_end = query.len; 103 + if (mem.indexOf(u8, query[value_start..], "&")) |amp| { 104 + value_end = value_start + amp; 105 + } 106 + 107 + return query[value_start..value_end]; 108 + }
+6
src/api/routes.zig
··· 15 15 pub const deployments = @import("deployments.zig"); 16 16 pub const events_api = @import("events_api.zig"); 17 17 pub const concurrency_limits_v2 = @import("concurrency_limits_v2.zig"); 18 + pub const flow_run_states = @import("flow_run_states.zig"); 19 + pub const task_run_states = @import("task_run_states.zig"); 18 20 19 21 pub fn handle(r: zap.Request) !void { 20 22 const target = r.path orelse "/"; ··· 55 57 try deployments.handle(r); 56 58 } else if (std.mem.startsWith(u8, target, "/api/v2/concurrency_limits") or std.mem.startsWith(u8, target, "/v2/concurrency_limits")) { 57 59 try concurrency_limits_v2.handle(r); 60 + } else if (std.mem.startsWith(u8, target, "/api/flow_run_states") or std.mem.startsWith(u8, target, "/flow_run_states")) { 61 + try flow_run_states.handle(r); 62 + } else if (std.mem.startsWith(u8, target, "/api/task_run_states") or std.mem.startsWith(u8, target, "/task_run_states")) { 63 + try task_run_states.handle(r); 58 64 } else if (std.mem.startsWith(u8, target, "/api/events/filter") or std.mem.startsWith(u8, target, "/events/filter") or 59 65 std.mem.startsWith(u8, target, "/api/events/count") or std.mem.startsWith(u8, target, "/events/count")) 60 66 {
+106
src/api/task_run_states.zig
··· 1 + const std = @import("std"); 2 + const zap = @import("zap"); 3 + const mem = std.mem; 4 + 5 + const db = @import("../db/sqlite.zig"); 6 + const json_util = @import("../utilities/json.zig"); 7 + 8 + pub fn handle(r: zap.Request) !void { 9 + const target = r.path orelse "/"; 10 + const method = r.method orelse "GET"; 11 + 12 + // only GET is supported 13 + if (!mem.eql(u8, method, "GET")) { 14 + json_util.sendStatus(r, "{\"detail\":\"method not allowed\"}", .method_not_allowed); 15 + return; 16 + } 17 + 18 + // GET /task_run_states/?task_run_id=... - list states for a task run 19 + if (r.query != null) { 20 + try listByTaskRunId(r); 21 + return; 22 + } 23 + 24 + // GET /task_run_states/{id} - get specific state 25 + const id = extractId(target) orelse { 26 + json_util.sendStatus(r, "{\"detail\":\"state id or task_run_id query param required\"}", .bad_request); 27 + return; 28 + }; 29 + try getById(r, id); 30 + } 31 + 32 + fn extractId(target: []const u8) ?[]const u8 { 33 + const prefix = if (mem.startsWith(u8, target, "/api/task_run_states/")) 34 + "/api/task_run_states/" 35 + else 36 + "/task_run_states/"; 37 + if (target.len > prefix.len) { 38 + const id = target[prefix.len..]; 39 + if (id.len > 0 and id[0] != '?') return id; 40 + } 41 + return null; 42 + } 43 + 44 + fn getById(r: zap.Request, id: []const u8) !void { 45 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 46 + defer arena.deinit(); 47 + const alloc = arena.allocator(); 48 + 49 + const state = db.task_run_states.getById(alloc, id) catch { 50 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 51 + return; 52 + }; 53 + 54 + if (state) |s| { 55 + json_util.send(r, s.toJson(alloc) catch "{\"detail\":\"serialization error\"}"); 56 + } else { 57 + json_util.sendStatus(r, "{\"detail\":\"Task run state not found\"}", .not_found); 58 + } 59 + } 60 + 61 + fn listByTaskRunId(r: zap.Request) !void { 62 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 63 + defer arena.deinit(); 64 + const alloc = arena.allocator(); 65 + 66 + const query = r.query orelse ""; 67 + 68 + // parse task_run_id from query string 69 + const task_run_id = parseQueryParam(query, "task_run_id") orelse { 70 + json_util.sendStatus(r, "{\"detail\":\"task_run_id query param required\"}", .bad_request); 71 + return; 72 + }; 73 + 74 + const states = db.task_run_states.listByTaskRunId(alloc, task_run_id) catch { 75 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 76 + return; 77 + }; 78 + 79 + // build JSON array response 80 + var output = std.ArrayListUnmanaged(u8){}; 81 + try output.append(alloc, '['); 82 + for (states, 0..) |s, i| { 83 + if (i > 0) try output.append(alloc, ','); 84 + const state_json = s.toJson(alloc) catch continue; 85 + try output.appendSlice(alloc, state_json); 86 + } 87 + try output.append(alloc, ']'); 88 + 89 + json_util.send(r, output.items); 90 + } 91 + 92 + fn parseQueryParam(query: []const u8, name: []const u8) ?[]const u8 { 93 + const search = std.fmt.allocPrint(std.heap.page_allocator, "{s}=", .{name}) catch return null; 94 + defer std.heap.page_allocator.free(search); 95 + 96 + const start = mem.indexOf(u8, query, search) orelse return null; 97 + const value_start = start + search.len; 98 + if (value_start >= query.len) return null; 99 + 100 + var value_end = query.len; 101 + if (mem.indexOf(u8, query[value_start..], "&")) |amp| { 102 + value_end = value_start + amp; 103 + } 104 + 105 + return query[value_start..value_end]; 106 + }
+75
src/db/flow_run_states.zig
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + const backend = @import("backend.zig"); 4 + const log = @import("../logging.zig"); 5 + 6 + const Col = enum(u8) { id = 0, created = 1, flow_run_id = 2, type_ = 3, name = 4, timestamp = 5 }; 7 + 8 + pub const FlowRunStateRow = struct { 9 + id: []const u8, 10 + created: []const u8, 11 + flow_run_id: []const u8, 12 + type: []const u8, 13 + name: []const u8, 14 + timestamp: []const u8, 15 + 16 + pub fn toJson(self: FlowRunStateRow, alloc: Allocator) ![]const u8 { 17 + var output: std.Io.Writer.Allocating = .init(alloc); 18 + var jw: std.json.Stringify = .{ .writer = &output.writer }; 19 + try jw.beginObject(); 20 + try jw.objectField("id"); 21 + try jw.write(self.id); 22 + try jw.objectField("created"); 23 + try jw.write(self.created); 24 + try jw.objectField("flow_run_id"); 25 + try jw.write(self.flow_run_id); 26 + try jw.objectField("type"); 27 + try jw.write(self.type); 28 + try jw.objectField("name"); 29 + try jw.write(self.name); 30 + try jw.objectField("timestamp"); 31 + try jw.write(self.timestamp); 32 + try jw.endObject(); 33 + return output.toOwnedSlice(); 34 + } 35 + }; 36 + 37 + fn rowFromResult(alloc: Allocator, r: anytype) !FlowRunStateRow { 38 + return FlowRunStateRow{ 39 + .id = try alloc.dupe(u8, r.text(@intFromEnum(Col.id))), 40 + .created = try alloc.dupe(u8, r.text(@intFromEnum(Col.created))), 41 + .flow_run_id = try alloc.dupe(u8, r.text(@intFromEnum(Col.flow_run_id))), 42 + .type = try alloc.dupe(u8, r.text(@intFromEnum(Col.type_))), 43 + .name = try alloc.dupe(u8, r.text(@intFromEnum(Col.name))), 44 + .timestamp = try alloc.dupe(u8, r.text(@intFromEnum(Col.timestamp))), 45 + }; 46 + } 47 + 48 + pub fn getById(alloc: Allocator, id: []const u8) !?FlowRunStateRow { 49 + var r = backend.db.row( 50 + "SELECT id, created, flow_run_id, type, name, timestamp FROM flow_run_state WHERE id = ?", 51 + .{id}, 52 + ) catch return null; 53 + if (r) |*row| { 54 + defer row.deinit(); 55 + return try rowFromResult(alloc, row); 56 + } 57 + return null; 58 + } 59 + 60 + pub fn listByFlowRunId(alloc: Allocator, flow_run_id: []const u8) ![]FlowRunStateRow { 61 + var results = std.ArrayListUnmanaged(FlowRunStateRow){}; 62 + errdefer results.deinit(alloc); 63 + var rows = backend.db.query( 64 + "SELECT id, created, flow_run_id, type, name, timestamp FROM flow_run_state WHERE flow_run_id = ? ORDER BY timestamp ASC", 65 + .{flow_run_id}, 66 + ) catch |err| { 67 + log.err("database", "list flow_run_states error: {}", .{err}); 68 + return err; 69 + }; 70 + defer rows.deinit(); 71 + while (rows.next()) |r| { 72 + try results.append(alloc, try rowFromResult(alloc, &r)); 73 + } 74 + return results.toOwnedSlice(alloc); 75 + }
+11
src/db/migrations/001_initial/postgres.sql
··· 70 70 total_run_time DOUBLE PRECISION DEFAULT 0.0 71 71 ); 72 72 73 + -- task_run_state table 74 + CREATE TABLE IF NOT EXISTS task_run_state ( 75 + id TEXT PRIMARY KEY, 76 + created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 77 + task_run_id TEXT REFERENCES task_run(id) ON DELETE CASCADE, 78 + type TEXT NOT NULL, 79 + name TEXT NOT NULL, 80 + timestamp TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') 81 + ); 82 + 73 83 -- events table 74 84 CREATE TABLE IF NOT EXISTS events ( 75 85 id TEXT PRIMARY KEY, ··· 232 242 CREATE INDEX IF NOT EXISTS ix_flow_run__next_scheduled_start_time ON flow_run(next_scheduled_start_time); 233 243 CREATE UNIQUE INDEX IF NOT EXISTS uq_flow_run__flow_id_idempotency_key ON flow_run(flow_id, idempotency_key) WHERE idempotency_key IS NOT NULL; 234 244 CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id); 245 + CREATE INDEX IF NOT EXISTS ix_task_run_state__task_run_id ON task_run_state(task_run_id); 235 246 CREATE INDEX IF NOT EXISTS ix_task_run__flow_run_id ON task_run(flow_run_id); 236 247 CREATE INDEX IF NOT EXISTS ix_task_run__task_key_dynamic_key ON task_run(task_key, dynamic_key); 237 248 CREATE INDEX IF NOT EXISTS ix_events__occurred ON events(occurred);
+11
src/db/migrations/001_initial/sqlite.sql
··· 70 70 total_run_time REAL DEFAULT 0.0 71 71 ); 72 72 73 + -- task_run_state table 74 + CREATE TABLE IF NOT EXISTS task_run_state ( 75 + id TEXT PRIMARY KEY, 76 + created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 77 + task_run_id TEXT REFERENCES task_run(id) ON DELETE CASCADE, 78 + type TEXT NOT NULL, 79 + name TEXT NOT NULL, 80 + timestamp TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')) 81 + ); 82 + 73 83 -- events table 74 84 CREATE TABLE IF NOT EXISTS events ( 75 85 id TEXT PRIMARY KEY, ··· 232 242 CREATE INDEX IF NOT EXISTS ix_flow_run__next_scheduled_start_time ON flow_run(next_scheduled_start_time); 233 243 CREATE UNIQUE INDEX IF NOT EXISTS uq_flow_run__flow_id_idempotency_key ON flow_run(flow_id, idempotency_key) WHERE idempotency_key IS NOT NULL; 234 244 CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id); 245 + CREATE INDEX IF NOT EXISTS ix_task_run_state__task_run_id ON task_run_state(task_run_id); 235 246 CREATE INDEX IF NOT EXISTS ix_task_run__flow_run_id ON task_run(flow_run_id); 236 247 CREATE INDEX IF NOT EXISTS ix_task_run__task_key_dynamic_key ON task_run(task_key, dynamic_key); 237 248 CREATE INDEX IF NOT EXISTS ix_events__occurred ON events(occurred);
+13
src/db/schema/postgres.zig
··· 85 85 \\) 86 86 , .{}); 87 87 88 + // task_run_state table 89 + try backend.db.exec( 90 + \\CREATE TABLE IF NOT EXISTS task_run_state ( 91 + \\ id TEXT PRIMARY KEY, 92 + \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 93 + \\ task_run_id TEXT REFERENCES task_run(id) ON DELETE CASCADE, 94 + \\ type TEXT NOT NULL, 95 + \\ name TEXT NOT NULL, 96 + \\ timestamp TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') 97 + \\) 98 + , .{}); 99 + 88 100 // events table 89 101 try backend.db.exec( 90 102 \\CREATE TABLE IF NOT EXISTS events ( ··· 287 299 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__next_scheduled_start_time ON flow_run(next_scheduled_start_time)", .{}); 288 300 try backend.db.exec("CREATE UNIQUE INDEX IF NOT EXISTS uq_flow_run__flow_id_idempotency_key ON flow_run(flow_id, idempotency_key) WHERE idempotency_key IS NOT NULL", .{}); 289 301 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id)", .{}); 302 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run_state__task_run_id ON task_run_state(task_run_id)", .{}); 290 303 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run__flow_run_id ON task_run(flow_run_id)", .{}); 291 304 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run__task_key_dynamic_key ON task_run(task_key, dynamic_key)", .{}); 292 305 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__occurred ON events(occurred)", .{});
+13
src/db/schema/sqlite.zig
··· 81 81 \\) 82 82 , .{}); 83 83 84 + // task_run_state table 85 + try backend.db.exec( 86 + \\CREATE TABLE IF NOT EXISTS task_run_state ( 87 + \\ id TEXT PRIMARY KEY, 88 + \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 89 + \\ task_run_id TEXT REFERENCES task_run(id) ON DELETE CASCADE, 90 + \\ type TEXT NOT NULL, 91 + \\ name TEXT NOT NULL, 92 + \\ timestamp TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')) 93 + \\) 94 + , .{}); 95 + 84 96 // events table 85 97 try backend.db.exec( 86 98 \\CREATE TABLE IF NOT EXISTS events ( ··· 279 291 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__next_scheduled_start_time ON flow_run(next_scheduled_start_time)", .{}); 280 292 try backend.db.exec("CREATE UNIQUE INDEX IF NOT EXISTS uq_flow_run__flow_id_idempotency_key ON flow_run(flow_id, idempotency_key) WHERE idempotency_key IS NOT NULL", .{}); 281 293 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id)", .{}); 294 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run_state__task_run_id ON task_run_state(task_run_id)", .{}); 282 295 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run__flow_run_id ON task_run(flow_run_id)", .{}); 283 296 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_task_run__task_key_dynamic_key ON task_run(task_key, dynamic_key)", .{}); 284 297 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_events__occurred ON events(occurred)", .{});
+2
src/db/sqlite.zig
··· 20 20 pub const deployments = @import("deployments.zig"); 21 21 pub const deployment_schedules = @import("deployment_schedules.zig"); 22 22 pub const concurrency_limits = @import("concurrency_limits.zig"); 23 + pub const flow_run_states = @import("flow_run_states.zig"); 24 + pub const task_run_states = @import("task_run_states.zig"); 23 25 24 26 // re-export types for compatibility 25 27 pub const FlowRow = flows.FlowRow;
+75
src/db/task_run_states.zig
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + const backend = @import("backend.zig"); 4 + const log = @import("../logging.zig"); 5 + 6 + const Col = enum(u8) { id = 0, created = 1, task_run_id = 2, type_ = 3, name = 4, timestamp = 5 }; 7 + 8 + pub const TaskRunStateRow = struct { 9 + id: []const u8, 10 + created: []const u8, 11 + task_run_id: []const u8, 12 + type: []const u8, 13 + name: []const u8, 14 + timestamp: []const u8, 15 + 16 + pub fn toJson(self: TaskRunStateRow, alloc: Allocator) ![]const u8 { 17 + var output: std.Io.Writer.Allocating = .init(alloc); 18 + var jw: std.json.Stringify = .{ .writer = &output.writer }; 19 + try jw.beginObject(); 20 + try jw.objectField("id"); 21 + try jw.write(self.id); 22 + try jw.objectField("created"); 23 + try jw.write(self.created); 24 + try jw.objectField("task_run_id"); 25 + try jw.write(self.task_run_id); 26 + try jw.objectField("type"); 27 + try jw.write(self.type); 28 + try jw.objectField("name"); 29 + try jw.write(self.name); 30 + try jw.objectField("timestamp"); 31 + try jw.write(self.timestamp); 32 + try jw.endObject(); 33 + return output.toOwnedSlice(); 34 + } 35 + }; 36 + 37 + fn rowFromResult(alloc: Allocator, r: anytype) !TaskRunStateRow { 38 + return TaskRunStateRow{ 39 + .id = try alloc.dupe(u8, r.text(@intFromEnum(Col.id))), 40 + .created = try alloc.dupe(u8, r.text(@intFromEnum(Col.created))), 41 + .task_run_id = try alloc.dupe(u8, r.text(@intFromEnum(Col.task_run_id))), 42 + .type = try alloc.dupe(u8, r.text(@intFromEnum(Col.type_))), 43 + .name = try alloc.dupe(u8, r.text(@intFromEnum(Col.name))), 44 + .timestamp = try alloc.dupe(u8, r.text(@intFromEnum(Col.timestamp))), 45 + }; 46 + } 47 + 48 + pub fn getById(alloc: Allocator, id: []const u8) !?TaskRunStateRow { 49 + var r = backend.db.row( 50 + "SELECT id, created, task_run_id, type, name, timestamp FROM task_run_state WHERE id = ?", 51 + .{id}, 52 + ) catch return null; 53 + if (r) |*row| { 54 + defer row.deinit(); 55 + return try rowFromResult(alloc, row); 56 + } 57 + return null; 58 + } 59 + 60 + pub fn listByTaskRunId(alloc: Allocator, task_run_id: []const u8) ![]TaskRunStateRow { 61 + var results = std.ArrayListUnmanaged(TaskRunStateRow){}; 62 + errdefer results.deinit(alloc); 63 + var rows = backend.db.query( 64 + "SELECT id, created, task_run_id, type, name, timestamp FROM task_run_state WHERE task_run_id = ? ORDER BY timestamp ASC", 65 + .{task_run_id}, 66 + ) catch |err| { 67 + log.err("database", "list task_run_states error: {}", .{err}); 68 + return err; 69 + }; 70 + defer rows.deinit(); 71 + while (rows.next()) |r| { 72 + try results.append(alloc, try rowFromResult(alloc, &r)); 73 + } 74 + return results.toOwnedSlice(alloc); 75 + }
+25 -1
src/db/task_runs.zig
··· 93 93 state_name: []const u8, 94 94 timestamp: []const u8, 95 95 ) !void { 96 - backend.db.exec( 96 + // Lock mutex only for SQLite (postgres pool handles concurrency) 97 + if (backend.db.dialect == .sqlite) backend.db.mutex.lock(); 98 + defer if (backend.db.dialect == .sqlite) backend.db.mutex.unlock(); 99 + 100 + var txn = backend.db.beginTransaction() catch |err| { 101 + log.err("database", "begin transaction error: {}", .{err}); 102 + return err; 103 + }; 104 + errdefer txn.rollback(); 105 + 106 + txn.exec( 97 107 \\UPDATE task_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ? 98 108 \\WHERE id = ? 99 109 , .{ state_id, state_type, state_name, timestamp, timestamp, run_id }) catch |err| { 100 110 log.err("database", "update task_run error: {}", .{err}); 111 + return err; 112 + }; 113 + 114 + // Insert state history record 115 + txn.exec( 116 + \\INSERT INTO task_run_state (id, task_run_id, type, name, timestamp) 117 + \\VALUES (?, ?, ?, ?, ?) 118 + , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| { 119 + log.err("database", "insert task_run_state error: {}", .{err}); 120 + return err; 121 + }; 122 + 123 + txn.commit() catch |err| { 124 + log.err("database", "commit error: {}", .{err}); 101 125 return err; 102 126 }; 103 127 }