prefect server in zig

add automations background service

- reactive trigger evaluation via broker subscription
- proactive trigger evaluation via periodic loop
- bucket-based counting for threshold triggers
- action execution: run-deployment, pause/resume-deployment,
cancel-flow-run, pause/resume-automation
- in-memory automation cache with refresh
- add deployments.updatePaused() helper

split into modules to stay under 500-line limit:
- automations.zig: main service loop, cache management
- automations/triggers.zig: trigger parsing and evaluation
- automations/actions.zig: action execution

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+618
+11
src/db/deployments.zig
··· 297 297 return affected > 0; 298 298 } 299 299 300 + pub fn updatePaused(id: []const u8, paused: bool, updated: []const u8) !bool { 301 + const affected = backend.db.execWithRowCount( 302 + "UPDATE deployment SET paused = ?, updated = ? WHERE id = ?", 303 + .{ @as(i64, if (paused) 1 else 0), updated, id }, 304 + ) catch |err| { 305 + log.err("database", "update deployment paused error: {}", .{err}); 306 + return err; 307 + }; 308 + return affected > 0; 309 + } 310 + 300 311 pub fn deleteById(id: []const u8) !bool { 301 312 const affected = backend.db.execWithRowCount( 302 313 "DELETE FROM deployment WHERE id = ?",
+2
src/services.zig
··· 6 6 pub const scheduler = @import("services/scheduler.zig"); 7 7 pub const lease_cleanup = @import("services/lease_cleanup.zig"); 8 8 pub const late_runs = @import("services/late_runs.zig"); 9 + pub const automations = @import("services/automations.zig"); 9 10 10 11 pub const Service = struct { 11 12 name: []const u8, ··· 19 20 .{ .name = "scheduler", .start = scheduler.start, .stop = scheduler.stop }, 20 21 .{ .name = "lease_cleanup", .start = lease_cleanup.start, .stop = lease_cleanup.stop }, 21 22 .{ .name = "late_runs", .start = late_runs.start, .stop = late_runs.stop }, 23 + .{ .name = "automations", .start = automations.start, .stop = automations.stop }, 22 24 }; 23 25 24 26 pub fn startAll() !void {
+189
src/services/automations.zig
··· 1 + const std = @import("std"); 2 + const Thread = std.Thread; 3 + const json = std.json; 4 + const log = @import("../logging.zig"); 5 + const broker = @import("../broker.zig"); 6 + const db = @import("../db/sqlite.zig"); 7 + const time_util = @import("../utilities/time.zig"); 8 + const events_api = @import("../api/events.zig"); 9 + 10 + const triggers = @import("automations/triggers.zig"); 11 + 12 + // Configuration 13 + const PROACTIVE_INTERVAL_MS: u64 = 10_000; 14 + const CONSUMER_GROUP = "automations"; 15 + 16 + // Service state 17 + var consumer_handle: ?broker.ConsumerHandle = null; 18 + var proactive_thread: ?Thread = null; 19 + var running: bool = false; 20 + var mutex: Thread.Mutex = .{}; 21 + 22 + // Automation cache 23 + var automations_mutex: Thread.Mutex = .{}; 24 + var automations_cache: ?[]triggers.CachedAutomation = null; 25 + var last_cache_refresh: i64 = 0; 26 + const CACHE_REFRESH_INTERVAL_MS: i64 = 30_000; 27 + 28 + pub fn start() !void { 29 + mutex.lock(); 30 + defer mutex.unlock(); 31 + 32 + if (running) return; 33 + running = true; 34 + 35 + log.info("automations", "starting service", .{}); 36 + 37 + refreshCache() catch |err| { 38 + log.err("automations", "failed to load initial cache: {}", .{err}); 39 + }; 40 + 41 + if (broker.getBroker()) |b| { 42 + consumer_handle = try b.subscribe(events_api.EVENTS_TOPIC, CONSUMER_GROUP, handleEvent); 43 + log.info("automations", "subscribed to events topic", .{}); 44 + } else { 45 + log.warn("automations", "broker not available, reactive triggers disabled", .{}); 46 + } 47 + 48 + proactive_thread = try Thread.spawn(.{}, proactiveLoop, .{}); 49 + } 50 + 51 + pub fn stop() void { 52 + mutex.lock(); 53 + const was_running = running; 54 + running = false; 55 + mutex.unlock(); 56 + 57 + if (!was_running) return; 58 + 59 + log.info("automations", "stopping service", .{}); 60 + 61 + if (consumer_handle) |handle| { 62 + if (broker.getBroker()) |b| { 63 + b.unsubscribe(handle); 64 + } 65 + consumer_handle = null; 66 + } 67 + 68 + if (proactive_thread) |t| { 69 + t.join(); 70 + proactive_thread = null; 71 + } 72 + 73 + automations_mutex.lock(); 74 + defer automations_mutex.unlock(); 75 + automations_cache = null; 76 + 77 + log.info("automations", "stopped", .{}); 78 + } 79 + 80 + /// Handle incoming event for reactive trigger evaluation 81 + fn handleEvent(msg: *const broker.Message) anyerror!void { 82 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 83 + defer arena.deinit(); 84 + const alloc = arena.allocator(); 85 + 86 + const parsed = json.parseFromSlice(json.Value, alloc, msg.data, .{}) catch { 87 + log.err("automations", "failed to parse event json", .{}); 88 + return error.ParseError; 89 + }; 90 + 91 + if (parsed.value != .object) return error.InvalidEvent; 92 + const event_obj = parsed.value.object; 93 + 94 + const event_name = if (event_obj.get("event")) |v| switch (v) { 95 + .string => |s| s, 96 + else => return error.MissingEventName, 97 + } else return error.MissingEventName; 98 + 99 + const event_id = if (event_obj.get("id")) |v| switch (v) { 100 + .string => |s| s, 101 + else => "unknown", 102 + } else "unknown"; 103 + 104 + log.debug("automations", "evaluating event {s}: {s}", .{ event_id, event_name }); 105 + 106 + maybeRefreshCache(); 107 + 108 + automations_mutex.lock(); 109 + const automations = automations_cache; 110 + automations_mutex.unlock(); 111 + 112 + if (automations) |autos| { 113 + for (autos) |automation| { 114 + if (!automation.enabled) continue; 115 + if (automation.trigger.posture != .reactive) continue; 116 + 117 + if (triggers.eventMatchesTrigger(event_name, automation.trigger)) { 118 + triggers.evaluateReactiveTrigger(alloc, automation, event_obj) catch |err| { 119 + log.err("automations", "error evaluating trigger for {s}: {}", .{ automation.name, err }); 120 + }; 121 + } 122 + } 123 + } 124 + } 125 + 126 + /// Proactive evaluation loop 127 + fn proactiveLoop() void { 128 + while (true) { 129 + mutex.lock(); 130 + const should_run = running; 131 + mutex.unlock(); 132 + 133 + if (!should_run) break; 134 + 135 + maybeRefreshCache(); 136 + 137 + automations_mutex.lock(); 138 + const cache = automations_cache; 139 + automations_mutex.unlock(); 140 + 141 + triggers.evaluateProactiveTriggers(cache) catch |err| { 142 + log.err("automations", "proactive evaluation error: {}", .{err}); 143 + }; 144 + 145 + Thread.sleep(PROACTIVE_INTERVAL_MS * std.time.ns_per_ms); 146 + } 147 + } 148 + 149 + /// Maybe refresh the automation cache 150 + fn maybeRefreshCache() void { 151 + const now = std.time.milliTimestamp(); 152 + if (now - last_cache_refresh < CACHE_REFRESH_INTERVAL_MS) return; 153 + refreshCache() catch |err| { 154 + log.err("automations", "failed to refresh cache: {}", .{err}); 155 + }; 156 + } 157 + 158 + /// Refresh automation cache from database 159 + fn refreshCache() !void { 160 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 161 + errdefer arena.deinit(); 162 + const alloc = arena.allocator(); 163 + 164 + const rows = try db.automations.listEnabled(alloc); 165 + 166 + var cached: std.ArrayList(triggers.CachedAutomation) = .empty; 167 + 168 + for (rows) |row| { 169 + const parsed_trigger = triggers.parseTrigger(alloc, row.trigger) orelse continue; 170 + 171 + try cached.append(alloc, triggers.CachedAutomation{ 172 + .id = try alloc.dupe(u8, row.id), 173 + .name = try alloc.dupe(u8, row.name), 174 + .enabled = row.enabled, 175 + .trigger = parsed_trigger, 176 + .actions = try alloc.dupe(u8, row.actions), 177 + .actions_on_trigger = try alloc.dupe(u8, row.actions_on_trigger), 178 + .actions_on_resolve = try alloc.dupe(u8, row.actions_on_resolve), 179 + }); 180 + } 181 + 182 + automations_mutex.lock(); 183 + defer automations_mutex.unlock(); 184 + 185 + automations_cache = try cached.toOwnedSlice(alloc); 186 + last_cache_refresh = std.time.milliTimestamp(); 187 + 188 + log.debug("automations", "refreshed cache: {} automations loaded", .{automations_cache.?.len}); 189 + }
+187
src/services/automations/actions.zig
··· 1 + const std = @import("std"); 2 + const json = std.json; 3 + const log = @import("../../logging.zig"); 4 + const db = @import("../../db/sqlite.zig"); 5 + const time_util = @import("../../utilities/time.zig"); 6 + const uuid_util = @import("../../utilities/uuid.zig"); 7 + 8 + /// Execute automation actions 9 + pub fn executeActions( 10 + alloc: std.mem.Allocator, 11 + automation_name: []const u8, 12 + actions_json: []const u8, 13 + event_obj: json.ObjectMap, 14 + ) void { 15 + _ = event_obj; 16 + 17 + const actions_parsed = json.parseFromSlice(json.Value, alloc, actions_json, .{}) catch { 18 + log.err("automations", "failed to parse actions for {s}", .{automation_name}); 19 + return; 20 + }; 21 + 22 + if (actions_parsed.value != .array) return; 23 + 24 + for (actions_parsed.value.array.items) |action_val| { 25 + if (action_val != .object) continue; 26 + const action = action_val.object; 27 + 28 + const action_type = if (action.get("type")) |v| switch (v) { 29 + .string => |s| s, 30 + else => continue, 31 + } else continue; 32 + 33 + log.info("automations", "executing action type '{s}' for automation '{s}'", .{ action_type, automation_name }); 34 + 35 + executeAction(alloc, action_type, action) catch |err| { 36 + log.err("automations", "action '{s}' failed: {}", .{ action_type, err }); 37 + }; 38 + } 39 + } 40 + 41 + /// Execute a single action 42 + fn executeAction( 43 + alloc: std.mem.Allocator, 44 + action_type: []const u8, 45 + action: json.ObjectMap, 46 + ) !void { 47 + if (std.mem.eql(u8, action_type, "do-nothing")) { 48 + return; 49 + } 50 + 51 + if (std.mem.eql(u8, action_type, "run-deployment")) { 52 + const deployment_id = if (action.get("deployment_id")) |v| switch (v) { 53 + .string => |s| s, 54 + else => null, 55 + } else null; 56 + 57 + if (deployment_id) |dep_id| { 58 + try runDeployment(alloc, dep_id); 59 + } 60 + return; 61 + } 62 + 63 + if (std.mem.eql(u8, action_type, "pause-deployment")) { 64 + const deployment_id = if (action.get("deployment_id")) |v| switch (v) { 65 + .string => |s| s, 66 + else => null, 67 + } else null; 68 + 69 + if (deployment_id) |dep_id| { 70 + try setDeploymentPaused(dep_id, true); 71 + } 72 + return; 73 + } 74 + 75 + if (std.mem.eql(u8, action_type, "resume-deployment")) { 76 + const deployment_id = if (action.get("deployment_id")) |v| switch (v) { 77 + .string => |s| s, 78 + else => null, 79 + } else null; 80 + 81 + if (deployment_id) |dep_id| { 82 + try setDeploymentPaused(dep_id, false); 83 + } 84 + return; 85 + } 86 + 87 + if (std.mem.eql(u8, action_type, "cancel-flow-run")) { 88 + const flow_run_id = if (action.get("flow_run_id")) |v| switch (v) { 89 + .string => |s| s, 90 + else => null, 91 + } else null; 92 + 93 + if (flow_run_id) |run_id| { 94 + try cancelFlowRun(run_id); 95 + } 96 + return; 97 + } 98 + 99 + if (std.mem.eql(u8, action_type, "pause-automation")) { 100 + const automation_id = if (action.get("automation_id")) |v| switch (v) { 101 + .string => |s| s, 102 + else => null, 103 + } else null; 104 + 105 + if (automation_id) |auto_id| { 106 + var ts_buf: [32]u8 = undefined; 107 + const now = time_util.timestamp(&ts_buf); 108 + _ = db.automations.updateEnabled(auto_id, false, now) catch false; 109 + } 110 + return; 111 + } 112 + 113 + if (std.mem.eql(u8, action_type, "resume-automation")) { 114 + const automation_id = if (action.get("automation_id")) |v| switch (v) { 115 + .string => |s| s, 116 + else => null, 117 + } else null; 118 + 119 + if (automation_id) |auto_id| { 120 + var ts_buf: [32]u8 = undefined; 121 + const now = time_util.timestamp(&ts_buf); 122 + _ = db.automations.updateEnabled(auto_id, true, now) catch false; 123 + } 124 + return; 125 + } 126 + 127 + log.warn("automations", "unsupported action type: {s}", .{action_type}); 128 + } 129 + 130 + /// Run a deployment by creating a new flow run 131 + fn runDeployment(alloc: std.mem.Allocator, deployment_id: []const u8) !void { 132 + const deployment = db.deployments.getById(alloc, deployment_id) catch null orelse { 133 + log.warn("automations", "deployment not found: {s}", .{deployment_id}); 134 + return; 135 + }; 136 + 137 + var id_buf: [36]u8 = undefined; 138 + const run_id = uuid_util.generate(&id_buf); 139 + 140 + var name_buf: [64]u8 = undefined; 141 + const run_name = std.fmt.bufPrint(&name_buf, "automation-{s}", .{run_id[0..8]}) catch "automation-run"; 142 + 143 + var ts_buf: [32]u8 = undefined; 144 + const now = time_util.timestamp(&ts_buf); 145 + 146 + db.flow_runs.insert(run_id, deployment.flow_id, run_name, "PENDING", "Pending", now, .{ 147 + .deployment_id = deployment_id, 148 + .deployment_version = deployment.version, 149 + .work_queue_name = deployment.work_queue_name, 150 + .work_queue_id = deployment.work_queue_id, 151 + .parameters = deployment.parameters, 152 + }) catch |err| { 153 + log.err("automations", "failed to create flow run: {}", .{err}); 154 + return err; 155 + }; 156 + 157 + log.info("automations", "created flow run {s} for deployment {s}", .{ run_id, deployment_id }); 158 + } 159 + 160 + /// Set deployment paused state 161 + fn setDeploymentPaused(deployment_id: []const u8, paused: bool) !void { 162 + var ts_buf: [32]u8 = undefined; 163 + const now = time_util.timestamp(&ts_buf); 164 + 165 + _ = db.deployments.updatePaused(deployment_id, paused, now) catch |err| { 166 + log.err("automations", "failed to update deployment paused state: {}", .{err}); 167 + return err; 168 + }; 169 + 170 + log.info("automations", "set deployment {s} paused={}", .{ deployment_id, paused }); 171 + } 172 + 173 + /// Cancel a flow run 174 + fn cancelFlowRun(flow_run_id: []const u8) !void { 175 + var ts_buf: [32]u8 = undefined; 176 + const now = time_util.timestamp(&ts_buf); 177 + 178 + var state_id_buf: [36]u8 = undefined; 179 + const state_id = uuid_util.generate(&state_id_buf); 180 + 181 + db.flow_runs.setState(flow_run_id, state_id, "CANCELLED", "Cancelled", now, null, now, 0, 0.0, null, null) catch |err| { 182 + log.err("automations", "failed to cancel flow run: {}", .{err}); 183 + return err; 184 + }; 185 + 186 + log.info("automations", "cancelled flow run {s}", .{flow_run_id}); 187 + }
+229
src/services/automations/triggers.zig
··· 1 + const std = @import("std"); 2 + const json = std.json; 3 + const log = @import("../../logging.zig"); 4 + const db = @import("../../db/sqlite.zig"); 5 + const time_util = @import("../../utilities/time.zig"); 6 + const uuid_util = @import("../../utilities/uuid.zig"); 7 + const actions = @import("actions.zig"); 8 + 9 + pub const TriggerType = enum { event, compound, sequence }; 10 + pub const Posture = enum { reactive, proactive }; 11 + 12 + pub const ParsedTrigger = struct { 13 + trigger_type: TriggerType, 14 + posture: Posture, 15 + threshold: i64, 16 + within_seconds: i64, 17 + match_patterns: []const u8, 18 + for_each: []const u8, 19 + }; 20 + 21 + pub const CachedAutomation = struct { 22 + id: []const u8, 23 + name: []const u8, 24 + enabled: bool, 25 + trigger: ParsedTrigger, 26 + actions: []const u8, 27 + actions_on_trigger: []const u8, 28 + actions_on_resolve: []const u8, 29 + }; 30 + 31 + /// Check if event name matches trigger patterns 32 + pub fn eventMatchesTrigger(event_name: []const u8, trigger: ParsedTrigger) bool { 33 + if (trigger.match_patterns.len == 0) return true; 34 + if (std.mem.eql(u8, trigger.match_patterns, "*")) return true; 35 + 36 + // Wildcard prefix matching 37 + if (trigger.match_patterns.len > 1 and trigger.match_patterns[trigger.match_patterns.len - 1] == '*') { 38 + const prefix = trigger.match_patterns[0 .. trigger.match_patterns.len - 1]; 39 + return std.mem.startsWith(u8, event_name, prefix); 40 + } 41 + 42 + return std.mem.eql(u8, event_name, trigger.match_patterns); 43 + } 44 + 45 + /// Evaluate a reactive trigger for an event 46 + pub fn evaluateReactiveTrigger( 47 + alloc: std.mem.Allocator, 48 + automation: CachedAutomation, 49 + event_obj: json.ObjectMap, 50 + ) !void { 51 + const trigger = automation.trigger; 52 + 53 + var key_buf: [256]u8 = undefined; 54 + const bucketing_key = generateBucketingKey(&key_buf, trigger.for_each, event_obj); 55 + 56 + var bucket = try db.automations.getBucket(alloc, automation.id, automation.id, bucketing_key); 57 + 58 + var ts_buf: [32]u8 = undefined; 59 + const now = time_util.timestamp(&ts_buf); 60 + const now_micros = time_util.nowMicros(); 61 + 62 + if (bucket == null) { 63 + var bucket_id_buf: [36]u8 = undefined; 64 + const bucket_id = uuid_util.generate(&bucket_id_buf); 65 + 66 + const window_micros = trigger.within_seconds * 1_000_000; 67 + const end_micros = now_micros + window_micros; 68 + 69 + var end_buf: [32]u8 = undefined; 70 + const end_time = time_util.formatMicros(&end_buf, end_micros); 71 + 72 + try db.automations.insertBucket(bucket_id, automation.id, automation.id, bucketing_key, now, end_time); 73 + bucket = try db.automations.getBucket(alloc, automation.id, automation.id, bucketing_key); 74 + } 75 + 76 + if (bucket) |b| { 77 + const bucket_end = time_util.parse(b.end_time) orelse now_micros; 78 + if (bucket_end < now_micros) { 79 + const window_micros = trigger.within_seconds * 1_000_000; 80 + const new_end_micros = now_micros + window_micros; 81 + 82 + var new_end_buf: [32]u8 = undefined; 83 + const new_end = time_util.formatMicros(&new_end_buf, new_end_micros); 84 + 85 + try db.automations.resetBucket(automation.id, automation.id, bucketing_key, now, new_end); 86 + } 87 + 88 + const event_json = std.fmt.allocPrint(alloc, "{f}", .{json.fmt(json.Value{ .object = event_obj }, .{})}) catch "{}"; 89 + 90 + const new_count = try db.automations.incrementBucket( 91 + automation.id, 92 + automation.id, 93 + bucketing_key, 94 + event_json, 95 + now, 96 + ); 97 + 98 + log.debug("automations", "{s}: bucket count = {d}, threshold = {d}", .{ automation.name, new_count, trigger.threshold }); 99 + 100 + if (new_count >= trigger.threshold) { 101 + if (b.triggered_at == null) { 102 + log.info("automations", "FIRING automation '{s}' (count={d} >= threshold={d})", .{ automation.name, new_count, trigger.threshold }); 103 + try db.automations.markBucketTriggered(automation.id, automation.id, bucketing_key, now); 104 + actions.executeActions(alloc, automation.name, automation.actions, event_obj); 105 + } 106 + } 107 + } 108 + } 109 + 110 + /// Evaluate proactive triggers (fire when event count < threshold after window expires) 111 + pub fn evaluateProactiveTriggers(automations_cache: ?[]CachedAutomation) !void { 112 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 113 + defer arena.deinit(); 114 + const alloc = arena.allocator(); 115 + 116 + const automations = automations_cache orelse return; 117 + 118 + var ts_buf: [32]u8 = undefined; 119 + const now = time_util.timestamp(&ts_buf); 120 + const now_micros = time_util.nowMicros(); 121 + 122 + for (automations) |automation| { 123 + if (!automation.enabled) continue; 124 + if (automation.trigger.posture != .proactive) continue; 125 + 126 + const bucketing_key = ""; 127 + const bucket = db.automations.getBucket(alloc, automation.id, automation.id, bucketing_key) catch null; 128 + 129 + if (bucket) |b| { 130 + const bucket_end = time_util.parse(b.end_time) orelse now_micros; 131 + 132 + if (bucket_end <= now_micros and b.triggered_at == null) { 133 + if (b.count < automation.trigger.threshold) { 134 + log.info("automations", "FIRING proactive automation '{s}' (count={d} < threshold={d}, bucket expired)", .{ automation.name, b.count, automation.trigger.threshold }); 135 + db.automations.markBucketTriggered(automation.id, automation.id, bucketing_key, now) catch {}; 136 + actions.executeActions(alloc, automation.name, automation.actions, json.ObjectMap.init(alloc)); 137 + } 138 + 139 + const window_micros = automation.trigger.within_seconds * 1_000_000; 140 + const new_end_micros = now_micros + window_micros; 141 + var new_end_buf: [32]u8 = undefined; 142 + const new_end = time_util.formatMicros(&new_end_buf, new_end_micros); 143 + db.automations.resetBucket(automation.id, automation.id, bucketing_key, now, new_end) catch {}; 144 + } 145 + } else { 146 + var bucket_id_buf: [36]u8 = undefined; 147 + const bucket_id = uuid_util.generate(&bucket_id_buf); 148 + 149 + const window_micros = automation.trigger.within_seconds * 1_000_000; 150 + const end_micros = now_micros + window_micros; 151 + var end_buf: [32]u8 = undefined; 152 + const end_time = time_util.formatMicros(&end_buf, end_micros); 153 + 154 + db.automations.insertBucket(bucket_id, automation.id, automation.id, bucketing_key, now, end_time) catch {}; 155 + } 156 + } 157 + } 158 + 159 + /// Parse trigger JSON into ParsedTrigger 160 + pub fn parseTrigger(alloc: std.mem.Allocator, trigger_json: []const u8) ?ParsedTrigger { 161 + const trigger_parsed = json.parseFromSlice(json.Value, alloc, trigger_json, .{}) catch return null; 162 + if (trigger_parsed.value != .object) return null; 163 + const trigger_obj = trigger_parsed.value.object; 164 + 165 + const trigger_type_str = if (trigger_obj.get("type")) |v| switch (v) { 166 + .string => |s| s, 167 + else => "event", 168 + } else "event"; 169 + 170 + const trigger_type: TriggerType = if (std.mem.eql(u8, trigger_type_str, "compound")) 171 + .compound 172 + else if (std.mem.eql(u8, trigger_type_str, "sequence")) 173 + .sequence 174 + else 175 + .event; 176 + 177 + const posture_str = if (trigger_obj.get("posture")) |v| switch (v) { 178 + .string => |s| s, 179 + else => "Reactive", 180 + } else "Reactive"; 181 + 182 + const posture: Posture = if (std.mem.eql(u8, posture_str, "Proactive") or std.mem.eql(u8, posture_str, "proactive")) 183 + .proactive 184 + else 185 + .reactive; 186 + 187 + const threshold: i64 = if (trigger_obj.get("threshold")) |v| switch (v) { 188 + .integer => |i| i, 189 + else => 1, 190 + } else 1; 191 + 192 + const within: i64 = if (trigger_obj.get("within")) |v| switch (v) { 193 + .integer => |i| i, 194 + .float => |f| @intFromFloat(f), 195 + else => 60, 196 + } else 60; 197 + 198 + const match_patterns = if (trigger_obj.get("expect")) |v| switch (v) { 199 + .string => |s| s, 200 + .array => blk: { 201 + if (v.array.items.len > 0) { 202 + if (v.array.items[0] == .string) { 203 + break :blk v.array.items[0].string; 204 + } 205 + } 206 + break :blk "*"; 207 + }, 208 + else => "*", 209 + } else if (trigger_obj.get("match")) |v| switch (v) { 210 + .string => |s| s, 211 + else => "*", 212 + } else "*"; 213 + 214 + return ParsedTrigger{ 215 + .trigger_type = trigger_type, 216 + .posture = posture, 217 + .threshold = threshold, 218 + .within_seconds = within, 219 + .match_patterns = alloc.dupe(u8, match_patterns) catch return null, 220 + .for_each = "[]", 221 + }; 222 + } 223 + 224 + /// Generate bucketing key from for_each labels 225 + fn generateBucketingKey(buf: *[256]u8, for_each: []const u8, event_obj: json.ObjectMap) []const u8 { 226 + _ = for_each; 227 + _ = event_obj; 228 + return std.fmt.bufPrint(buf, "", .{}) catch ""; 229 + }