prefect server in zig
at main 227 lines 7.2 kB view raw
1const std = @import("std"); 2const Allocator = std.mem.Allocator; 3 4const backend = @import("backend.zig"); 5const log = @import("../logging.zig"); 6 7/// Work queue status enum - matches Python's WorkQueueStatus 8pub const Status = enum { 9 not_ready, 10 ready, 11 paused, 12 13 pub fn fromString(s: []const u8) Status { 14 if (std.mem.eql(u8, s, "READY")) return .ready; 15 if (std.mem.eql(u8, s, "PAUSED")) return .paused; 16 return .not_ready; 17 } 18 19 pub fn toString(self: Status) []const u8 { 20 return switch (self) { 21 .not_ready => "NOT_READY", 22 .ready => "READY", 23 .paused => "PAUSED", 24 }; 25 } 26}; 27 28pub const WorkQueueRow = struct { 29 id: []const u8, 30 created: []const u8, 31 updated: []const u8, 32 name: []const u8, 33 description: []const u8, 34 is_paused: bool, 35 concurrency_limit: ?i64, 36 priority: i64, 37 work_pool_id: []const u8, 38 last_polled: ?[]const u8, 39 status: Status, 40}; 41 42const Col = struct { 43 const id: usize = 0; 44 const created: usize = 1; 45 const updated: usize = 2; 46 const name: usize = 3; 47 const description: usize = 4; 48 const is_paused: usize = 5; 49 const concurrency_limit: usize = 6; 50 const priority: usize = 7; 51 const work_pool_id: usize = 8; 52 const last_polled: usize = 9; 53 const status: usize = 10; 54}; 55 56const select_cols = "id, created, updated, name, description, is_paused, concurrency_limit, priority, work_pool_id, last_polled, status"; 57 58fn rowFromResult(alloc: Allocator, r: anytype) !WorkQueueRow { 59 const last_polled = r.textOrNull(Col.last_polled); 60 const concurrency = r.textOrNull(Col.concurrency_limit); 61 62 return WorkQueueRow{ 63 .id = try alloc.dupe(u8, r.text(Col.id)), 64 .created = try alloc.dupe(u8, r.text(Col.created)), 65 .updated = try alloc.dupe(u8, r.text(Col.updated)), 66 .name = try alloc.dupe(u8, r.text(Col.name)), 67 .description = try alloc.dupe(u8, r.text(Col.description)), 68 .is_paused = r.int(Col.is_paused) != 0, 69 .concurrency_limit = if (concurrency != null) r.bigint(Col.concurrency_limit) else null, 70 .priority = r.bigint(Col.priority), 71 .work_pool_id = try alloc.dupe(u8, r.text(Col.work_pool_id)), 72 .last_polled = if (last_polled) |lp| try alloc.dupe(u8, lp) else null, 73 .status = Status.fromString(r.text(Col.status)), 74 }; 75} 76 77pub fn getById(alloc: Allocator, id: []const u8) !?WorkQueueRow { 78 var r = backend.db.row( 79 "SELECT " ++ select_cols ++ " FROM work_queue WHERE id = ?", 80 .{id}, 81 ) catch return null; 82 83 if (r) |*row| { 84 defer row.deinit(); 85 return try rowFromResult(alloc, row); 86 } 87 return null; 88} 89 90pub fn getByPoolAndName(alloc: Allocator, pool_id: []const u8, name: []const u8) !?WorkQueueRow { 91 var r = backend.db.row( 92 "SELECT " ++ select_cols ++ " FROM work_queue WHERE work_pool_id = ? AND name = ?", 93 .{ pool_id, name }, 94 ) catch return null; 95 96 if (r) |*row| { 97 defer row.deinit(); 98 return try rowFromResult(alloc, row); 99 } 100 return null; 101} 102 103pub fn insert( 104 id: []const u8, 105 name: []const u8, 106 description: []const u8, 107 is_paused: bool, 108 concurrency_limit: ?i64, 109 priority: i64, 110 work_pool_id: []const u8, 111 status: Status, 112 created: []const u8, 113) !void { 114 backend.db.exec( 115 "INSERT INTO work_queue (id, created, updated, name, description, is_paused, concurrency_limit, priority, work_pool_id, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 116 .{ 117 id, 118 created, 119 created, 120 name, 121 description, 122 @as(i32, if (is_paused) 1 else 0), 123 concurrency_limit, 124 priority, 125 work_pool_id, 126 status.toString(), 127 }, 128 ) catch |err| { 129 log.err("database", "insert work_queue error: {}", .{err}); 130 return err; 131 }; 132} 133 134pub fn updateByPoolAndName( 135 pool_id: []const u8, 136 name: []const u8, 137 new_name: ?[]const u8, 138 description: ?[]const u8, 139 is_paused: ?bool, 140 concurrency_limit: ?i64, 141 priority: ?i64, 142 status: ?Status, 143 updated: []const u8, 144) !bool { 145 const affected = backend.db.execWithRowCount( 146 "UPDATE work_queue SET name = COALESCE(?, name), description = COALESCE(?, description), is_paused = COALESCE(?, is_paused), concurrency_limit = COALESCE(?, concurrency_limit), priority = COALESCE(?, priority), status = COALESCE(?, status), updated = ? WHERE work_pool_id = ? AND name = ?", 147 .{ 148 new_name, 149 description, 150 if (is_paused) |p| @as(?i32, if (p) 1 else 0) else null, 151 concurrency_limit, 152 priority, 153 if (status) |s| s.toString() else null, 154 updated, 155 pool_id, 156 name, 157 }, 158 ) catch |err| { 159 log.err("database", "update work_queue error: {}", .{err}); 160 return err; 161 }; 162 return affected > 0; 163} 164 165pub fn updateLastPolled(pool_id: []const u8, name: []const u8, last_polled: []const u8, status: Status) !bool { 166 const affected = backend.db.execWithRowCount( 167 "UPDATE work_queue SET last_polled = ?, status = ?, updated = ? WHERE work_pool_id = ? AND name = ?", 168 .{ last_polled, status.toString(), last_polled, pool_id, name }, 169 ) catch |err| { 170 log.err("database", "update work_queue last_polled error: {}", .{err}); 171 return err; 172 }; 173 return affected > 0; 174} 175 176pub fn deleteByPoolAndName(pool_id: []const u8, name: []const u8) !bool { 177 const affected = backend.db.execWithRowCount( 178 "DELETE FROM work_queue WHERE work_pool_id = ? AND name = ?", 179 .{ pool_id, name }, 180 ) catch |err| { 181 log.err("database", "delete work_queue error: {}", .{err}); 182 return err; 183 }; 184 return affected > 0; 185} 186 187pub fn listByPool(alloc: Allocator, pool_id: []const u8, limit: usize, offset: usize) ![]WorkQueueRow { 188 var results = std.ArrayListUnmanaged(WorkQueueRow){}; 189 errdefer results.deinit(alloc); 190 191 var rows = backend.db.query( 192 "SELECT " ++ select_cols ++ " FROM work_queue WHERE work_pool_id = ? ORDER BY priority ASC LIMIT ? OFFSET ?", 193 .{ pool_id, @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 194 ) catch |err| { 195 log.err("database", "list work_queues error: {}", .{err}); 196 return err; 197 }; 198 defer rows.deinit(); 199 200 while (rows.next()) |r| { 201 try results.append(alloc, try rowFromResult(alloc, &r)); 202 } 203 204 return results.toOwnedSlice(alloc); 205} 206 207pub fn countByPool(pool_id: []const u8) !usize { 208 var r = backend.db.row("SELECT COUNT(*) FROM work_queue WHERE work_pool_id = ?", .{pool_id}) catch return 0; 209 if (r) |*row| { 210 defer row.deinit(); 211 return @intCast(row.bigint(0)); 212 } 213 return 0; 214} 215 216/// Get next available priority for a pool (max + 1, or 1 if no queues) 217pub fn nextPriority(pool_id: []const u8) !i64 { 218 var r = backend.db.row( 219 "SELECT COALESCE(MAX(priority), 0) + 1 FROM work_queue WHERE work_pool_id = ?", 220 .{pool_id}, 221 ) catch return 1; 222 if (r) |*row| { 223 defer row.deinit(); 224 return row.bigint(0); 225 } 226 return 1; 227}