const std = @import("std"); const Allocator = std.mem.Allocator; const backend = @import("backend.zig"); const log = @import("../logging.zig"); /// Work queue status enum - matches Python's WorkQueueStatus pub const Status = enum { not_ready, ready, paused, pub fn fromString(s: []const u8) Status { if (std.mem.eql(u8, s, "READY")) return .ready; if (std.mem.eql(u8, s, "PAUSED")) return .paused; return .not_ready; } pub fn toString(self: Status) []const u8 { return switch (self) { .not_ready => "NOT_READY", .ready => "READY", .paused => "PAUSED", }; } }; pub const WorkQueueRow = struct { id: []const u8, created: []const u8, updated: []const u8, name: []const u8, description: []const u8, is_paused: bool, concurrency_limit: ?i64, priority: i64, work_pool_id: []const u8, last_polled: ?[]const u8, status: Status, }; const Col = struct { const id: usize = 0; const created: usize = 1; const updated: usize = 2; const name: usize = 3; const description: usize = 4; const is_paused: usize = 5; const concurrency_limit: usize = 6; const priority: usize = 7; const work_pool_id: usize = 8; const last_polled: usize = 9; const status: usize = 10; }; const select_cols = "id, created, updated, name, description, is_paused, concurrency_limit, priority, work_pool_id, last_polled, status"; fn rowFromResult(alloc: Allocator, r: anytype) !WorkQueueRow { const last_polled = r.textOrNull(Col.last_polled); const concurrency = r.textOrNull(Col.concurrency_limit); return WorkQueueRow{ .id = try alloc.dupe(u8, r.text(Col.id)), .created = try alloc.dupe(u8, r.text(Col.created)), .updated = try alloc.dupe(u8, r.text(Col.updated)), .name = try alloc.dupe(u8, r.text(Col.name)), .description = try alloc.dupe(u8, r.text(Col.description)), .is_paused = r.int(Col.is_paused) != 0, .concurrency_limit = if (concurrency != null) r.bigint(Col.concurrency_limit) else null, .priority = r.bigint(Col.priority), .work_pool_id = try alloc.dupe(u8, r.text(Col.work_pool_id)), .last_polled = if (last_polled) |lp| try alloc.dupe(u8, lp) else null, .status = Status.fromString(r.text(Col.status)), }; } pub fn getById(alloc: Allocator, id: []const u8) !?WorkQueueRow { var r = backend.db.row( "SELECT " ++ select_cols ++ " FROM work_queue WHERE id = ?", .{id}, ) catch return null; if (r) |*row| { defer row.deinit(); return try rowFromResult(alloc, row); } return null; } pub fn getByPoolAndName(alloc: Allocator, pool_id: []const u8, name: []const u8) !?WorkQueueRow { var r = backend.db.row( "SELECT " ++ select_cols ++ " FROM work_queue WHERE work_pool_id = ? AND name = ?", .{ pool_id, name }, ) catch return null; if (r) |*row| { defer row.deinit(); return try rowFromResult(alloc, row); } return null; } pub fn insert( id: []const u8, name: []const u8, description: []const u8, is_paused: bool, concurrency_limit: ?i64, priority: i64, work_pool_id: []const u8, status: Status, created: []const u8, ) !void { backend.db.exec( "INSERT INTO work_queue (id, created, updated, name, description, is_paused, concurrency_limit, priority, work_pool_id, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", .{ id, created, created, name, description, @as(i32, if (is_paused) 1 else 0), concurrency_limit, priority, work_pool_id, status.toString(), }, ) catch |err| { log.err("database", "insert work_queue error: {}", .{err}); return err; }; } pub fn updateByPoolAndName( pool_id: []const u8, name: []const u8, new_name: ?[]const u8, description: ?[]const u8, is_paused: ?bool, concurrency_limit: ?i64, priority: ?i64, status: ?Status, updated: []const u8, ) !bool { const affected = backend.db.execWithRowCount( "UPDATE work_queue SET name = COALESCE(?, name), description = COALESCE(?, description), is_paused = COALESCE(?, is_paused), concurrency_limit = COALESCE(?, concurrency_limit), priority = COALESCE(?, priority), status = COALESCE(?, status), updated = ? WHERE work_pool_id = ? AND name = ?", .{ new_name, description, if (is_paused) |p| @as(?i32, if (p) 1 else 0) else null, concurrency_limit, priority, if (status) |s| s.toString() else null, updated, pool_id, name, }, ) catch |err| { log.err("database", "update work_queue error: {}", .{err}); return err; }; return affected > 0; } pub fn updateLastPolled(pool_id: []const u8, name: []const u8, last_polled: []const u8, status: Status) !bool { const affected = backend.db.execWithRowCount( "UPDATE work_queue SET last_polled = ?, status = ?, updated = ? WHERE work_pool_id = ? AND name = ?", .{ last_polled, status.toString(), last_polled, pool_id, name }, ) catch |err| { log.err("database", "update work_queue last_polled error: {}", .{err}); return err; }; return affected > 0; } pub fn deleteByPoolAndName(pool_id: []const u8, name: []const u8) !bool { const affected = backend.db.execWithRowCount( "DELETE FROM work_queue WHERE work_pool_id = ? AND name = ?", .{ pool_id, name }, ) catch |err| { log.err("database", "delete work_queue error: {}", .{err}); return err; }; return affected > 0; } pub fn listByPool(alloc: Allocator, pool_id: []const u8, limit: usize, offset: usize) ![]WorkQueueRow { var results = std.ArrayListUnmanaged(WorkQueueRow){}; errdefer results.deinit(alloc); var rows = backend.db.query( "SELECT " ++ select_cols ++ " FROM work_queue WHERE work_pool_id = ? ORDER BY priority ASC LIMIT ? OFFSET ?", .{ pool_id, @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, ) catch |err| { log.err("database", "list work_queues error: {}", .{err}); return err; }; defer rows.deinit(); while (rows.next()) |r| { try results.append(alloc, try rowFromResult(alloc, &r)); } return results.toOwnedSlice(alloc); } pub fn countByPool(pool_id: []const u8) !usize { var r = backend.db.row("SELECT COUNT(*) FROM work_queue WHERE work_pool_id = ?", .{pool_id}) catch return 0; if (r) |*row| { defer row.deinit(); return @intCast(row.bigint(0)); } return 0; } /// Get next available priority for a pool (max + 1, or 1 if no queues) pub fn nextPriority(pool_id: []const u8) !i64 { var r = backend.db.row( "SELECT COALESCE(MAX(priority), 0) + 1 FROM work_queue WHERE work_pool_id = ?", .{pool_id}, ) catch return 1; if (r) |*row| { defer row.deinit(); return row.bigint(0); } return 1; }