prefect server in zig
at main 109 lines 4.2 kB view raw
1// transforms.zig - global bookkeeping transforms 2// 3// universal transforms that run on every state transition: 4// - SetStartTime: set start_time when first entering RUNNING 5// - SetEndTime: set end_time when entering terminal state 6// - IncrementRunTime: accumulate total_run_time when exiting RUNNING 7// - IncrementRunCount: increment run_count when entering RUNNING 8 9const std = @import("std"); 10const log = @import("../logging.zig"); 11const time_util = @import("../utilities/time.zig"); 12const types = @import("types.zig"); 13 14const StateType = types.StateType; 15 16/// context for a state transition, holding info needed for bookkeeping 17pub const TransitionContext = struct { 18 // current run state (from db) 19 current_state_type: ?StateType, 20 current_state_timestamp: ?[]const u8, 21 start_time: ?[]const u8, 22 end_time: ?[]const u8, 23 run_count: i64, 24 total_run_time: f64, 25 26 // proposed new state 27 proposed_state_type: StateType, 28 proposed_state_timestamp: []const u8, 29 30 // output: updated values to write to db 31 new_start_time: ?[]const u8 = null, 32 new_end_time: ?[]const u8 = null, 33 new_run_count: i64 = 0, 34 new_total_run_time: f64 = 0.0, 35}; 36 37/// apply all bookkeeping transforms to a state transition 38pub fn applyBookkeeping(ctx: *TransitionContext) void { 39 // copy current values as baseline 40 ctx.new_start_time = ctx.start_time; 41 ctx.new_end_time = ctx.end_time; 42 ctx.new_run_count = ctx.run_count; 43 ctx.new_total_run_time = ctx.total_run_time; 44 45 // SetStartTime: record when first entering RUNNING 46 if (ctx.proposed_state_type.isRunning() and ctx.start_time == null) { 47 ctx.new_start_time = ctx.proposed_state_timestamp; 48 log.debug("orchestration", "setting start_time to {s}", .{ctx.proposed_state_timestamp}); 49 } 50 51 // SetEndTime: record when entering terminal state 52 if (ctx.proposed_state_type.isFinal()) { 53 if (ctx.start_time != null and ctx.end_time == null) { 54 ctx.new_end_time = ctx.proposed_state_timestamp; 55 log.debug("orchestration", "setting end_time to {s}", .{ctx.proposed_state_timestamp}); 56 } 57 } 58 // clear end_time if exiting final state for non-final state 59 if (ctx.current_state_type) |current| { 60 if (current.isFinal() and !ctx.proposed_state_type.isFinal()) { 61 ctx.new_end_time = null; 62 log.debug("orchestration", "clearing end_time (exiting terminal state)", .{}); 63 } 64 } 65 66 // IncrementRunTime: accumulate time spent in RUNNING 67 if (ctx.current_state_type) |current| { 68 if (current.isRunning()) { 69 if (ctx.current_state_timestamp) |start_ts| { 70 const duration = computeDuration(start_ts, ctx.proposed_state_timestamp); 71 ctx.new_total_run_time = ctx.total_run_time + duration; 72 log.debug("orchestration", "adding {d:.3}s to total_run_time (now {d:.3}s)", .{ duration, ctx.new_total_run_time }); 73 } 74 } 75 } 76 77 // IncrementRunCount: bump count when entering RUNNING 78 if (ctx.proposed_state_type.isRunning()) { 79 ctx.new_run_count = ctx.run_count + 1; 80 log.debug("orchestration", "incrementing run_count to {d}", .{ctx.new_run_count}); 81 } 82} 83 84/// compute duration in seconds between two ISO8601 timestamps 85fn computeDuration(start: []const u8, end: []const u8) f64 { 86 const start_us = time_util.parse(start) orelse return 0.0; 87 const end_us = time_util.parse(end) orelse return 0.0; 88 89 if (end_us >= start_us) { 90 return @as(f64, @floatFromInt(end_us - start_us)) / 1_000_000.0; 91 } 92 return 0.0; 93} 94 95// ============================================================================ 96// Tests 97// ============================================================================ 98 99test "computeDuration" { 100 const testing = std.testing; 101 const duration = computeDuration("2024-01-19T16:30:00Z", "2024-01-19T16:30:05Z"); 102 try testing.expectApproxEqAbs(@as(f64, 5.0), duration, 0.001); 103} 104 105test "computeDuration with fractional seconds" { 106 const testing = std.testing; 107 const duration = computeDuration("2024-01-19T16:30:00.000000Z", "2024-01-19T16:30:01.500000Z"); 108 try testing.expectApproxEqAbs(@as(f64, 1.5), duration, 0.001); 109}