prefect server in zig
at main 279 lines 11 kB view raw
1const std = @import("std"); 2const Allocator = std.mem.Allocator; 3 4const backend = @import("backend.zig"); 5const log = @import("../logging.zig"); 6 7pub const ConcurrencyLimitRow = struct { 8 id: []const u8, 9 created: []const u8, 10 updated: []const u8, 11 name: []const u8, 12 limit: i64, 13 active: bool, 14 active_slots: i64, 15 denied_slots: i64, 16 slot_decay_per_second: f64, 17 avg_slot_occupancy_seconds: f64, 18}; 19 20const Col = struct { 21 const id: usize = 0; 22 const created: usize = 1; 23 const updated: usize = 2; 24 const name: usize = 3; 25 const limit: usize = 4; 26 const active: usize = 5; 27 const active_slots: usize = 6; 28 const denied_slots: usize = 7; 29 const slot_decay_per_second: usize = 8; 30 const avg_slot_occupancy_seconds: usize = 9; 31}; 32 33// Note: "limit" is a reserved keyword, must be quoted 34const select_cols = "id, created, updated, name, \"limit\", active, active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds"; 35 36fn rowFromResult(alloc: Allocator, r: anytype) !ConcurrencyLimitRow { 37 return ConcurrencyLimitRow{ 38 .id = try alloc.dupe(u8, r.text(Col.id)), 39 .created = try alloc.dupe(u8, r.text(Col.created)), 40 .updated = try alloc.dupe(u8, r.text(Col.updated)), 41 .name = try alloc.dupe(u8, r.text(Col.name)), 42 .limit = r.bigint(Col.limit), 43 .active = r.bigint(Col.active) != 0, 44 .active_slots = r.bigint(Col.active_slots), 45 .denied_slots = r.bigint(Col.denied_slots), 46 .slot_decay_per_second = r.float(Col.slot_decay_per_second), 47 .avg_slot_occupancy_seconds = r.float(Col.avg_slot_occupancy_seconds), 48 }; 49} 50 51pub fn getById(alloc: Allocator, id: []const u8) !?ConcurrencyLimitRow { 52 var r = backend.db.row( 53 "SELECT " ++ select_cols ++ " FROM concurrency_limit WHERE id = ?", 54 .{id}, 55 ) catch return null; 56 57 if (r) |*row| { 58 defer row.deinit(); 59 return try rowFromResult(alloc, row); 60 } 61 return null; 62} 63 64pub fn getByName(alloc: Allocator, name: []const u8) !?ConcurrencyLimitRow { 65 var r = backend.db.row( 66 "SELECT " ++ select_cols ++ " FROM concurrency_limit WHERE name = ?", 67 .{name}, 68 ) catch return null; 69 70 if (r) |*row| { 71 defer row.deinit(); 72 return try rowFromResult(alloc, row); 73 } 74 return null; 75} 76 77pub fn insert( 78 id: []const u8, 79 name: []const u8, 80 limit: i64, 81 active: bool, 82 active_slots: i64, 83 denied_slots: i64, 84 slot_decay_per_second: f64, 85 avg_slot_occupancy_seconds: f64, 86 created: []const u8, 87) !void { 88 backend.db.exec( 89 "INSERT INTO concurrency_limit (id, name, \"limit\", active, active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds, created, updated) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 90 .{ id, name, limit, @as(i64, if (active) 1 else 0), active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds, created, created }, 91 ) catch |err| { 92 log.err("database", "insert concurrency_limit error: {}", .{err}); 93 return err; 94 }; 95} 96 97pub fn updateById( 98 id: []const u8, 99 name: ?[]const u8, 100 limit: ?i64, 101 active: ?bool, 102 active_slots: ?i64, 103 denied_slots: ?i64, 104 slot_decay_per_second: ?f64, 105 avg_slot_occupancy_seconds: ?f64, 106 updated: []const u8, 107) !bool { 108 const active_int: ?i64 = if (active) |a| @as(i64, if (a) 1 else 0) else null; 109 const affected = backend.db.execWithRowCount( 110 "UPDATE concurrency_limit SET name = COALESCE(?, name), \"limit\" = COALESCE(?, \"limit\"), active = COALESCE(?, active), active_slots = COALESCE(?, active_slots), denied_slots = COALESCE(?, denied_slots), slot_decay_per_second = COALESCE(?, slot_decay_per_second), avg_slot_occupancy_seconds = COALESCE(?, avg_slot_occupancy_seconds), updated = ? WHERE id = ?", 111 .{ name, limit, active_int, active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds, updated, id }, 112 ) catch |err| { 113 log.err("database", "update concurrency_limit error: {}", .{err}); 114 return err; 115 }; 116 return affected > 0; 117} 118 119pub fn updateByName( 120 name: []const u8, 121 new_name: ?[]const u8, 122 limit: ?i64, 123 active: ?bool, 124 active_slots: ?i64, 125 denied_slots: ?i64, 126 slot_decay_per_second: ?f64, 127 avg_slot_occupancy_seconds: ?f64, 128 updated: []const u8, 129) !bool { 130 const active_int: ?i64 = if (active) |a| @as(i64, if (a) 1 else 0) else null; 131 const affected = backend.db.execWithRowCount( 132 "UPDATE concurrency_limit SET name = COALESCE(?, name), \"limit\" = COALESCE(?, \"limit\"), active = COALESCE(?, active), active_slots = COALESCE(?, active_slots), denied_slots = COALESCE(?, denied_slots), slot_decay_per_second = COALESCE(?, slot_decay_per_second), avg_slot_occupancy_seconds = COALESCE(?, avg_slot_occupancy_seconds), updated = ? WHERE name = ?", 133 .{ new_name, limit, active_int, active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds, updated, name }, 134 ) catch |err| { 135 log.err("database", "update concurrency_limit error: {}", .{err}); 136 return err; 137 }; 138 return affected > 0; 139} 140 141/// Increment active_slots counter atomically 142pub fn incrementActiveSlots(name: []const u8, slots: i64, updated: []const u8) !bool { 143 const affected = backend.db.execWithRowCount( 144 "UPDATE concurrency_limit SET active_slots = active_slots + ?, updated = ? WHERE name = ?", 145 .{ slots, updated, name }, 146 ) catch |err| { 147 log.err("database", "increment active_slots error: {}", .{err}); 148 return err; 149 }; 150 return affected > 0; 151} 152 153/// Decrement active_slots counter atomically (won't go below 0) 154pub fn decrementActiveSlots(name: []const u8, slots: i64, updated: []const u8) !bool { 155 const affected = backend.db.execWithRowCount( 156 "UPDATE concurrency_limit SET active_slots = MAX(0, active_slots - ?), updated = ? WHERE name = ?", 157 .{ slots, updated, name }, 158 ) catch |err| { 159 log.err("database", "decrement active_slots error: {}", .{err}); 160 return err; 161 }; 162 return affected > 0; 163} 164 165/// Increment denied_slots counter atomically 166pub fn incrementDeniedSlots(name: []const u8, slots: i64, updated: []const u8) !bool { 167 const affected = backend.db.execWithRowCount( 168 "UPDATE concurrency_limit SET denied_slots = denied_slots + ?, updated = ? WHERE name = ?", 169 .{ slots, updated, name }, 170 ) catch |err| { 171 log.err("database", "increment denied_slots error: {}", .{err}); 172 return err; 173 }; 174 return affected > 0; 175} 176 177pub fn deleteById(id: []const u8) !bool { 178 const affected = backend.db.execWithRowCount( 179 "DELETE FROM concurrency_limit WHERE id = ?", 180 .{id}, 181 ) catch |err| { 182 log.err("database", "delete concurrency_limit error: {}", .{err}); 183 return err; 184 }; 185 return affected > 0; 186} 187 188pub fn deleteByName(name: []const u8) !bool { 189 const affected = backend.db.execWithRowCount( 190 "DELETE FROM concurrency_limit WHERE name = ?", 191 .{name}, 192 ) catch |err| { 193 log.err("database", "delete concurrency_limit error: {}", .{err}); 194 return err; 195 }; 196 return affected > 0; 197} 198 199pub fn list(alloc: Allocator, limit: usize, offset: usize) ![]ConcurrencyLimitRow { 200 var results = std.ArrayListUnmanaged(ConcurrencyLimitRow){}; 201 errdefer results.deinit(alloc); 202 203 var rows = backend.db.query( 204 "SELECT " ++ select_cols ++ " FROM concurrency_limit ORDER BY name ASC LIMIT ? OFFSET ?", 205 .{ @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 206 ) catch |err| { 207 log.err("database", "list concurrency_limits error: {}", .{err}); 208 return err; 209 }; 210 defer rows.deinit(); 211 212 while (rows.next()) |r| { 213 try results.append(alloc, try rowFromResult(alloc, &r)); 214 } 215 216 return results.toOwnedSlice(alloc); 217} 218 219pub fn count() !usize { 220 var r = backend.db.row("SELECT COUNT(*) FROM concurrency_limit", .{}) catch return 0; 221 if (r) |*row| { 222 defer row.deinit(); 223 return @intCast(row.bigint(0)); 224 } 225 return 0; 226} 227 228/// Try to atomically increment active_slots if there's room under the limit. 229/// Returns true if slots were acquired, false if limit would be exceeded. 230pub fn tryAcquireSlots(id: []const u8, slots: i64, updated: []const u8) !bool { 231 // Atomically increment only if active_slots + slots <= limit 232 const affected = backend.db.execWithRowCount( 233 "UPDATE concurrency_limit SET active_slots = active_slots + ?, updated = ? WHERE id = ? AND active = 1 AND active_slots + ? <= \"limit\"", 234 .{ slots, updated, id, slots }, 235 ) catch |err| { 236 log.err("database", "try acquire slots error: {}", .{err}); 237 return err; 238 }; 239 return affected > 0; 240} 241 242/// Release slots and optionally update the average occupancy tracking. 243/// occupancy_seconds is the total time the slots were held. 244pub fn releaseSlots(id: []const u8, slots: i64, occupancy_seconds: ?f64, updated: []const u8) !bool { 245 if (occupancy_seconds) |occ| { 246 // Update avg_slot_occupancy_seconds as a weighted average 247 // Formula: new_avg = old_avg + (sample - old_avg) / (limit * 2) 248 const occupancy_per_slot = @max(occ / @as(f64, @floatFromInt(slots)), 0.1); 249 const affected = backend.db.execWithRowCount( 250 "UPDATE concurrency_limit SET active_slots = MAX(0, active_slots - ?), avg_slot_occupancy_seconds = avg_slot_occupancy_seconds + (? - avg_slot_occupancy_seconds) / (\"limit\" * 2), updated = ? WHERE id = ? AND active = 1", 251 .{ slots, occupancy_per_slot, updated, id }, 252 ) catch |err| { 253 log.err("database", "release slots error: {}", .{err}); 254 return err; 255 }; 256 return affected > 0; 257 } else { 258 const affected = backend.db.execWithRowCount( 259 "UPDATE concurrency_limit SET active_slots = MAX(0, active_slots - ?), updated = ? WHERE id = ? AND active = 1", 260 .{ slots, updated, id }, 261 ) catch |err| { 262 log.err("database", "release slots error: {}", .{err}); 263 return err; 264 }; 265 return affected > 0; 266 } 267} 268 269/// Increment denied_slots counter (called when slots are denied) 270pub fn recordDenied(id: []const u8, slots: i64, updated: []const u8) !bool { 271 const affected = backend.db.execWithRowCount( 272 "UPDATE concurrency_limit SET denied_slots = denied_slots + ?, updated = ? WHERE id = ? AND active = 1", 273 .{ slots, updated, id }, 274 ) catch |err| { 275 log.err("database", "record denied error: {}", .{err}); 276 return err; 277 }; 278 return affected > 0; 279}