prefect server in zig
1const std = @import("std");
2const Allocator = std.mem.Allocator;
3
4const backend = @import("backend.zig");
5const log = @import("../logging.zig");
6
7/// Work pool status enum - matches Python's WorkPoolStatus
8pub const Status = enum {
9 not_ready,
10 ready,
11 paused,
12
13 pub fn fromString(s: []const u8) Status {
14 if (std.mem.eql(u8, s, "READY")) return .ready;
15 if (std.mem.eql(u8, s, "PAUSED")) return .paused;
16 return .not_ready;
17 }
18
19 pub fn toString(self: Status) []const u8 {
20 return switch (self) {
21 .not_ready => "NOT_READY",
22 .ready => "READY",
23 .paused => "PAUSED",
24 };
25 }
26};
27
28pub const WorkPoolRow = struct {
29 id: []const u8,
30 created: []const u8,
31 updated: []const u8,
32 name: []const u8,
33 description: ?[]const u8,
34 type: []const u8,
35 base_job_template: []const u8,
36 is_paused: bool,
37 concurrency_limit: ?i64,
38 default_queue_id: ?[]const u8,
39 status: Status,
40};
41
42const Col = struct {
43 const id: usize = 0;
44 const created: usize = 1;
45 const updated: usize = 2;
46 const name: usize = 3;
47 const description: usize = 4;
48 const type_: usize = 5;
49 const base_job_template: usize = 6;
50 const is_paused: usize = 7;
51 const concurrency_limit: usize = 8;
52 const default_queue_id: usize = 9;
53 const status: usize = 10;
54};
55
56const select_cols = "id, created, updated, name, description, type, base_job_template, is_paused, concurrency_limit, default_queue_id, status";
57
58fn rowFromResult(alloc: Allocator, r: anytype) !WorkPoolRow {
59 const desc = r.textOrNull(Col.description);
60 const queue_id = r.textOrNull(Col.default_queue_id);
61 const concurrency = r.textOrNull(Col.concurrency_limit);
62
63 return WorkPoolRow{
64 .id = try alloc.dupe(u8, r.text(Col.id)),
65 .created = try alloc.dupe(u8, r.text(Col.created)),
66 .updated = try alloc.dupe(u8, r.text(Col.updated)),
67 .name = try alloc.dupe(u8, r.text(Col.name)),
68 .description = if (desc) |d| try alloc.dupe(u8, d) else null,
69 .type = try alloc.dupe(u8, r.text(Col.type_)),
70 .base_job_template = try alloc.dupe(u8, r.text(Col.base_job_template)),
71 .is_paused = r.int(Col.is_paused) != 0,
72 .concurrency_limit = if (concurrency != null) r.bigint(Col.concurrency_limit) else null,
73 .default_queue_id = if (queue_id) |q| try alloc.dupe(u8, q) else null,
74 .status = Status.fromString(r.text(Col.status)),
75 };
76}
77
78pub fn getById(alloc: Allocator, id: []const u8) !?WorkPoolRow {
79 var r = backend.db.row(
80 "SELECT " ++ select_cols ++ " FROM work_pool WHERE id = ?",
81 .{id},
82 ) catch return null;
83
84 if (r) |*row| {
85 defer row.deinit();
86 return try rowFromResult(alloc, row);
87 }
88 return null;
89}
90
91pub fn getByName(alloc: Allocator, name: []const u8) !?WorkPoolRow {
92 var r = backend.db.row(
93 "SELECT " ++ select_cols ++ " FROM work_pool WHERE name = ?",
94 .{name},
95 ) catch return null;
96
97 if (r) |*row| {
98 defer row.deinit();
99 return try rowFromResult(alloc, row);
100 }
101 return null;
102}
103
104pub fn insert(
105 id: []const u8,
106 name: []const u8,
107 description: ?[]const u8,
108 pool_type: []const u8,
109 base_job_template: []const u8,
110 is_paused: bool,
111 concurrency_limit: ?i64,
112 default_queue_id: ?[]const u8,
113 status: Status,
114 created: []const u8,
115) !void {
116 backend.db.exec(
117 "INSERT INTO work_pool (id, created, updated, name, description, type, base_job_template, is_paused, concurrency_limit, default_queue_id, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
118 .{
119 id,
120 created,
121 created,
122 name,
123 description,
124 pool_type,
125 base_job_template,
126 @as(i32, if (is_paused) 1 else 0),
127 concurrency_limit,
128 default_queue_id,
129 status.toString(),
130 },
131 ) catch |err| {
132 log.err("database", "insert work_pool error: {}", .{err});
133 return err;
134 };
135}
136
137pub fn updateByName(
138 name: []const u8,
139 description: ?[]const u8,
140 base_job_template: ?[]const u8,
141 is_paused: ?bool,
142 concurrency_limit: ?i64,
143 status: ?Status,
144 updated: []const u8,
145) !bool {
146 const affected = backend.db.execWithRowCount(
147 "UPDATE work_pool SET description = COALESCE(?, description), base_job_template = COALESCE(?, base_job_template), is_paused = COALESCE(?, is_paused), concurrency_limit = COALESCE(?, concurrency_limit), status = COALESCE(?, status), updated = ? WHERE name = ?",
148 .{
149 description,
150 base_job_template,
151 if (is_paused) |p| @as(?i32, if (p) 1 else 0) else null,
152 concurrency_limit,
153 if (status) |s| s.toString() else null,
154 updated,
155 name,
156 },
157 ) catch |err| {
158 log.err("database", "update work_pool error: {}", .{err});
159 return err;
160 };
161 return affected > 0;
162}
163
164pub fn updateDefaultQueueId(name: []const u8, default_queue_id: []const u8, updated: []const u8) !bool {
165 const affected = backend.db.execWithRowCount(
166 "UPDATE work_pool SET default_queue_id = ?, updated = ? WHERE name = ?",
167 .{ default_queue_id, updated, name },
168 ) catch |err| {
169 log.err("database", "update work_pool default_queue_id error: {}", .{err});
170 return err;
171 };
172 return affected > 0;
173}
174
175pub fn updateStatus(name: []const u8, status: Status, updated: []const u8) !bool {
176 const affected = backend.db.execWithRowCount(
177 "UPDATE work_pool SET status = ?, updated = ? WHERE name = ?",
178 .{ status.toString(), updated, name },
179 ) catch |err| {
180 log.err("database", "update work_pool status error: {}", .{err});
181 return err;
182 };
183 return affected > 0;
184}
185
186pub fn deleteByName(name: []const u8) !bool {
187 const affected = backend.db.execWithRowCount(
188 "DELETE FROM work_pool WHERE name = ?",
189 .{name},
190 ) catch |err| {
191 log.err("database", "delete work_pool error: {}", .{err});
192 return err;
193 };
194 return affected > 0;
195}
196
197pub fn list(alloc: Allocator, limit: usize, offset: usize) ![]WorkPoolRow {
198 var results = std.ArrayListUnmanaged(WorkPoolRow){};
199 errdefer results.deinit(alloc);
200
201 var rows = backend.db.query(
202 "SELECT " ++ select_cols ++ " FROM work_pool ORDER BY name ASC LIMIT ? OFFSET ?",
203 .{ @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) },
204 ) catch |err| {
205 log.err("database", "list work_pools error: {}", .{err});
206 return err;
207 };
208 defer rows.deinit();
209
210 while (rows.next()) |r| {
211 try results.append(alloc, try rowFromResult(alloc, &r));
212 }
213
214 return results.toOwnedSlice(alloc);
215}
216
217pub fn count() !usize {
218 var r = backend.db.row("SELECT COUNT(*) FROM work_pool", .{}) catch return 0;
219 if (r) |*row| {
220 defer row.deinit();
221 return @intCast(row.bigint(0));
222 }
223 return 0;
224}
225
226/// Check if any online workers exist for a pool
227pub fn hasOnlineWorkers(pool_id: []const u8) !bool {
228 var r = backend.db.row(
229 "SELECT COUNT(*) FROM worker WHERE work_pool_id = ? AND status = 'ONLINE'",
230 .{pool_id},
231 ) catch return false;
232 if (r) |*row| {
233 defer row.deinit();
234 return row.bigint(0) > 0;
235 }
236 return false;
237}