prefect server in zig

add automations API, fix JSON serialization with std.json.Stringify

- add 005_automations migration (automation, automation_bucket tables)
- add db/automations.zig with CRUD and bucket operations
- add api/automations.zig endpoints (create, get, list, delete, patch)
- wire automations route in routes.zig
- fix api/logs.zig JSON building to use std.json.Stringify
- fix api/automations.zig to use std.json.Stringify with beginWriteRaw
for raw JSON passthrough of pre-serialized fields

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+796 -53
+369
src/api/automations.zig
···
··· 1 + const std = @import("std"); 2 + const zap = @import("zap"); 3 + const db = @import("../db/sqlite.zig"); 4 + const json_util = @import("../utilities/json.zig"); 5 + const uuid_util = @import("../utilities/uuid.zig"); 6 + const time_util = @import("../utilities/time.zig"); 7 + const logging = @import("../logging.zig"); 8 + 9 + /// Serialize a std.json.Value to a string using Stringify 10 + fn serializeJsonValue(alloc: std.mem.Allocator, value: std.json.Value) ![]const u8 { 11 + var output: std.io.Writer.Allocating = .init(alloc); 12 + errdefer output.deinit(); 13 + var jw: std.json.Stringify = .{ .writer = &output.writer }; 14 + 15 + try jw.write(value); 16 + 17 + return output.toOwnedSlice(); 18 + } 19 + 20 + pub fn handle(r: zap.Request) !void { 21 + const target = r.path orelse "/"; 22 + const method = r.method orelse "GET"; 23 + 24 + // POST /automations/filter - list/filter automations 25 + if (std.mem.endsWith(u8, target, "/filter") and std.mem.eql(u8, method, "POST")) { 26 + try filterAutomations(r); 27 + return; 28 + } 29 + 30 + // POST /automations/count - count automations 31 + if (std.mem.endsWith(u8, target, "/count") and std.mem.eql(u8, method, "POST")) { 32 + try countAutomations(r); 33 + return; 34 + } 35 + 36 + // Extract ID from path for single-resource operations 37 + const id = extractId(target); 38 + 39 + if (id) |automation_id| { 40 + // GET /automations/{id} - get single automation 41 + if (std.mem.eql(u8, method, "GET")) { 42 + try getAutomation(r, automation_id); 43 + return; 44 + } 45 + // DELETE /automations/{id} - delete automation 46 + if (std.mem.eql(u8, method, "DELETE")) { 47 + try deleteAutomation(r, automation_id); 48 + return; 49 + } 50 + // PATCH /automations/{id} - partial update (enabled only) 51 + if (std.mem.eql(u8, method, "PATCH")) { 52 + try patchAutomation(r, automation_id); 53 + return; 54 + } 55 + } 56 + 57 + // POST /automations/ - create automation 58 + if (std.mem.eql(u8, method, "POST")) { 59 + try createAutomation(r); 60 + return; 61 + } 62 + 63 + json_util.sendStatus(r, "{\"detail\":\"not found\"}", .not_found); 64 + } 65 + 66 + fn extractId(target: []const u8) ?[]const u8 { 67 + // Handle /api/automations/{id} or /automations/{id} 68 + const prefix1 = "/api/automations/"; 69 + const prefix2 = "/automations/"; 70 + 71 + var rest: []const u8 = undefined; 72 + if (std.mem.startsWith(u8, target, prefix1)) { 73 + rest = target[prefix1.len..]; 74 + } else if (std.mem.startsWith(u8, target, prefix2)) { 75 + rest = target[prefix2.len..]; 76 + } else { 77 + return null; 78 + } 79 + 80 + // Check if it's an ID (not a sub-path like "filter" or "count") 81 + if (rest.len == 0) return null; 82 + if (std.mem.eql(u8, rest, "filter") or std.mem.eql(u8, rest, "count")) return null; 83 + 84 + // Remove trailing slash if present 85 + if (rest.len > 0 and rest[rest.len - 1] == '/') { 86 + rest = rest[0 .. rest.len - 1]; 87 + } 88 + 89 + return if (rest.len > 0) rest else null; 90 + } 91 + 92 + fn createAutomation(r: zap.Request) !void { 93 + const body = r.body orelse { 94 + json_util.sendStatus(r, "{\"detail\":\"missing body\"}", .bad_request); 95 + return; 96 + }; 97 + 98 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 99 + defer arena.deinit(); 100 + const alloc = arena.allocator(); 101 + 102 + const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch { 103 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 104 + return; 105 + }; 106 + 107 + if (parsed.value != .object) { 108 + json_util.sendStatus(r, "{\"detail\":\"expected object\"}", .bad_request); 109 + return; 110 + } 111 + 112 + const obj = parsed.value.object; 113 + 114 + // Required fields 115 + const name = if (obj.get("name")) |v| (if (v == .string) v.string else null) else null; 116 + const trigger = obj.get("trigger"); 117 + const actions = obj.get("actions"); 118 + 119 + if (name == null or trigger == null or actions == null) { 120 + json_util.sendStatus(r, "{\"detail\":\"name, trigger, and actions are required\"}", .bad_request); 121 + return; 122 + } 123 + 124 + // Optional fields 125 + const description = if (obj.get("description")) |v| (if (v == .string) v.string else "") else ""; 126 + const enabled = if (obj.get("enabled")) |v| (if (v == .bool) v.bool else true) else true; 127 + 128 + // Serialize JSON fields 129 + const tags_json = if (obj.get("tags")) |v| 130 + serializeJsonValue(alloc, v) catch "[]" 131 + else 132 + "[]"; 133 + const trigger_json = serializeJsonValue(alloc, trigger.?) catch { 134 + json_util.sendStatus(r, "{\"detail\":\"invalid trigger\"}", .bad_request); 135 + return; 136 + }; 137 + const actions_json = serializeJsonValue(alloc, actions.?) catch { 138 + json_util.sendStatus(r, "{\"detail\":\"invalid actions\"}", .bad_request); 139 + return; 140 + }; 141 + const actions_on_trigger_json = if (obj.get("actions_on_trigger")) |v| 142 + serializeJsonValue(alloc, v) catch "[]" 143 + else 144 + "[]"; 145 + const actions_on_resolve_json = if (obj.get("actions_on_resolve")) |v| 146 + serializeJsonValue(alloc, v) catch "[]" 147 + else 148 + "[]"; 149 + 150 + // Generate ID and timestamp 151 + var id_buf: [36]u8 = undefined; 152 + const id = uuid_util.generate(&id_buf); 153 + var ts_buf: [32]u8 = undefined; 154 + const created = time_util.timestamp(&ts_buf); 155 + 156 + db.automations.insert( 157 + id, 158 + name.?, 159 + description, 160 + enabled, 161 + tags_json, 162 + trigger_json, 163 + actions_json, 164 + actions_on_trigger_json, 165 + actions_on_resolve_json, 166 + created, 167 + ) catch { 168 + json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 169 + return; 170 + }; 171 + 172 + // Fetch and return created automation 173 + const row = db.automations.getById(alloc, id) catch { 174 + json_util.sendStatus(r, "{\"detail\":\"fetch failed\"}", .internal_server_error); 175 + return; 176 + }; 177 + 178 + if (row) |automation| { 179 + const resp = buildAutomationJson(alloc, automation) catch { 180 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 181 + return; 182 + }; 183 + json_util.sendStatus(r, resp, .created); 184 + } else { 185 + json_util.sendStatus(r, "{\"detail\":\"automation not found after insert\"}", .internal_server_error); 186 + } 187 + } 188 + 189 + fn getAutomation(r: zap.Request, id: []const u8) !void { 190 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 191 + defer arena.deinit(); 192 + const alloc = arena.allocator(); 193 + 194 + const row = db.automations.getById(alloc, id) catch { 195 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 196 + return; 197 + }; 198 + 199 + if (row) |automation| { 200 + const resp = buildAutomationJson(alloc, automation) catch { 201 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 202 + return; 203 + }; 204 + json_util.sendStatus(r, resp, .ok); 205 + } else { 206 + json_util.sendStatus(r, "{\"detail\":\"automation not found\"}", .not_found); 207 + } 208 + } 209 + 210 + fn deleteAutomation(r: zap.Request, id: []const u8) !void { 211 + const deleted = db.automations.deleteById(id) catch { 212 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 213 + return; 214 + }; 215 + 216 + if (deleted) { 217 + json_util.sendStatus(r, "", .no_content); 218 + } else { 219 + json_util.sendStatus(r, "{\"detail\":\"automation not found\"}", .not_found); 220 + } 221 + } 222 + 223 + fn patchAutomation(r: zap.Request, id: []const u8) !void { 224 + const body = r.body orelse { 225 + json_util.sendStatus(r, "{\"detail\":\"missing body\"}", .bad_request); 226 + return; 227 + }; 228 + 229 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 230 + defer arena.deinit(); 231 + const alloc = arena.allocator(); 232 + 233 + const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch { 234 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 235 + return; 236 + }; 237 + 238 + if (parsed.value != .object) { 239 + json_util.sendStatus(r, "{\"detail\":\"expected object\"}", .bad_request); 240 + return; 241 + } 242 + 243 + const obj = parsed.value.object; 244 + 245 + // Only enabled field is supported for PATCH 246 + const enabled = if (obj.get("enabled")) |v| (if (v == .bool) v.bool else null) else null; 247 + 248 + if (enabled == null) { 249 + json_util.sendStatus(r, "{\"detail\":\"enabled field required\"}", .bad_request); 250 + return; 251 + } 252 + 253 + var ts_buf: [32]u8 = undefined; 254 + const updated = time_util.timestamp(&ts_buf); 255 + 256 + const success = db.automations.updateEnabled(id, enabled.?, updated) catch { 257 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 258 + return; 259 + }; 260 + 261 + if (success) { 262 + json_util.sendStatus(r, "", .no_content); 263 + } else { 264 + json_util.sendStatus(r, "{\"detail\":\"automation not found\"}", .not_found); 265 + } 266 + } 267 + 268 + fn filterAutomations(r: zap.Request) !void { 269 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 270 + defer arena.deinit(); 271 + const alloc = arena.allocator(); 272 + 273 + // Parse body for limit/offset 274 + var limit: usize = 200; 275 + var offset: usize = 0; 276 + 277 + if (r.body) |body| { 278 + const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch null; 279 + if (parsed) |p| { 280 + if (p.value == .object) { 281 + const obj = p.value.object; 282 + if (obj.get("limit")) |v| { 283 + if (v == .integer) limit = @intCast(@max(1, @min(10000, v.integer))); 284 + } 285 + if (obj.get("offset")) |v| { 286 + if (v == .integer) offset = @intCast(@max(0, v.integer)); 287 + } 288 + } 289 + } 290 + } 291 + 292 + const rows = db.automations.list(alloc, limit, offset) catch { 293 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 294 + return; 295 + }; 296 + 297 + // Build JSON array 298 + var json_buf = std.ArrayListUnmanaged(u8){}; 299 + const writer = json_buf.writer(alloc); 300 + 301 + try writer.writeByte('['); 302 + for (rows, 0..) |automation, i| { 303 + if (i > 0) try writer.writeByte(','); 304 + const item_json = buildAutomationJson(alloc, automation) catch continue; 305 + try writer.writeAll(item_json); 306 + } 307 + try writer.writeByte(']'); 308 + 309 + json_util.sendStatus(r, json_buf.items, .ok); 310 + } 311 + 312 + fn countAutomations(r: zap.Request) !void { 313 + const cnt = db.automations.count() catch 0; 314 + 315 + var buf: [32]u8 = undefined; 316 + const count_str = std.fmt.bufPrint(&buf, "{d}", .{cnt}) catch "0"; 317 + 318 + json_util.sendStatus(r, count_str, .ok); 319 + } 320 + 321 + fn buildAutomationJson(alloc: std.mem.Allocator, automation: db.automations.AutomationRow) ![]const u8 { 322 + var output: std.io.Writer.Allocating = .init(alloc); 323 + errdefer output.deinit(); 324 + var jw: std.json.Stringify = .{ .writer = &output.writer }; 325 + 326 + try jw.beginObject(); 327 + try jw.objectField("id"); 328 + try jw.write(automation.id); 329 + try jw.objectField("created"); 330 + try jw.write(automation.created); 331 + try jw.objectField("updated"); 332 + try jw.write(automation.updated); 333 + try jw.objectField("name"); 334 + try jw.write(automation.name); 335 + try jw.objectField("description"); 336 + try jw.write(automation.description); 337 + try jw.objectField("enabled"); 338 + try jw.write(automation.enabled); 339 + 340 + // raw JSON passthrough for pre-serialized fields 341 + try jw.objectField("tags"); 342 + try jw.beginWriteRaw(); 343 + try jw.writer.writeAll(automation.tags); 344 + jw.endWriteRaw(); 345 + 346 + try jw.objectField("trigger"); 347 + try jw.beginWriteRaw(); 348 + try jw.writer.writeAll(automation.trigger); 349 + jw.endWriteRaw(); 350 + 351 + try jw.objectField("actions"); 352 + try jw.beginWriteRaw(); 353 + try jw.writer.writeAll(automation.actions); 354 + jw.endWriteRaw(); 355 + 356 + try jw.objectField("actions_on_trigger"); 357 + try jw.beginWriteRaw(); 358 + try jw.writer.writeAll(automation.actions_on_trigger); 359 + jw.endWriteRaw(); 360 + 361 + try jw.objectField("actions_on_resolve"); 362 + try jw.beginWriteRaw(); 363 + try jw.writer.writeAll(automation.actions_on_resolve); 364 + jw.endWriteRaw(); 365 + 366 + try jw.endObject(); 367 + 368 + return output.toOwnedSlice(); 369 + }
+28 -53
src/api/logs.zig
··· 11 r.sendBody(body) catch {}; 12 } 13 14 - /// Write JSON-escaped string (without quotes) 15 - fn writeJsonString(writer: anytype, s: []const u8) !void { 16 - for (s) |c| { 17 - switch (c) { 18 - '"' => try writer.writeAll("\\\""), 19 - '\\' => try writer.writeAll("\\\\"), 20 - '\n' => try writer.writeAll("\\n"), 21 - '\r' => try writer.writeAll("\\r"), 22 - '\t' => try writer.writeAll("\\t"), 23 - 0x00...0x08, 0x0b, 0x0c, 0x0e...0x1f => { 24 - var buf: [6]u8 = undefined; 25 - const slice = std.fmt.bufPrint(&buf, "\\u{x:0>4}", .{c}) catch unreachable; 26 - try writer.writeAll(slice); 27 - }, 28 - else => try writer.writeByte(c), 29 - } 30 - } 31 - } 32 - 33 pub fn handle(r: zap.Request) !void { 34 const target = r.path orelse "/"; 35 const method = r.method orelse "GET"; ··· 216 return; 217 }; 218 219 - // build JSON response 220 - var json_buf = std.ArrayListUnmanaged(u8){}; 221 - const writer = json_buf.writer(alloc); 222 - 223 - try writer.writeByte('['); 224 - for (rows, 0..) |log_row, i| { 225 - if (i > 0) try writer.writeByte(','); 226 - try writer.print( 227 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","level":{d},"message":" 228 - , .{ log_row.id, log_row.created, log_row.updated, log_row.name, log_row.level }); 229 - 230 - // escape message for JSON 231 - try writeJsonString(writer, log_row.message); 232 - 233 - try writer.print( 234 - \\","timestamp":"{s}" 235 - , .{log_row.timestamp}); 236 - 237 - // flow_run_id 238 - if (log_row.flow_run_id) |fid| { 239 - try writer.print(",\"flow_run_id\":\"{s}\"", .{fid}); 240 - } else { 241 - try writer.writeAll(",\"flow_run_id\":null"); 242 - } 243 - 244 - // task_run_id 245 - if (log_row.task_run_id) |tid| { 246 - try writer.print(",\"task_run_id\":\"{s}\"", .{tid}); 247 - } else { 248 - try writer.writeAll(",\"task_run_id\":null"); 249 - } 250 251 - try writer.writeByte('}'); 252 } 253 - try writer.writeByte(']'); 254 255 - sendJson(r, json_buf.items); 256 }
··· 11 r.sendBody(body) catch {}; 12 } 13 14 pub fn handle(r: zap.Request) !void { 15 const target = r.path orelse "/"; 16 const method = r.method orelse "GET"; ··· 197 return; 198 }; 199 200 + // build JSON response using std.json.Stringify 201 + var output: std.io.Writer.Allocating = .init(alloc); 202 + errdefer output.deinit(); 203 + var jw: std.json.Stringify = .{ .writer = &output.writer }; 204 205 + try jw.beginArray(); 206 + for (rows) |log_row| { 207 + try jw.beginObject(); 208 + try jw.objectField("id"); 209 + try jw.write(log_row.id); 210 + try jw.objectField("created"); 211 + try jw.write(log_row.created); 212 + try jw.objectField("updated"); 213 + try jw.write(log_row.updated); 214 + try jw.objectField("name"); 215 + try jw.write(log_row.name); 216 + try jw.objectField("level"); 217 + try jw.write(log_row.level); 218 + try jw.objectField("message"); 219 + try jw.write(log_row.message); 220 + try jw.objectField("timestamp"); 221 + try jw.write(log_row.timestamp); 222 + try jw.objectField("flow_run_id"); 223 + try jw.write(log_row.flow_run_id); 224 + try jw.objectField("task_run_id"); 225 + try jw.write(log_row.task_run_id); 226 + try jw.endObject(); 227 } 228 + try jw.endArray(); 229 230 + sendJson(r, try output.toOwnedSlice()); 231 }
+3
src/api/routes.zig
··· 17 pub const concurrency_limits_v2 = @import("concurrency_limits_v2.zig"); 18 pub const flow_run_states = @import("flow_run_states.zig"); 19 pub const task_run_states = @import("task_run_states.zig"); 20 21 pub fn handle(r: zap.Request) !void { 22 const target = r.path orelse "/"; ··· 66 { 67 // HTTP events endpoints (not websocket /events/in or /events/out) 68 try events_api.handle(r); 69 } else { 70 try sendNotFound(r); 71 }
··· 17 pub const concurrency_limits_v2 = @import("concurrency_limits_v2.zig"); 18 pub const flow_run_states = @import("flow_run_states.zig"); 19 pub const task_run_states = @import("task_run_states.zig"); 20 + pub const automations = @import("automations.zig"); 21 22 pub fn handle(r: zap.Request) !void { 23 const target = r.path orelse "/"; ··· 67 { 68 // HTTP events endpoints (not websocket /events/in or /events/out) 69 try events_api.handle(r); 70 + } else if (std.mem.startsWith(u8, target, "/api/automations") or std.mem.startsWith(u8, target, "/automations")) { 71 + try automations.handle(r); 72 } else { 73 try sendNotFound(r); 74 }
+284
src/db/automations.zig
···
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + 4 + const backend = @import("backend.zig"); 5 + const log = @import("../logging.zig"); 6 + 7 + pub const AutomationRow = struct { 8 + id: []const u8, 9 + created: []const u8, 10 + updated: []const u8, 11 + name: []const u8, 12 + description: []const u8, 13 + enabled: bool, 14 + tags: []const u8, 15 + trigger: []const u8, 16 + actions: []const u8, 17 + actions_on_trigger: []const u8, 18 + actions_on_resolve: []const u8, 19 + }; 20 + 21 + const Col = struct { 22 + const id: usize = 0; 23 + const created: usize = 1; 24 + const updated: usize = 2; 25 + const name: usize = 3; 26 + const description: usize = 4; 27 + const enabled: usize = 5; 28 + const tags: usize = 6; 29 + const trigger: usize = 7; 30 + const actions: usize = 8; 31 + const actions_on_trigger: usize = 9; 32 + const actions_on_resolve: usize = 10; 33 + }; 34 + 35 + const select_cols = "id, created, updated, name, description, enabled, tags, trigger, actions, actions_on_trigger, actions_on_resolve"; 36 + 37 + fn rowFromResult(alloc: Allocator, r: anytype) !AutomationRow { 38 + return AutomationRow{ 39 + .id = try alloc.dupe(u8, r.text(Col.id)), 40 + .created = try alloc.dupe(u8, r.text(Col.created)), 41 + .updated = try alloc.dupe(u8, r.text(Col.updated)), 42 + .name = try alloc.dupe(u8, r.text(Col.name)), 43 + .description = try alloc.dupe(u8, r.text(Col.description)), 44 + .enabled = r.bigint(Col.enabled) != 0, 45 + .tags = try alloc.dupe(u8, r.text(Col.tags)), 46 + .trigger = try alloc.dupe(u8, r.text(Col.trigger)), 47 + .actions = try alloc.dupe(u8, r.text(Col.actions)), 48 + .actions_on_trigger = try alloc.dupe(u8, r.text(Col.actions_on_trigger)), 49 + .actions_on_resolve = try alloc.dupe(u8, r.text(Col.actions_on_resolve)), 50 + }; 51 + } 52 + 53 + pub fn insert( 54 + id: []const u8, 55 + name: []const u8, 56 + description: []const u8, 57 + enabled: bool, 58 + tags: []const u8, 59 + trigger: []const u8, 60 + actions: []const u8, 61 + actions_on_trigger: []const u8, 62 + actions_on_resolve: []const u8, 63 + created: []const u8, 64 + ) !void { 65 + backend.db.exec( 66 + "INSERT INTO automation (id, name, description, enabled, tags, trigger, actions, actions_on_trigger, actions_on_resolve, created, updated) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 67 + .{ id, name, description, @as(i64, if (enabled) 1 else 0), tags, trigger, actions, actions_on_trigger, actions_on_resolve, created, created }, 68 + ) catch |err| { 69 + log.err("database", "insert automation error: {}", .{err}); 70 + return err; 71 + }; 72 + } 73 + 74 + pub fn getById(alloc: Allocator, id: []const u8) !?AutomationRow { 75 + var r = backend.db.row( 76 + "SELECT " ++ select_cols ++ " FROM automation WHERE id = ?", 77 + .{id}, 78 + ) catch return null; 79 + 80 + if (r) |*row| { 81 + defer row.deinit(); 82 + return try rowFromResult(alloc, row); 83 + } 84 + return null; 85 + } 86 + 87 + pub fn deleteById(id: []const u8) !bool { 88 + const affected = backend.db.execWithRowCount( 89 + "DELETE FROM automation WHERE id = ?", 90 + .{id}, 91 + ) catch |err| { 92 + log.err("database", "delete automation error: {}", .{err}); 93 + return err; 94 + }; 95 + return affected > 0; 96 + } 97 + 98 + pub fn updateEnabled(id: []const u8, enabled: bool, updated: []const u8) !bool { 99 + const affected = backend.db.execWithRowCount( 100 + "UPDATE automation SET enabled = ?, updated = ? WHERE id = ?", 101 + .{ @as(i64, if (enabled) 1 else 0), updated, id }, 102 + ) catch |err| { 103 + log.err("database", "update automation enabled error: {}", .{err}); 104 + return err; 105 + }; 106 + return affected > 0; 107 + } 108 + 109 + pub fn list(alloc: Allocator, limit: usize, offset: usize) ![]AutomationRow { 110 + var results = std.ArrayListUnmanaged(AutomationRow){}; 111 + errdefer results.deinit(alloc); 112 + 113 + var rows = backend.db.query( 114 + "SELECT " ++ select_cols ++ " FROM automation ORDER BY name ASC LIMIT ? OFFSET ?", 115 + .{ @as(i64, @intCast(limit)), @as(i64, @intCast(offset)) }, 116 + ) catch |err| { 117 + log.err("database", "list automations error: {}", .{err}); 118 + return err; 119 + }; 120 + defer rows.deinit(); 121 + 122 + while (rows.next()) |r| { 123 + try results.append(alloc, try rowFromResult(alloc, &r)); 124 + } 125 + 126 + return results.toOwnedSlice(alloc); 127 + } 128 + 129 + pub fn listEnabled(alloc: Allocator) ![]AutomationRow { 130 + var results = std.ArrayListUnmanaged(AutomationRow){}; 131 + errdefer results.deinit(alloc); 132 + 133 + var rows = backend.db.query( 134 + "SELECT " ++ select_cols ++ " FROM automation WHERE enabled = 1 ORDER BY name ASC", 135 + .{}, 136 + ) catch |err| { 137 + log.err("database", "list enabled automations error: {}", .{err}); 138 + return err; 139 + }; 140 + defer rows.deinit(); 141 + 142 + while (rows.next()) |r| { 143 + try results.append(alloc, try rowFromResult(alloc, &r)); 144 + } 145 + 146 + return results.toOwnedSlice(alloc); 147 + } 148 + 149 + pub fn count() !usize { 150 + var r = backend.db.row("SELECT COUNT(*) FROM automation", .{}) catch return 0; 151 + if (r) |*row| { 152 + defer row.deinit(); 153 + return @intCast(row.bigint(0)); 154 + } 155 + return 0; 156 + } 157 + 158 + // Bucket operations for event counting 159 + 160 + pub const BucketRow = struct { 161 + id: []const u8, 162 + automation_id: []const u8, 163 + trigger_id: []const u8, 164 + bucketing_key: []const u8, 165 + start_time: []const u8, 166 + end_time: []const u8, 167 + count: i64, 168 + last_event: ?[]const u8, 169 + triggered_at: ?[]const u8, 170 + }; 171 + 172 + const BCol = struct { 173 + const id: usize = 0; 174 + const automation_id: usize = 1; 175 + const trigger_id: usize = 2; 176 + const bucketing_key: usize = 3; 177 + const start_time: usize = 4; 178 + const end_time: usize = 5; 179 + const count: usize = 6; 180 + const last_event: usize = 7; 181 + const triggered_at: usize = 8; 182 + }; 183 + 184 + const bucket_select_cols = "id, automation_id, trigger_id, bucketing_key, start_time, end_time, count, last_event, triggered_at"; 185 + 186 + fn bucketFromResult(alloc: Allocator, r: anytype) !BucketRow { 187 + const last_event_text = r.text(BCol.last_event); 188 + const triggered_at_text = r.text(BCol.triggered_at); 189 + 190 + return BucketRow{ 191 + .id = try alloc.dupe(u8, r.text(BCol.id)), 192 + .automation_id = try alloc.dupe(u8, r.text(BCol.automation_id)), 193 + .trigger_id = try alloc.dupe(u8, r.text(BCol.trigger_id)), 194 + .bucketing_key = try alloc.dupe(u8, r.text(BCol.bucketing_key)), 195 + .start_time = try alloc.dupe(u8, r.text(BCol.start_time)), 196 + .end_time = try alloc.dupe(u8, r.text(BCol.end_time)), 197 + .count = r.bigint(BCol.count), 198 + .last_event = if (last_event_text.len > 0) try alloc.dupe(u8, last_event_text) else null, 199 + .triggered_at = if (triggered_at_text.len > 0) try alloc.dupe(u8, triggered_at_text) else null, 200 + }; 201 + } 202 + 203 + pub fn getBucket(alloc: Allocator, automation_id: []const u8, trigger_id: []const u8, bucketing_key: []const u8) !?BucketRow { 204 + var r = backend.db.row( 205 + "SELECT " ++ bucket_select_cols ++ " FROM automation_bucket WHERE automation_id = ? AND trigger_id = ? AND bucketing_key = ?", 206 + .{ automation_id, trigger_id, bucketing_key }, 207 + ) catch return null; 208 + 209 + if (r) |*row| { 210 + defer row.deinit(); 211 + return try bucketFromResult(alloc, row); 212 + } 213 + return null; 214 + } 215 + 216 + pub fn insertBucket( 217 + id: []const u8, 218 + automation_id: []const u8, 219 + trigger_id: []const u8, 220 + bucketing_key: []const u8, 221 + start_time: []const u8, 222 + end_time: []const u8, 223 + ) !void { 224 + backend.db.exec( 225 + "INSERT INTO automation_bucket (id, automation_id, trigger_id, bucketing_key, start_time, end_time, count) VALUES (?, ?, ?, ?, ?, ?, 0)", 226 + .{ id, automation_id, trigger_id, bucketing_key, start_time, end_time }, 227 + ) catch |err| { 228 + log.err("database", "insert bucket error: {}", .{err}); 229 + return err; 230 + }; 231 + } 232 + 233 + pub fn incrementBucket(automation_id: []const u8, trigger_id: []const u8, bucketing_key: []const u8, last_event: []const u8, updated: []const u8) !i64 { 234 + // increment and return new count 235 + backend.db.exec( 236 + "UPDATE automation_bucket SET count = count + 1, last_event = ?, updated = ? WHERE automation_id = ? AND trigger_id = ? AND bucketing_key = ?", 237 + .{ last_event, updated, automation_id, trigger_id, bucketing_key }, 238 + ) catch |err| { 239 + log.err("database", "increment bucket error: {}", .{err}); 240 + return err; 241 + }; 242 + 243 + // fetch updated count 244 + var r = backend.db.row( 245 + "SELECT count FROM automation_bucket WHERE automation_id = ? AND trigger_id = ? AND bucketing_key = ?", 246 + .{ automation_id, trigger_id, bucketing_key }, 247 + ) catch return 0; 248 + 249 + if (r) |*row| { 250 + defer row.deinit(); 251 + return row.bigint(0); 252 + } 253 + return 0; 254 + } 255 + 256 + pub fn markBucketTriggered(automation_id: []const u8, trigger_id: []const u8, bucketing_key: []const u8, triggered_at: []const u8) !void { 257 + backend.db.exec( 258 + "UPDATE automation_bucket SET triggered_at = ? WHERE automation_id = ? AND trigger_id = ? AND bucketing_key = ?", 259 + .{ triggered_at, automation_id, trigger_id, bucketing_key }, 260 + ) catch |err| { 261 + log.err("database", "mark bucket triggered error: {}", .{err}); 262 + return err; 263 + }; 264 + } 265 + 266 + pub fn resetBucket(automation_id: []const u8, trigger_id: []const u8, bucketing_key: []const u8, new_start: []const u8, new_end: []const u8) !void { 267 + backend.db.exec( 268 + "UPDATE automation_bucket SET count = 0, triggered_at = NULL, start_time = ?, end_time = ? WHERE automation_id = ? AND trigger_id = ? AND bucketing_key = ?", 269 + .{ new_start, new_end, automation_id, trigger_id, bucketing_key }, 270 + ) catch |err| { 271 + log.err("database", "reset bucket error: {}", .{err}); 272 + return err; 273 + }; 274 + } 275 + 276 + pub fn deleteBucketsByAutomation(automation_id: []const u8) !void { 277 + backend.db.exec( 278 + "DELETE FROM automation_bucket WHERE automation_id = ?", 279 + .{automation_id}, 280 + ) catch |err| { 281 + log.err("database", "delete buckets error: {}", .{err}); 282 + return err; 283 + }; 284 + }
+53
src/db/migrations/005_automations/postgres.sql
···
··· 1 + -- 005_automations: automation tables for event-driven triggers 2 + -- supports EventTrigger (reactive) with RunDeployment and DoNothing actions 3 + 4 + -- main automation table 5 + CREATE TABLE IF NOT EXISTS automation ( 6 + id TEXT PRIMARY KEY, 7 + created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 8 + updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 9 + name TEXT NOT NULL, 10 + description TEXT DEFAULT '', 11 + enabled INTEGER DEFAULT 1, 12 + tags JSONB DEFAULT '[]', 13 + trigger JSONB NOT NULL, -- {type, posture, expect, threshold, within, match, for_each} 14 + actions JSONB NOT NULL, -- array of actions 15 + actions_on_trigger JSONB DEFAULT '[]', -- array of actions to run on trigger 16 + actions_on_resolve JSONB DEFAULT '[]' -- array of actions to run on resolve 17 + ); 18 + 19 + -- bucket for counting events per automation trigger 20 + CREATE TABLE IF NOT EXISTS automation_bucket ( 21 + id TEXT PRIMARY KEY, 22 + created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 23 + updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 24 + automation_id TEXT NOT NULL REFERENCES automation(id) ON DELETE CASCADE, 25 + trigger_id TEXT NOT NULL, -- matches trigger.id in automation.trigger 26 + bucketing_key JSONB DEFAULT '[]', -- array for grouping (from for_each labels) 27 + start_time TEXT NOT NULL, -- bucket window start 28 + end_time TEXT NOT NULL, -- bucket window end 29 + count INTEGER DEFAULT 0, 30 + last_event JSONB, -- last event that incremented the bucket 31 + triggered_at TEXT, -- when this bucket fired (null if not yet) 32 + UNIQUE(automation_id, trigger_id, bucketing_key) 33 + ); 34 + 35 + -- track which resources (deployments) an automation is related to 36 + CREATE TABLE IF NOT EXISTS automation_related_resource ( 37 + id TEXT PRIMARY KEY, 38 + created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 39 + updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 40 + automation_id TEXT NOT NULL REFERENCES automation(id) ON DELETE CASCADE, 41 + resource_id TEXT NOT NULL, -- e.g., "prefect.deployment.{uuid}" 42 + automation_owned_by_resource INTEGER DEFAULT 0, 43 + UNIQUE(automation_id, resource_id) 44 + ); 45 + 46 + -- indexes 47 + CREATE INDEX IF NOT EXISTS ix_automation__name ON automation(name); 48 + CREATE INDEX IF NOT EXISTS ix_automation__enabled ON automation(enabled); 49 + CREATE INDEX IF NOT EXISTS ix_automation__updated ON automation(updated); 50 + CREATE INDEX IF NOT EXISTS ix_automation_bucket__automation_id ON automation_bucket(automation_id); 51 + CREATE INDEX IF NOT EXISTS ix_automation_bucket__end_time ON automation_bucket(end_time); 52 + CREATE INDEX IF NOT EXISTS ix_automation_related_resource__automation_id ON automation_related_resource(automation_id); 53 + CREATE INDEX IF NOT EXISTS ix_automation_related_resource__resource_id ON automation_related_resource(resource_id);
+53
src/db/migrations/005_automations/sqlite.sql
···
··· 1 + -- 005_automations: automation tables for event-driven triggers 2 + -- supports EventTrigger (reactive) with RunDeployment and DoNothing actions 3 + 4 + -- main automation table 5 + CREATE TABLE IF NOT EXISTS automation ( 6 + id TEXT PRIMARY KEY, 7 + created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 8 + updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 9 + name TEXT NOT NULL, 10 + description TEXT DEFAULT '', 11 + enabled INTEGER DEFAULT 1, 12 + tags TEXT DEFAULT '[]', 13 + trigger TEXT NOT NULL, -- JSON: {type, posture, expect, threshold, within, match, for_each} 14 + actions TEXT NOT NULL, -- JSON array of actions 15 + actions_on_trigger TEXT DEFAULT '[]', -- JSON array of actions to run on trigger 16 + actions_on_resolve TEXT DEFAULT '[]' -- JSON array of actions to run on resolve 17 + ); 18 + 19 + -- bucket for counting events per automation trigger 20 + CREATE TABLE IF NOT EXISTS automation_bucket ( 21 + id TEXT PRIMARY KEY, 22 + created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 23 + updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 24 + automation_id TEXT NOT NULL REFERENCES automation(id) ON DELETE CASCADE, 25 + trigger_id TEXT NOT NULL, -- matches trigger.id in automation.trigger 26 + bucketing_key TEXT DEFAULT '[]', -- JSON array for grouping (from for_each labels) 27 + start_time TEXT NOT NULL, -- bucket window start 28 + end_time TEXT NOT NULL, -- bucket window end 29 + count INTEGER DEFAULT 0, 30 + last_event TEXT, -- JSON: last event that incremented the bucket 31 + triggered_at TEXT, -- when this bucket fired (null if not yet) 32 + UNIQUE(automation_id, trigger_id, bucketing_key) 33 + ); 34 + 35 + -- track which resources (deployments) an automation is related to 36 + CREATE TABLE IF NOT EXISTS automation_related_resource ( 37 + id TEXT PRIMARY KEY, 38 + created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 39 + updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 40 + automation_id TEXT NOT NULL REFERENCES automation(id) ON DELETE CASCADE, 41 + resource_id TEXT NOT NULL, -- e.g., "prefect.deployment.{uuid}" 42 + automation_owned_by_resource INTEGER DEFAULT 0, 43 + UNIQUE(automation_id, resource_id) 44 + ); 45 + 46 + -- indexes 47 + CREATE INDEX IF NOT EXISTS ix_automation__name ON automation(name); 48 + CREATE INDEX IF NOT EXISTS ix_automation__enabled ON automation(enabled); 49 + CREATE INDEX IF NOT EXISTS ix_automation__updated ON automation(updated); 50 + CREATE INDEX IF NOT EXISTS ix_automation_bucket__automation_id ON automation_bucket(automation_id); 51 + CREATE INDEX IF NOT EXISTS ix_automation_bucket__end_time ON automation_bucket(end_time); 52 + CREATE INDEX IF NOT EXISTS ix_automation_related_resource__automation_id ON automation_related_resource(automation_id); 53 + CREATE INDEX IF NOT EXISTS ix_automation_related_resource__resource_id ON automation_related_resource(resource_id);
+5
src/db/migrations_data.zig
··· 32 .sqlite_sql = @embedFile("migrations/004_log_table/sqlite.sql"), 33 .postgres_sql = @embedFile("migrations/004_log_table/postgres.sql"), 34 }, 35 };
··· 32 .sqlite_sql = @embedFile("migrations/004_log_table/sqlite.sql"), 33 .postgres_sql = @embedFile("migrations/004_log_table/postgres.sql"), 34 }, 35 + .{ 36 + .id = "005_automations", 37 + .sqlite_sql = @embedFile("migrations/005_automations/sqlite.sql"), 38 + .postgres_sql = @embedFile("migrations/005_automations/postgres.sql"), 39 + }, 40 };
+1
src/db/sqlite.zig
··· 23 pub const flow_run_states = @import("flow_run_states.zig"); 24 pub const task_run_states = @import("task_run_states.zig"); 25 pub const logs = @import("logs.zig"); 26 27 // re-export types for compatibility 28 pub const FlowRow = flows.FlowRow;
··· 23 pub const flow_run_states = @import("flow_run_states.zig"); 24 pub const task_run_states = @import("task_run_states.zig"); 25 pub const logs = @import("logs.zig"); 26 + pub const automations = @import("automations.zig"); 27 28 // re-export types for compatibility 29 pub const FlowRow = flows.FlowRow;