prefect server in zig
1const std = @import("std");
2const Allocator = std.mem.Allocator;
3
4const backend = @import("backend.zig");
5const log = @import("../logging.zig");
6
7/// Work queue status enum - matches Python's WorkQueueStatus
8pub const Status = enum {
9 not_ready,
10 ready,
11 paused,
12
13 pub fn fromString(s: []const u8) Status {
14 if (std.mem.eql(u8, s, "READY")) return .ready;
15 if (std.mem.eql(u8, s, "PAUSED")) return .paused;
16 return .not_ready;
17 }
18
19 pub fn toString(self: Status) []const u8 {
20 return switch (self) {
21 .not_ready => "NOT_READY",
22 .ready => "READY",
23 .paused => "PAUSED",
24 };
25 }
26};
27
28pub const WorkQueueRow = struct {
29 id: []const u8,
30 created: []const u8,
31 updated: []const u8,
32 name: []const u8,
33 description: []const u8,
34 is_paused: bool,
35 concurrency_limit: ?i64,
36 priority: i64,
37 work_pool_id: []const u8,
38 last_polled: ?[]const u8,
39 status: Status,
40};
41
42const Col = struct {
43 const id: usize = 0;
44 const created: usize = 1;
45 const updated: usize = 2;
46 const name: usize = 3;
47 const description: usize = 4;
48 const is_paused: usize = 5;
49 const concurrency_limit: usize = 6;
50 const priority: usize = 7;
51 const work_pool_id: usize = 8;
52 const last_polled: usize = 9;
53 const status: usize = 10;
54};
55
56const select_cols = "id, created, updated, name, description, is_paused, concurrency_limit, priority, work_pool_id, last_polled, status";
57
58fn rowFromResult(alloc: Allocator, r: anytype) !WorkQueueRow {
59 const last_polled = r.textOrNull(Col.last_polled);
60 const concurrency = r.textOrNull(Col.concurrency_limit);
61
62 return WorkQueueRow{
63 .id = try alloc.dupe(u8, r.text(Col.id)),
64 .created = try alloc.dupe(u8, r.text(Col.created)),
65 .updated = try alloc.dupe(u8, r.text(Col.updated)),
66 .name = try alloc.dupe(u8, r.text(Col.name)),
67 .description = try alloc.dupe(u8, r.text(Col.description)),
68 .is_paused = r.int(Col.is_paused) != 0,
69 .concurrency_limit = if (concurrency != null) r.bigint(Col.concurrency_limit) else null,
70 .priority = r.bigint(Col.priority),
71 .work_pool_id = try alloc.dupe(u8, r.text(Col.work_pool_id)),
72 .last_polled = if (last_polled) |lp| try alloc.dupe(u8, lp) else null,
73 .status = Status.fromString(r.text(Col.status)),
74 };
75}
76
77pub fn getById(alloc: Allocator, id: []const u8) !?WorkQueueRow {
78 var r = backend.db.row(
79 "SELECT " ++ select_cols ++ " FROM work_queue WHERE id = ?",
80 .{id},
81 ) catch return null;
82
83 if (r) |*row| {
84 defer row.deinit();
85 return try rowFromResult(alloc, row);
86 }
87 return null;
88}
89
90pub fn getByPoolAndName(alloc: Allocator, pool_id: []const u8, name: []const u8) !?WorkQueueRow {
91 var r = backend.db.row(
92 "SELECT " ++ select_cols ++ " FROM work_queue WHERE work_pool_id = ? AND name = ?",
93 .{ pool_id, name },
94 ) catch return null;
95
96 if (r) |*row| {
97 defer row.deinit();
98 return try rowFromResult(alloc, row);
99 }
100 return null;
101}
102
103pub fn insert(
104 id: []const u8,
105 name: []const u8,
106 description: []const u8,
107 is_paused: bool,
108 concurrency_limit: ?i64,
109 priority: i64,
110 work_pool_id: []const u8,
111 status: Status,
112 created: []const u8,
113) !void {
114 backend.db.exec(
115 "INSERT INTO work_queue (id, created, updated, name, description, is_paused, concurrency_limit, priority, work_pool_id, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
116 .{
117 id,
118 created,
119 created,
120 name,
121 description,
122 @as(i32, if (is_paused) 1 else 0),
123 concurrency_limit,
124 priority,
125 work_pool_id,
126 status.toString(),
127 },
128 ) catch |err| {
129 log.err("database", "insert work_queue error: {}", .{err});
130 return err;
131 };
132}
133
134pub fn updateByPoolAndName(
135 pool_id: []const u8,
136 name: []const u8,
137 new_name: ?[]const u8,
138 description: ?[]const u8,
139 is_paused: ?bool,
140 concurrency_limit: ?i64,
141 priority: ?i64,
142 status: ?Status,
143 updated: []const u8,
144) !bool {
145 const affected = backend.db.execWithRowCount(
146 "UPDATE work_queue SET name = COALESCE(?, name), description = COALESCE(?, description), is_paused = COALESCE(?, is_paused), concurrency_limit = COALESCE(?, concurrency_limit), priority = COALESCE(?, priority), status = COALESCE(?, status), updated = ? WHERE work_pool_id = ? AND name = ?",
147 .{
148 new_name,
149 description,
150 if (is_paused) |p| @as(?i32, if (p) 1 else 0) else null,
151 concurrency_limit,
152 priority,
153 if (status) |s| s.toString() else null,
154 updated,
155 pool_id,
156 name,
157 },
158 ) catch |err| {
159 log.err("database", "update work_queue error: {}", .{err});
160 return err;
161 };
162 return affected > 0;
163}
164
165pub fn updateLastPolled(pool_id: []const u8, name: []const u8, last_polled: []const u8, status: Status) !bool {
166 const affected = backend.db.execWithRowCount(
167 "UPDATE work_queue SET last_polled = ?, status = ?, updated = ? WHERE work_pool_id = ? AND name = ?",
168 .{ last_polled, status.toString(), last_polled, pool_id, name },
169 ) catch |err| {
170 log.err("database", "update work_queue last_polled error: {}", .{err});
171 return err;
172 };
173 return affected > 0;
174}
175
176pub fn deleteByPoolAndName(pool_id: []const u8, name: []const u8) !bool {
177 const affected = backend.db.execWithRowCount(
178 "DELETE FROM work_queue WHERE work_pool_id = ? AND name = ?",
179 .{ pool_id, name },
180 ) catch |err| {
181 log.err("database", "delete work_queue error: {}", .{err});
182 return err;
183 };
184 return affected > 0;
185}
186
187pub fn listByPool(alloc: Allocator, pool_id: []const u8, limit: usize, offset: usize) ![]WorkQueueRow {
188 var results = std.ArrayListUnmanaged(WorkQueueRow){};
189 errdefer results.deinit(alloc);
190
191 var rows = backend.db.query(
192 "SELECT " ++ select_cols ++ " FROM work_queue WHERE work_pool_id = ? ORDER BY priority ASC LIMIT ? OFFSET ?",
193 .{ pool_id, @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) },
194 ) catch |err| {
195 log.err("database", "list work_queues error: {}", .{err});
196 return err;
197 };
198 defer rows.deinit();
199
200 while (rows.next()) |r| {
201 try results.append(alloc, try rowFromResult(alloc, &r));
202 }
203
204 return results.toOwnedSlice(alloc);
205}
206
207pub fn countByPool(pool_id: []const u8) !usize {
208 var r = backend.db.row("SELECT COUNT(*) FROM work_queue WHERE work_pool_id = ?", .{pool_id}) catch return 0;
209 if (r) |*row| {
210 defer row.deinit();
211 return @intCast(row.bigint(0));
212 }
213 return 0;
214}
215
216/// Get next available priority for a pool (max + 1, or 1 if no queues)
217pub fn nextPriority(pool_id: []const u8) !i64 {
218 var r = backend.db.row(
219 "SELECT COALESCE(MAX(priority), 0) + 1 FROM work_queue WHERE work_pool_id = ?",
220 .{pool_id},
221 ) catch return 1;
222 if (r) |*row| {
223 defer row.deinit();
224 return row.bigint(0);
225 }
226 return 1;
227}