prefect server in zig
1const std = @import("std");
2const Allocator = std.mem.Allocator;
3
4const backend = @import("backend.zig");
5const log = @import("../logging.zig");
6
7/// Worker status enum - matches Python's WorkerStatus
8pub const Status = enum {
9 online,
10 offline,
11
12 pub fn fromString(s: []const u8) Status {
13 if (std.mem.eql(u8, s, "ONLINE")) return .online;
14 return .offline;
15 }
16
17 pub fn toString(self: Status) []const u8 {
18 return switch (self) {
19 .online => "ONLINE",
20 .offline => "OFFLINE",
21 };
22 }
23};
24
25pub const WorkerRow = struct {
26 id: []const u8,
27 created: []const u8,
28 updated: []const u8,
29 name: []const u8,
30 work_pool_id: []const u8,
31 last_heartbeat_time: ?[]const u8,
32 heartbeat_interval_seconds: ?i64,
33 status: Status,
34};
35
36const Col = struct {
37 const id: usize = 0;
38 const created: usize = 1;
39 const updated: usize = 2;
40 const name: usize = 3;
41 const work_pool_id: usize = 4;
42 const last_heartbeat_time: usize = 5;
43 const heartbeat_interval_seconds: usize = 6;
44 const status: usize = 7;
45};
46
47const select_cols = "id, created, updated, name, work_pool_id, last_heartbeat_time, heartbeat_interval_seconds, status";
48
49fn rowFromResult(alloc: Allocator, r: anytype) !WorkerRow {
50 const last_heartbeat = r.textOrNull(Col.last_heartbeat_time);
51 const interval = r.textOrNull(Col.heartbeat_interval_seconds);
52
53 return WorkerRow{
54 .id = try alloc.dupe(u8, r.text(Col.id)),
55 .created = try alloc.dupe(u8, r.text(Col.created)),
56 .updated = try alloc.dupe(u8, r.text(Col.updated)),
57 .name = try alloc.dupe(u8, r.text(Col.name)),
58 .work_pool_id = try alloc.dupe(u8, r.text(Col.work_pool_id)),
59 .last_heartbeat_time = if (last_heartbeat) |lh| try alloc.dupe(u8, lh) else null,
60 .heartbeat_interval_seconds = if (interval != null) r.bigint(Col.heartbeat_interval_seconds) else null,
61 .status = Status.fromString(r.text(Col.status)),
62 };
63}
64
65pub fn getById(alloc: Allocator, id: []const u8) !?WorkerRow {
66 var r = backend.db.row(
67 "SELECT " ++ select_cols ++ " FROM worker WHERE id = ?",
68 .{id},
69 ) catch return null;
70
71 if (r) |*row| {
72 defer row.deinit();
73 return try rowFromResult(alloc, row);
74 }
75 return null;
76}
77
78pub fn getByPoolAndName(alloc: Allocator, pool_id: []const u8, name: []const u8) !?WorkerRow {
79 var r = backend.db.row(
80 "SELECT " ++ select_cols ++ " FROM worker WHERE work_pool_id = ? AND name = ?",
81 .{ pool_id, name },
82 ) catch return null;
83
84 if (r) |*row| {
85 defer row.deinit();
86 return try rowFromResult(alloc, row);
87 }
88 return null;
89}
90
91/// Upsert worker heartbeat - inserts if not exists, updates if exists
92/// This is the core worker registration mechanism
93pub fn upsertHeartbeat(
94 id: []const u8,
95 name: []const u8,
96 work_pool_id: []const u8,
97 heartbeat_interval_seconds: ?i64,
98 now: []const u8,
99) !void {
100 // Use dialect-specific upsert SQL
101 const sql = switch (backend.db.dialect) {
102 .sqlite => sqlite_upsert,
103 .postgres => postgres_upsert,
104 };
105
106 backend.db.exec(sql, .{
107 id,
108 now,
109 now,
110 name,
111 work_pool_id,
112 now,
113 heartbeat_interval_seconds,
114 // For the ON CONFLICT update clause:
115 now, // updated
116 now, // last_heartbeat_time
117 heartbeat_interval_seconds,
118 }) catch |err| {
119 log.err("database", "upsert worker heartbeat error: {}", .{err});
120 return err;
121 };
122}
123
124const sqlite_upsert =
125 \\INSERT INTO worker (id, created, updated, name, work_pool_id, last_heartbeat_time, heartbeat_interval_seconds, status)
126 \\VALUES (?, ?, ?, ?, ?, ?, ?, 'ONLINE')
127 \\ON CONFLICT(work_pool_id, name) DO UPDATE SET
128 \\ updated = ?,
129 \\ last_heartbeat_time = ?,
130 \\ heartbeat_interval_seconds = COALESCE(?, heartbeat_interval_seconds),
131 \\ status = 'ONLINE'
132;
133
134const postgres_upsert =
135 \\INSERT INTO worker (id, created, updated, name, work_pool_id, last_heartbeat_time, heartbeat_interval_seconds, status)
136 \\VALUES ($1, $2, $3, $4, $5, $6, $7, 'ONLINE')
137 \\ON CONFLICT(work_pool_id, name) DO UPDATE SET
138 \\ updated = $8,
139 \\ last_heartbeat_time = $9,
140 \\ heartbeat_interval_seconds = COALESCE($10, worker.heartbeat_interval_seconds),
141 \\ status = 'ONLINE'
142;
143
144pub fn deleteByPoolAndName(pool_id: []const u8, name: []const u8) !bool {
145 const affected = backend.db.execWithRowCount(
146 "DELETE FROM worker WHERE work_pool_id = ? AND name = ?",
147 .{ pool_id, name },
148 ) catch |err| {
149 log.err("database", "delete worker error: {}", .{err});
150 return err;
151 };
152 return affected > 0;
153}
154
155pub fn listByPool(alloc: Allocator, pool_id: []const u8, limit: usize, offset: usize) ![]WorkerRow {
156 var results = std.ArrayListUnmanaged(WorkerRow){};
157 errdefer results.deinit(alloc);
158
159 var rows = backend.db.query(
160 "SELECT " ++ select_cols ++ " FROM worker WHERE work_pool_id = ? ORDER BY last_heartbeat_time DESC NULLS LAST LIMIT ? OFFSET ?",
161 .{ pool_id, @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) },
162 ) catch |err| {
163 log.err("database", "list workers error: {}", .{err});
164 return err;
165 };
166 defer rows.deinit();
167
168 while (rows.next()) |r| {
169 try results.append(alloc, try rowFromResult(alloc, &r));
170 }
171
172 return results.toOwnedSlice(alloc);
173}
174
175pub fn countByPool(pool_id: []const u8) !usize {
176 var r = backend.db.row("SELECT COUNT(*) FROM worker WHERE work_pool_id = ?", .{pool_id}) catch return 0;
177 if (r) |*row| {
178 defer row.deinit();
179 return @intCast(row.bigint(0));
180 }
181 return 0;
182}
183
184pub fn countOnlineByPool(pool_id: []const u8) !usize {
185 var r = backend.db.row(
186 "SELECT COUNT(*) FROM worker WHERE work_pool_id = ? AND status = 'ONLINE'",
187 .{pool_id},
188 ) catch return 0;
189 if (r) |*row| {
190 defer row.deinit();
191 return @intCast(row.bigint(0));
192 }
193 return 0;
194}