const std = @import("std"); const Allocator = std.mem.Allocator; const backend = @import("backend.zig"); const log = @import("../logging.zig"); pub const TaskRunRow = struct { id: []const u8, created: []const u8, updated: []const u8, flow_run_id: []const u8, name: []const u8, task_key: []const u8, dynamic_key: []const u8, state_type: []const u8, state_name: []const u8, state_timestamp: []const u8, tags: []const u8, run_count: i64, }; pub fn insert( id: []const u8, flow_run_id: ?[]const u8, name: []const u8, task_key: []const u8, dynamic_key: []const u8, state_type: []const u8, state_name: []const u8, timestamp: []const u8, ) !void { backend.db.exec( \\INSERT INTO task_run (id, flow_run_id, name, task_key, dynamic_key, state_type, state_name, state_timestamp) \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) , .{ id, flow_run_id, name, task_key, dynamic_key, state_type, state_name, timestamp, }) catch |err| { log.err("database", "insert task_run error: {}", .{err}); return err; }; } pub fn get(alloc: Allocator, id: []const u8) !?TaskRunRow { var rows = backend.db.query( \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, \\ state_type, state_name, state_timestamp, tags, run_count \\FROM task_run WHERE id = ? , .{id}) catch return null; defer rows.deinit(); if (rows.next()) |row| { return rowToTaskRun(alloc, row); } return null; } pub fn getByKey( alloc: Allocator, flow_run_id: ?[]const u8, task_key: []const u8, dynamic_key: []const u8, ) !?TaskRunRow { if (flow_run_id) |frid| { var rows = backend.db.query( \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, \\ state_type, state_name, state_timestamp, tags, run_count \\FROM task_run WHERE flow_run_id = ? AND task_key = ? AND dynamic_key = ? , .{ frid, task_key, dynamic_key }) catch return null; defer rows.deinit(); if (rows.next()) |row| { return rowToTaskRun(alloc, row); } } else { var rows = backend.db.query( \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, \\ state_type, state_name, state_timestamp, tags, run_count \\FROM task_run WHERE flow_run_id IS NULL AND task_key = ? AND dynamic_key = ? , .{ task_key, dynamic_key }) catch return null; defer rows.deinit(); if (rows.next()) |row| { return rowToTaskRun(alloc, row); } } return null; } pub fn setState( run_id: []const u8, state_id: []const u8, state_type: []const u8, state_name: []const u8, timestamp: []const u8, ) !void { // Lock mutex only for SQLite (postgres pool handles concurrency) if (backend.db.dialect == .sqlite) backend.db.mutex.lock(); defer if (backend.db.dialect == .sqlite) backend.db.mutex.unlock(); var txn = backend.db.beginTransaction() catch |err| { log.err("database", "begin transaction error: {}", .{err}); return err; }; errdefer txn.rollback(); txn.exec( \\UPDATE task_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ? \\WHERE id = ? , .{ state_id, state_type, state_name, timestamp, timestamp, run_id }) catch |err| { log.err("database", "update task_run error: {}", .{err}); return err; }; // Insert state history record txn.exec( \\INSERT INTO task_run_state (id, task_run_id, type, name, timestamp) \\VALUES (?, ?, ?, ?, ?) , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| { log.err("database", "insert task_run_state error: {}", .{err}); return err; }; txn.commit() catch |err| { log.err("database", "commit error: {}", .{err}); return err; }; } pub fn list(alloc: Allocator, limit: usize) ![]TaskRunRow { var results = std.ArrayListUnmanaged(TaskRunRow){}; errdefer results.deinit(alloc); var rows = backend.db.query( \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, \\ state_type, state_name, state_timestamp, tags, run_count \\FROM task_run ORDER BY created DESC LIMIT ? , .{@as(i64, @intCast(limit))}) catch |err| { log.err("database", "list task_runs error: {}", .{err}); return err; }; defer rows.deinit(); while (rows.next()) |row| { try results.append(alloc, rowToTaskRun(alloc, row)); } return results.toOwnedSlice(alloc); } fn rowToTaskRun(alloc: Allocator, row: anytype) TaskRunRow { return .{ .id = alloc.dupe(u8, row.text(0)) catch "", .created = alloc.dupe(u8, row.text(1)) catch "", .updated = alloc.dupe(u8, row.text(2)) catch "", .flow_run_id = alloc.dupe(u8, row.text(3)) catch "", .name = alloc.dupe(u8, row.text(4)) catch "", .task_key = alloc.dupe(u8, row.text(5)) catch "", .dynamic_key = alloc.dupe(u8, row.text(6)) catch "", .state_type = alloc.dupe(u8, row.text(7)) catch "", .state_name = alloc.dupe(u8, row.text(8)) catch "", .state_timestamp = alloc.dupe(u8, row.text(9)) catch "", .tags = alloc.dupe(u8, row.text(10)) catch "[]", .run_count = row.bigint(11), }; }