prefect server in zig

feat: add concurrency limits v2 API (phase 1)

implements basic CRUD and slot management for global concurrency limits:
- add concurrency_limit table via migration 003
- add src/db/concurrency_limits.zig with CRUD and slot operations
- add src/api/concurrency_limits_v2.zig with endpoints:
- POST /v2/concurrency_limits/ (create)
- GET /v2/concurrency_limits/{id_or_name} (read)
- PATCH /v2/concurrency_limits/{id_or_name} (update)
- DELETE /v2/concurrency_limits/{id_or_name} (delete)
- POST /v2/concurrency_limits/filter (list)
- POST /v2/concurrency_limits/increment (acquire slots)
- POST /v2/concurrency_limits/decrement (release slots)

slot management returns 423 Locked with Retry-After header when
slots cannot be acquired. matches python prefect API.

phase 2 (lease storage) and phase 4 (cleanup service) still pending.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+910
+4
loq.toml
··· 31 31 [[rules]] 32 32 path = "src/db/flow_runs.zig" 33 33 max_lines = 650 34 + 35 + [[rules]] 36 + path = "src/api/concurrency_limits_v2.zig" 37 + max_lines = 600
+554
src/api/concurrency_limits_v2.zig
··· 1 + const std = @import("std"); 2 + const zap = @import("zap"); 3 + const mem = std.mem; 4 + const json = std.json; 5 + 6 + const db = @import("../db/sqlite.zig"); 7 + const uuid_util = @import("../utilities/uuid.zig"); 8 + const time_util = @import("../utilities/time.zig"); 9 + const json_util = @import("../utilities/json.zig"); 10 + 11 + pub fn handle(r: zap.Request) !void { 12 + const target = r.path orelse "/"; 13 + const method = r.method orelse "GET"; 14 + 15 + // POST /v2/concurrency_limits/filter - list 16 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) { 17 + try filter(r); 18 + return; 19 + } 20 + 21 + // POST /v2/concurrency_limits/increment - acquire slots 22 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/increment")) { 23 + try incrementSlots(r); 24 + return; 25 + } 26 + 27 + // POST /v2/concurrency_limits/decrement - release slots 28 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/decrement")) { 29 + try decrementSlots(r); 30 + return; 31 + } 32 + 33 + // POST /v2/concurrency_limits/ - create 34 + if (mem.eql(u8, method, "POST") and isCreatePath(target)) { 35 + try createLimit(r); 36 + return; 37 + } 38 + 39 + // GET /v2/concurrency_limits/{id_or_name} 40 + if (mem.eql(u8, method, "GET")) { 41 + const id_or_name = extractIdOrName(target) orelse { 42 + json_util.sendStatus(r, "{\"detail\":\"id or name required\"}", .bad_request); 43 + return; 44 + }; 45 + try getLimit(r, id_or_name); 46 + return; 47 + } 48 + 49 + // PATCH /v2/concurrency_limits/{id_or_name} 50 + if (mem.eql(u8, method, "PATCH")) { 51 + const id_or_name = extractIdOrName(target) orelse { 52 + json_util.sendStatus(r, "{\"detail\":\"id or name required\"}", .bad_request); 53 + return; 54 + }; 55 + try updateLimit(r, id_or_name); 56 + return; 57 + } 58 + 59 + // DELETE /v2/concurrency_limits/{id_or_name} 60 + if (mem.eql(u8, method, "DELETE")) { 61 + const id_or_name = extractIdOrName(target) orelse { 62 + json_util.sendStatus(r, "{\"detail\":\"id or name required\"}", .bad_request); 63 + return; 64 + }; 65 + try deleteLimit(r, id_or_name); 66 + return; 67 + } 68 + 69 + json_util.sendStatus(r, "{\"detail\":\"not found\"}", .not_found); 70 + } 71 + 72 + fn isCreatePath(target: []const u8) bool { 73 + // Match /v2/concurrency_limits/ or /api/v2/concurrency_limits/ 74 + return mem.endsWith(u8, target, "/concurrency_limits/") or 75 + mem.endsWith(u8, target, "/concurrency_limits"); 76 + } 77 + 78 + fn isUuid(s: []const u8) bool { 79 + // UUID format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx (36 chars with dashes) 80 + // or: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx (32 chars without dashes) 81 + if (s.len == 36) { 82 + if (s[8] != '-' or s[13] != '-' or s[18] != '-' or s[23] != '-') return false; 83 + for (s) |c| { 84 + if (c != '-' and !std.ascii.isHex(c)) return false; 85 + } 86 + return true; 87 + } 88 + if (s.len == 32) { 89 + for (s) |c| { 90 + if (!std.ascii.isHex(c)) return false; 91 + } 92 + return true; 93 + } 94 + return false; 95 + } 96 + 97 + fn extractIdOrName(target: []const u8) ?[]const u8 { 98 + // Path: /v2/concurrency_limits/{id_or_name} or /api/v2/concurrency_limits/{id_or_name} 99 + const prefix1 = "/v2/concurrency_limits/"; 100 + const prefix2 = "/api/v2/concurrency_limits/"; 101 + 102 + var rest: []const u8 = undefined; 103 + if (mem.startsWith(u8, target, prefix2)) { 104 + rest = target[prefix2.len..]; 105 + } else if (mem.startsWith(u8, target, prefix1)) { 106 + rest = target[prefix1.len..]; 107 + } else { 108 + return null; 109 + } 110 + 111 + if (rest.len == 0) return null; 112 + 113 + // Don't match special endpoints 114 + if (mem.eql(u8, rest, "filter") or 115 + mem.eql(u8, rest, "increment") or 116 + mem.eql(u8, rest, "decrement")) 117 + { 118 + return null; 119 + } 120 + 121 + return rest; 122 + } 123 + 124 + fn createLimit(r: zap.Request) !void { 125 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 126 + defer arena.deinit(); 127 + const alloc = arena.allocator(); 128 + 129 + const body = r.body orelse { 130 + json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request); 131 + return; 132 + }; 133 + 134 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 135 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 136 + return; 137 + }; 138 + 139 + const obj = parsed.value.object; 140 + const name = (obj.get("name") orelse { 141 + json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); 142 + return; 143 + }).string; 144 + 145 + const limit_val = obj.get("limit") orelse { 146 + json_util.sendStatus(r, "{\"detail\":\"limit required\"}", .bad_request); 147 + return; 148 + }; 149 + const limit: i64 = switch (limit_val) { 150 + .integer => |i| i, 151 + .float => |f| @intFromFloat(f), 152 + else => { 153 + json_util.sendStatus(r, "{\"detail\":\"limit must be a number\"}", .bad_request); 154 + return; 155 + }, 156 + }; 157 + 158 + // Optional fields with defaults 159 + const active = if (obj.get("active")) |v| v == .bool and v.bool else true; 160 + const active_slots: i64 = if (obj.get("active_slots")) |v| switch (v) { 161 + .integer => |i| i, 162 + else => 0, 163 + } else 0; 164 + const denied_slots: i64 = if (obj.get("denied_slots")) |v| switch (v) { 165 + .integer => |i| i, 166 + else => 0, 167 + } else 0; 168 + const slot_decay: f64 = if (obj.get("slot_decay_per_second")) |v| switch (v) { 169 + .float => |f| f, 170 + .integer => |i| @floatFromInt(i), 171 + else => 0.0, 172 + } else 0.0; 173 + const avg_occupancy: f64 = if (obj.get("avg_slot_occupancy_seconds")) |v| switch (v) { 174 + .float => |f| f, 175 + .integer => |i| @floatFromInt(i), 176 + else => 2.0, 177 + } else 2.0; 178 + 179 + var id_buf: [36]u8 = undefined; 180 + const id = uuid_util.generate(&id_buf); 181 + var ts_buf: [32]u8 = undefined; 182 + const now = time_util.timestamp(&ts_buf); 183 + 184 + db.concurrency_limits.insert( 185 + id, 186 + name, 187 + limit, 188 + active, 189 + active_slots, 190 + denied_slots, 191 + slot_decay, 192 + avg_occupancy, 193 + now, 194 + ) catch |err| { 195 + if (err == error.ConstraintFailed) { 196 + json_util.sendStatus(r, "{\"detail\":\"concurrency limit with this name already exists\"}", .conflict); 197 + return; 198 + } 199 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 200 + return; 201 + }; 202 + 203 + // Return the created limit 204 + const result = db.concurrency_limits.getById(alloc, id) catch { 205 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 206 + return; 207 + }; 208 + 209 + if (result) |row| { 210 + try sendLimitResponse(r, alloc, row, .created); 211 + } else { 212 + json_util.sendStatus(r, "{\"detail\":\"failed to create\"}", .internal_server_error); 213 + } 214 + } 215 + 216 + fn getLimit(r: zap.Request, id_or_name: []const u8) !void { 217 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 218 + defer arena.deinit(); 219 + const alloc = arena.allocator(); 220 + 221 + // Try by ID first (if it looks like a UUID), then by name 222 + const result = if (isUuid(id_or_name)) 223 + try db.concurrency_limits.getById(alloc, id_or_name) 224 + else 225 + try db.concurrency_limits.getByName(alloc, id_or_name); 226 + 227 + if (result) |row| { 228 + try sendLimitResponse(r, alloc, row, .ok); 229 + } else { 230 + json_util.sendStatus(r, "{\"detail\":\"Concurrency Limit not found\"}", .not_found); 231 + } 232 + } 233 + 234 + fn updateLimit(r: zap.Request, id_or_name: []const u8) !void { 235 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 236 + defer arena.deinit(); 237 + const alloc = arena.allocator(); 238 + 239 + const body = r.body orelse { 240 + json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request); 241 + return; 242 + }; 243 + 244 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 245 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 246 + return; 247 + }; 248 + 249 + const obj = parsed.value.object; 250 + var ts_buf: [32]u8 = undefined; 251 + const now = time_util.timestamp(&ts_buf); 252 + 253 + // Extract optional update fields 254 + const new_name: ?[]const u8 = if (obj.get("name")) |v| v.string else null; 255 + const limit: ?i64 = if (obj.get("limit")) |v| switch (v) { 256 + .integer => |i| i, 257 + .float => |f| @as(i64, @intFromFloat(f)), 258 + else => null, 259 + } else null; 260 + const active: ?bool = if (obj.get("active")) |v| switch (v) { 261 + .bool => |b| b, 262 + else => null, 263 + } else null; 264 + const slot_decay: ?f64 = if (obj.get("slot_decay_per_second")) |v| switch (v) { 265 + .float => |f| f, 266 + .integer => |i| @as(f64, @floatFromInt(i)), 267 + else => null, 268 + } else null; 269 + const avg_occupancy: ?f64 = if (obj.get("avg_slot_occupancy_seconds")) |v| switch (v) { 270 + .float => |f| f, 271 + .integer => |i| @as(f64, @floatFromInt(i)), 272 + else => null, 273 + } else null; 274 + 275 + const updated = if (isUuid(id_or_name)) 276 + try db.concurrency_limits.updateById(id_or_name, new_name, limit, active, null, null, slot_decay, avg_occupancy, now) 277 + else 278 + try db.concurrency_limits.updateByName(id_or_name, new_name, limit, active, null, null, slot_decay, avg_occupancy, now); 279 + 280 + if (updated) { 281 + r.setStatus(.no_content); 282 + r.setHeader("access-control-allow-origin", "*") catch {}; 283 + try r.sendBody(""); 284 + } else { 285 + json_util.sendStatus(r, "{\"detail\":\"Concurrency Limit not found\"}", .not_found); 286 + } 287 + } 288 + 289 + fn deleteLimit(r: zap.Request, id_or_name: []const u8) !void { 290 + const deleted = if (isUuid(id_or_name)) 291 + try db.concurrency_limits.deleteById(id_or_name) 292 + else 293 + try db.concurrency_limits.deleteByName(id_or_name); 294 + 295 + if (deleted) { 296 + r.setStatus(.no_content); 297 + r.setHeader("access-control-allow-origin", "*") catch {}; 298 + try r.sendBody(""); 299 + } else { 300 + json_util.sendStatus(r, "{\"detail\":\"Concurrency Limit not found\"}", .not_found); 301 + } 302 + } 303 + 304 + fn filter(r: zap.Request) !void { 305 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 306 + defer arena.deinit(); 307 + const alloc = arena.allocator(); 308 + 309 + var limit: usize = 200; 310 + var offset: usize = 0; 311 + 312 + if (r.body) |body| { 313 + if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 314 + if (parsed.value.object.get("limit")) |v| { 315 + if (v == .integer) limit = @intCast(v.integer); 316 + } 317 + if (parsed.value.object.get("offset")) |v| { 318 + if (v == .integer) offset = @intCast(v.integer); 319 + } 320 + } else |_| {} 321 + } 322 + 323 + const rows = try db.concurrency_limits.list(alloc, limit, offset); 324 + 325 + var output = std.ArrayListUnmanaged(u8){}; 326 + try output.append(alloc, '['); 327 + 328 + for (rows, 0..) |row, i| { 329 + if (i > 0) try output.append(alloc, ','); 330 + try appendLimitJson(alloc, &output, row); 331 + } 332 + 333 + try output.append(alloc, ']'); 334 + 335 + r.setStatus(.ok); 336 + r.setHeader("content-type", "application/json") catch {}; 337 + r.setHeader("access-control-allow-origin", "*") catch {}; 338 + try r.sendBody(output.items); 339 + } 340 + 341 + fn incrementSlots(r: zap.Request) !void { 342 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 343 + defer arena.deinit(); 344 + const alloc = arena.allocator(); 345 + 346 + const body = r.body orelse { 347 + json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request); 348 + return; 349 + }; 350 + 351 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 352 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 353 + return; 354 + }; 355 + 356 + const obj = parsed.value.object; 357 + 358 + const slots: i64 = if (obj.get("slots")) |v| switch (v) { 359 + .integer => |i| i, 360 + else => 1, 361 + } else 1; 362 + 363 + const names = obj.get("names") orelse { 364 + json_util.sendStatus(r, "{\"detail\":\"names required\"}", .bad_request); 365 + return; 366 + }; 367 + 368 + if (names != .array) { 369 + json_util.sendStatus(r, "{\"detail\":\"names must be an array\"}", .bad_request); 370 + return; 371 + } 372 + 373 + var ts_buf: [32]u8 = undefined; 374 + const now = time_util.timestamp(&ts_buf); 375 + var acquired_limits = std.ArrayListUnmanaged(db.concurrency_limits.ConcurrencyLimitRow){}; 376 + var all_acquired = true; 377 + 378 + // Try to acquire slots for each named limit 379 + for (names.array.items) |name_val| { 380 + if (name_val != .string) continue; 381 + const name = name_val.string; 382 + 383 + // Get the limit first 384 + const maybe_limit = try db.concurrency_limits.getByName(alloc, name); 385 + if (maybe_limit) |lim| { 386 + if (!lim.active) continue; // Skip inactive limits 387 + 388 + // Check if requested slots exceed limit 389 + if (slots > lim.limit) { 390 + json_util.sendStatus(r, "{\"detail\":\"Slots requested is greater than the limit\"}", .unprocessable_content); 391 + return; 392 + } 393 + 394 + // Try to acquire 395 + const acquired = try db.concurrency_limits.tryAcquireSlots(lim.id, slots, now); 396 + if (acquired) { 397 + try acquired_limits.append(alloc, lim); 398 + } else { 399 + all_acquired = false; 400 + } 401 + } 402 + } 403 + 404 + if (all_acquired and acquired_limits.items.len > 0) { 405 + // Success - return minimal response 406 + var output = std.ArrayListUnmanaged(u8){}; 407 + try output.append(alloc, '['); 408 + 409 + for (acquired_limits.items, 0..) |lim, i| { 410 + if (i > 0) try output.append(alloc, ','); 411 + try output.writer(alloc).print("{{\"id\":\"{s}\",\"name\":\"{s}\",\"limit\":{d}}}", .{ 412 + lim.id, lim.name, lim.limit, 413 + }); 414 + } 415 + 416 + try output.append(alloc, ']'); 417 + 418 + r.setStatus(.ok); 419 + r.setHeader("content-type", "application/json") catch {}; 420 + r.setHeader("access-control-allow-origin", "*") catch {}; 421 + try r.sendBody(output.items); 422 + } else { 423 + // Couldn't acquire - rollback any acquired slots and return 423 Locked 424 + for (acquired_limits.items) |lim| { 425 + _ = try db.concurrency_limits.releaseSlots(lim.id, slots, null, now); 426 + } 427 + 428 + // Record denials and calculate retry-after 429 + var max_retry: f64 = 2.0; 430 + for (names.array.items) |name_val| { 431 + if (name_val != .string) continue; 432 + const name = name_val.string; 433 + 434 + if (try db.concurrency_limits.getByName(alloc, name)) |lim| { 435 + if (lim.active) { 436 + _ = try db.concurrency_limits.recordDenied(lim.id, slots, now); 437 + // Use avg_slot_occupancy_seconds for retry calculation 438 + max_retry = @max(max_retry, lim.avg_slot_occupancy_seconds); 439 + } 440 + } 441 + } 442 + 443 + // Add some jitter (simple approximation of clamped_poisson_interval) 444 + var retry_buf: [32]u8 = undefined; 445 + const retry_str = std.fmt.bufPrint(&retry_buf, "{d:.1}", .{max_retry}) catch "2.0"; 446 + 447 + r.setStatus(.locked); 448 + r.setHeader("content-type", "application/json") catch {}; 449 + r.setHeader("access-control-allow-origin", "*") catch {}; 450 + r.setHeader("retry-after", retry_str) catch {}; 451 + try r.sendBody("{}"); 452 + } 453 + } 454 + 455 + fn decrementSlots(r: zap.Request) !void { 456 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 457 + defer arena.deinit(); 458 + const alloc = arena.allocator(); 459 + 460 + const body = r.body orelse { 461 + json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request); 462 + return; 463 + }; 464 + 465 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 466 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 467 + return; 468 + }; 469 + 470 + const obj = parsed.value.object; 471 + 472 + const slots: i64 = if (obj.get("slots")) |v| switch (v) { 473 + .integer => |i| i, 474 + else => 1, 475 + } else 1; 476 + 477 + const occupancy_seconds: ?f64 = if (obj.get("occupancy_seconds")) |v| switch (v) { 478 + .float => |f| f, 479 + .integer => |i| @as(f64, @floatFromInt(i)), 480 + else => null, 481 + } else null; 482 + 483 + const names = obj.get("names") orelse { 484 + json_util.sendStatus(r, "{\"detail\":\"names required\"}", .bad_request); 485 + return; 486 + }; 487 + 488 + if (names != .array) { 489 + json_util.sendStatus(r, "{\"detail\":\"names must be an array\"}", .bad_request); 490 + return; 491 + } 492 + 493 + var ts_buf: [32]u8 = undefined; 494 + const now = time_util.timestamp(&ts_buf); 495 + var released_limits = std.ArrayListUnmanaged(db.concurrency_limits.ConcurrencyLimitRow){}; 496 + 497 + // Release slots for each named limit 498 + for (names.array.items) |name_val| { 499 + if (name_val != .string) continue; 500 + const name = name_val.string; 501 + 502 + if (try db.concurrency_limits.getByName(alloc, name)) |lim| { 503 + if (lim.active) { 504 + _ = try db.concurrency_limits.releaseSlots(lim.id, slots, occupancy_seconds, now); 505 + try released_limits.append(alloc, lim); 506 + } 507 + } 508 + } 509 + 510 + // Return minimal response 511 + var output = std.ArrayListUnmanaged(u8){}; 512 + try output.append(alloc, '['); 513 + 514 + for (released_limits.items, 0..) |lim, i| { 515 + if (i > 0) try output.append(alloc, ','); 516 + try output.writer(alloc).print("{{\"id\":\"{s}\",\"name\":\"{s}\",\"limit\":{d}}}", .{ 517 + lim.id, lim.name, lim.limit, 518 + }); 519 + } 520 + 521 + try output.append(alloc, ']'); 522 + 523 + r.setStatus(.ok); 524 + r.setHeader("content-type", "application/json") catch {}; 525 + r.setHeader("access-control-allow-origin", "*") catch {}; 526 + try r.sendBody(output.items); 527 + } 528 + 529 + fn sendLimitResponse(r: zap.Request, alloc: std.mem.Allocator, row: db.concurrency_limits.ConcurrencyLimitRow, status: zap.http.StatusCode) !void { 530 + var output = std.ArrayListUnmanaged(u8){}; 531 + try appendLimitJson(alloc, &output, row); 532 + 533 + r.setStatus(status); 534 + r.setHeader("content-type", "application/json") catch {}; 535 + r.setHeader("access-control-allow-origin", "*") catch {}; 536 + try r.sendBody(output.items); 537 + } 538 + 539 + fn appendLimitJson(alloc: std.mem.Allocator, output: *std.ArrayListUnmanaged(u8), row: db.concurrency_limits.ConcurrencyLimitRow) !void { 540 + try output.writer(alloc).print( 541 + \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","limit":{d},"active":{s},"active_slots":{d},"denied_slots":{d},"slot_decay_per_second":{d},"avg_slot_occupancy_seconds":{d}}} 542 + , .{ 543 + row.id, 544 + row.created, 545 + row.updated, 546 + row.name, 547 + row.limit, 548 + if (row.active) "true" else "false", 549 + row.active_slots, 550 + row.denied_slots, 551 + row.slot_decay_per_second, 552 + row.avg_slot_occupancy_seconds, 553 + }); 554 + }
+3
src/api/routes.zig
··· 14 14 pub const work_pools = @import("work_pools.zig"); 15 15 pub const deployments = @import("deployments.zig"); 16 16 pub const events_api = @import("events_api.zig"); 17 + pub const concurrency_limits_v2 = @import("concurrency_limits_v2.zig"); 17 18 18 19 pub fn handle(r: zap.Request) !void { 19 20 const target = r.path orelse "/"; ··· 52 53 try work_pools.handle(r); 53 54 } else if (std.mem.startsWith(u8, target, "/api/deployments") or std.mem.startsWith(u8, target, "/deployments")) { 54 55 try deployments.handle(r); 56 + } else if (std.mem.startsWith(u8, target, "/api/v2/concurrency_limits") or std.mem.startsWith(u8, target, "/v2/concurrency_limits")) { 57 + try concurrency_limits_v2.handle(r); 55 58 } else if (std.mem.startsWith(u8, target, "/api/events/filter") or std.mem.startsWith(u8, target, "/events/filter") or 56 59 std.mem.startsWith(u8, target, "/api/events/count") or std.mem.startsWith(u8, target, "/events/count")) 57 60 {
+279
src/db/concurrency_limits.zig
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + 4 + const backend = @import("backend.zig"); 5 + const log = @import("../logging.zig"); 6 + 7 + pub const ConcurrencyLimitRow = struct { 8 + id: []const u8, 9 + created: []const u8, 10 + updated: []const u8, 11 + name: []const u8, 12 + limit: i64, 13 + active: bool, 14 + active_slots: i64, 15 + denied_slots: i64, 16 + slot_decay_per_second: f64, 17 + avg_slot_occupancy_seconds: f64, 18 + }; 19 + 20 + const Col = struct { 21 + const id: usize = 0; 22 + const created: usize = 1; 23 + const updated: usize = 2; 24 + const name: usize = 3; 25 + const limit: usize = 4; 26 + const active: usize = 5; 27 + const active_slots: usize = 6; 28 + const denied_slots: usize = 7; 29 + const slot_decay_per_second: usize = 8; 30 + const avg_slot_occupancy_seconds: usize = 9; 31 + }; 32 + 33 + // Note: "limit" is a reserved keyword, must be quoted 34 + const select_cols = "id, created, updated, name, \"limit\", active, active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds"; 35 + 36 + fn rowFromResult(alloc: Allocator, r: anytype) !ConcurrencyLimitRow { 37 + return ConcurrencyLimitRow{ 38 + .id = try alloc.dupe(u8, r.text(Col.id)), 39 + .created = try alloc.dupe(u8, r.text(Col.created)), 40 + .updated = try alloc.dupe(u8, r.text(Col.updated)), 41 + .name = try alloc.dupe(u8, r.text(Col.name)), 42 + .limit = r.bigint(Col.limit), 43 + .active = r.bigint(Col.active) != 0, 44 + .active_slots = r.bigint(Col.active_slots), 45 + .denied_slots = r.bigint(Col.denied_slots), 46 + .slot_decay_per_second = r.float(Col.slot_decay_per_second), 47 + .avg_slot_occupancy_seconds = r.float(Col.avg_slot_occupancy_seconds), 48 + }; 49 + } 50 + 51 + pub fn getById(alloc: Allocator, id: []const u8) !?ConcurrencyLimitRow { 52 + var r = backend.db.row( 53 + "SELECT " ++ select_cols ++ " FROM concurrency_limit WHERE id = ?", 54 + .{id}, 55 + ) catch return null; 56 + 57 + if (r) |*row| { 58 + defer row.deinit(); 59 + return try rowFromResult(alloc, row); 60 + } 61 + return null; 62 + } 63 + 64 + pub fn getByName(alloc: Allocator, name: []const u8) !?ConcurrencyLimitRow { 65 + var r = backend.db.row( 66 + "SELECT " ++ select_cols ++ " FROM concurrency_limit WHERE name = ?", 67 + .{name}, 68 + ) catch return null; 69 + 70 + if (r) |*row| { 71 + defer row.deinit(); 72 + return try rowFromResult(alloc, row); 73 + } 74 + return null; 75 + } 76 + 77 + pub fn insert( 78 + id: []const u8, 79 + name: []const u8, 80 + limit: i64, 81 + active: bool, 82 + active_slots: i64, 83 + denied_slots: i64, 84 + slot_decay_per_second: f64, 85 + avg_slot_occupancy_seconds: f64, 86 + created: []const u8, 87 + ) !void { 88 + backend.db.exec( 89 + "INSERT INTO concurrency_limit (id, name, \"limit\", active, active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds, created, updated) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 90 + .{ id, name, limit, @as(i64, if (active) 1 else 0), active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds, created, created }, 91 + ) catch |err| { 92 + log.err("database", "insert concurrency_limit error: {}", .{err}); 93 + return err; 94 + }; 95 + } 96 + 97 + pub fn updateById( 98 + id: []const u8, 99 + name: ?[]const u8, 100 + limit: ?i64, 101 + active: ?bool, 102 + active_slots: ?i64, 103 + denied_slots: ?i64, 104 + slot_decay_per_second: ?f64, 105 + avg_slot_occupancy_seconds: ?f64, 106 + updated: []const u8, 107 + ) !bool { 108 + const active_int: ?i64 = if (active) |a| @as(i64, if (a) 1 else 0) else null; 109 + const affected = backend.db.execWithRowCount( 110 + "UPDATE concurrency_limit SET name = COALESCE(?, name), \"limit\" = COALESCE(?, \"limit\"), active = COALESCE(?, active), active_slots = COALESCE(?, active_slots), denied_slots = COALESCE(?, denied_slots), slot_decay_per_second = COALESCE(?, slot_decay_per_second), avg_slot_occupancy_seconds = COALESCE(?, avg_slot_occupancy_seconds), updated = ? WHERE id = ?", 111 + .{ name, limit, active_int, active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds, updated, id }, 112 + ) catch |err| { 113 + log.err("database", "update concurrency_limit error: {}", .{err}); 114 + return err; 115 + }; 116 + return affected > 0; 117 + } 118 + 119 + pub fn updateByName( 120 + name: []const u8, 121 + new_name: ?[]const u8, 122 + limit: ?i64, 123 + active: ?bool, 124 + active_slots: ?i64, 125 + denied_slots: ?i64, 126 + slot_decay_per_second: ?f64, 127 + avg_slot_occupancy_seconds: ?f64, 128 + updated: []const u8, 129 + ) !bool { 130 + const active_int: ?i64 = if (active) |a| @as(i64, if (a) 1 else 0) else null; 131 + const affected = backend.db.execWithRowCount( 132 + "UPDATE concurrency_limit SET name = COALESCE(?, name), \"limit\" = COALESCE(?, \"limit\"), active = COALESCE(?, active), active_slots = COALESCE(?, active_slots), denied_slots = COALESCE(?, denied_slots), slot_decay_per_second = COALESCE(?, slot_decay_per_second), avg_slot_occupancy_seconds = COALESCE(?, avg_slot_occupancy_seconds), updated = ? WHERE name = ?", 133 + .{ new_name, limit, active_int, active_slots, denied_slots, slot_decay_per_second, avg_slot_occupancy_seconds, updated, name }, 134 + ) catch |err| { 135 + log.err("database", "update concurrency_limit error: {}", .{err}); 136 + return err; 137 + }; 138 + return affected > 0; 139 + } 140 + 141 + /// Increment active_slots counter atomically 142 + pub fn incrementActiveSlots(name: []const u8, slots: i64, updated: []const u8) !bool { 143 + const affected = backend.db.execWithRowCount( 144 + "UPDATE concurrency_limit SET active_slots = active_slots + ?, updated = ? WHERE name = ?", 145 + .{ slots, updated, name }, 146 + ) catch |err| { 147 + log.err("database", "increment active_slots error: {}", .{err}); 148 + return err; 149 + }; 150 + return affected > 0; 151 + } 152 + 153 + /// Decrement active_slots counter atomically (won't go below 0) 154 + pub fn decrementActiveSlots(name: []const u8, slots: i64, updated: []const u8) !bool { 155 + const affected = backend.db.execWithRowCount( 156 + "UPDATE concurrency_limit SET active_slots = MAX(0, active_slots - ?), updated = ? WHERE name = ?", 157 + .{ slots, updated, name }, 158 + ) catch |err| { 159 + log.err("database", "decrement active_slots error: {}", .{err}); 160 + return err; 161 + }; 162 + return affected > 0; 163 + } 164 + 165 + /// Increment denied_slots counter atomically 166 + pub fn incrementDeniedSlots(name: []const u8, slots: i64, updated: []const u8) !bool { 167 + const affected = backend.db.execWithRowCount( 168 + "UPDATE concurrency_limit SET denied_slots = denied_slots + ?, updated = ? WHERE name = ?", 169 + .{ slots, updated, name }, 170 + ) catch |err| { 171 + log.err("database", "increment denied_slots error: {}", .{err}); 172 + return err; 173 + }; 174 + return affected > 0; 175 + } 176 + 177 + pub fn deleteById(id: []const u8) !bool { 178 + const affected = backend.db.execWithRowCount( 179 + "DELETE FROM concurrency_limit WHERE id = ?", 180 + .{id}, 181 + ) catch |err| { 182 + log.err("database", "delete concurrency_limit error: {}", .{err}); 183 + return err; 184 + }; 185 + return affected > 0; 186 + } 187 + 188 + pub fn deleteByName(name: []const u8) !bool { 189 + const affected = backend.db.execWithRowCount( 190 + "DELETE FROM concurrency_limit WHERE name = ?", 191 + .{name}, 192 + ) catch |err| { 193 + log.err("database", "delete concurrency_limit error: {}", .{err}); 194 + return err; 195 + }; 196 + return affected > 0; 197 + } 198 + 199 + pub fn list(alloc: Allocator, limit: usize, offset: usize) ![]ConcurrencyLimitRow { 200 + var results = std.ArrayListUnmanaged(ConcurrencyLimitRow){}; 201 + errdefer results.deinit(alloc); 202 + 203 + var rows = backend.db.query( 204 + "SELECT " ++ select_cols ++ " FROM concurrency_limit ORDER BY name ASC LIMIT ? OFFSET ?", 205 + .{ @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 206 + ) catch |err| { 207 + log.err("database", "list concurrency_limits error: {}", .{err}); 208 + return err; 209 + }; 210 + defer rows.deinit(); 211 + 212 + while (rows.next()) |r| { 213 + try results.append(alloc, try rowFromResult(alloc, &r)); 214 + } 215 + 216 + return results.toOwnedSlice(alloc); 217 + } 218 + 219 + pub fn count() !usize { 220 + var r = backend.db.row("SELECT COUNT(*) FROM concurrency_limit", .{}) catch return 0; 221 + if (r) |*row| { 222 + defer row.deinit(); 223 + return @intCast(row.bigint(0)); 224 + } 225 + return 0; 226 + } 227 + 228 + /// Try to atomically increment active_slots if there's room under the limit. 229 + /// Returns true if slots were acquired, false if limit would be exceeded. 230 + pub fn tryAcquireSlots(id: []const u8, slots: i64, updated: []const u8) !bool { 231 + // Atomically increment only if active_slots + slots <= limit 232 + const affected = backend.db.execWithRowCount( 233 + "UPDATE concurrency_limit SET active_slots = active_slots + ?, updated = ? WHERE id = ? AND active = 1 AND active_slots + ? <= \"limit\"", 234 + .{ slots, updated, id, slots }, 235 + ) catch |err| { 236 + log.err("database", "try acquire slots error: {}", .{err}); 237 + return err; 238 + }; 239 + return affected > 0; 240 + } 241 + 242 + /// Release slots and optionally update the average occupancy tracking. 243 + /// occupancy_seconds is the total time the slots were held. 244 + pub fn releaseSlots(id: []const u8, slots: i64, occupancy_seconds: ?f64, updated: []const u8) !bool { 245 + if (occupancy_seconds) |occ| { 246 + // Update avg_slot_occupancy_seconds as a weighted average 247 + // Formula: new_avg = old_avg + (sample - old_avg) / (limit * 2) 248 + const occupancy_per_slot = @max(occ / @as(f64, @floatFromInt(slots)), 0.1); 249 + const affected = backend.db.execWithRowCount( 250 + "UPDATE concurrency_limit SET active_slots = MAX(0, active_slots - ?), avg_slot_occupancy_seconds = avg_slot_occupancy_seconds + (? - avg_slot_occupancy_seconds) / (\"limit\" * 2), updated = ? WHERE id = ? AND active = 1", 251 + .{ slots, occupancy_per_slot, updated, id }, 252 + ) catch |err| { 253 + log.err("database", "release slots error: {}", .{err}); 254 + return err; 255 + }; 256 + return affected > 0; 257 + } else { 258 + const affected = backend.db.execWithRowCount( 259 + "UPDATE concurrency_limit SET active_slots = MAX(0, active_slots - ?), updated = ? WHERE id = ? AND active = 1", 260 + .{ slots, updated, id }, 261 + ) catch |err| { 262 + log.err("database", "release slots error: {}", .{err}); 263 + return err; 264 + }; 265 + return affected > 0; 266 + } 267 + } 268 + 269 + /// Increment denied_slots counter (called when slots are denied) 270 + pub fn recordDenied(id: []const u8, slots: i64, updated: []const u8) !bool { 271 + const affected = backend.db.execWithRowCount( 272 + "UPDATE concurrency_limit SET denied_slots = denied_slots + ?, updated = ? WHERE id = ? AND active = 1", 273 + .{ slots, updated, id }, 274 + ) catch |err| { 275 + log.err("database", "record denied error: {}", .{err}); 276 + return err; 277 + }; 278 + return affected > 0; 279 + }
+15
src/db/migrations/003_concurrency_limits/postgres.sql
··· 1 + -- concurrency_limit table (v2 only - no v1 support) 2 + CREATE TABLE IF NOT EXISTS concurrency_limit ( 3 + id TEXT PRIMARY KEY, 4 + created TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, 5 + updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, 6 + name TEXT NOT NULL UNIQUE, 7 + "limit" INTEGER NOT NULL, 8 + active BOOLEAN DEFAULT TRUE, 9 + active_slots INTEGER DEFAULT 0, 10 + denied_slots INTEGER DEFAULT 0, 11 + slot_decay_per_second DOUBLE PRECISION DEFAULT 0.0, 12 + avg_slot_occupancy_seconds DOUBLE PRECISION DEFAULT 2.0 13 + ); 14 + 15 + CREATE INDEX IF NOT EXISTS ix_concurrency_limit__name ON concurrency_limit(name);
+15
src/db/migrations/003_concurrency_limits/sqlite.sql
··· 1 + -- concurrency_limit table (v2 only - no v1 support) 2 + CREATE TABLE IF NOT EXISTS concurrency_limit ( 3 + id TEXT PRIMARY KEY, 4 + created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 5 + updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 6 + name TEXT NOT NULL UNIQUE, 7 + "limit" INTEGER NOT NULL, 8 + active INTEGER DEFAULT 1, 9 + active_slots INTEGER DEFAULT 0, 10 + denied_slots INTEGER DEFAULT 0, 11 + slot_decay_per_second REAL DEFAULT 0.0, 12 + avg_slot_occupancy_seconds REAL DEFAULT 2.0 13 + ); 14 + 15 + CREATE INDEX IF NOT EXISTS ix_concurrency_limit__name ON concurrency_limit(name);
+5
src/db/migrations_data.zig
··· 22 22 .sqlite_sql = @embedFile("migrations/002_state_transition_id/sqlite.sql"), 23 23 .postgres_sql = @embedFile("migrations/002_state_transition_id/postgres.sql"), 24 24 }, 25 + .{ 26 + .id = "003_concurrency_limits", 27 + .sqlite_sql = @embedFile("migrations/003_concurrency_limits/sqlite.sql"), 28 + .postgres_sql = @embedFile("migrations/003_concurrency_limits/postgres.sql"), 29 + }, 25 30 };
+17
src/db/schema/postgres.zig
··· 265 265 \\) 266 266 , .{}); 267 267 268 + // concurrency_limit table (v2 only - no v1 support) 269 + try backend.db.exec( 270 + \\CREATE TABLE IF NOT EXISTS concurrency_limit ( 271 + \\ id TEXT PRIMARY KEY, 272 + \\ created TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, 273 + \\ updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, 274 + \\ name TEXT NOT NULL UNIQUE, 275 + \\ "limit" INTEGER NOT NULL, 276 + \\ active BOOLEAN DEFAULT TRUE, 277 + \\ active_slots INTEGER DEFAULT 0, 278 + \\ denied_slots INTEGER DEFAULT 0, 279 + \\ slot_decay_per_second DOUBLE PRECISION DEFAULT 0.0, 280 + \\ avg_slot_occupancy_seconds DOUBLE PRECISION DEFAULT 2.0 281 + \\) 282 + , .{}); 283 + 268 284 // indexes 269 285 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)", .{}); 270 286 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)", .{}); ··· 296 312 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__updated ON deployment(updated)", .{}); 297 313 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__deployment_id ON deployment_schedule(deployment_id)", .{}); 298 314 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__active ON deployment_schedule(active)", .{}); 315 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_concurrency_limit__name ON concurrency_limit(name)", .{}); 299 316 300 317 log.info("database", "postgres schema initialized", .{}); 301 318 }
+17
src/db/schema/sqlite.zig
··· 257 257 \\) 258 258 , .{}); 259 259 260 + // concurrency_limit table (v2 only - no v1 support) 261 + try backend.db.exec( 262 + \\CREATE TABLE IF NOT EXISTS concurrency_limit ( 263 + \\ id TEXT PRIMARY KEY, 264 + \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 265 + \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 266 + \\ name TEXT NOT NULL UNIQUE, 267 + \\ "limit" INTEGER NOT NULL, 268 + \\ active INTEGER DEFAULT 1, 269 + \\ active_slots INTEGER DEFAULT 0, 270 + \\ denied_slots INTEGER DEFAULT 0, 271 + \\ slot_decay_per_second REAL DEFAULT 0.0, 272 + \\ avg_slot_occupancy_seconds REAL DEFAULT 2.0 273 + \\) 274 + , .{}); 275 + 260 276 // indexes 261 277 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)", .{}); 262 278 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)", .{}); ··· 288 304 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment__updated ON deployment(updated)", .{}); 289 305 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__deployment_id ON deployment_schedule(deployment_id)", .{}); 290 306 try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_deployment_schedule__active ON deployment_schedule(active)", .{}); 307 + try backend.db.exec("CREATE INDEX IF NOT EXISTS ix_concurrency_limit__name ON concurrency_limit(name)", .{}); 291 308 292 309 log.info("database", "sqlite schema initialized", .{}); 293 310 }
+1
src/db/sqlite.zig
··· 19 19 pub const workers = @import("workers.zig"); 20 20 pub const deployments = @import("deployments.zig"); 21 21 pub const deployment_schedules = @import("deployment_schedules.zig"); 22 + pub const concurrency_limits = @import("concurrency_limits.zig"); 22 23 23 24 // re-export types for compatibility 24 25 pub const FlowRow = flows.FlowRow;