prefect server in zig
1const std = @import("std");
2const Allocator = std.mem.Allocator;
3
4const backend = @import("backend.zig");
5const log = @import("../logging.zig");
6
7pub const ConcurrencyLimitRow = struct {
8 id: []const u8,
9 created: []const u8,
10 updated: []const u8,
11 name: []const u8,
12 limit: i64,
13 active: bool,
14 active_slots: i64,
15 denied_slots: i64,
16 slot_decay_per_second: f64,
17 avg_slot_occupancy_seconds: f64,
18};
19
20const Col = struct {
21 const id: usize = 0;
22 const created: usize = 1;
23 const updated: usize = 2;
24 const name: usize = 3;
25 const limit: usize = 4;
26 const active: usize = 5;
27 const active_slots: usize = 6;
28 const denied_slots: usize = 7;
29 const slot_decay_per_second: usize = 8;
30 const avg_slot_occupancy_seconds: usize = 9;
31};
32
33// Note: "limit" is a reserved keyword, must be quoted
34const select_cols = "id, created, updated, name, \"limit\", active, active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds";
35
36fn rowFromResult(alloc: Allocator, r: anytype) !ConcurrencyLimitRow {
37 return ConcurrencyLimitRow{
38 .id = try alloc.dupe(u8, r.text(Col.id)),
39 .created = try alloc.dupe(u8, r.text(Col.created)),
40 .updated = try alloc.dupe(u8, r.text(Col.updated)),
41 .name = try alloc.dupe(u8, r.text(Col.name)),
42 .limit = r.bigint(Col.limit),
43 .active = r.bigint(Col.active) != 0,
44 .active_slots = r.bigint(Col.active_slots),
45 .denied_slots = r.bigint(Col.denied_slots),
46 .slot_decay_per_second = r.float(Col.slot_decay_per_second),
47 .avg_slot_occupancy_seconds = r.float(Col.avg_slot_occupancy_seconds),
48 };
49}
50
51pub fn getById(alloc: Allocator, id: []const u8) !?ConcurrencyLimitRow {
52 var r = backend.db.row(
53 "SELECT " ++ select_cols ++ " FROM concurrency_limit WHERE id = ?",
54 .{id},
55 ) catch return null;
56
57 if (r) |*row| {
58 defer row.deinit();
59 return try rowFromResult(alloc, row);
60 }
61 return null;
62}
63
64pub fn getByName(alloc: Allocator, name: []const u8) !?ConcurrencyLimitRow {
65 var r = backend.db.row(
66 "SELECT " ++ select_cols ++ " FROM concurrency_limit WHERE name = ?",
67 .{name},
68 ) catch return null;
69
70 if (r) |*row| {
71 defer row.deinit();
72 return try rowFromResult(alloc, row);
73 }
74 return null;
75}
76
77pub fn insert(
78 id: []const u8,
79 name: []const u8,
80 limit: i64,
81 active: bool,
82 active_slots: i64,
83 denied_slots: i64,
84 slot_decay_per_second: f64,
85 avg_slot_occupancy_seconds: f64,
86 created: []const u8,
87) !void {
88 backend.db.exec(
89 "INSERT INTO concurrency_limit (id, name, \"limit\", active, active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds, created, updated) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
90 .{ id, name, limit, @as(i64, if (active) 1 else 0), active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds, created, created },
91 ) catch |err| {
92 log.err("database", "insert concurrency_limit error: {}", .{err});
93 return err;
94 };
95}
96
97pub fn updateById(
98 id: []const u8,
99 name: ?[]const u8,
100 limit: ?i64,
101 active: ?bool,
102 active_slots: ?i64,
103 denied_slots: ?i64,
104 slot_decay_per_second: ?f64,
105 avg_slot_occupancy_seconds: ?f64,
106 updated: []const u8,
107) !bool {
108 const active_int: ?i64 = if (active) |a| @as(i64, if (a) 1 else 0) else null;
109 const affected = backend.db.execWithRowCount(
110 "UPDATE concurrency_limit SET name = COALESCE(?, name), \"limit\" = COALESCE(?, \"limit\"), active = COALESCE(?, active), active_slots = COALESCE(?, active_slots), denied_slots = COALESCE(?, denied_slots), slot_decay_per_second = COALESCE(?, slot_decay_per_second), avg_slot_occupancy_seconds = COALESCE(?, avg_slot_occupancy_seconds), updated = ? WHERE id = ?",
111 .{ name, limit, active_int, active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds, updated, id },
112 ) catch |err| {
113 log.err("database", "update concurrency_limit error: {}", .{err});
114 return err;
115 };
116 return affected > 0;
117}
118
119pub fn updateByName(
120 name: []const u8,
121 new_name: ?[]const u8,
122 limit: ?i64,
123 active: ?bool,
124 active_slots: ?i64,
125 denied_slots: ?i64,
126 slot_decay_per_second: ?f64,
127 avg_slot_occupancy_seconds: ?f64,
128 updated: []const u8,
129) !bool {
130 const active_int: ?i64 = if (active) |a| @as(i64, if (a) 1 else 0) else null;
131 const affected = backend.db.execWithRowCount(
132 "UPDATE concurrency_limit SET name = COALESCE(?, name), \"limit\" = COALESCE(?, \"limit\"), active = COALESCE(?, active), active_slots = COALESCE(?, active_slots), denied_slots = COALESCE(?, denied_slots), slot_decay_per_second = COALESCE(?, slot_decay_per_second), avg_slot_occupancy_seconds = COALESCE(?, avg_slot_occupancy_seconds), updated = ? WHERE name = ?",
133 .{ new_name, limit, active_int, active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds, updated, name },
134 ) catch |err| {
135 log.err("database", "update concurrency_limit error: {}", .{err});
136 return err;
137 };
138 return affected > 0;
139}
140
141/// Increment active_slots counter atomically
142pub fn incrementActiveSlots(name: []const u8, slots: i64, updated: []const u8) !bool {
143 const affected = backend.db.execWithRowCount(
144 "UPDATE concurrency_limit SET active_slots = active_slots + ?, updated = ? WHERE name = ?",
145 .{ slots, updated, name },
146 ) catch |err| {
147 log.err("database", "increment active_slots error: {}", .{err});
148 return err;
149 };
150 return affected > 0;
151}
152
153/// Decrement active_slots counter atomically (won't go below 0)
154pub fn decrementActiveSlots(name: []const u8, slots: i64, updated: []const u8) !bool {
155 const affected = backend.db.execWithRowCount(
156 "UPDATE concurrency_limit SET active_slots = MAX(0, active_slots - ?), updated = ? WHERE name = ?",
157 .{ slots, updated, name },
158 ) catch |err| {
159 log.err("database", "decrement active_slots error: {}", .{err});
160 return err;
161 };
162 return affected > 0;
163}
164
165/// Increment denied_slots counter atomically
166pub fn incrementDeniedSlots(name: []const u8, slots: i64, updated: []const u8) !bool {
167 const affected = backend.db.execWithRowCount(
168 "UPDATE concurrency_limit SET denied_slots = denied_slots + ?, updated = ? WHERE name = ?",
169 .{ slots, updated, name },
170 ) catch |err| {
171 log.err("database", "increment denied_slots error: {}", .{err});
172 return err;
173 };
174 return affected > 0;
175}
176
177pub fn deleteById(id: []const u8) !bool {
178 const affected = backend.db.execWithRowCount(
179 "DELETE FROM concurrency_limit WHERE id = ?",
180 .{id},
181 ) catch |err| {
182 log.err("database", "delete concurrency_limit error: {}", .{err});
183 return err;
184 };
185 return affected > 0;
186}
187
188pub fn deleteByName(name: []const u8) !bool {
189 const affected = backend.db.execWithRowCount(
190 "DELETE FROM concurrency_limit WHERE name = ?",
191 .{name},
192 ) catch |err| {
193 log.err("database", "delete concurrency_limit error: {}", .{err});
194 return err;
195 };
196 return affected > 0;
197}
198
199pub fn list(alloc: Allocator, limit: usize, offset: usize) ![]ConcurrencyLimitRow {
200 var results = std.ArrayListUnmanaged(ConcurrencyLimitRow){};
201 errdefer results.deinit(alloc);
202
203 var rows = backend.db.query(
204 "SELECT " ++ select_cols ++ " FROM concurrency_limit ORDER BY name ASC LIMIT ? OFFSET ?",
205 .{ @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) },
206 ) catch |err| {
207 log.err("database", "list concurrency_limits error: {}", .{err});
208 return err;
209 };
210 defer rows.deinit();
211
212 while (rows.next()) |r| {
213 try results.append(alloc, try rowFromResult(alloc, &r));
214 }
215
216 return results.toOwnedSlice(alloc);
217}
218
219pub fn count() !usize {
220 var r = backend.db.row("SELECT COUNT(*) FROM concurrency_limit", .{}) catch return 0;
221 if (r) |*row| {
222 defer row.deinit();
223 return @intCast(row.bigint(0));
224 }
225 return 0;
226}
227
228/// Try to atomically increment active_slots if there's room under the limit.
229/// Returns true if slots were acquired, false if limit would be exceeded.
230pub fn tryAcquireSlots(id: []const u8, slots: i64, updated: []const u8) !bool {
231 // Atomically increment only if active_slots + slots <= limit
232 const affected = backend.db.execWithRowCount(
233 "UPDATE concurrency_limit SET active_slots = active_slots + ?, updated = ? WHERE id = ? AND active = 1 AND active_slots + ? <= \"limit\"",
234 .{ slots, updated, id, slots },
235 ) catch |err| {
236 log.err("database", "try acquire slots error: {}", .{err});
237 return err;
238 };
239 return affected > 0;
240}
241
242/// Release slots and optionally update the average occupancy tracking.
243/// occupancy_seconds is the total time the slots were held.
244pub fn releaseSlots(id: []const u8, slots: i64, occupancy_seconds: ?f64, updated: []const u8) !bool {
245 if (occupancy_seconds) |occ| {
246 // Update avg_slot_occupancy_seconds as a weighted average
247 // Formula: new_avg = old_avg + (sample - old_avg) / (limit * 2)
248 const occupancy_per_slot = @max(occ / @as(f64, @floatFromInt(slots)), 0.1);
249 const affected = backend.db.execWithRowCount(
250 "UPDATE concurrency_limit SET active_slots = MAX(0, active_slots - ?), avg_slot_occupancy_seconds = avg_slot_occupancy_seconds + (? - avg_slot_occupancy_seconds) / (\"limit\" * 2), updated = ? WHERE id = ? AND active = 1",
251 .{ slots, occupancy_per_slot, updated, id },
252 ) catch |err| {
253 log.err("database", "release slots error: {}", .{err});
254 return err;
255 };
256 return affected > 0;
257 } else {
258 const affected = backend.db.execWithRowCount(
259 "UPDATE concurrency_limit SET active_slots = MAX(0, active_slots - ?), updated = ? WHERE id = ? AND active = 1",
260 .{ slots, updated, id },
261 ) catch |err| {
262 log.err("database", "release slots error: {}", .{err});
263 return err;
264 };
265 return affected > 0;
266 }
267}
268
269/// Increment denied_slots counter (called when slots are denied)
270pub fn recordDenied(id: []const u8, slots: i64, updated: []const u8) !bool {
271 const affected = backend.db.execWithRowCount(
272 "UPDATE concurrency_limit SET denied_slots = denied_slots + ?, updated = ? WHERE id = ? AND active = 1",
273 .{ slots, updated, id },
274 ) catch |err| {
275 log.err("database", "record denied error: {}", .{err});
276 return err;
277 };
278 return affected > 0;
279}