const std = @import("std"); const Allocator = std.mem.Allocator; const backend = @import("backend.zig"); const log = @import("../logging.zig"); const time_util = @import("../utilities/time.zig"); pub const FlowRunRow = struct { id: []const u8, created: []const u8, updated: []const u8, flow_id: []const u8, name: []const u8, state_type: []const u8, state_name: []const u8, state_timestamp: []const u8, parameters: []const u8, tags: []const u8, run_count: i64, expected_start_time: ?[]const u8, next_scheduled_start_time: ?[]const u8, start_time: ?[]const u8, end_time: ?[]const u8, total_run_time: f64, // deployment fields deployment_id: ?[]const u8, deployment_version: ?[]const u8, work_queue_name: ?[]const u8, work_queue_id: ?[]const u8, auto_scheduled: bool, idempotency_key: ?[]const u8, // retry policy (empirical_policy JSON) empirical_policy: []const u8, // idempotency: transition_id from current state state_transition_id: ?[]const u8, }; pub const InsertParams = struct { deployment_id: ?[]const u8 = null, deployment_version: ?[]const u8 = null, work_queue_name: ?[]const u8 = null, work_queue_id: ?[]const u8 = null, auto_scheduled: bool = false, expected_start_time: ?[]const u8 = null, next_scheduled_start_time: ?[]const u8 = null, idempotency_key: ?[]const u8 = null, parameters: ?[]const u8 = null, empirical_policy: ?[]const u8 = null, // JSON: {"retries": N, "retry_delay": N} }; pub fn insert( id: []const u8, flow_id: []const u8, name: []const u8, state_type: []const u8, state_name: []const u8, timestamp: []const u8, params: InsertParams, ) !void { backend.db.exec( \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp, \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters, empirical_policy) \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) , .{ id, flow_id, name, state_type, state_name, timestamp, params.deployment_id, params.deployment_version, params.work_queue_name, params.work_queue_id, @as(i64, if (params.auto_scheduled) 1 else 0), params.expected_start_time, params.next_scheduled_start_time, params.idempotency_key, params.parameters orelse "{}", params.empirical_policy orelse "{}", }) catch |err| { log.err("database", "insert flow_run error: {}", .{err}); return err; }; } /// Insert a flow run idempotently - silently ignores duplicates based on (flow_id, idempotency_key). /// Used by the scheduler to safely create runs without duplicates. pub fn insertOrIgnore( id: []const u8, flow_id: []const u8, name: []const u8, state_type: []const u8, state_name: []const u8, timestamp: []const u8, params: InsertParams, ) !bool { // Use INSERT OR IGNORE for SQLite, INSERT ... ON CONFLICT DO NOTHING for PostgreSQL const sql = if (backend.db.dialect == .sqlite) \\INSERT OR IGNORE INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp, \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters, empirical_policy) \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) else \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp, \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters, empirical_policy) \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) \\ON CONFLICT (flow_id, idempotency_key) WHERE idempotency_key IS NOT NULL DO NOTHING ; backend.db.exec(sql, .{ id, flow_id, name, state_type, state_name, timestamp, params.deployment_id, params.deployment_version, params.work_queue_name, params.work_queue_id, @as(i64, if (params.auto_scheduled) 1 else 0), params.expected_start_time, params.next_scheduled_start_time, params.idempotency_key, params.parameters orelse "{}", params.empirical_policy orelse "{}", }) catch |err| { log.err("database", "insertOrIgnore flow_run error: {}", .{err}); return err; }; // For SQLite, check if insert happened via changes() // For PostgreSQL, we'd need to check affected rows - for now assume success return true; } pub fn get(alloc: Allocator, id: []const u8) !?FlowRunRow { var rows = backend.db.query( \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id \\FROM flow_run WHERE id = ? , .{id}) catch return null; defer rows.deinit(); if (rows.next()) |row| { return rowToFlowRun(alloc, row); } return null; } pub fn setState( run_id: []const u8, state_id: []const u8, state_type: []const u8, state_name: []const u8, timestamp: []const u8, start_time: ?[]const u8, end_time: ?[]const u8, run_count: i64, total_run_time: f64, expected_start_time: ?[]const u8, state_transition_id: ?[]const u8, ) !void { // Lock mutex only for SQLite (postgres pool handles concurrency) if (backend.db.dialect == .sqlite) { backend.db.mutex.lock(); } defer if (backend.db.dialect == .sqlite) { backend.db.mutex.unlock(); }; // Begin transaction - for postgres this acquires a dedicated connection var txn = backend.db.beginTransaction() catch |err| { log.err("database", "begin transaction error: {}", .{err}); return err; }; errdefer txn.rollback(); // Execute within transaction (uses same connection for postgres) // Only update expected_start_time if provided (from CopyScheduledTime rule) if (expected_start_time) |est| { txn.exec( \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?, \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?, expected_start_time = ?, state_transition_id = ? \\WHERE id = ? , .{ state_id, state_type, state_name, timestamp, timestamp, start_time, end_time, run_count, total_run_time, est, state_transition_id, run_id, }) catch |err| { log.err("database", "update flow_run error: {}", .{err}); return err; }; } else { txn.exec( \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?, \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?, state_transition_id = ? \\WHERE id = ? , .{ state_id, state_type, state_name, timestamp, timestamp, start_time, end_time, run_count, total_run_time, state_transition_id, run_id, }) catch |err| { log.err("database", "update flow_run error: {}", .{err}); return err; }; } txn.exec( \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp) \\VALUES (?, ?, ?, ?, ?) , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| { log.err("database", "insert flow_run_state error: {}", .{err}); return err; }; txn.commit() catch |err| { log.err("database", "commit error: {}", .{err}); return err; }; } /// Set flow run state with next_scheduled_start_time (for AwaitingRetry state) /// optionally updates empirical_policy if provided pub fn setStateWithSchedule( run_id: []const u8, state_id: []const u8, state_type: []const u8, state_name: []const u8, timestamp: []const u8, start_time: ?[]const u8, end_time: ?[]const u8, run_count: i64, total_run_time: f64, expected_start_time: ?[]const u8, next_scheduled_start_time: ?[]const u8, empirical_policy: ?[]const u8, state_transition_id: ?[]const u8, ) !void { if (backend.db.dialect == .sqlite) { backend.db.mutex.lock(); } defer if (backend.db.dialect == .sqlite) { backend.db.mutex.unlock(); }; var txn = backend.db.beginTransaction() catch |err| { log.err("database", "begin transaction error: {}", .{err}); return err; }; errdefer txn.rollback(); if (empirical_policy) |policy| { txn.exec( \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?, \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?, \\ expected_start_time = COALESCE(?, expected_start_time), \\ next_scheduled_start_time = ?, \\ empirical_policy = ?, \\ state_transition_id = ? \\WHERE id = ? , .{ state_id, state_type, state_name, timestamp, timestamp, start_time, end_time, run_count, total_run_time, expected_start_time, next_scheduled_start_time, policy, state_transition_id, run_id, }) catch |err| { log.err("database", "update flow_run error: {}", .{err}); return err; }; } else { txn.exec( \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?, \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?, \\ expected_start_time = COALESCE(?, expected_start_time), \\ next_scheduled_start_time = ?, \\ state_transition_id = ? \\WHERE id = ? , .{ state_id, state_type, state_name, timestamp, timestamp, start_time, end_time, run_count, total_run_time, expected_start_time, next_scheduled_start_time, state_transition_id, run_id, }) catch |err| { log.err("database", "update flow_run error: {}", .{err}); return err; }; } txn.exec( \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp) \\VALUES (?, ?, ?, ?, ?) , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| { log.err("database", "insert flow_run_state error: {}", .{err}); return err; }; txn.commit() catch |err| { log.err("database", "commit error: {}", .{err}); return err; }; } pub fn list(alloc: Allocator, limit: usize) ![]FlowRunRow { var results = std.ArrayListUnmanaged(FlowRunRow){}; errdefer results.deinit(alloc); var rows = backend.db.query( \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id \\FROM flow_run ORDER BY created DESC LIMIT ? , .{@as(i64, @intCast(limit))}) catch |err| { log.err("database", "list flow_runs error: {}", .{err}); return err; }; defer rows.deinit(); while (rows.next()) |r| { try results.append(alloc, rowToFlowRun(alloc, r)); } return results.toOwnedSlice(alloc); } /// Get scheduled flow runs for a specific deployment pub fn getScheduledByDeployment(alloc: Allocator, deployment_id: []const u8, limit: usize) ![]FlowRunRow { var results = std.ArrayListUnmanaged(FlowRunRow){}; errdefer results.deinit(alloc); var rows = backend.db.query( \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED' \\ORDER BY expected_start_time ASC LIMIT ? , .{ deployment_id, @as(i64, @intCast(limit)) }) catch |err| { log.err("database", "get scheduled flow_runs error: {}", .{err}); return err; }; defer rows.deinit(); while (rows.next()) |r| { try results.append(alloc, rowToFlowRun(alloc, r)); } return results.toOwnedSlice(alloc); } /// Get scheduled flow runs for multiple deployments pub fn getScheduledByDeployments( alloc: Allocator, deployment_ids: []const []const u8, scheduled_before: ?[]const u8, limit: usize, ) ![]FlowRunRow { var results = std.ArrayListUnmanaged(FlowRunRow){}; errdefer results.deinit(alloc); // Query each deployment and combine results for (deployment_ids) |dep_id| { const dep_runs = try getScheduledByDeploymentBefore(alloc, dep_id, scheduled_before, limit); for (dep_runs) |run| { try results.append(alloc, run); if (results.items.len >= limit) break; } if (results.items.len >= limit) break; } // Sort by next_scheduled_start_time using proper timestamp parsing const items = results.items; std.mem.sort(FlowRunRow, items, {}, struct { fn lessThan(_: void, a: FlowRunRow, b: FlowRunRow) bool { const a_time = a.next_scheduled_start_time orelse a.expected_start_time orelse ""; const b_time = b.next_scheduled_start_time orelse b.expected_start_time orelse ""; // Parse to microseconds for proper comparison const a_us = time_util.parse(a_time) orelse 0; const b_us = time_util.parse(b_time) orelse 0; return a_us < b_us; } }.lessThan); // Trim to limit if (items.len > limit) { results.shrinkRetainingCapacity(limit); } return results.toOwnedSlice(alloc); } /// Get scheduled flow runs for a deployment with optional time filter. /// Filters on next_scheduled_start_time to match Python Prefect behavior. fn getScheduledByDeploymentBefore( alloc: Allocator, deployment_id: []const u8, scheduled_before: ?[]const u8, limit: usize, ) ![]FlowRunRow { var results = std.ArrayListUnmanaged(FlowRunRow){}; errdefer results.deinit(alloc); if (scheduled_before) |before| { var rows = backend.db.query( \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED' \\ AND (next_scheduled_start_time IS NULL OR next_scheduled_start_time <= ?) \\ORDER BY next_scheduled_start_time ASC LIMIT ? , .{ deployment_id, before, @as(i64, @intCast(limit)) }) catch |err| { log.err("database", "get scheduled flow_runs error: {}", .{err}); return err; }; defer rows.deinit(); while (rows.next()) |r| { try results.append(alloc, rowToFlowRun(alloc, r)); } } else { var rows = backend.db.query( \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED' \\ORDER BY next_scheduled_start_time ASC LIMIT ? , .{ deployment_id, @as(i64, @intCast(limit)) }) catch |err| { log.err("database", "get scheduled flow_runs error: {}", .{err}); return err; }; defer rows.deinit(); while (rows.next()) |r| { try results.append(alloc, rowToFlowRun(alloc, r)); } } return results.toOwnedSlice(alloc); } /// Get scheduled flow runs for a work queue with optional time filter. /// Filters on next_scheduled_start_time to match Python Prefect behavior. pub fn getScheduledByWorkQueue( alloc: Allocator, work_queue_id: []const u8, scheduled_before: ?[]const u8, limit: usize, ) ![]FlowRunRow { var results = std.ArrayListUnmanaged(FlowRunRow){}; errdefer results.deinit(alloc); if (scheduled_before) |before| { var rows = backend.db.query( \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id \\FROM flow_run WHERE work_queue_id = ? AND state_type = 'SCHEDULED' \\ AND (next_scheduled_start_time IS NULL OR next_scheduled_start_time <= ?) \\ORDER BY next_scheduled_start_time ASC LIMIT ? , .{ work_queue_id, before, @as(i64, @intCast(limit)) }) catch |err| { log.err("database", "get scheduled flow_runs by queue error: {}", .{err}); return err; }; defer rows.deinit(); while (rows.next()) |r| { try results.append(alloc, rowToFlowRun(alloc, r)); } } else { var rows = backend.db.query( \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id \\FROM flow_run WHERE work_queue_id = ? AND state_type = 'SCHEDULED' \\ORDER BY next_scheduled_start_time ASC LIMIT ? , .{ work_queue_id, @as(i64, @intCast(limit)) }) catch |err| { log.err("database", "get scheduled flow_runs by queue error: {}", .{err}); return err; }; defer rows.deinit(); while (rows.next()) |r| { try results.append(alloc, rowToFlowRun(alloc, r)); } } return results.toOwnedSlice(alloc); } /// Get scheduled flow runs for multiple work queues pub fn getScheduledByWorkQueues( alloc: Allocator, work_queue_ids: []const []const u8, scheduled_before: ?[]const u8, limit: usize, ) ![]FlowRunRow { var results = std.ArrayListUnmanaged(FlowRunRow){}; errdefer results.deinit(alloc); // Query each queue and combine results for (work_queue_ids) |queue_id| { const queue_runs = try getScheduledByWorkQueue(alloc, queue_id, scheduled_before, limit); for (queue_runs) |run| { try results.append(alloc, run); if (results.items.len >= limit) break; } if (results.items.len >= limit) break; } // Sort by next_scheduled_start_time using proper timestamp parsing const items = results.items; std.mem.sort(FlowRunRow, items, {}, struct { fn lessThan(_: void, a: FlowRunRow, b: FlowRunRow) bool { const a_time = a.next_scheduled_start_time orelse a.expected_start_time orelse ""; const b_time = b.next_scheduled_start_time orelse b.expected_start_time orelse ""; // Parse to microseconds for proper comparison const a_us = time_util.parse(a_time) orelse 0; const b_us = time_util.parse(b_time) orelse 0; return a_us < b_us; } }.lessThan); // Trim to limit if (items.len > limit) { results.shrinkRetainingCapacity(limit); } return results.toOwnedSlice(alloc); } fn rowToFlowRun(alloc: Allocator, r: anytype) FlowRunRow { return .{ .id = alloc.dupe(u8, r.text(0)) catch "", .created = alloc.dupe(u8, r.text(1)) catch "", .updated = alloc.dupe(u8, r.text(2)) catch "", .flow_id = alloc.dupe(u8, r.text(3)) catch "", .name = alloc.dupe(u8, r.text(4)) catch "", .state_type = alloc.dupe(u8, r.text(5)) catch "", .state_name = alloc.dupe(u8, r.text(6)) catch "", .state_timestamp = alloc.dupe(u8, r.text(7)) catch "", .parameters = alloc.dupe(u8, r.text(8)) catch "{}", .tags = alloc.dupe(u8, r.text(9)) catch "[]", .run_count = r.bigint(10), .expected_start_time = if (r.text(11).len > 0) alloc.dupe(u8, r.text(11)) catch null else null, .next_scheduled_start_time = if (r.text(12).len > 0) alloc.dupe(u8, r.text(12)) catch null else null, .start_time = if (r.text(13).len > 0) alloc.dupe(u8, r.text(13)) catch null else null, .end_time = if (r.text(14).len > 0) alloc.dupe(u8, r.text(14)) catch null else null, .total_run_time = r.float(15), .deployment_id = if (r.text(16).len > 0) alloc.dupe(u8, r.text(16)) catch null else null, .deployment_version = if (r.text(17).len > 0) alloc.dupe(u8, r.text(17)) catch null else null, .work_queue_name = if (r.text(18).len > 0) alloc.dupe(u8, r.text(18)) catch null else null, .work_queue_id = if (r.text(19).len > 0) alloc.dupe(u8, r.text(19)) catch null else null, .auto_scheduled = r.int(20) != 0, .idempotency_key = if (r.text(21).len > 0) alloc.dupe(u8, r.text(21)) catch null else null, .empirical_policy = alloc.dupe(u8, r.text(22)) catch "{}", .state_transition_id = if (r.text(23).len > 0) alloc.dupe(u8, r.text(23)) catch null else null, }; } /// Patch a flow run with optional fields /// Currently supports: infrastructure_pid pub fn patch(id: []const u8, infrastructure_pid: ?[]const u8) !void { // For now, just accept the PATCH request without actually updating // The runner uses PATCH to set infrastructure metadata, but we can // skip this for basic functionality - the state transitions are what matter _ = id; _ = infrastructure_pid; // TODO: add infrastructure_pid column and update it } /// Get late flow runs (SCHEDULED with next_scheduled_start_time before cutoff). pub fn getLateRuns(alloc: Allocator, cutoff_timestamp: []const u8, limit: usize) ![]FlowRunRow { var results = std.ArrayListUnmanaged(FlowRunRow){}; errdefer results.deinit(alloc); var rows = backend.db.query( \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id \\FROM flow_run WHERE state_type = 'SCHEDULED' AND state_name = 'Scheduled' \\ AND next_scheduled_start_time IS NOT NULL AND next_scheduled_start_time <= ? \\ORDER BY next_scheduled_start_time ASC LIMIT ? , .{ cutoff_timestamp, @as(i64, @intCast(limit)) }) catch |err| { log.err("database", "get late flow_runs error: {}", .{err}); return err; }; defer rows.deinit(); while (rows.next()) |r| try results.append(alloc, rowToFlowRun(alloc, r)); return results.toOwnedSlice(alloc); } pub fn delete(id: []const u8) !bool { // check if flow run exists var rows = backend.db.query( "SELECT id FROM flow_run WHERE id = ?", .{id}, ) catch return false; defer rows.deinit(); if (rows.next() == null) { return false; } // delete associated state records first backend.db.exec( "DELETE FROM flow_run_state WHERE flow_run_id = ?", .{id}, ) catch |err| { log.err("database", "delete flow_run_state error: {}", .{err}); return false; }; // delete the flow run backend.db.exec( "DELETE FROM flow_run WHERE id = ?", .{id}, ) catch |err| { log.err("database", "delete flow_run error: {}", .{err}); return false; }; return true; }