prefect server in zig
1// transforms.zig - global bookkeeping transforms
2//
3// universal transforms that run on every state transition:
4// - SetStartTime: set start_time when first entering RUNNING
5// - SetEndTime: set end_time when entering terminal state
6// - IncrementRunTime: accumulate total_run_time when exiting RUNNING
7// - IncrementRunCount: increment run_count when entering RUNNING
8
9const std = @import("std");
10const log = @import("../logging.zig");
11const time_util = @import("../utilities/time.zig");
12const types = @import("types.zig");
13
14const StateType = types.StateType;
15
16/// context for a state transition, holding info needed for bookkeeping
17pub const TransitionContext = struct {
18 // current run state (from db)
19 current_state_type: ?StateType,
20 current_state_timestamp: ?[]const u8,
21 start_time: ?[]const u8,
22 end_time: ?[]const u8,
23 run_count: i64,
24 total_run_time: f64,
25
26 // proposed new state
27 proposed_state_type: StateType,
28 proposed_state_timestamp: []const u8,
29
30 // output: updated values to write to db
31 new_start_time: ?[]const u8 = null,
32 new_end_time: ?[]const u8 = null,
33 new_run_count: i64 = 0,
34 new_total_run_time: f64 = 0.0,
35};
36
37/// apply all bookkeeping transforms to a state transition
38pub fn applyBookkeeping(ctx: *TransitionContext) void {
39 // copy current values as baseline
40 ctx.new_start_time = ctx.start_time;
41 ctx.new_end_time = ctx.end_time;
42 ctx.new_run_count = ctx.run_count;
43 ctx.new_total_run_time = ctx.total_run_time;
44
45 // SetStartTime: record when first entering RUNNING
46 if (ctx.proposed_state_type.isRunning() and ctx.start_time == null) {
47 ctx.new_start_time = ctx.proposed_state_timestamp;
48 log.debug("orchestration", "setting start_time to {s}", .{ctx.proposed_state_timestamp});
49 }
50
51 // SetEndTime: record when entering terminal state
52 if (ctx.proposed_state_type.isFinal()) {
53 if (ctx.start_time != null and ctx.end_time == null) {
54 ctx.new_end_time = ctx.proposed_state_timestamp;
55 log.debug("orchestration", "setting end_time to {s}", .{ctx.proposed_state_timestamp});
56 }
57 }
58 // clear end_time if exiting final state for non-final state
59 if (ctx.current_state_type) |current| {
60 if (current.isFinal() and !ctx.proposed_state_type.isFinal()) {
61 ctx.new_end_time = null;
62 log.debug("orchestration", "clearing end_time (exiting terminal state)", .{});
63 }
64 }
65
66 // IncrementRunTime: accumulate time spent in RUNNING
67 if (ctx.current_state_type) |current| {
68 if (current.isRunning()) {
69 if (ctx.current_state_timestamp) |start_ts| {
70 const duration = computeDuration(start_ts, ctx.proposed_state_timestamp);
71 ctx.new_total_run_time = ctx.total_run_time + duration;
72 log.debug("orchestration", "adding {d:.3}s to total_run_time (now {d:.3}s)", .{ duration, ctx.new_total_run_time });
73 }
74 }
75 }
76
77 // IncrementRunCount: bump count when entering RUNNING
78 if (ctx.proposed_state_type.isRunning()) {
79 ctx.new_run_count = ctx.run_count + 1;
80 log.debug("orchestration", "incrementing run_count to {d}", .{ctx.new_run_count});
81 }
82}
83
84/// compute duration in seconds between two ISO8601 timestamps
85fn computeDuration(start: []const u8, end: []const u8) f64 {
86 const start_us = time_util.parse(start) orelse return 0.0;
87 const end_us = time_util.parse(end) orelse return 0.0;
88
89 if (end_us >= start_us) {
90 return @as(f64, @floatFromInt(end_us - start_us)) / 1_000_000.0;
91 }
92 return 0.0;
93}
94
95// ============================================================================
96// Tests
97// ============================================================================
98
99test "computeDuration" {
100 const testing = std.testing;
101 const duration = computeDuration("2024-01-19T16:30:00Z", "2024-01-19T16:30:05Z");
102 try testing.expectApproxEqAbs(@as(f64, 5.0), duration, 0.001);
103}
104
105test "computeDuration with fractional seconds" {
106 const testing = std.testing;
107 const duration = computeDuration("2024-01-19T16:30:00.000000Z", "2024-01-19T16:30:01.500000Z");
108 try testing.expectApproxEqAbs(@as(f64, 1.5), duration, 0.001);
109}