prefect server in zig
at main 237 lines 7.4 kB view raw
1const std = @import("std"); 2const Allocator = std.mem.Allocator; 3 4const backend = @import("backend.zig"); 5const log = @import("../logging.zig"); 6 7/// Work pool status enum - matches Python's WorkPoolStatus 8pub const Status = enum { 9 not_ready, 10 ready, 11 paused, 12 13 pub fn fromString(s: []const u8) Status { 14 if (std.mem.eql(u8, s, "READY")) return .ready; 15 if (std.mem.eql(u8, s, "PAUSED")) return .paused; 16 return .not_ready; 17 } 18 19 pub fn toString(self: Status) []const u8 { 20 return switch (self) { 21 .not_ready => "NOT_READY", 22 .ready => "READY", 23 .paused => "PAUSED", 24 }; 25 } 26}; 27 28pub const WorkPoolRow = struct { 29 id: []const u8, 30 created: []const u8, 31 updated: []const u8, 32 name: []const u8, 33 description: ?[]const u8, 34 type: []const u8, 35 base_job_template: []const u8, 36 is_paused: bool, 37 concurrency_limit: ?i64, 38 default_queue_id: ?[]const u8, 39 status: Status, 40}; 41 42const Col = struct { 43 const id: usize = 0; 44 const created: usize = 1; 45 const updated: usize = 2; 46 const name: usize = 3; 47 const description: usize = 4; 48 const type_: usize = 5; 49 const base_job_template: usize = 6; 50 const is_paused: usize = 7; 51 const concurrency_limit: usize = 8; 52 const default_queue_id: usize = 9; 53 const status: usize = 10; 54}; 55 56const select_cols = "id, created, updated, name, description, type, base_job_template, is_paused, concurrency_limit, default_queue_id, status"; 57 58fn rowFromResult(alloc: Allocator, r: anytype) !WorkPoolRow { 59 const desc = r.textOrNull(Col.description); 60 const queue_id = r.textOrNull(Col.default_queue_id); 61 const concurrency = r.textOrNull(Col.concurrency_limit); 62 63 return WorkPoolRow{ 64 .id = try alloc.dupe(u8, r.text(Col.id)), 65 .created = try alloc.dupe(u8, r.text(Col.created)), 66 .updated = try alloc.dupe(u8, r.text(Col.updated)), 67 .name = try alloc.dupe(u8, r.text(Col.name)), 68 .description = if (desc) |d| try alloc.dupe(u8, d) else null, 69 .type = try alloc.dupe(u8, r.text(Col.type_)), 70 .base_job_template = try alloc.dupe(u8, r.text(Col.base_job_template)), 71 .is_paused = r.int(Col.is_paused) != 0, 72 .concurrency_limit = if (concurrency != null) r.bigint(Col.concurrency_limit) else null, 73 .default_queue_id = if (queue_id) |q| try alloc.dupe(u8, q) else null, 74 .status = Status.fromString(r.text(Col.status)), 75 }; 76} 77 78pub fn getById(alloc: Allocator, id: []const u8) !?WorkPoolRow { 79 var r = backend.db.row( 80 "SELECT " ++ select_cols ++ " FROM work_pool WHERE id = ?", 81 .{id}, 82 ) catch return null; 83 84 if (r) |*row| { 85 defer row.deinit(); 86 return try rowFromResult(alloc, row); 87 } 88 return null; 89} 90 91pub fn getByName(alloc: Allocator, name: []const u8) !?WorkPoolRow { 92 var r = backend.db.row( 93 "SELECT " ++ select_cols ++ " FROM work_pool WHERE name = ?", 94 .{name}, 95 ) catch return null; 96 97 if (r) |*row| { 98 defer row.deinit(); 99 return try rowFromResult(alloc, row); 100 } 101 return null; 102} 103 104pub fn insert( 105 id: []const u8, 106 name: []const u8, 107 description: ?[]const u8, 108 pool_type: []const u8, 109 base_job_template: []const u8, 110 is_paused: bool, 111 concurrency_limit: ?i64, 112 default_queue_id: ?[]const u8, 113 status: Status, 114 created: []const u8, 115) !void { 116 backend.db.exec( 117 "INSERT INTO work_pool (id, created, updated, name, description, type, base_job_template, is_paused, concurrency_limit, default_queue_id, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 118 .{ 119 id, 120 created, 121 created, 122 name, 123 description, 124 pool_type, 125 base_job_template, 126 @as(i32, if (is_paused) 1 else 0), 127 concurrency_limit, 128 default_queue_id, 129 status.toString(), 130 }, 131 ) catch |err| { 132 log.err("database", "insert work_pool error: {}", .{err}); 133 return err; 134 }; 135} 136 137pub fn updateByName( 138 name: []const u8, 139 description: ?[]const u8, 140 base_job_template: ?[]const u8, 141 is_paused: ?bool, 142 concurrency_limit: ?i64, 143 status: ?Status, 144 updated: []const u8, 145) !bool { 146 const affected = backend.db.execWithRowCount( 147 "UPDATE work_pool SET description = COALESCE(?, description), base_job_template = COALESCE(?, base_job_template), is_paused = COALESCE(?, is_paused), concurrency_limit = COALESCE(?, concurrency_limit), status = COALESCE(?, status), updated = ? WHERE name = ?", 148 .{ 149 description, 150 base_job_template, 151 if (is_paused) |p| @as(?i32, if (p) 1 else 0) else null, 152 concurrency_limit, 153 if (status) |s| s.toString() else null, 154 updated, 155 name, 156 }, 157 ) catch |err| { 158 log.err("database", "update work_pool error: {}", .{err}); 159 return err; 160 }; 161 return affected > 0; 162} 163 164pub fn updateDefaultQueueId(name: []const u8, default_queue_id: []const u8, updated: []const u8) !bool { 165 const affected = backend.db.execWithRowCount( 166 "UPDATE work_pool SET default_queue_id = ?, updated = ? WHERE name = ?", 167 .{ default_queue_id, updated, name }, 168 ) catch |err| { 169 log.err("database", "update work_pool default_queue_id error: {}", .{err}); 170 return err; 171 }; 172 return affected > 0; 173} 174 175pub fn updateStatus(name: []const u8, status: Status, updated: []const u8) !bool { 176 const affected = backend.db.execWithRowCount( 177 "UPDATE work_pool SET status = ?, updated = ? WHERE name = ?", 178 .{ status.toString(), updated, name }, 179 ) catch |err| { 180 log.err("database", "update work_pool status error: {}", .{err}); 181 return err; 182 }; 183 return affected > 0; 184} 185 186pub fn deleteByName(name: []const u8) !bool { 187 const affected = backend.db.execWithRowCount( 188 "DELETE FROM work_pool WHERE name = ?", 189 .{name}, 190 ) catch |err| { 191 log.err("database", "delete work_pool error: {}", .{err}); 192 return err; 193 }; 194 return affected > 0; 195} 196 197pub fn list(alloc: Allocator, limit: usize, offset: usize) ![]WorkPoolRow { 198 var results = std.ArrayListUnmanaged(WorkPoolRow){}; 199 errdefer results.deinit(alloc); 200 201 var rows = backend.db.query( 202 "SELECT " ++ select_cols ++ " FROM work_pool ORDER BY name ASC LIMIT ? OFFSET ?", 203 .{ @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 204 ) catch |err| { 205 log.err("database", "list work_pools error: {}", .{err}); 206 return err; 207 }; 208 defer rows.deinit(); 209 210 while (rows.next()) |r| { 211 try results.append(alloc, try rowFromResult(alloc, &r)); 212 } 213 214 return results.toOwnedSlice(alloc); 215} 216 217pub fn count() !usize { 218 var r = backend.db.row("SELECT COUNT(*) FROM work_pool", .{}) catch return 0; 219 if (r) |*row| { 220 defer row.deinit(); 221 return @intCast(row.bigint(0)); 222 } 223 return 0; 224} 225 226/// Check if any online workers exist for a pool 227pub fn hasOnlineWorkers(pool_id: []const u8) !bool { 228 var r = backend.db.row( 229 "SELECT COUNT(*) FROM worker WHERE work_pool_id = ? AND status = 'ONLINE'", 230 .{pool_id}, 231 ) catch return false; 232 if (r) |*row| { 233 defer row.deinit(); 234 return row.bigint(0) > 0; 235 } 236 return false; 237}