prefect server in zig
at main 194 lines 6.1 kB view raw
1const std = @import("std"); 2const Allocator = std.mem.Allocator; 3 4const backend = @import("backend.zig"); 5const log = @import("../logging.zig"); 6 7/// Worker status enum - matches Python's WorkerStatus 8pub const Status = enum { 9 online, 10 offline, 11 12 pub fn fromString(s: []const u8) Status { 13 if (std.mem.eql(u8, s, "ONLINE")) return .online; 14 return .offline; 15 } 16 17 pub fn toString(self: Status) []const u8 { 18 return switch (self) { 19 .online => "ONLINE", 20 .offline => "OFFLINE", 21 }; 22 } 23}; 24 25pub const WorkerRow = struct { 26 id: []const u8, 27 created: []const u8, 28 updated: []const u8, 29 name: []const u8, 30 work_pool_id: []const u8, 31 last_heartbeat_time: ?[]const u8, 32 heartbeat_interval_seconds: ?i64, 33 status: Status, 34}; 35 36const Col = struct { 37 const id: usize = 0; 38 const created: usize = 1; 39 const updated: usize = 2; 40 const name: usize = 3; 41 const work_pool_id: usize = 4; 42 const last_heartbeat_time: usize = 5; 43 const heartbeat_interval_seconds: usize = 6; 44 const status: usize = 7; 45}; 46 47const select_cols = "id, created, updated, name, work_pool_id, last_heartbeat_time, heartbeat_interval_seconds, status"; 48 49fn rowFromResult(alloc: Allocator, r: anytype) !WorkerRow { 50 const last_heartbeat = r.textOrNull(Col.last_heartbeat_time); 51 const interval = r.textOrNull(Col.heartbeat_interval_seconds); 52 53 return WorkerRow{ 54 .id = try alloc.dupe(u8, r.text(Col.id)), 55 .created = try alloc.dupe(u8, r.text(Col.created)), 56 .updated = try alloc.dupe(u8, r.text(Col.updated)), 57 .name = try alloc.dupe(u8, r.text(Col.name)), 58 .work_pool_id = try alloc.dupe(u8, r.text(Col.work_pool_id)), 59 .last_heartbeat_time = if (last_heartbeat) |lh| try alloc.dupe(u8, lh) else null, 60 .heartbeat_interval_seconds = if (interval != null) r.bigint(Col.heartbeat_interval_seconds) else null, 61 .status = Status.fromString(r.text(Col.status)), 62 }; 63} 64 65pub fn getById(alloc: Allocator, id: []const u8) !?WorkerRow { 66 var r = backend.db.row( 67 "SELECT " ++ select_cols ++ " FROM worker WHERE id = ?", 68 .{id}, 69 ) catch return null; 70 71 if (r) |*row| { 72 defer row.deinit(); 73 return try rowFromResult(alloc, row); 74 } 75 return null; 76} 77 78pub fn getByPoolAndName(alloc: Allocator, pool_id: []const u8, name: []const u8) !?WorkerRow { 79 var r = backend.db.row( 80 "SELECT " ++ select_cols ++ " FROM worker WHERE work_pool_id = ? AND name = ?", 81 .{ pool_id, name }, 82 ) catch return null; 83 84 if (r) |*row| { 85 defer row.deinit(); 86 return try rowFromResult(alloc, row); 87 } 88 return null; 89} 90 91/// Upsert worker heartbeat - inserts if not exists, updates if exists 92/// This is the core worker registration mechanism 93pub fn upsertHeartbeat( 94 id: []const u8, 95 name: []const u8, 96 work_pool_id: []const u8, 97 heartbeat_interval_seconds: ?i64, 98 now: []const u8, 99) !void { 100 // Use dialect-specific upsert SQL 101 const sql = switch (backend.db.dialect) { 102 .sqlite => sqlite_upsert, 103 .postgres => postgres_upsert, 104 }; 105 106 backend.db.exec(sql, .{ 107 id, 108 now, 109 now, 110 name, 111 work_pool_id, 112 now, 113 heartbeat_interval_seconds, 114 // For the ON CONFLICT update clause: 115 now, // updated 116 now, // last_heartbeat_time 117 heartbeat_interval_seconds, 118 }) catch |err| { 119 log.err("database", "upsert worker heartbeat error: {}", .{err}); 120 return err; 121 }; 122} 123 124const sqlite_upsert = 125 \\INSERT INTO worker (id, created, updated, name, work_pool_id, last_heartbeat_time, heartbeat_interval_seconds, status) 126 \\VALUES (?, ?, ?, ?, ?, ?, ?, 'ONLINE') 127 \\ON CONFLICT(work_pool_id, name) DO UPDATE SET 128 \\ updated = ?, 129 \\ last_heartbeat_time = ?, 130 \\ heartbeat_interval_seconds = COALESCE(?, heartbeat_interval_seconds), 131 \\ status = 'ONLINE' 132; 133 134const postgres_upsert = 135 \\INSERT INTO worker (id, created, updated, name, work_pool_id, last_heartbeat_time, heartbeat_interval_seconds, status) 136 \\VALUES ($1, $2, $3, $4, $5, $6, $7, 'ONLINE') 137 \\ON CONFLICT(work_pool_id, name) DO UPDATE SET 138 \\ updated = $8, 139 \\ last_heartbeat_time = $9, 140 \\ heartbeat_interval_seconds = COALESCE($10, worker.heartbeat_interval_seconds), 141 \\ status = 'ONLINE' 142; 143 144pub fn deleteByPoolAndName(pool_id: []const u8, name: []const u8) !bool { 145 const affected = backend.db.execWithRowCount( 146 "DELETE FROM worker WHERE work_pool_id = ? AND name = ?", 147 .{ pool_id, name }, 148 ) catch |err| { 149 log.err("database", "delete worker error: {}", .{err}); 150 return err; 151 }; 152 return affected > 0; 153} 154 155pub fn listByPool(alloc: Allocator, pool_id: []const u8, limit: usize, offset: usize) ![]WorkerRow { 156 var results = std.ArrayListUnmanaged(WorkerRow){}; 157 errdefer results.deinit(alloc); 158 159 var rows = backend.db.query( 160 "SELECT " ++ select_cols ++ " FROM worker WHERE work_pool_id = ? ORDER BY last_heartbeat_time DESC NULLS LAST LIMIT ? OFFSET ?", 161 .{ pool_id, @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 162 ) catch |err| { 163 log.err("database", "list workers error: {}", .{err}); 164 return err; 165 }; 166 defer rows.deinit(); 167 168 while (rows.next()) |r| { 169 try results.append(alloc, try rowFromResult(alloc, &r)); 170 } 171 172 return results.toOwnedSlice(alloc); 173} 174 175pub fn countByPool(pool_id: []const u8) !usize { 176 var r = backend.db.row("SELECT COUNT(*) FROM worker WHERE work_pool_id = ?", .{pool_id}) catch return 0; 177 if (r) |*row| { 178 defer row.deinit(); 179 return @intCast(row.bigint(0)); 180 } 181 return 0; 182} 183 184pub fn countOnlineByPool(pool_id: []const u8) !usize { 185 var r = backend.db.row( 186 "SELECT COUNT(*) FROM worker WHERE work_pool_id = ? AND status = 'ONLINE'", 187 .{pool_id}, 188 ) catch return 0; 189 if (r) |*row| { 190 defer row.deinit(); 191 return @intCast(row.bigint(0)); 192 } 193 return 0; 194}