prefect server in zig

implement deployments and deployment schedules

adds full deployment support including:
- deployment table with CRUD API (create, read, update, delete, filter, count)
- deployment_schedule table with CRUD API
- get_scheduled_flow_runs endpoint for runner polling
- create_flow_run from deployment
- pause/resume deployment endpoints
- .serve() now works end-to-end

also fixes event backfill to apply subscriber filters

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+2259 -22
+24 -13
ROADMAP.md
··· 35 35 - [ ] POST /api/task_run_states/filter 36 36 37 37 ### deployments 38 - - [ ] POST /api/deployments/ 39 - - [ ] GET /api/deployments/{id} 40 - - [ ] POST /api/deployments/filter 41 - - [ ] PATCH /api/deployments/{id} 42 - - [ ] DELETE /api/deployments/{id} 43 - - [ ] POST /api/deployments/{id}/schedule 44 - - [ ] POST /api/deployments/{id}/create_flow_run 38 + - [x] POST /api/deployments/ 39 + - [x] GET /api/deployments/{id} 40 + - [x] GET /api/deployments/name/{flow_name}/{deployment_name} 41 + - [x] POST /api/deployments/filter 42 + - [x] POST /api/deployments/count 43 + - [x] PATCH /api/deployments/{id} 44 + - [x] DELETE /api/deployments/{id} 45 + - [x] POST /api/deployments/{id}/create_flow_run 46 + - [x] POST /api/deployments/{id}/pause_deployment 47 + - [x] POST /api/deployments/{id}/resume_deployment 48 + - [x] GET /api/deployments/{id}/schedules 49 + - [x] POST /api/deployments/{id}/schedules 50 + - [x] PATCH /api/deployments/{id}/schedules/{schedule_id} 51 + - [x] DELETE /api/deployments/{id}/schedules/{schedule_id} 52 + - [x] POST /api/deployments/get_scheduled_flow_runs 45 53 46 54 ### work pools & workers 47 55 - [x] POST /api/work_pools/ ··· 145 153 - [x] events table 146 154 - [x] task_run table 147 155 - [ ] task_run_state table 148 - - [ ] deployment table 156 + - [x] deployment table 157 + - [x] deployment_schedule table 149 158 - [x] work_pool table 150 159 - [x] work_queue table 151 160 - [x] worker table ··· 196 205 - `POST /work_pools/{name}/workers/heartbeat` 197 206 - tracks worker health and last seen 198 207 199 - 4. **deployment** - flow + schedule + work pool binding (NEXT) 208 + 4. ~~**deployment** - flow + schedule + work pool binding~~ ✓ 200 209 - table + CRUD API 201 210 - links flow_id → work_pool → work_queue 211 + - `.serve()` support via `POST /deployments/get_scheduled_flow_runs` 202 212 203 - 5. **deployment_schedule** - cron/interval/rrule schedules 213 + 5. ~~**deployment_schedule** - cron/interval/rrule schedules~~ ✓ 204 214 - table, linked to deployment 205 - - schedule type parsing 215 + - CRUD API for managing schedules 206 216 207 - 6. **scheduler service** - creates runs from schedules 217 + 6. **scheduler service** - creates runs from schedules (NEXT) 208 218 - background service 209 219 - queries deployments needing runs 210 220 - creates flow_runs in SCHEDULED state ··· 218 228 - blocks (types, schemas, documents) 219 229 - variables (full CRUD) 220 230 - work pools, work queues, workers (full CRUD + heartbeat) 221 - - events (ingest via websocket, persist, broadcast with backfill) 231 + - deployments + schedules (full CRUD, `.serve()` support) 232 + - events (ingest via websocket, persist, broadcast with filtered backfill) 222 233 - dual database backends (sqlite/postgres) 223 234 - dual message brokers (memory/redis)
+5 -1
loq.toml
··· 10 10 11 11 [[rules]] 12 12 path = "scripts/test-api-sequence" 13 - max_lines = 725 13 + max_lines = 1000 14 14 15 15 [[rules]] 16 16 path = "src/broker/redis.zig" ··· 19 19 [[rules]] 20 20 path = "src/api/work_pools.zig" 21 21 max_lines = 550 22 + 23 + [[rules]] 24 + path = "src/api/deployments.zig" 25 + max_lines = 900
+189
scripts/test-api-sequence
··· 649 649 return True 650 650 651 651 652 + def test_deployments(client: CountingClient) -> bool: 653 + """Test deployments API (deployments, schedules, create_flow_run).""" 654 + # create a flow first 655 + if not QUIET: 656 + console.print("[bold]setup: create flow[/bold]") 657 + resp = client.post("/flows/", json={"name": f"deploy-flow-{uuid.uuid4().hex[:8]}"}) 658 + if resp.status_code not in (200, 201): 659 + if not QUIET: 660 + console.print(f"[red]FAIL[/red]: create flow {resp.status_code}") 661 + return False 662 + flow = resp.json() 663 + flow_id = flow.get("id") 664 + flow_name = flow.get("name") 665 + 666 + # create deployment 667 + deployment_name = f"test-deployment-{uuid.uuid4().hex[:8]}" 668 + if not QUIET: 669 + console.print("[bold]deployments[/bold]") 670 + resp = client.post("/deployments/", json={ 671 + "name": deployment_name, 672 + "flow_id": flow_id, 673 + "description": "test deployment", 674 + "tags": ["test", "benchmark"], 675 + "parameters": {"key": "value"}, 676 + "schedules": [ 677 + {"schedule": {"interval": 3600}, "active": True}, 678 + ], 679 + }) 680 + if resp.status_code not in (200, 201): 681 + if not QUIET: 682 + console.print(f"[red]FAIL[/red]: create deployment {resp.status_code} {resp.text}") 683 + return False 684 + deployment = resp.json() 685 + if not validate_response(deployment, ["id", "name", "flow_id", "status", "schedules"], {"id": str, "name": str}): 686 + return False 687 + deployment_id = deployment.get("id") 688 + if not QUIET: 689 + console.print(f" created: {deployment_id}") 690 + 691 + # verify schedules were created 692 + schedules = deployment.get("schedules", []) 693 + if not isinstance(schedules, list) or len(schedules) != 1: 694 + if not QUIET: 695 + console.print(f"[red]FAIL[/red]: expected 1 schedule, got {len(schedules) if isinstance(schedules, list) else 'not a list'}") 696 + return False 697 + if not QUIET: 698 + console.print(f" schedules: {len(schedules)}") 699 + 700 + # get by id 701 + resp = client.get(f"/deployments/{deployment_id}") 702 + if resp.status_code != 200: 703 + if not QUIET: 704 + console.print(f"[red]FAIL[/red]: get deployment {resp.status_code}") 705 + return False 706 + 707 + # get by name 708 + resp = client.get(f"/deployments/name/{flow_name}/{deployment_name}") 709 + if resp.status_code != 200: 710 + if not QUIET: 711 + console.print(f"[red]FAIL[/red]: get deployment by name {resp.status_code}") 712 + return False 713 + if not QUIET: 714 + console.print(" get by name: ok") 715 + 716 + # update 717 + resp = client.patch(f"/deployments/{deployment_id}", json={"description": "updated"}) 718 + if resp.status_code != 204: 719 + if not QUIET: 720 + console.print(f"[red]FAIL[/red]: update deployment {resp.status_code}") 721 + return False 722 + if not QUIET: 723 + console.print(" updated") 724 + 725 + # filter 726 + resp = client.post("/deployments/filter", json={"limit": 10}) 727 + if resp.status_code != 200: 728 + if not QUIET: 729 + console.print(f"[red]FAIL[/red]: filter deployments {resp.status_code}") 730 + return False 731 + if not QUIET: 732 + console.print(f" filter: {len(resp.json())} items") 733 + 734 + # count 735 + resp = client.post("/deployments/count", json={}) 736 + if resp.status_code != 200: 737 + if not QUIET: 738 + console.print(f"[red]FAIL[/red]: count deployments {resp.status_code}") 739 + return False 740 + if not QUIET: 741 + console.print(f" count: {resp.text}") 742 + 743 + # pause 744 + resp = client.post(f"/deployments/{deployment_id}/pause_deployment", json={}) 745 + if resp.status_code != 204: 746 + if not QUIET: 747 + console.print(f"[red]FAIL[/red]: pause deployment {resp.status_code}") 748 + return False 749 + if not QUIET: 750 + console.print(" paused") 751 + 752 + # resume 753 + resp = client.post(f"/deployments/{deployment_id}/resume_deployment", json={}) 754 + if resp.status_code != 204: 755 + if not QUIET: 756 + console.print(f"[red]FAIL[/red]: resume deployment {resp.status_code}") 757 + return False 758 + if not QUIET: 759 + console.print(" resumed") 760 + 761 + # create flow run from deployment 762 + if not QUIET: 763 + console.print("[bold]create_flow_run[/bold]") 764 + resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={}) 765 + if resp.status_code not in (200, 201): 766 + if not QUIET: 767 + console.print(f"[red]FAIL[/red]: create flow run {resp.status_code} {resp.text}") 768 + return False 769 + flow_run = resp.json() 770 + if not validate_response(flow_run, ["id", "flow_id", "deployment_id"], {"id": str}): 771 + return False 772 + if flow_run.get("deployment_id") != deployment_id: 773 + if not QUIET: 774 + console.print(f"[red]FAIL[/red]: deployment_id mismatch") 775 + return False 776 + if not QUIET: 777 + console.print(f" created flow run: {flow_run.get('id')}") 778 + 779 + # schedules - list 780 + if not QUIET: 781 + console.print("[bold]deployment_schedules[/bold]") 782 + resp = client.get(f"/deployments/{deployment_id}/schedules") 783 + if resp.status_code != 200: 784 + if not QUIET: 785 + console.print(f"[red]FAIL[/red]: list schedules {resp.status_code}") 786 + return False 787 + schedules = resp.json() 788 + schedule_id = schedules[0].get("id") if schedules else None 789 + if not QUIET: 790 + console.print(f" list: {len(schedules)} schedules") 791 + 792 + # schedules - create 793 + resp = client.post(f"/deployments/{deployment_id}/schedules", json={ 794 + "schedule": {"cron": "0 0 * * *"}, 795 + "active": False, 796 + }) 797 + if resp.status_code not in (200, 201): 798 + if not QUIET: 799 + console.print(f"[red]FAIL[/red]: create schedule {resp.status_code}") 800 + return False 801 + created_schedules = resp.json() 802 + if not isinstance(created_schedules, list) or len(created_schedules) != 1: 803 + if not QUIET: 804 + console.print(f"[red]FAIL[/red]: expected 1 created schedule") 805 + return False 806 + new_schedule_id = created_schedules[0].get("id") 807 + if not QUIET: 808 + console.print(f" created schedule: {new_schedule_id}") 809 + 810 + # schedules - update 811 + resp = client.patch(f"/deployments/{deployment_id}/schedules/{new_schedule_id}", json={"active": True}) 812 + if resp.status_code != 204: 813 + if not QUIET: 814 + console.print(f"[red]FAIL[/red]: update schedule {resp.status_code}") 815 + return False 816 + if not QUIET: 817 + console.print(" updated schedule") 818 + 819 + # schedules - delete 820 + resp = client.delete(f"/deployments/{deployment_id}/schedules/{new_schedule_id}") 821 + if resp.status_code != 204: 822 + if not QUIET: 823 + console.print(f"[red]FAIL[/red]: delete schedule {resp.status_code}") 824 + return False 825 + if not QUIET: 826 + console.print(" deleted schedule") 827 + 828 + # delete deployment 829 + resp = client.delete(f"/deployments/{deployment_id}") 830 + if resp.status_code != 204: 831 + if not QUIET: 832 + console.print(f"[red]FAIL[/red]: delete deployment {resp.status_code}") 833 + return False 834 + if not QUIET: 835 + console.print(" deleted deployment") 836 + 837 + return True 838 + 839 + 652 840 def main(): 653 841 json_output = "--json" in sys.argv 654 842 ··· 667 855 results.append(run_test("variables", test_variables)) 668 856 results.append(run_test("blocks", test_blocks)) 669 857 results.append(run_test("work_pools", test_work_pools)) 858 + results.append(run_test("deployments", test_deployments)) 670 859 671 860 total_duration = sum(r.duration_ms for r in results) 672 861 total_requests = sum(r.requests for r in results)
+39
scripts/test-serve
··· 1 + #!/usr/bin/env python3 2 + """Test deployment .serve() functionality""" 3 + 4 + import sys 5 + import threading 6 + import time 7 + from prefect import flow 8 + from prefect.client.orchestration import get_client 9 + 10 + @flow(log_prints=True) 11 + def test_flow(name: str = "world"): 12 + print(f"Hello, {name}!") 13 + return f"greeted {name}" 14 + 15 + 16 + def main(): 17 + print("Testing .serve() functionality...") 18 + 19 + # Start serve in background thread 20 + serve_thread = threading.Thread( 21 + target=lambda: test_flow.serve(name="test-serve"), 22 + daemon=True 23 + ) 24 + serve_thread.start() 25 + 26 + # Give it time to create deployment and start polling 27 + time.sleep(3) 28 + 29 + # Verify deployment was created 30 + with get_client(sync_client=True) as client: 31 + deployment = client.read_deployment_by_name("test-flow/test-serve") 32 + print(f"✓ Deployment created: {deployment.id}") 33 + print(f" name: {deployment.name}") 34 + print(f" flow_id: {deployment.flow_id}") 35 + print("\n✓ .serve() test passed") 36 + 37 + 38 + if __name__ == "__main__": 39 + main()
+304
src/api/deployment_schedules.zig
··· 1 + const std = @import("std"); 2 + const zap = @import("zap"); 3 + const mem = std.mem; 4 + const json = std.json; 5 + 6 + const db = @import("../db/sqlite.zig"); 7 + const uuid_util = @import("../utilities/uuid.zig"); 8 + const time_util = @import("../utilities/time.zig"); 9 + const json_util = @import("../utilities/json.zig"); 10 + const deployments = @import("deployments.zig"); 11 + 12 + pub fn handle(r: zap.Request, target: []const u8) !void { 13 + const method = r.method orelse "GET"; 14 + 15 + const deployment_id = deployments.extractDeploymentId(target) orelse { 16 + json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 17 + return; 18 + }; 19 + 20 + // Extract schedule_id if present 21 + const schedule_id = extractScheduleId(target); 22 + 23 + // GET /deployments/{id}/schedules 24 + if (mem.eql(u8, method, "GET") and schedule_id == null) { 25 + try listSchedules(r, deployment_id); 26 + return; 27 + } 28 + 29 + // POST /deployments/{id}/schedules 30 + if (mem.eql(u8, method, "POST") and schedule_id == null) { 31 + try createSchedules(r, deployment_id); 32 + return; 33 + } 34 + 35 + // PATCH /deployments/{id}/schedules/{schedule_id} 36 + if (mem.eql(u8, method, "PATCH") and schedule_id != null) { 37 + try updateSchedule(r, schedule_id.?); 38 + return; 39 + } 40 + 41 + // DELETE /deployments/{id}/schedules/{schedule_id} 42 + if (mem.eql(u8, method, "DELETE") and schedule_id != null) { 43 + try deleteSchedule(r, schedule_id.?); 44 + return; 45 + } 46 + 47 + json_util.sendStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented); 48 + } 49 + 50 + fn extractScheduleId(target: []const u8) ?[]const u8 { 51 + const idx = mem.indexOf(u8, target, "/schedules/") orelse return null; 52 + const start = idx + 11; 53 + if (start >= target.len) return null; 54 + 55 + const after = target[start..]; 56 + const end = mem.indexOf(u8, after, "/") orelse after.len; 57 + if (end == 0) return null; 58 + 59 + return after[0..end]; 60 + } 61 + 62 + fn listSchedules(r: zap.Request, deployment_id: []const u8) !void { 63 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 64 + defer arena.deinit(); 65 + const alloc = arena.allocator(); 66 + 67 + // Verify deployment exists 68 + _ = db.deployments.getById(alloc, deployment_id) catch null orelse { 69 + json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 70 + return; 71 + }; 72 + 73 + const schedules_list = db.deployment_schedules.listByDeployment(alloc, deployment_id) catch { 74 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 75 + return; 76 + }; 77 + 78 + var output: std.io.Writer.Allocating = .init(alloc); 79 + var jw: json.Stringify = .{ .writer = &output.writer }; 80 + 81 + jw.beginArray() catch { 82 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 83 + return; 84 + }; 85 + 86 + for (schedules_list) |s| { 87 + writeScheduleObject(&jw, s) catch continue; 88 + } 89 + 90 + jw.endArray() catch {}; 91 + 92 + json_util.send(r, output.toOwnedSlice() catch "[]"); 93 + } 94 + 95 + fn createSchedules(r: zap.Request, deployment_id: []const u8) !void { 96 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 97 + defer arena.deinit(); 98 + const alloc = arena.allocator(); 99 + 100 + // Verify deployment exists 101 + _ = db.deployments.getById(alloc, deployment_id) catch null orelse { 102 + json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 103 + return; 104 + }; 105 + 106 + const body = r.body orelse { 107 + json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request); 108 + return; 109 + }; 110 + 111 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 112 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 113 + return; 114 + }; 115 + 116 + var ts_buf: [32]u8 = undefined; 117 + const now = time_util.timestamp(&ts_buf); 118 + 119 + // Body can be a single schedule or array of schedules 120 + const schedules_arr = if (parsed.value == .array) 121 + parsed.value.array.items 122 + else if (parsed.value == .object) 123 + &[_]json.Value{parsed.value} 124 + else { 125 + json_util.sendStatus(r, "{\"detail\":\"expected object or array\"}", .bad_request); 126 + return; 127 + }; 128 + 129 + var created_ids = std.ArrayListUnmanaged([]const u8){}; 130 + 131 + for (schedules_arr) |sched_val| { 132 + if (sched_val != .object) continue; 133 + const obj = sched_val.object; 134 + 135 + const schedule_json = getJsonString(alloc, obj, "schedule") orelse { 136 + continue; // Skip invalid schedules 137 + }; 138 + 139 + var id_buf: [36]u8 = undefined; 140 + const new_id = uuid_util.generate(&id_buf); 141 + const id_copy = try alloc.dupe(u8, new_id); 142 + 143 + db.deployment_schedules.insert(id_copy, deployment_id, schedule_json, now, .{ 144 + .active = getBool(obj, "active") orelse true, 145 + .max_scheduled_runs = getInt(obj, "max_scheduled_runs"), 146 + .parameters = getJsonString(alloc, obj, "parameters") orelse "{}", 147 + .slug = getString(obj, "slug"), 148 + }) catch continue; 149 + 150 + try created_ids.append(alloc, id_copy); 151 + } 152 + 153 + // Return created schedules 154 + var output: std.io.Writer.Allocating = .init(alloc); 155 + var jw: json.Stringify = .{ .writer = &output.writer }; 156 + 157 + jw.beginArray() catch { 158 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 159 + return; 160 + }; 161 + 162 + for (created_ids.items) |id| { 163 + if (db.deployment_schedules.getById(alloc, id) catch null) |s| { 164 + writeScheduleObject(&jw, s) catch continue; 165 + } 166 + } 167 + 168 + jw.endArray() catch {}; 169 + 170 + json_util.sendStatus(r, output.toOwnedSlice() catch "[]", .created); 171 + } 172 + 173 + fn updateSchedule(r: zap.Request, schedule_id: []const u8) !void { 174 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 175 + defer arena.deinit(); 176 + const alloc = arena.allocator(); 177 + 178 + const body = r.body orelse { 179 + json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request); 180 + return; 181 + }; 182 + 183 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 184 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 185 + return; 186 + }; 187 + 188 + var ts_buf: [32]u8 = undefined; 189 + const now = time_util.timestamp(&ts_buf); 190 + 191 + const obj = parsed.value.object; 192 + 193 + const updated = db.deployment_schedules.updateById(schedule_id, now, .{ 194 + .schedule = getJsonString(alloc, obj, "schedule"), 195 + .active = getBool(obj, "active"), 196 + .max_scheduled_runs = getInt(obj, "max_scheduled_runs"), 197 + .parameters = getJsonString(alloc, obj, "parameters"), 198 + .slug = getString(obj, "slug"), 199 + }) catch { 200 + json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 201 + return; 202 + }; 203 + 204 + if (!updated) { 205 + json_util.sendStatus(r, "{\"detail\":\"Schedule not found\"}", .not_found); 206 + return; 207 + } 208 + 209 + r.setStatus(.no_content); 210 + r.sendBody("") catch {}; 211 + } 212 + 213 + fn deleteSchedule(r: zap.Request, schedule_id: []const u8) !void { 214 + const deleted = db.deployment_schedules.deleteById(schedule_id) catch { 215 + json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); 216 + return; 217 + }; 218 + 219 + if (!deleted) { 220 + json_util.sendStatus(r, "{\"detail\":\"Schedule not found\"}", .not_found); 221 + return; 222 + } 223 + 224 + r.setStatus(.no_content); 225 + r.sendBody("") catch {}; 226 + } 227 + 228 + // Called from deployments.zig to replace all schedules during create/update 229 + pub fn replaceSchedules(alloc: std.mem.Allocator, deployment_id: []const u8, sched_val: json.Value, now: []const u8) !void { 230 + // Delete existing schedules 231 + _ = db.deployment_schedules.deleteByDeployment(deployment_id) catch {}; 232 + 233 + // Insert new schedules 234 + const schedules_arr = if (sched_val == .array) sched_val.array.items else return; 235 + 236 + for (schedules_arr) |s| { 237 + if (s != .object) continue; 238 + const obj = s.object; 239 + 240 + const schedule_json = getJsonString(alloc, obj, "schedule") orelse continue; 241 + 242 + var id_buf: [36]u8 = undefined; 243 + const new_id = uuid_util.generate(&id_buf); 244 + 245 + db.deployment_schedules.insert(new_id, deployment_id, schedule_json, now, .{ 246 + .active = getBool(obj, "active") orelse true, 247 + .max_scheduled_runs = getInt(obj, "max_scheduled_runs"), 248 + .parameters = getJsonString(alloc, obj, "parameters") orelse "{}", 249 + .slug = getString(obj, "slug"), 250 + }) catch continue; 251 + } 252 + } 253 + 254 + // JSON helpers 255 + 256 + fn getString(obj: json.ObjectMap, key: []const u8) ?[]const u8 { 257 + const v = obj.get(key) orelse return null; 258 + return if (v == .string) v.string else null; 259 + } 260 + 261 + fn getBool(obj: json.ObjectMap, key: []const u8) ?bool { 262 + const v = obj.get(key) orelse return null; 263 + return if (v == .bool) v.bool else null; 264 + } 265 + 266 + fn getInt(obj: json.ObjectMap, key: []const u8) ?i64 { 267 + const v = obj.get(key) orelse return null; 268 + return if (v == .integer) v.integer else null; 269 + } 270 + 271 + fn getJsonString(alloc: std.mem.Allocator, obj: json.ObjectMap, key: []const u8) ?[]const u8 { 272 + const v = obj.get(key) orelse return null; 273 + if (v == .null) return null; 274 + return std.fmt.allocPrint(alloc, "{f}", .{json.fmt(v, .{})}) catch null; 275 + } 276 + 277 + pub fn writeScheduleObject(jw: *json.Stringify, s: db.deployment_schedules.DeploymentScheduleRow) !void { 278 + try jw.beginObject(); 279 + 280 + try jw.objectField("id"); 281 + try jw.write(s.id); 282 + try jw.objectField("created"); 283 + try jw.write(s.created); 284 + try jw.objectField("updated"); 285 + try jw.write(s.updated); 286 + try jw.objectField("deployment_id"); 287 + try jw.write(s.deployment_id); 288 + try jw.objectField("schedule"); 289 + try jw.beginWriteRaw(); 290 + try jw.writer.writeAll(s.schedule); 291 + jw.endWriteRaw(); 292 + try jw.objectField("active"); 293 + try jw.write(s.active); 294 + try jw.objectField("max_scheduled_runs"); 295 + try jw.write(s.max_scheduled_runs); 296 + try jw.objectField("parameters"); 297 + try jw.beginWriteRaw(); 298 + try jw.writer.writeAll(s.parameters); 299 + jw.endWriteRaw(); 300 + try jw.objectField("slug"); 301 + try jw.write(s.slug); 302 + 303 + try jw.endObject(); 304 + }
+820
src/api/deployments.zig
··· 1 + const std = @import("std"); 2 + const zap = @import("zap"); 3 + const mem = std.mem; 4 + const json = std.json; 5 + 6 + const db = @import("../db/sqlite.zig"); 7 + const uuid_util = @import("../utilities/uuid.zig"); 8 + const time_util = @import("../utilities/time.zig"); 9 + const json_util = @import("../utilities/json.zig"); 10 + 11 + const schedules = @import("deployment_schedules.zig"); 12 + 13 + pub fn handle(r: zap.Request) !void { 14 + const target = r.path orelse "/"; 15 + const method = r.method orelse "GET"; 16 + 17 + // POST /deployments/filter 18 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) { 19 + try filter(r); 20 + return; 21 + } 22 + 23 + // POST /deployments/count 24 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/count")) { 25 + try count(r); 26 + return; 27 + } 28 + 29 + // POST /deployments/get_scheduled_flow_runs 30 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/get_scheduled_flow_runs")) { 31 + try getScheduledFlowRuns(r); 32 + return; 33 + } 34 + 35 + // GET /deployments/name/{flow_name}/{deployment_name} 36 + if (mem.eql(u8, method, "GET") and mem.indexOf(u8, target, "/name/") != null) { 37 + try getByName(r, target); 38 + return; 39 + } 40 + 41 + // POST /deployments/{id}/create_flow_run 42 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/create_flow_run")) { 43 + try createFlowRun(r, target); 44 + return; 45 + } 46 + 47 + // POST /deployments/{id}/pause_deployment 48 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/pause_deployment")) { 49 + try pause(r, target); 50 + return; 51 + } 52 + 53 + // POST /deployments/{id}/resume_deployment 54 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/resume_deployment")) { 55 + try resume_(r, target); 56 + return; 57 + } 58 + 59 + // Schedule endpoints 60 + if (mem.indexOf(u8, target, "/schedules") != null) { 61 + try schedules.handle(r, target); 62 + return; 63 + } 64 + 65 + // POST /deployments/ - create deployment 66 + if (mem.eql(u8, method, "POST")) { 67 + const is_root = mem.endsWith(u8, target, "/deployments/") or mem.endsWith(u8, target, "/deployments"); 68 + if (is_root) { 69 + try create(r); 70 + return; 71 + } 72 + } 73 + 74 + // GET /deployments/{id} 75 + if (mem.eql(u8, method, "GET")) { 76 + const id = extractDeploymentId(target) orelse { 77 + json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 78 + return; 79 + }; 80 + try read(r, id); 81 + return; 82 + } 83 + 84 + // PATCH /deployments/{id} 85 + if (mem.eql(u8, method, "PATCH")) { 86 + const id = extractDeploymentId(target) orelse { 87 + json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 88 + return; 89 + }; 90 + try update(r, id); 91 + return; 92 + } 93 + 94 + // DELETE /deployments/{id} 95 + if (mem.eql(u8, method, "DELETE")) { 96 + const id = extractDeploymentId(target) orelse { 97 + json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 98 + return; 99 + }; 100 + try delete(r, id); 101 + return; 102 + } 103 + 104 + json_util.sendStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented); 105 + } 106 + 107 + // Path helper 108 + 109 + pub fn extractDeploymentId(target: []const u8) ?[]const u8 { 110 + const prefix = if (mem.startsWith(u8, target, "/api/deployments/")) 111 + "/api/deployments/" 112 + else if (mem.startsWith(u8, target, "/deployments/")) 113 + "/deployments/" 114 + else 115 + return null; 116 + 117 + if (target.len <= prefix.len) return null; 118 + 119 + const after = target[prefix.len..]; 120 + const end = mem.indexOf(u8, after, "/") orelse after.len; 121 + if (end == 0) return null; 122 + 123 + return after[0..end]; 124 + } 125 + 126 + // CRUD handlers 127 + 128 + fn create(r: zap.Request) !void { 129 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 130 + defer arena.deinit(); 131 + const alloc = arena.allocator(); 132 + 133 + const body = r.body orelse { 134 + json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request); 135 + return; 136 + }; 137 + 138 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 139 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 140 + return; 141 + }; 142 + 143 + const obj = parsed.value.object; 144 + 145 + const name = getString(obj, "name") orelse { 146 + json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); 147 + return; 148 + }; 149 + 150 + const flow_id = getString(obj, "flow_id") orelse { 151 + json_util.sendStatus(r, "{\"detail\":\"flow_id required\"}", .bad_request); 152 + return; 153 + }; 154 + 155 + // Verify flow exists 156 + _ = db.flows.getById(alloc, flow_id) catch null orelse { 157 + json_util.sendStatus(r, "{\"detail\":\"Flow not found\"}", .not_found); 158 + return; 159 + }; 160 + 161 + var ts_buf: [32]u8 = undefined; 162 + const now = time_util.timestamp(&ts_buf); 163 + 164 + // Check for existing deployment (upsert) 165 + if (db.deployments.getByFlowAndName(alloc, flow_id, name) catch null) |existing| { 166 + _ = db.deployments.updateById(existing.id, now, buildUpdateParams(obj)) catch { 167 + json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 168 + return; 169 + }; 170 + if (obj.get("schedules")) |sched_val| { 171 + try schedules.replaceSchedules(alloc, existing.id, sched_val, now); 172 + } 173 + const deployment = db.deployments.getById(alloc, existing.id) catch null orelse { 174 + json_util.sendStatus(r, "{\"detail\":\"not found after update\"}", .internal_server_error); 175 + return; 176 + }; 177 + const resp = writeDeployment(alloc, deployment) catch { 178 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 179 + return; 180 + }; 181 + json_util.send(r, resp); 182 + return; 183 + } 184 + 185 + // Create new 186 + var id_buf: [36]u8 = undefined; 187 + const new_id = uuid_util.generate(&id_buf); 188 + 189 + db.deployments.insert(new_id, name, flow_id, now, buildInsertParams(obj)) catch { 190 + json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 191 + return; 192 + }; 193 + 194 + if (obj.get("schedules")) |sched_val| { 195 + try schedules.replaceSchedules(alloc, new_id, sched_val, now); 196 + } 197 + 198 + const deployment = db.deployments.getById(alloc, new_id) catch null orelse { 199 + json_util.sendStatus(r, "{\"detail\":\"not found after insert\"}", .internal_server_error); 200 + return; 201 + }; 202 + const resp = writeDeployment(alloc, deployment) catch { 203 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 204 + return; 205 + }; 206 + json_util.sendStatus(r, resp, .created); 207 + } 208 + 209 + fn read(r: zap.Request, id: []const u8) !void { 210 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 211 + defer arena.deinit(); 212 + const alloc = arena.allocator(); 213 + 214 + const deployment = db.deployments.getById(alloc, id) catch null orelse { 215 + json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 216 + return; 217 + }; 218 + 219 + const resp = writeDeployment(alloc, deployment) catch { 220 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 221 + return; 222 + }; 223 + json_util.send(r, resp); 224 + } 225 + 226 + fn getByName(r: zap.Request, target: []const u8) !void { 227 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 228 + defer arena.deinit(); 229 + const alloc = arena.allocator(); 230 + 231 + const name_idx = mem.indexOf(u8, target, "/name/") orelse { 232 + json_util.sendStatus(r, "{\"detail\":\"invalid path\"}", .bad_request); 233 + return; 234 + }; 235 + const after_name = target[name_idx + 6 ..]; 236 + const sep_idx = mem.indexOf(u8, after_name, "/") orelse { 237 + json_util.sendStatus(r, "{\"detail\":\"deployment name required\"}", .bad_request); 238 + return; 239 + }; 240 + const flow_name = after_name[0..sep_idx]; 241 + const deployment_name = after_name[sep_idx + 1 ..]; 242 + if (deployment_name.len == 0) { 243 + json_util.sendStatus(r, "{\"detail\":\"deployment name required\"}", .bad_request); 244 + return; 245 + } 246 + 247 + const flow = db.flows.getByName(alloc, flow_name) catch null orelse { 248 + json_util.sendStatus(r, "{\"detail\":\"Flow not found\"}", .not_found); 249 + return; 250 + }; 251 + 252 + const deployment = db.deployments.getByFlowAndName(alloc, flow.id, deployment_name) catch null orelse { 253 + json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 254 + return; 255 + }; 256 + 257 + const resp = writeDeployment(alloc, deployment) catch { 258 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 259 + return; 260 + }; 261 + json_util.send(r, resp); 262 + } 263 + 264 + fn update(r: zap.Request, id: []const u8) !void { 265 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 266 + defer arena.deinit(); 267 + const alloc = arena.allocator(); 268 + 269 + const body = r.body orelse { 270 + json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request); 271 + return; 272 + }; 273 + 274 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 275 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 276 + return; 277 + }; 278 + 279 + var ts_buf: [32]u8 = undefined; 280 + const now = time_util.timestamp(&ts_buf); 281 + 282 + const updated = db.deployments.updateById(id, now, buildUpdateParams(parsed.value.object)) catch { 283 + json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 284 + return; 285 + }; 286 + 287 + if (!updated) { 288 + json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 289 + return; 290 + } 291 + 292 + r.setStatus(.no_content); 293 + r.sendBody("") catch {}; 294 + } 295 + 296 + fn delete(r: zap.Request, id: []const u8) !void { 297 + const deleted = db.deployments.deleteById(id) catch { 298 + json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); 299 + return; 300 + }; 301 + 302 + if (!deleted) { 303 + json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 304 + return; 305 + } 306 + 307 + r.setStatus(.no_content); 308 + r.sendBody("") catch {}; 309 + } 310 + 311 + fn filter(r: zap.Request) !void { 312 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 313 + defer arena.deinit(); 314 + const alloc = arena.allocator(); 315 + 316 + var limit: usize = 200; 317 + var offset: usize = 0; 318 + 319 + if (r.body) |body| { 320 + if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 321 + const obj = parsed.value.object; 322 + if (obj.get("limit")) |v| { 323 + if (v == .integer) limit = @intCast(v.integer); 324 + } 325 + if (obj.get("offset")) |v| { 326 + if (v == .integer) offset = @intCast(v.integer); 327 + } 328 + } else |_| {} 329 + } 330 + 331 + const deployments_list = db.deployments.list(alloc, limit, offset) catch { 332 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 333 + return; 334 + }; 335 + 336 + var output: std.io.Writer.Allocating = .init(alloc); 337 + var jw: json.Stringify = .{ .writer = &output.writer }; 338 + 339 + jw.beginArray() catch { 340 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 341 + return; 342 + }; 343 + 344 + for (deployments_list) |d| { 345 + writeDeploymentObject(&jw, d, alloc) catch continue; 346 + } 347 + 348 + jw.endArray() catch {}; 349 + 350 + json_util.send(r, output.toOwnedSlice() catch "[]"); 351 + } 352 + 353 + fn count(r: zap.Request) !void { 354 + const c = db.deployments.count() catch 0; 355 + var buf: [32]u8 = undefined; 356 + const resp = std.fmt.bufPrint(&buf, "{d}", .{c}) catch "0"; 357 + json_util.send(r, resp); 358 + } 359 + 360 + // Action handlers 361 + 362 + fn createFlowRun(r: zap.Request, target: []const u8) !void { 363 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 364 + defer arena.deinit(); 365 + const alloc = arena.allocator(); 366 + 367 + const id = extractDeploymentId(target) orelse { 368 + json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 369 + return; 370 + }; 371 + 372 + const deployment = db.deployments.getById(alloc, id) catch null orelse { 373 + json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 374 + return; 375 + }; 376 + 377 + var state_type: []const u8 = "SCHEDULED"; 378 + var state_name: []const u8 = "Scheduled"; 379 + 380 + if (r.body) |body| { 381 + if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 382 + const obj = parsed.value.object; 383 + if (obj.get("state")) |s| { 384 + if (s == .object) { 385 + if (s.object.get("type")) |t| { 386 + if (t == .string) state_type = t.string; 387 + } 388 + if (s.object.get("name")) |n| { 389 + if (n == .string) state_name = n.string; 390 + } 391 + } 392 + } 393 + } else |_| {} 394 + } 395 + 396 + var id_buf: [36]u8 = undefined; 397 + const run_id = uuid_util.generate(&id_buf); 398 + 399 + var ts_buf: [32]u8 = undefined; 400 + const now = time_util.timestamp(&ts_buf); 401 + 402 + var name_buf: [64]u8 = undefined; 403 + const run_name = std.fmt.bufPrint(&name_buf, "{s}-{s}", .{ 404 + deployment.name[0..@min(deployment.name.len, 20)], 405 + run_id[0..8], 406 + }) catch "run"; 407 + 408 + db.flow_runs.insert(run_id, deployment.flow_id, run_name, state_type, state_name, now, .{ 409 + .deployment_id = deployment.id, 410 + .deployment_version = deployment.version, 411 + .work_queue_name = deployment.work_queue_name, 412 + .work_queue_id = deployment.work_queue_id, 413 + }) catch { 414 + json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 415 + return; 416 + }; 417 + 418 + const run = db.flow_runs.get(alloc, run_id) catch null orelse { 419 + json_util.sendStatus(r, "{\"detail\":\"not found after insert\"}", .internal_server_error); 420 + return; 421 + }; 422 + 423 + var state_id_buf: [36]u8 = undefined; 424 + const state_id = uuid_util.generate(&state_id_buf); 425 + 426 + const resp = writeFlowRunResponse(alloc, run, state_id) catch { 427 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 428 + return; 429 + }; 430 + json_util.sendStatus(r, resp, .created); 431 + } 432 + 433 + fn pause(r: zap.Request, target: []const u8) !void { 434 + const id = extractDeploymentId(target) orelse { 435 + json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 436 + return; 437 + }; 438 + 439 + var ts_buf: [32]u8 = undefined; 440 + const now = time_util.timestamp(&ts_buf); 441 + 442 + const updated = db.deployments.updateById(id, now, .{ .paused = true }) catch { 443 + json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 444 + return; 445 + }; 446 + 447 + if (!updated) { 448 + json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 449 + return; 450 + } 451 + 452 + r.setStatus(.no_content); 453 + r.sendBody("") catch {}; 454 + } 455 + 456 + fn resume_(r: zap.Request, target: []const u8) !void { 457 + const id = extractDeploymentId(target) orelse { 458 + json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 459 + return; 460 + }; 461 + 462 + var ts_buf: [32]u8 = undefined; 463 + const now = time_util.timestamp(&ts_buf); 464 + 465 + const updated = db.deployments.updateById(id, now, .{ .paused = false }) catch { 466 + json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 467 + return; 468 + }; 469 + 470 + if (!updated) { 471 + json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 472 + return; 473 + } 474 + 475 + r.setStatus(.no_content); 476 + r.sendBody("") catch {}; 477 + } 478 + 479 + fn getScheduledFlowRuns(r: zap.Request) !void { 480 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 481 + defer arena.deinit(); 482 + const alloc = arena.allocator(); 483 + 484 + const body = r.body orelse { 485 + json_util.send(r, "[]"); 486 + return; 487 + }; 488 + 489 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 490 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 491 + return; 492 + }; 493 + 494 + const obj = parsed.value.object; 495 + 496 + const ids_val = obj.get("deployment_ids") orelse { 497 + json_util.send(r, "[]"); 498 + return; 499 + }; 500 + 501 + if (ids_val != .array) { 502 + json_util.send(r, "[]"); 503 + return; 504 + } 505 + 506 + var deployment_ids = std.ArrayListUnmanaged([]const u8){}; 507 + for (ids_val.array.items) |item| { 508 + if (item == .string) { 509 + try deployment_ids.append(alloc, item.string); 510 + } 511 + } 512 + 513 + if (deployment_ids.items.len == 0) { 514 + json_util.send(r, "[]"); 515 + return; 516 + } 517 + 518 + var scheduled_before: ?[]const u8 = null; 519 + if (obj.get("scheduled_before")) |v| { 520 + if (v == .string) scheduled_before = v.string; 521 + } 522 + 523 + var limit: usize = 100; 524 + if (obj.get("limit")) |v| { 525 + if (v == .integer) limit = @intCast(v.integer); 526 + } 527 + 528 + const runs = db.flow_runs.getScheduledByDeployments(alloc, deployment_ids.items, scheduled_before, limit) catch { 529 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 530 + return; 531 + }; 532 + 533 + var output: std.io.Writer.Allocating = .init(alloc); 534 + var jw: json.Stringify = .{ .writer = &output.writer }; 535 + 536 + jw.beginArray() catch { 537 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 538 + return; 539 + }; 540 + 541 + for (runs) |run| { 542 + var state_id_buf: [36]u8 = undefined; 543 + const state_id = uuid_util.generate(&state_id_buf); 544 + writeFlowRunObject(&jw, run, state_id) catch continue; 545 + } 546 + 547 + jw.endArray() catch {}; 548 + 549 + json_util.send(r, output.toOwnedSlice() catch "[]"); 550 + } 551 + 552 + // JSON helpers 553 + 554 + fn getString(obj: json.ObjectMap, key: []const u8) ?[]const u8 { 555 + const v = obj.get(key) orelse return null; 556 + return if (v == .string) v.string else null; 557 + } 558 + 559 + fn getBool(obj: json.ObjectMap, key: []const u8) ?bool { 560 + const v = obj.get(key) orelse return null; 561 + return if (v == .bool) v.bool else null; 562 + } 563 + 564 + fn getInt(obj: json.ObjectMap, key: []const u8) ?i64 { 565 + const v = obj.get(key) orelse return null; 566 + return if (v == .integer) v.integer else null; 567 + } 568 + 569 + fn getJsonString(alloc: std.mem.Allocator, obj: json.ObjectMap, key: []const u8) ?[]const u8 { 570 + const v = obj.get(key) orelse return null; 571 + if (v == .null) return null; 572 + return std.fmt.allocPrint(alloc, "{f}", .{json.fmt(v, .{})}) catch null; 573 + } 574 + 575 + fn buildInsertParams(obj: json.ObjectMap) db.deployments.InsertParams { 576 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 577 + const alloc = arena.allocator(); 578 + 579 + return .{ 580 + .version = getString(obj, "version"), 581 + .description = getString(obj, "description"), 582 + .paused = getBool(obj, "paused") orelse false, 583 + .parameters = getJsonString(alloc, obj, "parameters") orelse "{}", 584 + .parameter_openapi_schema = getJsonString(alloc, obj, "parameter_openapi_schema"), 585 + .enforce_parameter_schema = getBool(obj, "enforce_parameter_schema") orelse true, 586 + .tags = getJsonString(alloc, obj, "tags") orelse "[]", 587 + .labels = getJsonString(alloc, obj, "labels") orelse "{}", 588 + .path = getString(obj, "path"), 589 + .entrypoint = getString(obj, "entrypoint"), 590 + .job_variables = getJsonString(alloc, obj, "job_variables") orelse "{}", 591 + .pull_steps = getJsonString(alloc, obj, "pull_steps"), 592 + .work_pool_name = getString(obj, "work_pool_name"), 593 + .work_queue_name = getString(obj, "work_queue_name"), 594 + .concurrency_limit = getInt(obj, "concurrency_limit"), 595 + }; 596 + } 597 + 598 + fn buildUpdateParams(obj: json.ObjectMap) db.deployments.UpdateParams { 599 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 600 + const alloc = arena.allocator(); 601 + 602 + return .{ 603 + .version = getString(obj, "version"), 604 + .description = getString(obj, "description"), 605 + .paused = getBool(obj, "paused"), 606 + .parameters = getJsonString(alloc, obj, "parameters"), 607 + .parameter_openapi_schema = getJsonString(alloc, obj, "parameter_openapi_schema"), 608 + .enforce_parameter_schema = getBool(obj, "enforce_parameter_schema"), 609 + .tags = getJsonString(alloc, obj, "tags"), 610 + .labels = getJsonString(alloc, obj, "labels"), 611 + .path = getString(obj, "path"), 612 + .entrypoint = getString(obj, "entrypoint"), 613 + .job_variables = getJsonString(alloc, obj, "job_variables"), 614 + .pull_steps = getJsonString(alloc, obj, "pull_steps"), 615 + .work_pool_name = getString(obj, "work_pool_name"), 616 + .work_queue_name = getString(obj, "work_queue_name"), 617 + .concurrency_limit = getInt(obj, "concurrency_limit"), 618 + }; 619 + } 620 + 621 + // Response serializers 622 + 623 + fn writeDeployment(alloc: std.mem.Allocator, d: db.deployments.DeploymentRow) ![]const u8 { 624 + var output: std.io.Writer.Allocating = .init(alloc); 625 + var jw: json.Stringify = .{ .writer = &output.writer }; 626 + try writeDeploymentObject(&jw, d, alloc); 627 + return output.toOwnedSlice(); 628 + } 629 + 630 + fn writeDeploymentObject(jw: *json.Stringify, d: db.deployments.DeploymentRow, alloc: std.mem.Allocator) !void { 631 + try jw.beginObject(); 632 + 633 + try jw.objectField("id"); 634 + try jw.write(d.id); 635 + try jw.objectField("created"); 636 + try jw.write(d.created); 637 + try jw.objectField("updated"); 638 + try jw.write(d.updated); 639 + try jw.objectField("name"); 640 + try jw.write(d.name); 641 + try jw.objectField("flow_id"); 642 + try jw.write(d.flow_id); 643 + try jw.objectField("version"); 644 + try jw.write(d.version); 645 + try jw.objectField("description"); 646 + try jw.write(d.description); 647 + try jw.objectField("paused"); 648 + try jw.write(d.paused); 649 + try jw.objectField("status"); 650 + try jw.write(d.status.toString()); 651 + try jw.objectField("last_polled"); 652 + try jw.write(d.last_polled); 653 + try jw.objectField("parameters"); 654 + try jw.beginWriteRaw(); 655 + try jw.writer.writeAll(d.parameters); 656 + jw.endWriteRaw(); 657 + try jw.objectField("parameter_openapi_schema"); 658 + if (d.parameter_openapi_schema) |s| { 659 + try jw.beginWriteRaw(); 660 + try jw.writer.writeAll(s); 661 + jw.endWriteRaw(); 662 + } else { 663 + try jw.write(null); 664 + } 665 + try jw.objectField("enforce_parameter_schema"); 666 + try jw.write(d.enforce_parameter_schema); 667 + try jw.objectField("tags"); 668 + try jw.beginWriteRaw(); 669 + try jw.writer.writeAll(d.tags); 670 + jw.endWriteRaw(); 671 + try jw.objectField("labels"); 672 + try jw.beginWriteRaw(); 673 + try jw.writer.writeAll(d.labels); 674 + jw.endWriteRaw(); 675 + try jw.objectField("path"); 676 + try jw.write(d.path); 677 + try jw.objectField("entrypoint"); 678 + try jw.write(d.entrypoint); 679 + try jw.objectField("job_variables"); 680 + try jw.beginWriteRaw(); 681 + try jw.writer.writeAll(d.job_variables); 682 + jw.endWriteRaw(); 683 + try jw.objectField("pull_steps"); 684 + if (d.pull_steps) |ps| { 685 + try jw.beginWriteRaw(); 686 + try jw.writer.writeAll(ps); 687 + jw.endWriteRaw(); 688 + } else { 689 + try jw.write(null); 690 + } 691 + try jw.objectField("work_pool_name"); 692 + try jw.write(d.work_pool_name); 693 + try jw.objectField("work_queue_name"); 694 + try jw.write(d.work_queue_name); 695 + try jw.objectField("work_queue_id"); 696 + try jw.write(d.work_queue_id); 697 + try jw.objectField("storage_document_id"); 698 + try jw.write(d.storage_document_id); 699 + try jw.objectField("infrastructure_document_id"); 700 + try jw.write(d.infrastructure_document_id); 701 + try jw.objectField("concurrency_limit"); 702 + try jw.write(d.concurrency_limit); 703 + 704 + try jw.objectField("schedules"); 705 + const sched_list = db.deployment_schedules.listByDeployment(alloc, d.id) catch &[_]db.deployment_schedules.DeploymentScheduleRow{}; 706 + try jw.beginArray(); 707 + for (sched_list) |s| { 708 + try schedules.writeScheduleObject(jw, s); 709 + } 710 + try jw.endArray(); 711 + 712 + try jw.endObject(); 713 + } 714 + 715 + fn writeFlowRunObject(jw: *json.Stringify, run: db.flow_runs.FlowRunRow, state_id: []const u8) !void { 716 + try jw.beginObject(); 717 + try jw.objectField("id"); 718 + try jw.write(run.id); 719 + try jw.objectField("created"); 720 + try jw.write(run.created); 721 + try jw.objectField("updated"); 722 + try jw.write(run.updated); 723 + try jw.objectField("name"); 724 + try jw.write(run.name); 725 + try jw.objectField("flow_id"); 726 + try jw.write(run.flow_id); 727 + try jw.objectField("deployment_id"); 728 + try jw.write(run.deployment_id); 729 + try jw.objectField("deployment_version"); 730 + try jw.write(run.deployment_version); 731 + try jw.objectField("work_queue_name"); 732 + try jw.write(run.work_queue_name); 733 + try jw.objectField("work_queue_id"); 734 + try jw.write(run.work_queue_id); 735 + try jw.objectField("state_type"); 736 + try jw.write(run.state_type); 737 + try jw.objectField("state_name"); 738 + try jw.write(run.state_name); 739 + try jw.objectField("expected_start_time"); 740 + try jw.write(run.expected_start_time); 741 + try jw.objectField("start_time"); 742 + try jw.write(run.start_time); 743 + try jw.objectField("end_time"); 744 + try jw.write(run.end_time); 745 + try jw.objectField("state"); 746 + try jw.beginObject(); 747 + try jw.objectField("type"); 748 + try jw.write(run.state_type); 749 + try jw.objectField("name"); 750 + try jw.write(run.state_name); 751 + try jw.objectField("timestamp"); 752 + try jw.write(run.state_timestamp); 753 + try jw.objectField("id"); 754 + try jw.write(state_id); 755 + try jw.endObject(); 756 + try jw.objectField("parameters"); 757 + try jw.beginWriteRaw(); 758 + try jw.writer.writeAll(run.parameters); 759 + jw.endWriteRaw(); 760 + try jw.objectField("tags"); 761 + try jw.beginWriteRaw(); 762 + try jw.writer.writeAll(run.tags); 763 + jw.endWriteRaw(); 764 + try jw.objectField("auto_scheduled"); 765 + try jw.write(run.auto_scheduled); 766 + try jw.endObject(); 767 + } 768 + 769 + fn writeFlowRunResponse(alloc: std.mem.Allocator, run: db.flow_runs.FlowRunRow, state_id: []const u8) ![]const u8 { 770 + var output: std.io.Writer.Allocating = .init(alloc); 771 + var jw: json.Stringify = .{ .writer = &output.writer }; 772 + 773 + try jw.beginObject(); 774 + try jw.objectField("id"); 775 + try jw.write(run.id); 776 + try jw.objectField("created"); 777 + try jw.write(run.created); 778 + try jw.objectField("updated"); 779 + try jw.write(run.updated); 780 + try jw.objectField("name"); 781 + try jw.write(run.name); 782 + try jw.objectField("flow_id"); 783 + try jw.write(run.flow_id); 784 + try jw.objectField("deployment_id"); 785 + try jw.write(run.deployment_id); 786 + try jw.objectField("deployment_version"); 787 + try jw.write(run.deployment_version); 788 + try jw.objectField("work_queue_name"); 789 + try jw.write(run.work_queue_name); 790 + try jw.objectField("work_queue_id"); 791 + try jw.write(run.work_queue_id); 792 + try jw.objectField("state_type"); 793 + try jw.write(run.state_type); 794 + try jw.objectField("state_name"); 795 + try jw.write(run.state_name); 796 + try jw.objectField("state"); 797 + try jw.beginObject(); 798 + try jw.objectField("type"); 799 + try jw.write(run.state_type); 800 + try jw.objectField("name"); 801 + try jw.write(run.state_name); 802 + try jw.objectField("timestamp"); 803 + try jw.write(run.state_timestamp); 804 + try jw.objectField("id"); 805 + try jw.write(state_id); 806 + try jw.endObject(); 807 + try jw.objectField("parameters"); 808 + try jw.beginWriteRaw(); 809 + try jw.writer.writeAll(run.parameters); 810 + jw.endWriteRaw(); 811 + try jw.objectField("tags"); 812 + try jw.beginWriteRaw(); 813 + try jw.writer.writeAll(run.tags); 814 + jw.endWriteRaw(); 815 + try jw.objectField("auto_scheduled"); 816 + try jw.write(run.auto_scheduled); 817 + try jw.endObject(); 818 + 819 + return output.toOwnedSlice(); 820 + }
+2
src/api/events.zig
··· 240 240 if (db.queryRecentEvents(alloc, 100)) |events| { 241 241 var backfill_count: usize = 0; 242 242 for (events) |event_json| { 243 + // Apply filter to backfill events 244 + if (!filter.matchesJson(event_json)) continue; 243 245 const wrapped = std.fmt.allocPrint(alloc, "{{\"type\":\"event\",\"event\":{s}}}", .{event_json}) catch continue; 244 246 SubscriberHandler.write(handle, wrapped, true) catch continue; 245 247 backfill_count += 1;
+21 -1
src/api/flow_runs.zig
··· 102 102 var ts_buf: [32]u8 = undefined; 103 103 const now = time_util.timestamp(&ts_buf); 104 104 105 - db.insertFlowRun(new_id, flow_id, name, state_type, state_name, now) catch { 105 + db.insertFlowRun(new_id, flow_id, name, state_type, state_name, now, .{}) catch { 106 106 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 107 107 return; 108 108 }; ··· 126 126 .start_time = null, 127 127 .end_time = null, 128 128 .total_run_time = 0.0, 129 + .deployment_id = null, 130 + .deployment_version = null, 131 + .work_queue_name = null, 132 + .work_queue_id = null, 133 + .auto_scheduled = false, 129 134 }; 130 135 131 136 const resp = writeFlowRun(alloc, run, state_id) catch { ··· 310 315 311 316 try jw.objectField("total_run_time"); 312 317 try jw.write(run.total_run_time); 318 + 319 + try jw.objectField("deployment_id"); 320 + try jw.write(run.deployment_id); 321 + 322 + try jw.objectField("deployment_version"); 323 + try jw.write(run.deployment_version); 324 + 325 + try jw.objectField("work_queue_name"); 326 + try jw.write(run.work_queue_name); 327 + 328 + try jw.objectField("work_queue_id"); 329 + try jw.write(run.work_queue_id); 330 + 331 + try jw.objectField("auto_scheduled"); 332 + try jw.write(run.auto_scheduled); 313 333 314 334 try jw.endObject(); 315 335 }
+3
src/api/routes.zig
··· 12 12 pub const block_documents = @import("block_documents.zig"); 13 13 pub const variables = @import("variables.zig"); 14 14 pub const work_pools = @import("work_pools.zig"); 15 + pub const deployments = @import("deployments.zig"); 15 16 16 17 pub fn handle(r: zap.Request) !void { 17 18 const target = r.path orelse "/"; ··· 48 49 try variables.handle(r); 49 50 } else if (std.mem.startsWith(u8, target, "/api/work_pools") or std.mem.startsWith(u8, target, "/work_pools")) { 50 51 try work_pools.handle(r); 52 + } else if (std.mem.startsWith(u8, target, "/api/deployments") or std.mem.startsWith(u8, target, "/deployments")) { 53 + try deployments.handle(r); 51 54 } else { 52 55 try sendNotFound(r); 53 56 }
+167
src/db/deployment_schedules.zig
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + 4 + const backend = @import("backend.zig"); 5 + const log = @import("../logging.zig"); 6 + 7 + pub const DeploymentScheduleRow = struct { 8 + id: []const u8, 9 + created: []const u8, 10 + updated: []const u8, 11 + deployment_id: []const u8, 12 + schedule: []const u8, 13 + active: bool, 14 + max_scheduled_runs: ?i64, 15 + parameters: []const u8, 16 + slug: ?[]const u8, 17 + }; 18 + 19 + const Col = struct { 20 + const id: usize = 0; 21 + const created: usize = 1; 22 + const updated: usize = 2; 23 + const deployment_id: usize = 3; 24 + const schedule: usize = 4; 25 + const active: usize = 5; 26 + const max_scheduled_runs: usize = 6; 27 + const parameters: usize = 7; 28 + const slug: usize = 8; 29 + }; 30 + 31 + const select_cols = "id, created, updated, deployment_id, schedule, active, max_scheduled_runs, parameters, slug"; 32 + 33 + fn rowFromResult(alloc: Allocator, r: anytype) !DeploymentScheduleRow { 34 + const max_runs = r.textOrNull(Col.max_scheduled_runs); 35 + return DeploymentScheduleRow{ 36 + .id = try alloc.dupe(u8, r.text(Col.id)), 37 + .created = try alloc.dupe(u8, r.text(Col.created)), 38 + .updated = try alloc.dupe(u8, r.text(Col.updated)), 39 + .deployment_id = try alloc.dupe(u8, r.text(Col.deployment_id)), 40 + .schedule = try alloc.dupe(u8, r.text(Col.schedule)), 41 + .active = r.int(Col.active) != 0, 42 + .max_scheduled_runs = if (max_runs != null) r.bigint(Col.max_scheduled_runs) else null, 43 + .parameters = try alloc.dupe(u8, r.text(Col.parameters)), 44 + .slug = if (r.textOrNull(Col.slug)) |s| try alloc.dupe(u8, s) else null, 45 + }; 46 + } 47 + 48 + pub fn getById(alloc: Allocator, id: []const u8) !?DeploymentScheduleRow { 49 + var r = backend.db.row( 50 + "SELECT " ++ select_cols ++ " FROM deployment_schedule WHERE id = ?", 51 + .{id}, 52 + ) catch return null; 53 + 54 + if (r) |*row| { 55 + defer row.deinit(); 56 + return try rowFromResult(alloc, row); 57 + } 58 + return null; 59 + } 60 + 61 + pub fn listByDeployment(alloc: Allocator, deployment_id: []const u8) ![]DeploymentScheduleRow { 62 + var results = std.ArrayListUnmanaged(DeploymentScheduleRow){}; 63 + errdefer results.deinit(alloc); 64 + 65 + var rows = backend.db.query( 66 + "SELECT " ++ select_cols ++ " FROM deployment_schedule WHERE deployment_id = ? ORDER BY created ASC", 67 + .{deployment_id}, 68 + ) catch |err| { 69 + log.err("database", "list deployment_schedules error: {}", .{err}); 70 + return err; 71 + }; 72 + defer rows.deinit(); 73 + 74 + while (rows.next()) |r| { 75 + try results.append(alloc, try rowFromResult(alloc, &r)); 76 + } 77 + 78 + return results.toOwnedSlice(alloc); 79 + } 80 + 81 + pub const InsertParams = struct { 82 + active: bool = true, 83 + max_scheduled_runs: ?i64 = null, 84 + parameters: []const u8 = "{}", 85 + slug: ?[]const u8 = null, 86 + }; 87 + 88 + pub fn insert( 89 + id: []const u8, 90 + deployment_id: []const u8, 91 + schedule: []const u8, 92 + created: []const u8, 93 + params: InsertParams, 94 + ) !void { 95 + backend.db.exec( 96 + \\INSERT INTO deployment_schedule (id, created, updated, deployment_id, schedule, active, max_scheduled_runs, parameters, slug) 97 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 98 + , .{ 99 + id, 100 + created, 101 + created, 102 + deployment_id, 103 + schedule, 104 + @as(i32, if (params.active) 1 else 0), 105 + params.max_scheduled_runs, 106 + params.parameters, 107 + params.slug, 108 + }) catch |err| { 109 + log.err("database", "insert deployment_schedule error: {}", .{err}); 110 + return err; 111 + }; 112 + } 113 + 114 + pub const UpdateParams = struct { 115 + schedule: ?[]const u8 = null, 116 + active: ?bool = null, 117 + max_scheduled_runs: ?i64 = null, 118 + parameters: ?[]const u8 = null, 119 + slug: ?[]const u8 = null, 120 + }; 121 + 122 + pub fn updateById(id: []const u8, updated: []const u8, params: UpdateParams) !bool { 123 + const affected = backend.db.execWithRowCount( 124 + \\UPDATE deployment_schedule SET 125 + \\ schedule = COALESCE(?, schedule), 126 + \\ active = COALESCE(?, active), 127 + \\ max_scheduled_runs = COALESCE(?, max_scheduled_runs), 128 + \\ parameters = COALESCE(?, parameters), 129 + \\ slug = COALESCE(?, slug), 130 + \\ updated = ? 131 + \\WHERE id = ? 132 + , .{ 133 + params.schedule, 134 + if (params.active) |a| @as(?i32, if (a) 1 else 0) else null, 135 + params.max_scheduled_runs, 136 + params.parameters, 137 + params.slug, 138 + updated, 139 + id, 140 + }) catch |err| { 141 + log.err("database", "update deployment_schedule error: {}", .{err}); 142 + return err; 143 + }; 144 + return affected > 0; 145 + } 146 + 147 + pub fn deleteById(id: []const u8) !bool { 148 + const affected = backend.db.execWithRowCount( 149 + "DELETE FROM deployment_schedule WHERE id = ?", 150 + .{id}, 151 + ) catch |err| { 152 + log.err("database", "delete deployment_schedule error: {}", .{err}); 153 + return err; 154 + }; 155 + return affected > 0; 156 + } 157 + 158 + pub fn deleteByDeployment(deployment_id: []const u8) !usize { 159 + const affected = backend.db.execWithRowCount( 160 + "DELETE FROM deployment_schedule WHERE deployment_id = ?", 161 + .{deployment_id}, 162 + ) catch |err| { 163 + log.err("database", "delete deployment_schedules error: {}", .{err}); 164 + return err; 165 + }; 166 + return @intCast(affected); 167 + }
+358
src/db/deployments.zig
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + 4 + const backend = @import("backend.zig"); 5 + const log = @import("../logging.zig"); 6 + 7 + /// Deployment status enum 8 + pub const Status = enum { 9 + not_ready, 10 + ready, 11 + 12 + pub fn fromString(s: []const u8) Status { 13 + if (std.mem.eql(u8, s, "READY")) return .ready; 14 + return .not_ready; 15 + } 16 + 17 + pub fn toString(self: Status) []const u8 { 18 + return switch (self) { 19 + .not_ready => "NOT_READY", 20 + .ready => "READY", 21 + }; 22 + } 23 + }; 24 + 25 + pub const DeploymentRow = struct { 26 + id: []const u8, 27 + created: []const u8, 28 + updated: []const u8, 29 + name: []const u8, 30 + flow_id: []const u8, 31 + version: ?[]const u8, 32 + description: ?[]const u8, 33 + paused: bool, 34 + status: Status, 35 + last_polled: ?[]const u8, 36 + parameters: []const u8, 37 + parameter_openapi_schema: ?[]const u8, 38 + enforce_parameter_schema: bool, 39 + tags: []const u8, 40 + labels: []const u8, 41 + path: ?[]const u8, 42 + entrypoint: ?[]const u8, 43 + job_variables: []const u8, 44 + pull_steps: ?[]const u8, 45 + work_pool_name: ?[]const u8, 46 + work_queue_name: ?[]const u8, 47 + work_queue_id: ?[]const u8, 48 + storage_document_id: ?[]const u8, 49 + infrastructure_document_id: ?[]const u8, 50 + concurrency_limit: ?i64, 51 + }; 52 + 53 + const Col = struct { 54 + const id: usize = 0; 55 + const created: usize = 1; 56 + const updated: usize = 2; 57 + const name: usize = 3; 58 + const flow_id: usize = 4; 59 + const version: usize = 5; 60 + const description: usize = 6; 61 + const paused: usize = 7; 62 + const status: usize = 8; 63 + const last_polled: usize = 9; 64 + const parameters: usize = 10; 65 + const parameter_openapi_schema: usize = 11; 66 + const enforce_parameter_schema: usize = 12; 67 + const tags: usize = 13; 68 + const labels: usize = 14; 69 + const path: usize = 15; 70 + const entrypoint: usize = 16; 71 + const job_variables: usize = 17; 72 + const pull_steps: usize = 18; 73 + const work_pool_name: usize = 19; 74 + const work_queue_name: usize = 20; 75 + const work_queue_id: usize = 21; 76 + const storage_document_id: usize = 22; 77 + const infrastructure_document_id: usize = 23; 78 + const concurrency_limit: usize = 24; 79 + }; 80 + 81 + const select_cols = 82 + \\id, created, updated, name, flow_id, version, description, paused, status, last_polled, 83 + \\parameters, parameter_openapi_schema, enforce_parameter_schema, tags, labels, 84 + \\path, entrypoint, job_variables, pull_steps, work_pool_name, work_queue_name, 85 + \\work_queue_id, storage_document_id, infrastructure_document_id, concurrency_limit 86 + ; 87 + 88 + fn rowFromResult(alloc: Allocator, r: anytype) !DeploymentRow { 89 + const concurrency = r.textOrNull(Col.concurrency_limit); 90 + return DeploymentRow{ 91 + .id = try alloc.dupe(u8, r.text(Col.id)), 92 + .created = try alloc.dupe(u8, r.text(Col.created)), 93 + .updated = try alloc.dupe(u8, r.text(Col.updated)), 94 + .name = try alloc.dupe(u8, r.text(Col.name)), 95 + .flow_id = try alloc.dupe(u8, r.text(Col.flow_id)), 96 + .version = if (r.textOrNull(Col.version)) |v| try alloc.dupe(u8, v) else null, 97 + .description = if (r.textOrNull(Col.description)) |d| try alloc.dupe(u8, d) else null, 98 + .paused = r.int(Col.paused) != 0, 99 + .status = Status.fromString(r.text(Col.status)), 100 + .last_polled = if (r.textOrNull(Col.last_polled)) |lp| try alloc.dupe(u8, lp) else null, 101 + .parameters = try alloc.dupe(u8, r.text(Col.parameters)), 102 + .parameter_openapi_schema = if (r.textOrNull(Col.parameter_openapi_schema)) |s| try alloc.dupe(u8, s) else null, 103 + .enforce_parameter_schema = r.int(Col.enforce_parameter_schema) != 0, 104 + .tags = try alloc.dupe(u8, r.text(Col.tags)), 105 + .labels = try alloc.dupe(u8, r.text(Col.labels)), 106 + .path = if (r.textOrNull(Col.path)) |p| try alloc.dupe(u8, p) else null, 107 + .entrypoint = if (r.textOrNull(Col.entrypoint)) |e| try alloc.dupe(u8, e) else null, 108 + .job_variables = try alloc.dupe(u8, r.text(Col.job_variables)), 109 + .pull_steps = if (r.textOrNull(Col.pull_steps)) |ps| try alloc.dupe(u8, ps) else null, 110 + .work_pool_name = if (r.textOrNull(Col.work_pool_name)) |wpn| try alloc.dupe(u8, wpn) else null, 111 + .work_queue_name = if (r.textOrNull(Col.work_queue_name)) |wqn| try alloc.dupe(u8, wqn) else null, 112 + .work_queue_id = if (r.textOrNull(Col.work_queue_id)) |wqi| try alloc.dupe(u8, wqi) else null, 113 + .storage_document_id = if (r.textOrNull(Col.storage_document_id)) |sd| try alloc.dupe(u8, sd) else null, 114 + .infrastructure_document_id = if (r.textOrNull(Col.infrastructure_document_id)) |id_| try alloc.dupe(u8, id_) else null, 115 + .concurrency_limit = if (concurrency != null) r.bigint(Col.concurrency_limit) else null, 116 + }; 117 + } 118 + 119 + pub fn getById(alloc: Allocator, id: []const u8) !?DeploymentRow { 120 + var r = backend.db.row( 121 + "SELECT " ++ select_cols ++ " FROM deployment WHERE id = ?", 122 + .{id}, 123 + ) catch return null; 124 + 125 + if (r) |*row| { 126 + defer row.deinit(); 127 + return try rowFromResult(alloc, row); 128 + } 129 + return null; 130 + } 131 + 132 + pub fn getByFlowAndName(alloc: Allocator, flow_id: []const u8, name: []const u8) !?DeploymentRow { 133 + var r = backend.db.row( 134 + "SELECT " ++ select_cols ++ " FROM deployment WHERE flow_id = ? AND name = ?", 135 + .{ flow_id, name }, 136 + ) catch return null; 137 + 138 + if (r) |*row| { 139 + defer row.deinit(); 140 + return try rowFromResult(alloc, row); 141 + } 142 + return null; 143 + } 144 + 145 + pub const InsertParams = struct { 146 + version: ?[]const u8 = null, 147 + description: ?[]const u8 = null, 148 + paused: bool = false, 149 + status: Status = .not_ready, 150 + parameters: []const u8 = "{}", 151 + parameter_openapi_schema: ?[]const u8 = null, 152 + enforce_parameter_schema: bool = true, 153 + tags: []const u8 = "[]", 154 + labels: []const u8 = "{}", 155 + path: ?[]const u8 = null, 156 + entrypoint: ?[]const u8 = null, 157 + job_variables: []const u8 = "{}", 158 + pull_steps: ?[]const u8 = null, 159 + work_pool_name: ?[]const u8 = null, 160 + work_queue_name: ?[]const u8 = null, 161 + work_queue_id: ?[]const u8 = null, 162 + storage_document_id: ?[]const u8 = null, 163 + infrastructure_document_id: ?[]const u8 = null, 164 + concurrency_limit: ?i64 = null, 165 + }; 166 + 167 + pub fn insert( 168 + id: []const u8, 169 + name: []const u8, 170 + flow_id: []const u8, 171 + created: []const u8, 172 + params: InsertParams, 173 + ) !void { 174 + backend.db.exec( 175 + \\INSERT INTO deployment (id, created, updated, name, flow_id, version, description, 176 + \\ paused, status, parameters, parameter_openapi_schema, enforce_parameter_schema, 177 + \\ tags, labels, path, entrypoint, job_variables, pull_steps, 178 + \\ work_pool_name, work_queue_name, work_queue_id, 179 + \\ storage_document_id, infrastructure_document_id, concurrency_limit) 180 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 181 + , .{ 182 + id, 183 + created, 184 + created, 185 + name, 186 + flow_id, 187 + params.version, 188 + params.description, 189 + @as(i32, if (params.paused) 1 else 0), 190 + params.status.toString(), 191 + params.parameters, 192 + params.parameter_openapi_schema, 193 + @as(i32, if (params.enforce_parameter_schema) 1 else 0), 194 + params.tags, 195 + params.labels, 196 + params.path, 197 + params.entrypoint, 198 + params.job_variables, 199 + params.pull_steps, 200 + params.work_pool_name, 201 + params.work_queue_name, 202 + params.work_queue_id, 203 + params.storage_document_id, 204 + params.infrastructure_document_id, 205 + params.concurrency_limit, 206 + }) catch |err| { 207 + log.err("database", "insert deployment error: {}", .{err}); 208 + return err; 209 + }; 210 + } 211 + 212 + pub const UpdateParams = struct { 213 + version: ?[]const u8 = null, 214 + description: ?[]const u8 = null, 215 + paused: ?bool = null, 216 + parameters: ?[]const u8 = null, 217 + parameter_openapi_schema: ?[]const u8 = null, 218 + enforce_parameter_schema: ?bool = null, 219 + tags: ?[]const u8 = null, 220 + labels: ?[]const u8 = null, 221 + path: ?[]const u8 = null, 222 + entrypoint: ?[]const u8 = null, 223 + job_variables: ?[]const u8 = null, 224 + pull_steps: ?[]const u8 = null, 225 + work_pool_name: ?[]const u8 = null, 226 + work_queue_name: ?[]const u8 = null, 227 + work_queue_id: ?[]const u8 = null, 228 + concurrency_limit: ?i64 = null, 229 + }; 230 + 231 + pub fn updateById(id: []const u8, updated: []const u8, params: UpdateParams) !bool { 232 + const affected = backend.db.execWithRowCount( 233 + \\UPDATE deployment SET 234 + \\ version = COALESCE(?, version), 235 + \\ description = COALESCE(?, description), 236 + \\ paused = COALESCE(?, paused), 237 + \\ parameters = COALESCE(?, parameters), 238 + \\ parameter_openapi_schema = COALESCE(?, parameter_openapi_schema), 239 + \\ enforce_parameter_schema = COALESCE(?, enforce_parameter_schema), 240 + \\ tags = COALESCE(?, tags), 241 + \\ labels = COALESCE(?, labels), 242 + \\ path = COALESCE(?, path), 243 + \\ entrypoint = COALESCE(?, entrypoint), 244 + \\ job_variables = COALESCE(?, job_variables), 245 + \\ pull_steps = COALESCE(?, pull_steps), 246 + \\ work_pool_name = COALESCE(?, work_pool_name), 247 + \\ work_queue_name = COALESCE(?, work_queue_name), 248 + \\ work_queue_id = COALESCE(?, work_queue_id), 249 + \\ concurrency_limit = COALESCE(?, concurrency_limit), 250 + \\ updated = ? 251 + \\WHERE id = ? 252 + , .{ 253 + params.version, 254 + params.description, 255 + if (params.paused) |p| @as(?i32, if (p) 1 else 0) else null, 256 + params.parameters, 257 + params.parameter_openapi_schema, 258 + if (params.enforce_parameter_schema) |e| @as(?i32, if (e) 1 else 0) else null, 259 + params.tags, 260 + params.labels, 261 + params.path, 262 + params.entrypoint, 263 + params.job_variables, 264 + params.pull_steps, 265 + params.work_pool_name, 266 + params.work_queue_name, 267 + params.work_queue_id, 268 + params.concurrency_limit, 269 + updated, 270 + id, 271 + }) catch |err| { 272 + log.err("database", "update deployment error: {}", .{err}); 273 + return err; 274 + }; 275 + return affected > 0; 276 + } 277 + 278 + pub fn updateStatus(id: []const u8, status: Status, updated: []const u8) !bool { 279 + const affected = backend.db.execWithRowCount( 280 + "UPDATE deployment SET status = ?, updated = ? WHERE id = ?", 281 + .{ status.toString(), updated, id }, 282 + ) catch |err| { 283 + log.err("database", "update deployment status error: {}", .{err}); 284 + return err; 285 + }; 286 + return affected > 0; 287 + } 288 + 289 + pub fn updateLastPolled(id: []const u8, last_polled: []const u8) !bool { 290 + const affected = backend.db.execWithRowCount( 291 + "UPDATE deployment SET last_polled = ?, updated = ? WHERE id = ?", 292 + .{ last_polled, last_polled, id }, 293 + ) catch |err| { 294 + log.err("database", "update deployment last_polled error: {}", .{err}); 295 + return err; 296 + }; 297 + return affected > 0; 298 + } 299 + 300 + pub fn deleteById(id: []const u8) !bool { 301 + const affected = backend.db.execWithRowCount( 302 + "DELETE FROM deployment WHERE id = ?", 303 + .{id}, 304 + ) catch |err| { 305 + log.err("database", "delete deployment error: {}", .{err}); 306 + return err; 307 + }; 308 + return affected > 0; 309 + } 310 + 311 + pub fn list(alloc: Allocator, limit: usize, offset: usize) ![]DeploymentRow { 312 + var results = std.ArrayListUnmanaged(DeploymentRow){}; 313 + errdefer results.deinit(alloc); 314 + 315 + var rows = backend.db.query( 316 + "SELECT " ++ select_cols ++ " FROM deployment ORDER BY created DESC LIMIT ? OFFSET ?", 317 + .{ @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 318 + ) catch |err| { 319 + log.err("database", "list deployments error: {}", .{err}); 320 + return err; 321 + }; 322 + defer rows.deinit(); 323 + 324 + while (rows.next()) |r| { 325 + try results.append(alloc, try rowFromResult(alloc, &r)); 326 + } 327 + 328 + return results.toOwnedSlice(alloc); 329 + } 330 + 331 + pub fn listByFlowId(alloc: Allocator, flow_id: []const u8, limit: usize, offset: usize) ![]DeploymentRow { 332 + var results = std.ArrayListUnmanaged(DeploymentRow){}; 333 + errdefer results.deinit(alloc); 334 + 335 + var rows = backend.db.query( 336 + "SELECT " ++ select_cols ++ " FROM deployment WHERE flow_id = ? ORDER BY created DESC LIMIT ? OFFSET ?", 337 + .{ flow_id, @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 338 + ) catch |err| { 339 + log.err("database", "list deployments by flow error: {}", .{err}); 340 + return err; 341 + }; 342 + defer rows.deinit(); 343 + 344 + while (rows.next()) |r| { 345 + try results.append(alloc, try rowFromResult(alloc, &r)); 346 + } 347 + 348 + return results.toOwnedSlice(alloc); 349 + } 350 + 351 + pub fn count() !usize { 352 + var r = backend.db.row("SELECT COUNT(*) FROM deployment", .{}) catch return 0; 353 + if (r) |*row| { 354 + defer row.deinit(); 355 + return @intCast(row.int(0)); 356 + } 357 + return 0; 358 + }
+201 -5
src/db/flow_runs.zig
··· 20 20 start_time: ?[]const u8, 21 21 end_time: ?[]const u8, 22 22 total_run_time: f64, 23 + // deployment fields 24 + deployment_id: ?[]const u8, 25 + deployment_version: ?[]const u8, 26 + work_queue_name: ?[]const u8, 27 + work_queue_id: ?[]const u8, 28 + auto_scheduled: bool, 29 + }; 30 + 31 + pub const InsertParams = struct { 32 + deployment_id: ?[]const u8 = null, 33 + deployment_version: ?[]const u8 = null, 34 + work_queue_name: ?[]const u8 = null, 35 + work_queue_id: ?[]const u8 = null, 36 + auto_scheduled: bool = false, 23 37 }; 24 38 25 39 pub fn insert( ··· 29 43 state_type: []const u8, 30 44 state_name: []const u8, 31 45 timestamp: []const u8, 46 + params: InsertParams, 32 47 ) !void { 33 48 backend.db.exec( 34 - \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp) 35 - \\VALUES (?, ?, ?, ?, ?, ?) 36 - , .{ id, flow_id, name, state_type, state_name, timestamp }) catch |err| { 49 + \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp, 50 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled) 51 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 52 + , .{ 53 + id, 54 + flow_id, 55 + name, 56 + state_type, 57 + state_name, 58 + timestamp, 59 + params.deployment_id, 60 + params.deployment_version, 61 + params.work_queue_name, 62 + params.work_queue_id, 63 + @as(i64, if (params.auto_scheduled) 1 else 0), 64 + }) catch |err| { 37 65 log.err("database", "insert flow_run error: {}", .{err}); 38 66 return err; 39 67 }; ··· 42 70 pub fn get(alloc: Allocator, id: []const u8) !?FlowRunRow { 43 71 var r = backend.db.row( 44 72 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 45 - \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time 73 + \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time, 74 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled 46 75 \\FROM flow_run WHERE id = ? 47 76 , .{id}) catch return null; 48 77 ··· 64 93 .start_time = if (row.text(12).len > 0) try alloc.dupe(u8, row.text(12)) else null, 65 94 .end_time = if (row.text(13).len > 0) try alloc.dupe(u8, row.text(13)) else null, 66 95 .total_run_time = row.float(14), 96 + .deployment_id = if (row.text(15).len > 0) try alloc.dupe(u8, row.text(15)) else null, 97 + .deployment_version = if (row.text(16).len > 0) try alloc.dupe(u8, row.text(16)) else null, 98 + .work_queue_name = if (row.text(17).len > 0) try alloc.dupe(u8, row.text(17)) else null, 99 + .work_queue_id = if (row.text(18).len > 0) try alloc.dupe(u8, row.text(18)) else null, 100 + .auto_scheduled = row.bigint(19) != 0, 67 101 }; 68 102 } 69 103 return null; ··· 128 162 129 163 var rows = backend.db.query( 130 164 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 131 - \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time 165 + \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time, 166 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled 132 167 \\FROM flow_run ORDER BY created DESC LIMIT ? 133 168 , .{@as(i64, @intCast(limit))}) catch |err| { 134 169 log.err("database", "list flow_runs error: {}", .{err}); ··· 153 188 .start_time = if (r.text(12).len > 0) try alloc.dupe(u8, r.text(12)) else null, 154 189 .end_time = if (r.text(13).len > 0) try alloc.dupe(u8, r.text(13)) else null, 155 190 .total_run_time = r.float(14), 191 + .deployment_id = if (r.text(15).len > 0) try alloc.dupe(u8, r.text(15)) else null, 192 + .deployment_version = if (r.text(16).len > 0) try alloc.dupe(u8, r.text(16)) else null, 193 + .work_queue_name = if (r.text(17).len > 0) try alloc.dupe(u8, r.text(17)) else null, 194 + .work_queue_id = if (r.text(18).len > 0) try alloc.dupe(u8, r.text(18)) else null, 195 + .auto_scheduled = r.bigint(19) != 0, 156 196 }); 157 197 } 158 198 159 199 return results.toOwnedSlice(alloc); 160 200 } 201 + 202 + /// Get scheduled flow runs for a specific deployment 203 + pub fn getScheduledByDeployment(alloc: Allocator, deployment_id: []const u8, limit: usize) ![]FlowRunRow { 204 + var results = std.ArrayListUnmanaged(FlowRunRow){}; 205 + errdefer results.deinit(alloc); 206 + 207 + var rows = backend.db.query( 208 + \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 209 + \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time, 210 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled 211 + \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED' 212 + \\ORDER BY expected_start_time ASC LIMIT ? 213 + , .{ deployment_id, @as(i64, @intCast(limit)) }) catch |err| { 214 + log.err("database", "get scheduled flow_runs error: {}", .{err}); 215 + return err; 216 + }; 217 + defer rows.deinit(); 218 + 219 + while (rows.next()) |r| { 220 + try results.append(alloc, .{ 221 + .id = try alloc.dupe(u8, r.text(0)), 222 + .created = try alloc.dupe(u8, r.text(1)), 223 + .updated = try alloc.dupe(u8, r.text(2)), 224 + .flow_id = try alloc.dupe(u8, r.text(3)), 225 + .name = try alloc.dupe(u8, r.text(4)), 226 + .state_type = try alloc.dupe(u8, r.text(5)), 227 + .state_name = try alloc.dupe(u8, r.text(6)), 228 + .state_timestamp = try alloc.dupe(u8, r.text(7)), 229 + .parameters = try alloc.dupe(u8, r.text(8)), 230 + .tags = try alloc.dupe(u8, r.text(9)), 231 + .run_count = r.bigint(10), 232 + .expected_start_time = if (r.text(11).len > 0) try alloc.dupe(u8, r.text(11)) else null, 233 + .start_time = if (r.text(12).len > 0) try alloc.dupe(u8, r.text(12)) else null, 234 + .end_time = if (r.text(13).len > 0) try alloc.dupe(u8, r.text(13)) else null, 235 + .total_run_time = r.float(14), 236 + .deployment_id = if (r.text(15).len > 0) try alloc.dupe(u8, r.text(15)) else null, 237 + .deployment_version = if (r.text(16).len > 0) try alloc.dupe(u8, r.text(16)) else null, 238 + .work_queue_name = if (r.text(17).len > 0) try alloc.dupe(u8, r.text(17)) else null, 239 + .work_queue_id = if (r.text(18).len > 0) try alloc.dupe(u8, r.text(18)) else null, 240 + .auto_scheduled = r.bigint(19) != 0, 241 + }); 242 + } 243 + 244 + return results.toOwnedSlice(alloc); 245 + } 246 + 247 + /// Get scheduled flow runs for multiple deployments 248 + pub fn getScheduledByDeployments( 249 + alloc: Allocator, 250 + deployment_ids: []const []const u8, 251 + scheduled_before: ?[]const u8, 252 + limit: usize, 253 + ) ![]FlowRunRow { 254 + var results = std.ArrayListUnmanaged(FlowRunRow){}; 255 + errdefer results.deinit(alloc); 256 + 257 + // Query each deployment and combine results 258 + for (deployment_ids) |dep_id| { 259 + const dep_runs = try getScheduledByDeploymentBefore(alloc, dep_id, scheduled_before, limit); 260 + for (dep_runs) |run| { 261 + try results.append(alloc, run); 262 + if (results.items.len >= limit) break; 263 + } 264 + if (results.items.len >= limit) break; 265 + } 266 + 267 + // Sort by expected_start_time 268 + const items = results.items; 269 + std.mem.sort(FlowRunRow, items, {}, struct { 270 + fn lessThan(_: void, a: FlowRunRow, b: FlowRunRow) bool { 271 + const a_time = a.expected_start_time orelse ""; 272 + const b_time = b.expected_start_time orelse ""; 273 + return std.mem.lessThan(u8, a_time, b_time); 274 + } 275 + }.lessThan); 276 + 277 + // Trim to limit 278 + if (items.len > limit) { 279 + results.shrinkRetainingCapacity(limit); 280 + } 281 + 282 + return results.toOwnedSlice(alloc); 283 + } 284 + 285 + /// Get scheduled flow runs for a deployment with optional time filter 286 + fn getScheduledByDeploymentBefore( 287 + alloc: Allocator, 288 + deployment_id: []const u8, 289 + scheduled_before: ?[]const u8, 290 + limit: usize, 291 + ) ![]FlowRunRow { 292 + var results = std.ArrayListUnmanaged(FlowRunRow){}; 293 + errdefer results.deinit(alloc); 294 + 295 + if (scheduled_before) |before| { 296 + var rows = backend.db.query( 297 + \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 298 + \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time, 299 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled 300 + \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED' 301 + \\ AND (expected_start_time IS NULL OR expected_start_time <= ?) 302 + \\ORDER BY expected_start_time ASC LIMIT ? 303 + , .{ deployment_id, before, @as(i64, @intCast(limit)) }) catch |err| { 304 + log.err("database", "get scheduled flow_runs error: {}", .{err}); 305 + return err; 306 + }; 307 + defer rows.deinit(); 308 + 309 + while (rows.next()) |r| { 310 + try results.append(alloc, rowToFlowRun(alloc, r)); 311 + } 312 + } else { 313 + var rows = backend.db.query( 314 + \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 315 + \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time, 316 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled 317 + \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED' 318 + \\ORDER BY expected_start_time ASC LIMIT ? 319 + , .{ deployment_id, @as(i64, @intCast(limit)) }) catch |err| { 320 + log.err("database", "get scheduled flow_runs error: {}", .{err}); 321 + return err; 322 + }; 323 + defer rows.deinit(); 324 + 325 + while (rows.next()) |r| { 326 + try results.append(alloc, rowToFlowRun(alloc, r)); 327 + } 328 + } 329 + 330 + return results.toOwnedSlice(alloc); 331 + } 332 + 333 + fn rowToFlowRun(alloc: Allocator, r: anytype) FlowRunRow { 334 + return .{ 335 + .id = alloc.dupe(u8, r.text(0)) catch "", 336 + .created = alloc.dupe(u8, r.text(1)) catch "", 337 + .updated = alloc.dupe(u8, r.text(2)) catch "", 338 + .flow_id = alloc.dupe(u8, r.text(3)) catch "", 339 + .name = alloc.dupe(u8, r.text(4)) catch "", 340 + .state_type = alloc.dupe(u8, r.text(5)) catch "", 341 + .state_name = alloc.dupe(u8, r.text(6)) catch "", 342 + .state_timestamp = alloc.dupe(u8, r.text(7)) catch "", 343 + .parameters = alloc.dupe(u8, r.text(8)) catch "{}", 344 + .tags = alloc.dupe(u8, r.text(9)) catch "[]", 345 + .run_count = r.bigint(10), 346 + .expected_start_time = if (r.text(11).len > 0) alloc.dupe(u8, r.text(11)) catch null else null, 347 + .start_time = if (r.text(12).len > 0) alloc.dupe(u8, r.text(12)) catch null else null, 348 + .end_time = if (r.text(13).len > 0) alloc.dupe(u8, r.text(13)) catch null else null, 349 + .total_run_time = r.float(14), 350 + .deployment_id = if (r.text(15).len > 0) alloc.dupe(u8, r.text(15)) catch null else null, 351 + .deployment_version = if (r.text(16).len > 0) alloc.dupe(u8, r.text(16)) catch null else null, 352 + .work_queue_name = if (r.text(17).len > 0) alloc.dupe(u8, r.text(17)) catch null else null, 353 + .work_queue_id = if (r.text(18).len > 0) alloc.dupe(u8, r.text(18)) catch null else null, 354 + .auto_scheduled = r.bigint(19) != 0, 355 + }; 356 + }
+62 -1
src/db/schema/postgres.zig
··· 36 36 \\ expected_start_time TEXT, 37 37 \\ start_time TEXT, 38 38 \\ end_time TEXT, 39 - \\ total_run_time DOUBLE PRECISION DEFAULT 0.0 39 + \\ total_run_time DOUBLE PRECISION DEFAULT 0.0, 40 + \\ deployment_id TEXT, 41 + \\ deployment_version TEXT, 42 + \\ work_queue_name TEXT, 43 + \\ work_queue_id TEXT, 44 + \\ auto_scheduled INTEGER DEFAULT 0 40 45 \\) 41 46 , .{}); 42 47 ··· 208 213 \\) 209 214 , .{}); 210 215 216 + // deployment table 217 + try backend.db.exec( 218 + \\CREATE TABLE IF NOT EXISTS deployment ( 219 + \\ id TEXT PRIMARY KEY, 220 + \\ created TEXT NOT NULL, 221 + \\ updated TEXT NOT NULL, 222 + \\ name TEXT NOT NULL, 223 + \\ flow_id TEXT NOT NULL REFERENCES flow(id) ON DELETE CASCADE, 224 + \\ version TEXT, 225 + \\ description TEXT, 226 + \\ paused INTEGER DEFAULT 0, 227 + \\ status TEXT DEFAULT 'NOT_READY', 228 + \\ last_polled TEXT, 229 + \\ parameters JSONB DEFAULT '{}', 230 + \\ parameter_openapi_schema TEXT, 231 + \\ enforce_parameter_schema INTEGER DEFAULT 1, 232 + \\ tags JSONB DEFAULT '[]', 233 + \\ labels JSONB DEFAULT '{}', 234 + \\ path TEXT, 235 + \\ entrypoint TEXT, 236 + \\ job_variables JSONB DEFAULT '{}', 237 + \\ pull_steps TEXT, 238 + \\ work_pool_name TEXT, 239 + \\ work_queue_name TEXT, 240 + \\ work_queue_id TEXT REFERENCES work_queue(id) ON DELETE SET NULL, 241 + \\ storage_document_id TEXT, 242 + \\ infrastructure_document_id TEXT, 243 + \\ concurrency_limit BIGINT, 244 + \\ UNIQUE(flow_id, name) 245 + \\) 246 + , .{}); 247 + 248 + // deployment_schedule table 249 + try backend.db.exec( 250 + \\CREATE TABLE IF NOT EXISTS deployment_schedule ( 251 + \\ id TEXT PRIMARY KEY, 252 + \\ created TEXT NOT NULL, 253 + \\ updated TEXT NOT NULL, 254 + \\ deployment_id TEXT NOT NULL REFERENCES deployment(id) ON DELETE CASCADE, 255 + \\ schedule TEXT NOT NULL, 256 + \\ active INTEGER DEFAULT 1, 257 + \\ max_scheduled_runs BIGINT, 258 + \\ parameters JSONB DEFAULT '{}', 259 + \\ slug TEXT, 260 + \\ UNIQUE(deployment_id, slug) 261 + \\) 262 + , .{}); 263 + 211 264 // indexes 212 265 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)", .{}); 213 266 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)", .{}); ··· 229 282 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__work_pool_id ON work_queue(work_pool_id)", .{}); 230 283 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__priority ON work_queue(priority)", .{}); 231 284 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_worker__work_pool_id ON worker(work_pool_id)", .{}); 285 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__deployment_id ON flow_run(deployment_id)", .{}); 286 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__flow_id ON deployment(flow_id)", .{}); 287 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__work_queue_id ON deployment(work_queue_id)", .{}); 288 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__work_pool_name ON deployment(work_pool_name)", .{}); 289 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__created ON deployment(created)", .{}); 290 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__updated ON deployment(updated)", .{}); 291 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__deployment_id ON deployment_schedule(deployment_id)", .{}); 292 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__active ON deployment_schedule(active)", .{}); 232 293 233 294 log.info("database", "postgres schema initialized", .{}); 234 295 }
+62 -1
src/db/schema/sqlite.zig
··· 33 33 \\ expected_start_time TEXT, 34 34 \\ start_time TEXT, 35 35 \\ end_time TEXT, 36 - \\ total_run_time REAL DEFAULT 0.0 36 + \\ total_run_time REAL DEFAULT 0.0, 37 + \\ deployment_id TEXT, 38 + \\ deployment_version TEXT, 39 + \\ work_queue_name TEXT, 40 + \\ work_queue_id TEXT, 41 + \\ auto_scheduled INTEGER DEFAULT 0 37 42 \\) 38 43 , .{}); 39 44 ··· 201 206 \\) 202 207 , .{}); 203 208 209 + // deployment table 210 + try backend.db.exec( 211 + \\CREATE TABLE IF NOT EXISTS deployment ( 212 + \\ id TEXT PRIMARY KEY, 213 + \\ created TEXT NOT NULL, 214 + \\ updated TEXT NOT NULL, 215 + \\ name TEXT NOT NULL, 216 + \\ flow_id TEXT NOT NULL REFERENCES flow(id) ON DELETE CASCADE, 217 + \\ version TEXT, 218 + \\ description TEXT, 219 + \\ paused INTEGER DEFAULT 0, 220 + \\ status TEXT DEFAULT 'NOT_READY', 221 + \\ last_polled TEXT, 222 + \\ parameters TEXT DEFAULT '{}', 223 + \\ parameter_openapi_schema TEXT, 224 + \\ enforce_parameter_schema INTEGER DEFAULT 1, 225 + \\ tags TEXT DEFAULT '[]', 226 + \\ labels TEXT DEFAULT '{}', 227 + \\ path TEXT, 228 + \\ entrypoint TEXT, 229 + \\ job_variables TEXT DEFAULT '{}', 230 + \\ pull_steps TEXT, 231 + \\ work_pool_name TEXT, 232 + \\ work_queue_name TEXT, 233 + \\ work_queue_id TEXT REFERENCES work_queue(id) ON DELETE SET NULL, 234 + \\ storage_document_id TEXT, 235 + \\ infrastructure_document_id TEXT, 236 + \\ concurrency_limit INTEGER, 237 + \\ UNIQUE(flow_id, name) 238 + \\) 239 + , .{}); 240 + 241 + // deployment_schedule table 242 + try backend.db.exec( 243 + \\CREATE TABLE IF NOT EXISTS deployment_schedule ( 244 + \\ id TEXT PRIMARY KEY, 245 + \\ created TEXT NOT NULL, 246 + \\ updated TEXT NOT NULL, 247 + \\ deployment_id TEXT NOT NULL REFERENCES deployment(id) ON DELETE CASCADE, 248 + \\ schedule TEXT NOT NULL, 249 + \\ active INTEGER DEFAULT 1, 250 + \\ max_scheduled_runs INTEGER, 251 + \\ parameters TEXT DEFAULT '{}', 252 + \\ slug TEXT, 253 + \\ UNIQUE(deployment_id, slug) 254 + \\) 255 + , .{}); 256 + 204 257 // indexes 205 258 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)", .{}); 206 259 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)", .{}); ··· 222 275 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__work_pool_id ON work_queue(work_pool_id)", .{}); 223 276 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_work_queue__priority ON work_queue(priority)", .{}); 224 277 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_worker__work_pool_id ON worker(work_pool_id)", .{}); 278 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__deployment_id ON flow_run(deployment_id)", .{}); 279 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__flow_id ON deployment(flow_id)", .{}); 280 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__work_queue_id ON deployment(work_queue_id)", .{}); 281 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__work_pool_name ON deployment(work_pool_name)", .{}); 282 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__created ON deployment(created)", .{}); 283 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__updated ON deployment(updated)", .{}); 284 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__deployment_id ON deployment_schedule(deployment_id)", .{}); 285 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__active ON deployment_schedule(active)", .{}); 225 286 226 287 log.info("database", "sqlite schema initialized", .{}); 227 288 }
+2
src/db/sqlite.zig
··· 18 18 pub const work_pools = @import("work_pools.zig"); 19 19 pub const work_queues = @import("work_queues.zig"); 20 20 pub const workers = @import("workers.zig"); 21 + pub const deployments = @import("deployments.zig"); 22 + pub const deployment_schedules = @import("deployment_schedules.zig"); 21 23 22 24 // re-export types for compatibility 23 25 pub const FlowRow = flows.FlowRow;