prefect server in zig
1// rules.zig - orchestration rule abstraction
2//
3// defines the rule interface and policy application logic
4
5const std = @import("std");
6const log = @import("../logging.zig");
7const types = @import("types.zig");
8
9pub const StateType = types.StateType;
10pub const StateTypeSet = types.StateTypeSet;
11pub const ResponseStatus = types.ResponseStatus;
12pub const ResponseDetails = types.ResponseDetails;
13pub const OrchestrationResult = types.OrchestrationResult;
14
15/// context passed to orchestration rules during state transition
16pub const RuleContext = struct {
17 // transition info
18 initial_state: ?StateType,
19 proposed_state: StateType,
20 initial_state_timestamp: ?[]const u8,
21 proposed_state_timestamp: []const u8,
22
23 // scheduling info (for CopyScheduledTime, WaitForScheduledTime)
24 initial_scheduled_time: ?[]const u8 = null, // next_scheduled_start_time from SCHEDULED state
25 proposed_scheduled_time: ?[]const u8 = null, // scheduled_time from proposed state (if any)
26
27 // idempotency (for PreventDuplicateTransitions)
28 initial_transition_id: ?[]const u8 = null, // transition_id from current state
29 proposed_transition_id: ?[]const u8 = null, // transition_id from proposed state
30
31 // run metadata (for rules that need it)
32 run_id: []const u8,
33 flow_id: ?[]const u8 = null,
34 deployment_id: ?[]const u8 = null,
35
36 // retry policy (from empirical_policy)
37 run_count: i64 = 0,
38 retries: ?i64 = null, // max retries allowed
39 retry_delay: ?i64 = null, // delay in seconds between retries
40
41 // orchestration result (modified by rules)
42 result: OrchestrationResult = .{},
43
44 // output: values to write back to db (set by rules)
45 new_expected_start_time: ?[]const u8 = null,
46
47 // retry output: when RetryFailedFlows rejects, it proposes a new state
48 retry_state_type: ?StateType = null,
49 retry_state_name: ?[]const u8 = null,
50 retry_scheduled_time: ?[]const u8 = null,
51
52 // internal buffer for retry_scheduled_time (to avoid use-after-free from stack buffers)
53 _retry_time_buf: [32]u8 = undefined,
54
55 // empirical_policy mutation flags (set by RetryFailedFlows)
56 // when set, API handler should update the policy JSON accordingly
57 set_retry_type_in_process: bool = false, // set retry_type = "in_process"
58 clear_retry_type: bool = false, // set retry_type = null (when retries exhausted)
59 clear_pause_keys: bool = false, // set pause_keys = []
60 set_resuming_false: bool = false, // set resuming = false
61
62 /// reject the transition with a reason
63 pub fn reject(self: *RuleContext, reason: []const u8) void {
64 self.result.status = .REJECT;
65 self.result.details.reason = reason;
66 log.debug("orchestration", "rule rejected transition: {s}", .{reason});
67 }
68
69 /// delay the transition (client should retry)
70 pub fn wait(self: *RuleContext, reason: []const u8, retry_after: f64) void {
71 self.result.status = .WAIT;
72 self.result.details.reason = reason;
73 self.result.details.retry_after = retry_after;
74 log.debug("orchestration", "rule delayed transition: {s} (retry after {d}s)", .{ reason, retry_after });
75 }
76
77 /// abort the transition completely
78 pub fn abort(self: *RuleContext, reason: []const u8) void {
79 self.result.status = .ABORT;
80 self.result.details.reason = reason;
81 log.debug("orchestration", "rule aborted transition: {s}", .{reason});
82 }
83
84 /// reject and schedule a retry (used by RetryFailedFlows)
85 /// The caller should set retry_scheduled_time before calling this
86 pub fn scheduleRetry(self: *RuleContext, reason: []const u8, scheduled_time: []const u8) void {
87 self.result.status = .REJECT;
88 self.result.details.reason = reason;
89 self.retry_state_type = .SCHEDULED;
90 self.retry_state_name = "AwaitingRetry";
91 self.retry_scheduled_time = scheduled_time;
92 // set policy mutation flags for retry
93 self.set_retry_type_in_process = true;
94 self.set_resuming_false = true;
95 self.clear_pause_keys = true;
96 log.debug("orchestration", "rule scheduled retry: {s} at {s}", .{ reason, scheduled_time });
97 }
98
99 /// signal that retries are exhausted and retry_type should be cleared
100 pub fn clearRetryType(self: *RuleContext) void {
101 self.clear_retry_type = true;
102 }
103
104 /// check if transition is still accepted (not yet rejected/waited/aborted)
105 pub fn isAccepted(self: *const RuleContext) bool {
106 return self.result.status == .ACCEPT;
107 }
108};
109
110/// an orchestration rule that can modify or reject state transitions
111pub const OrchestrationRule = struct {
112 /// rule name for logging/debugging
113 name: []const u8,
114 /// which initial states this rule applies to
115 from_states: StateTypeSet,
116 /// which proposed states this rule applies to
117 to_states: StateTypeSet,
118 /// the rule implementation - called before state is committed
119 before_transition: *const fn (*RuleContext) void,
120
121 /// check if this rule applies to the given transition
122 pub fn appliesTo(self: OrchestrationRule, initial: ?StateType, proposed: StateType) bool {
123 const from_matches = if (initial) |s|
124 self.from_states.contains(s)
125 else
126 self.from_states.containsNull();
127
128 const to_matches = self.to_states.contains(proposed);
129 return from_matches and to_matches;
130 }
131};
132
133/// apply all applicable rules from a policy to a transition
134pub fn applyPolicy(
135 policy: []const OrchestrationRule,
136 ctx: *RuleContext,
137) void {
138 for (policy) |rule| {
139 // skip rules that don't apply to this transition
140 if (!rule.appliesTo(ctx.initial_state, ctx.proposed_state)) {
141 continue;
142 }
143
144 log.debug("orchestration", "applying rule: {s}", .{rule.name});
145
146 // apply the rule
147 rule.before_transition(ctx);
148
149 // if rule rejected/waited/aborted, stop processing
150 if (!ctx.isAccepted()) {
151 log.debug("orchestration", "rule {s} stopped transition with status {s}", .{
152 rule.name,
153 ctx.result.status.toString(),
154 });
155 return;
156 }
157 }
158}