prefect server in zig
at main 158 lines 6.2 kB view raw
1// rules.zig - orchestration rule abstraction 2// 3// defines the rule interface and policy application logic 4 5const std = @import("std"); 6const log = @import("../logging.zig"); 7const types = @import("types.zig"); 8 9pub const StateType = types.StateType; 10pub const StateTypeSet = types.StateTypeSet; 11pub const ResponseStatus = types.ResponseStatus; 12pub const ResponseDetails = types.ResponseDetails; 13pub const OrchestrationResult = types.OrchestrationResult; 14 15/// context passed to orchestration rules during state transition 16pub const RuleContext = struct { 17 // transition info 18 initial_state: ?StateType, 19 proposed_state: StateType, 20 initial_state_timestamp: ?[]const u8, 21 proposed_state_timestamp: []const u8, 22 23 // scheduling info (for CopyScheduledTime, WaitForScheduledTime) 24 initial_scheduled_time: ?[]const u8 = null, // next_scheduled_start_time from SCHEDULED state 25 proposed_scheduled_time: ?[]const u8 = null, // scheduled_time from proposed state (if any) 26 27 // idempotency (for PreventDuplicateTransitions) 28 initial_transition_id: ?[]const u8 = null, // transition_id from current state 29 proposed_transition_id: ?[]const u8 = null, // transition_id from proposed state 30 31 // run metadata (for rules that need it) 32 run_id: []const u8, 33 flow_id: ?[]const u8 = null, 34 deployment_id: ?[]const u8 = null, 35 36 // retry policy (from empirical_policy) 37 run_count: i64 = 0, 38 retries: ?i64 = null, // max retries allowed 39 retry_delay: ?i64 = null, // delay in seconds between retries 40 41 // orchestration result (modified by rules) 42 result: OrchestrationResult = .{}, 43 44 // output: values to write back to db (set by rules) 45 new_expected_start_time: ?[]const u8 = null, 46 47 // retry output: when RetryFailedFlows rejects, it proposes a new state 48 retry_state_type: ?StateType = null, 49 retry_state_name: ?[]const u8 = null, 50 retry_scheduled_time: ?[]const u8 = null, 51 52 // internal buffer for retry_scheduled_time (to avoid use-after-free from stack buffers) 53 _retry_time_buf: [32]u8 = undefined, 54 55 // empirical_policy mutation flags (set by RetryFailedFlows) 56 // when set, API handler should update the policy JSON accordingly 57 set_retry_type_in_process: bool = false, // set retry_type = "in_process" 58 clear_retry_type: bool = false, // set retry_type = null (when retries exhausted) 59 clear_pause_keys: bool = false, // set pause_keys = [] 60 set_resuming_false: bool = false, // set resuming = false 61 62 /// reject the transition with a reason 63 pub fn reject(self: *RuleContext, reason: []const u8) void { 64 self.result.status = .REJECT; 65 self.result.details.reason = reason; 66 log.debug("orchestration", "rule rejected transition: {s}", .{reason}); 67 } 68 69 /// delay the transition (client should retry) 70 pub fn wait(self: *RuleContext, reason: []const u8, retry_after: f64) void { 71 self.result.status = .WAIT; 72 self.result.details.reason = reason; 73 self.result.details.retry_after = retry_after; 74 log.debug("orchestration", "rule delayed transition: {s} (retry after {d}s)", .{ reason, retry_after }); 75 } 76 77 /// abort the transition completely 78 pub fn abort(self: *RuleContext, reason: []const u8) void { 79 self.result.status = .ABORT; 80 self.result.details.reason = reason; 81 log.debug("orchestration", "rule aborted transition: {s}", .{reason}); 82 } 83 84 /// reject and schedule a retry (used by RetryFailedFlows) 85 /// The caller should set retry_scheduled_time before calling this 86 pub fn scheduleRetry(self: *RuleContext, reason: []const u8, scheduled_time: []const u8) void { 87 self.result.status = .REJECT; 88 self.result.details.reason = reason; 89 self.retry_state_type = .SCHEDULED; 90 self.retry_state_name = "AwaitingRetry"; 91 self.retry_scheduled_time = scheduled_time; 92 // set policy mutation flags for retry 93 self.set_retry_type_in_process = true; 94 self.set_resuming_false = true; 95 self.clear_pause_keys = true; 96 log.debug("orchestration", "rule scheduled retry: {s} at {s}", .{ reason, scheduled_time }); 97 } 98 99 /// signal that retries are exhausted and retry_type should be cleared 100 pub fn clearRetryType(self: *RuleContext) void { 101 self.clear_retry_type = true; 102 } 103 104 /// check if transition is still accepted (not yet rejected/waited/aborted) 105 pub fn isAccepted(self: *const RuleContext) bool { 106 return self.result.status == .ACCEPT; 107 } 108}; 109 110/// an orchestration rule that can modify or reject state transitions 111pub const OrchestrationRule = struct { 112 /// rule name for logging/debugging 113 name: []const u8, 114 /// which initial states this rule applies to 115 from_states: StateTypeSet, 116 /// which proposed states this rule applies to 117 to_states: StateTypeSet, 118 /// the rule implementation - called before state is committed 119 before_transition: *const fn (*RuleContext) void, 120 121 /// check if this rule applies to the given transition 122 pub fn appliesTo(self: OrchestrationRule, initial: ?StateType, proposed: StateType) bool { 123 const from_matches = if (initial) |s| 124 self.from_states.contains(s) 125 else 126 self.from_states.containsNull(); 127 128 const to_matches = self.to_states.contains(proposed); 129 return from_matches and to_matches; 130 } 131}; 132 133/// apply all applicable rules from a policy to a transition 134pub fn applyPolicy( 135 policy: []const OrchestrationRule, 136 ctx: *RuleContext, 137) void { 138 for (policy) |rule| { 139 // skip rules that don't apply to this transition 140 if (!rule.appliesTo(ctx.initial_state, ctx.proposed_state)) { 141 continue; 142 } 143 144 log.debug("orchestration", "applying rule: {s}", .{rule.name}); 145 146 // apply the rule 147 rule.before_transition(ctx); 148 149 // if rule rejected/waited/aborted, stop processing 150 if (!ctx.isAccepted()) { 151 log.debug("orchestration", "rule {s} stopped transition with status {s}", .{ 152 rule.name, 153 ctx.result.status.toString(), 154 }); 155 return; 156 } 157 } 158}