const std = @import("std"); const Allocator = std.mem.Allocator; const backend = @import("backend.zig"); const log = @import("../logging.zig"); /// Worker status enum - matches Python's WorkerStatus pub const Status = enum { online, offline, pub fn fromString(s: []const u8) Status { if (std.mem.eql(u8, s, "ONLINE")) return .online; return .offline; } pub fn toString(self: Status) []const u8 { return switch (self) { .online => "ONLINE", .offline => "OFFLINE", }; } }; pub const WorkerRow = struct { id: []const u8, created: []const u8, updated: []const u8, name: []const u8, work_pool_id: []const u8, last_heartbeat_time: ?[]const u8, heartbeat_interval_seconds: ?i64, status: Status, }; const Col = struct { const id: usize = 0; const created: usize = 1; const updated: usize = 2; const name: usize = 3; const work_pool_id: usize = 4; const last_heartbeat_time: usize = 5; const heartbeat_interval_seconds: usize = 6; const status: usize = 7; }; const select_cols = "id, created, updated, name, work_pool_id, last_heartbeat_time, heartbeat_interval_seconds, status"; fn rowFromResult(alloc: Allocator, r: anytype) !WorkerRow { const last_heartbeat = r.textOrNull(Col.last_heartbeat_time); const interval = r.textOrNull(Col.heartbeat_interval_seconds); return WorkerRow{ .id = try alloc.dupe(u8, r.text(Col.id)), .created = try alloc.dupe(u8, r.text(Col.created)), .updated = try alloc.dupe(u8, r.text(Col.updated)), .name = try alloc.dupe(u8, r.text(Col.name)), .work_pool_id = try alloc.dupe(u8, r.text(Col.work_pool_id)), .last_heartbeat_time = if (last_heartbeat) |lh| try alloc.dupe(u8, lh) else null, .heartbeat_interval_seconds = if (interval != null) r.bigint(Col.heartbeat_interval_seconds) else null, .status = Status.fromString(r.text(Col.status)), }; } pub fn getById(alloc: Allocator, id: []const u8) !?WorkerRow { var r = backend.db.row( "SELECT " ++ select_cols ++ " FROM worker WHERE id = ?", .{id}, ) catch return null; if (r) |*row| { defer row.deinit(); return try rowFromResult(alloc, row); } return null; } pub fn getByPoolAndName(alloc: Allocator, pool_id: []const u8, name: []const u8) !?WorkerRow { var r = backend.db.row( "SELECT " ++ select_cols ++ " FROM worker WHERE work_pool_id = ? AND name = ?", .{ pool_id, name }, ) catch return null; if (r) |*row| { defer row.deinit(); return try rowFromResult(alloc, row); } return null; } /// Upsert worker heartbeat - inserts if not exists, updates if exists /// This is the core worker registration mechanism pub fn upsertHeartbeat( id: []const u8, name: []const u8, work_pool_id: []const u8, heartbeat_interval_seconds: ?i64, now: []const u8, ) !void { // Use dialect-specific upsert SQL const sql = switch (backend.db.dialect) { .sqlite => sqlite_upsert, .postgres => postgres_upsert, }; backend.db.exec(sql, .{ id, now, now, name, work_pool_id, now, heartbeat_interval_seconds, // For the ON CONFLICT update clause: now, // updated now, // last_heartbeat_time heartbeat_interval_seconds, }) catch |err| { log.err("database", "upsert worker heartbeat error: {}", .{err}); return err; }; } const sqlite_upsert = \\INSERT INTO worker (id, created, updated, name, work_pool_id, last_heartbeat_time, heartbeat_interval_seconds, status) \\VALUES (?, ?, ?, ?, ?, ?, ?, 'ONLINE') \\ON CONFLICT(work_pool_id, name) DO UPDATE SET \\ updated = ?, \\ last_heartbeat_time = ?, \\ heartbeat_interval_seconds = COALESCE(?, heartbeat_interval_seconds), \\ status = 'ONLINE' ; const postgres_upsert = \\INSERT INTO worker (id, created, updated, name, work_pool_id, last_heartbeat_time, heartbeat_interval_seconds, status) \\VALUES ($1, $2, $3, $4, $5, $6, $7, 'ONLINE') \\ON CONFLICT(work_pool_id, name) DO UPDATE SET \\ updated = $8, \\ last_heartbeat_time = $9, \\ heartbeat_interval_seconds = COALESCE($10, worker.heartbeat_interval_seconds), \\ status = 'ONLINE' ; pub fn deleteByPoolAndName(pool_id: []const u8, name: []const u8) !bool { const affected = backend.db.execWithRowCount( "DELETE FROM worker WHERE work_pool_id = ? AND name = ?", .{ pool_id, name }, ) catch |err| { log.err("database", "delete worker error: {}", .{err}); return err; }; return affected > 0; } pub fn listByPool(alloc: Allocator, pool_id: []const u8, limit: usize, offset: usize) ![]WorkerRow { var results = std.ArrayListUnmanaged(WorkerRow){}; errdefer results.deinit(alloc); var rows = backend.db.query( "SELECT " ++ select_cols ++ " FROM worker WHERE work_pool_id = ? ORDER BY last_heartbeat_time DESC NULLS LAST LIMIT ? OFFSET ?", .{ pool_id, @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, ) catch |err| { log.err("database", "list workers error: {}", .{err}); return err; }; defer rows.deinit(); while (rows.next()) |r| { try results.append(alloc, try rowFromResult(alloc, &r)); } return results.toOwnedSlice(alloc); } pub fn countByPool(pool_id: []const u8) !usize { var r = backend.db.row("SELECT COUNT(*) FROM worker WHERE work_pool_id = ?", .{pool_id}) catch return 0; if (r) |*row| { defer row.deinit(); return @intCast(row.bigint(0)); } return 0; } pub fn countOnlineByPool(pool_id: []const u8) !usize { var r = backend.db.row( "SELECT COUNT(*) FROM worker WHERE work_pool_id = ? AND status = 'ONLINE'", .{pool_id}, ) catch return 0; if (r) |*row| { defer row.deinit(); return @intCast(row.bigint(0)); } return 0; }