prefect server in zig

add events REST API with filtering and pagination

- POST /events/filter with occurred, event, resource filters
- GET /events/filter/next with page tokens for pagination
- GET /events/count for total event count

code quality improvements:
- src/prefect.zig: clean public interface re-exporting main modules
- src/utilities/encoding.zig: base64 and URL encoding helpers
- db/backend.zig: add getBackend() for consistent access pattern
- db/events.zig: consolidate parseJsonStringArray, add column constants
- events_api.zig: use json.send/sendStatus helpers consistently

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+782 -27
+324
src/api/events_api.zig
··· 1 + // REST API handlers for /events endpoints 2 + // separate from events.zig which handles websocket connections 3 + 4 + const std = @import("std"); 5 + const mem = std.mem; 6 + const zap = @import("zap"); 7 + const log = @import("../logging.zig"); 8 + const db_events = @import("../db/events.zig"); 9 + const json = @import("../utilities/json.zig"); 10 + const encoding = @import("../utilities/encoding.zig"); 11 + 12 + const PAGE_SIZE: usize = 50; 13 + 14 + // POST /events/filter - query events with filters 15 + // GET /events/filter/next?page-token=... - get next page 16 + // GET /events/count - count total events 17 + pub fn handle(r: zap.Request) !void { 18 + const target = r.path orelse "/"; 19 + const method = r.method orelse "GET"; 20 + 21 + // GET /events/filter/next?page-token=... 22 + if (mem.eql(u8, method, "GET") and mem.indexOf(u8, target, "/filter/next") != null) { 23 + try filterNext(r); 24 + return; 25 + } 26 + 27 + // POST /events/filter 28 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) { 29 + try filter(r); 30 + return; 31 + } 32 + 33 + // GET /events/count 34 + if (mem.eql(u8, method, "GET") and mem.endsWith(u8, target, "/count")) { 35 + try count(r); 36 + return; 37 + } 38 + 39 + json.sendStatus(r, "{\"detail\":\"not found\"}", .not_found); 40 + } 41 + 42 + fn filter(r: zap.Request) !void { 43 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 44 + defer arena.deinit(); 45 + const alloc = arena.allocator(); 46 + 47 + // parse request body 48 + const body = r.body orelse "{}"; 49 + const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch { 50 + json.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 51 + return; 52 + }; 53 + const obj = parsed.value.object; 54 + 55 + // parse limit (default PAGE_SIZE, max PAGE_SIZE for pagination) 56 + var limit: usize = PAGE_SIZE; 57 + if (obj.get("limit")) |v| { 58 + if (v == .integer) { 59 + const l = v.integer; 60 + if (l > 0 and l <= PAGE_SIZE) limit = @intCast(l); 61 + } 62 + } 63 + 64 + // parse filter object 65 + const filter_opts = parseFilterOptions(alloc, obj); 66 + 67 + // get total count first 68 + const total = db_events.countWithFilter(alloc, filter_opts) catch 0; 69 + 70 + // query first page (offset 0) 71 + const events = db_events.queryWithFilter(alloc, filter_opts, limit, 0) catch |err| { 72 + log.err("events-api", "query error: {}", .{err}); 73 + json.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 74 + return; 75 + }; 76 + 77 + // generate next_page URL if there are more results 78 + const next_page = generateNextPageUrl(alloc, r, filter_opts, total, limit, 0); 79 + 80 + // send response 81 + sendEventPage(r, alloc, events, total, next_page); 82 + } 83 + 84 + fn filterNext(r: zap.Request) !void { 85 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 86 + defer arena.deinit(); 87 + const alloc = arena.allocator(); 88 + 89 + // extract page-token from query string 90 + const query = r.query orelse ""; 91 + const token_start = mem.indexOf(u8, query, "page-token=") orelse { 92 + json.sendStatus(r, "{\"detail\":\"missing page token\"}", .forbidden); 93 + return; 94 + }; 95 + 96 + const token_value_start = token_start + "page-token=".len; 97 + var token_end = query.len; 98 + if (mem.indexOf(u8, query[token_value_start..], "&")) |amp| { 99 + token_end = token_value_start + amp; 100 + } 101 + const url_encoded_token = query[token_value_start..token_end]; 102 + 103 + // URL decode (handle %2B -> +, %2F -> /, %3D -> =) 104 + const decoded_token = encoding.urlDecode(alloc, url_encoded_token) catch { 105 + json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden); 106 + return; 107 + }; 108 + 109 + // first base64 decode (URL layer) 110 + const inner_token = encoding.base64Decode(alloc, decoded_token) catch { 111 + json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden); 112 + return; 113 + }; 114 + 115 + // second base64 decode (token layer) 116 + const token_json = encoding.base64Decode(alloc, inner_token) catch { 117 + json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden); 118 + return; 119 + }; 120 + 121 + // parse token JSON 122 + const token_parsed = std.json.parseFromSlice(std.json.Value, alloc, token_json, .{}) catch { 123 + json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden); 124 + return; 125 + }; 126 + 127 + if (token_parsed.value != .object) { 128 + json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden); 129 + return; 130 + } 131 + 132 + const token_obj = token_parsed.value.object; 133 + 134 + // extract token fields 135 + const total: usize = blk: { 136 + if (token_obj.get("count")) |v| { 137 + if (v == .integer and v.integer >= 0) break :blk @intCast(v.integer); 138 + } 139 + break :blk 0; 140 + }; 141 + 142 + const page_size: usize = blk: { 143 + if (token_obj.get("page_size")) |v| { 144 + if (v == .integer and v.integer > 0) break :blk @intCast(v.integer); 145 + } 146 + break :blk PAGE_SIZE; 147 + }; 148 + 149 + const offset: usize = blk: { 150 + if (token_obj.get("offset")) |v| { 151 + if (v == .integer and v.integer >= 0) break :blk @intCast(v.integer); 152 + } 153 + break :blk 0; 154 + }; 155 + 156 + // parse filter from token - extract fields directly from token_obj.filter 157 + var filter_opts = db_events.FilterOptions{}; 158 + if (token_obj.get("filter")) |filter_val| { 159 + if (filter_val == .object) { 160 + const fobj = filter_val.object; 161 + if (fobj.get("occurred_since")) |v| { 162 + if (v == .string) filter_opts.occurred_since = v.string; 163 + } 164 + if (fobj.get("occurred_until")) |v| { 165 + if (v == .string) filter_opts.occurred_until = v.string; 166 + } 167 + if (fobj.get("event_prefixes")) |v| { 168 + filter_opts.event_prefixes = db_events.parseJsonStringArray(alloc, v); 169 + } 170 + if (fobj.get("event_names")) |v| { 171 + filter_opts.event_names = db_events.parseJsonStringArray(alloc, v); 172 + } 173 + if (fobj.get("resource_ids")) |v| { 174 + filter_opts.resource_ids = db_events.parseJsonStringArray(alloc, v); 175 + } 176 + if (fobj.get("resource_id_prefixes")) |v| { 177 + filter_opts.resource_id_prefixes = db_events.parseJsonStringArray(alloc, v); 178 + } 179 + if (fobj.get("order_asc")) |v| { 180 + if (v == .bool) filter_opts.order_asc = v.bool; 181 + } 182 + } 183 + } 184 + 185 + // query this page 186 + const events = db_events.queryWithFilter(alloc, filter_opts, page_size, offset) catch |err| { 187 + log.err("events-api", "query next page error: {}", .{err}); 188 + json.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 189 + return; 190 + }; 191 + 192 + // generate next_page URL if there are more results 193 + const next_page = generateNextPageUrl(alloc, r, filter_opts, total, page_size, offset); 194 + 195 + // send response 196 + sendEventPage(r, alloc, events, total, next_page); 197 + } 198 + 199 + fn parseFilterOptions(alloc: std.mem.Allocator, obj: std.json.ObjectMap) db_events.FilterOptions { 200 + var filter_opts = db_events.FilterOptions{}; 201 + 202 + if (obj.get("filter")) |filter_val| { 203 + if (filter_val == .object) { 204 + const fobj = filter_val.object; 205 + 206 + // occurred filter: { "since": "...", "until": "..." } 207 + if (fobj.get("occurred")) |occ| { 208 + if (occ == .object) { 209 + if (occ.object.get("since")) |v| { 210 + if (v == .string) filter_opts.occurred_since = v.string; 211 + } 212 + if (occ.object.get("until")) |v| { 213 + if (v == .string) filter_opts.occurred_until = v.string; 214 + } 215 + } 216 + } 217 + 218 + // event filter: { "prefix": ["..."], "name": ["..."] } 219 + if (fobj.get("event")) |ev| { 220 + if (ev == .object) { 221 + if (ev.object.get("prefix")) |v| { 222 + filter_opts.event_prefixes = db_events.parseJsonStringArray(alloc, v); 223 + } 224 + if (ev.object.get("name")) |v| { 225 + filter_opts.event_names = db_events.parseJsonStringArray(alloc, v); 226 + } 227 + } 228 + } 229 + 230 + // resource filter: { "id": ["..."], "id_prefix": ["..."] } 231 + if (fobj.get("resource")) |res| { 232 + if (res == .object) { 233 + if (res.object.get("id")) |v| { 234 + filter_opts.resource_ids = db_events.parseJsonStringArray(alloc, v); 235 + } 236 + if (res.object.get("id_prefix")) |v| { 237 + filter_opts.resource_id_prefixes = db_events.parseJsonStringArray(alloc, v); 238 + } 239 + } 240 + } 241 + 242 + // order: "ASC" or "DESC" 243 + if (fobj.get("order")) |v| { 244 + if (v == .string) { 245 + if (mem.eql(u8, v.string, "ASC")) filter_opts.order_asc = true; 246 + } 247 + } 248 + } 249 + } 250 + 251 + return filter_opts; 252 + } 253 + 254 + fn generateNextPageUrl( 255 + alloc: std.mem.Allocator, 256 + r: zap.Request, 257 + filter_opts: db_events.FilterOptions, 258 + total: usize, 259 + page_size: usize, 260 + current_offset: usize, 261 + ) ?[]const u8 { 262 + const next_offset = current_offset + page_size; 263 + if (next_offset >= total) return null; 264 + 265 + // build token JSON: {"filter": {...}, "count": N, "page_size": N, "offset": N} 266 + const filter_json = filter_opts.toJson(alloc) catch return null; 267 + 268 + const token_json = std.fmt.allocPrint(alloc, "{{\"filter\":{s},\"count\":{d},\"page_size\":{d},\"offset\":{d}}}", .{ 269 + filter_json, 270 + total, 271 + page_size, 272 + next_offset, 273 + }) catch return null; 274 + 275 + // base64 encode the token 276 + const inner_token = encoding.base64Encode(alloc, token_json) catch return null; 277 + 278 + // base64 encode again for URL 279 + const url_token = encoding.base64Encode(alloc, inner_token) catch return null; 280 + 281 + // build URL 282 + const host = r.getHeader("host") orelse "localhost:4200"; 283 + return std.fmt.allocPrint(alloc, "http://{s}/api/events/filter/next?page-token={s}", .{ host, url_token }) catch null; 284 + } 285 + 286 + fn sendEventPage(r: zap.Request, alloc: std.mem.Allocator, events: []const []const u8, total: usize, next_page: ?[]const u8) void { 287 + var output: std.Io.Writer.Allocating = .init(alloc); 288 + var jw: std.json.Stringify = .{ .writer = &output.writer }; 289 + 290 + jw.beginObject() catch {}; 291 + 292 + // "events": [...] 293 + jw.objectField("events") catch {}; 294 + jw.beginArray() catch {}; 295 + for (events) |event_json| { 296 + jw.print("{s}", .{event_json}) catch {}; 297 + } 298 + jw.endArray() catch {}; 299 + 300 + // "total": N 301 + jw.objectField("total") catch {}; 302 + jw.write(total) catch {}; 303 + 304 + // "next_page": "..." or null 305 + jw.objectField("next_page") catch {}; 306 + if (next_page) |url| { 307 + jw.write(url) catch {}; 308 + } else { 309 + jw.write(null) catch {}; 310 + } 311 + 312 + jw.endObject() catch {}; 313 + 314 + json.send(r, output.toOwnedSlice() catch "{\"events\":[],\"total\":0,\"next_page\":null}"); 315 + } 316 + 317 + fn count(r: zap.Request) !void { 318 + const total = db_events.count(); 319 + 320 + var buf: [64]u8 = undefined; 321 + const resp = std.fmt.bufPrint(&buf, "{{\"count\":{d}}}", .{total}) catch "{\"count\":0}"; 322 + 323 + json.send(r, resp); 324 + }
+6
src/api/routes.zig
··· 13 13 pub const variables = @import("variables.zig"); 14 14 pub const work_pools = @import("work_pools.zig"); 15 15 pub const deployments = @import("deployments.zig"); 16 + pub const events_api = @import("events_api.zig"); 16 17 17 18 pub fn handle(r: zap.Request) !void { 18 19 const target = r.path orelse "/"; ··· 51 52 try work_pools.handle(r); 52 53 } else if (std.mem.startsWith(u8, target, "/api/deployments") or std.mem.startsWith(u8, target, "/deployments")) { 53 54 try deployments.handle(r); 55 + } else if (std.mem.startsWith(u8, target, "/api/events/filter") or std.mem.startsWith(u8, target, "/events/filter") or 56 + std.mem.startsWith(u8, target, "/api/events/count") or std.mem.startsWith(u8, target, "/events/count")) 57 + { 58 + // HTTP events endpoints (not websocket /events/in or /events/out) 59 + try events_api.handle(r); 54 60 } else { 55 61 try sendNotFound(r); 56 62 }
+11 -16
src/db/backend.zig
··· 489 489 } 490 490 } 491 491 492 + /// Get the database backend (returns null if not initialized) 493 + pub fn getBackend() ?*Backend { 494 + if (initialized) return &db; 495 + return null; 496 + } 497 + 492 498 /// Get the current dialect 493 499 pub fn getDialect() Dialect { 494 500 return db.dialect; 495 501 } 496 502 497 503 test "sqlite backend basic operations" { 498 - const allocator = std.testing.allocator; 499 - 500 - var b = try Backend.initSqlite(allocator, ":memory:"); 504 + var b = try Backend.initSqlite(std.testing.allocator, ":memory:"); 501 505 defer b.deinit(); 502 - 503 506 try b.exec("CREATE TABLE test (id TEXT PRIMARY KEY, name TEXT)", .{}); 504 507 try b.exec("INSERT INTO test (id, name) VALUES (?, ?)", .{ "1", "alice" }); 505 - 506 508 var r = try b.row("SELECT id, name FROM test WHERE id = ?", .{"1"}); 507 509 if (r) |*row| { 508 510 defer row.deinit(); 509 511 try std.testing.expectEqualStrings("1", row.text(0)); 510 512 try std.testing.expectEqualStrings("alice", row.text(1)); 511 - } else { 512 - return error.RowNotFound; 513 - } 514 - 513 + } else return error.RowNotFound; 515 514 try b.exec("INSERT INTO test (id, name) VALUES (?, ?)", .{ "2", "bob" }); 516 515 var rows = try b.query("SELECT id, name FROM test ORDER BY id", .{}); 517 516 defer rows.deinit(); 518 - 519 517 var count: usize = 0; 520 518 while (rows.next()) |_| count += 1; 521 519 try std.testing.expectEqual(@as(usize, 2), count); 522 520 } 523 521 524 522 test "dialect placeholder rewriting" { 525 - const allocator = std.testing.allocator; 526 - 527 523 const sql = "SELECT * FROM t WHERE a = ? AND b = ?"; 528 - const sqlite_result = try Dialect.sqlite.rewritePlaceholders(allocator, sql); 524 + const sqlite_result = try Dialect.sqlite.rewritePlaceholders(std.testing.allocator, sql); 529 525 try std.testing.expectEqualStrings(sql, sqlite_result); 530 - 531 - const pg_result = try Dialect.postgres.rewritePlaceholders(allocator, sql); 532 - defer allocator.free(pg_result); 526 + const pg_result = try Dialect.postgres.rewritePlaceholders(std.testing.allocator, sql); 527 + defer std.testing.allocator.free(pg_result); 533 528 try std.testing.expectEqualStrings("SELECT * FROM t WHERE a = $1 AND b = $2", pg_result); 534 529 }
+348 -11
src/db/events.zig
··· 1 1 const std = @import("std"); 2 2 const backend = @import("backend.zig"); 3 + const dialect = @import("dialect.zig"); 3 4 const log = @import("../logging.zig"); 4 5 6 + /// Filter options for event queries 7 + pub const FilterOptions = struct { 8 + occurred_since: ?[]const u8 = null, 9 + occurred_until: ?[]const u8 = null, 10 + event_prefixes: ?[]const []const u8 = null, 11 + event_names: ?[]const []const u8 = null, 12 + resource_ids: ?[]const []const u8 = null, 13 + resource_id_prefixes: ?[]const []const u8 = null, 14 + order_asc: bool = false, 15 + 16 + /// Serialize filter to JSON for page token 17 + pub fn toJson(self: FilterOptions, alloc: std.mem.Allocator) ![]const u8 { 18 + var output: std.Io.Writer.Allocating = .init(alloc); 19 + var jw: std.json.Stringify = .{ .writer = &output.writer }; 20 + 21 + try jw.beginObject(); 22 + 23 + if (self.occurred_since) |v| { 24 + try jw.objectField("occurred_since"); 25 + try jw.write(v); 26 + } 27 + if (self.occurred_until) |v| { 28 + try jw.objectField("occurred_until"); 29 + try jw.write(v); 30 + } 31 + if (self.event_prefixes) |arr| { 32 + try jw.objectField("event_prefixes"); 33 + try jw.beginArray(); 34 + for (arr) |v| try jw.write(v); 35 + try jw.endArray(); 36 + } 37 + if (self.event_names) |arr| { 38 + try jw.objectField("event_names"); 39 + try jw.beginArray(); 40 + for (arr) |v| try jw.write(v); 41 + try jw.endArray(); 42 + } 43 + if (self.resource_ids) |arr| { 44 + try jw.objectField("resource_ids"); 45 + try jw.beginArray(); 46 + for (arr) |v| try jw.write(v); 47 + try jw.endArray(); 48 + } 49 + if (self.resource_id_prefixes) |arr| { 50 + try jw.objectField("resource_id_prefixes"); 51 + try jw.beginArray(); 52 + for (arr) |v| try jw.write(v); 53 + try jw.endArray(); 54 + } 55 + try jw.objectField("order_asc"); 56 + try jw.write(self.order_asc); 57 + 58 + try jw.endObject(); 59 + return output.toOwnedSlice(); 60 + } 61 + 62 + /// Parse filter from JSON (for page token decoding) 63 + pub fn fromJson(alloc: std.mem.Allocator, json_str: []const u8) !FilterOptions { 64 + const parsed = std.json.parseFromSlice(std.json.Value, alloc, json_str, .{}) catch return FilterOptions{}; 65 + if (parsed.value != .object) return FilterOptions{}; 66 + const obj = parsed.value.object; 67 + 68 + var opts = FilterOptions{}; 69 + 70 + if (obj.get("occurred_since")) |v| { 71 + if (v == .string) opts.occurred_since = v.string; 72 + } 73 + if (obj.get("occurred_until")) |v| { 74 + if (v == .string) opts.occurred_until = v.string; 75 + } 76 + if (obj.get("event_prefixes")) |v| { 77 + opts.event_prefixes = parseStringArray(alloc, v); 78 + } 79 + if (obj.get("event_names")) |v| { 80 + opts.event_names = parseStringArray(alloc, v); 81 + } 82 + if (obj.get("resource_ids")) |v| { 83 + opts.resource_ids = parseStringArray(alloc, v); 84 + } 85 + if (obj.get("resource_id_prefixes")) |v| { 86 + opts.resource_id_prefixes = parseStringArray(alloc, v); 87 + } 88 + if (obj.get("order_asc")) |v| { 89 + if (v == .bool) opts.order_asc = v.bool; 90 + } 91 + 92 + return opts; 93 + } 94 + 95 + fn parseStringArray(alloc: std.mem.Allocator, val: std.json.Value) ?[]const []const u8 { 96 + return parseJsonStringArray(alloc, val); 97 + } 98 + }; 99 + 100 + /// Parse JSON value as string array (handles both array and single string) 101 + pub fn parseJsonStringArray(alloc: std.mem.Allocator, val: std.json.Value) ?[]const []const u8 { 102 + if (val == .array) { 103 + var list = std.ArrayListUnmanaged([]const u8){}; 104 + for (val.array.items) |item| { 105 + if (item == .string) { 106 + list.append(alloc, item.string) catch continue; 107 + } 108 + } 109 + if (list.items.len > 0) return list.toOwnedSlice(alloc) catch null; 110 + } else if (val == .string) { 111 + var arr = alloc.alloc([]const u8, 1) catch return null; 112 + arr[0] = val.string; 113 + return arr; 114 + } 115 + return null; 116 + } 117 + 5 118 /// SQL for insertDeduped - built at comptime for each dialect 6 119 const insert_deduped_sql = struct { 7 120 const sqlite = ··· 69 182 return 0; 70 183 } 71 184 72 - /// Event for backfill queries 73 - pub const Event = struct { 74 - id: []const u8, 75 - occurred: []const u8, 76 - event: []const u8, 77 - resource_id: []const u8, 78 - resource: []const u8, 79 - payload: []const u8, 80 - related: []const u8, 81 - }; 82 - 83 185 /// Query recent events for backfill (returns JSON-serialized events) 84 186 /// Returns up to `limit` events ordered by occurred DESC 85 187 pub fn queryRecent(alloc: std.mem.Allocator, limit: usize) ![][]const u8 { ··· 144 246 145 247 return results.toOwnedSlice(alloc); 146 248 } 249 + 250 + /// Build WHERE clause and bind values for filter options 251 + fn buildWhereClause(alloc: std.mem.Allocator, opts: FilterOptions) !struct { where: []const u8, bindings: []const []const u8 } { 252 + var where_parts = std.ArrayListUnmanaged([]const u8){}; 253 + var bind_values = std.ArrayListUnmanaged([]const u8){}; 254 + 255 + // occurred range 256 + if (opts.occurred_since) |since| { 257 + try where_parts.append(alloc, "occurred >= ?"); 258 + try bind_values.append(alloc, since); 259 + } 260 + if (opts.occurred_until) |until| { 261 + try where_parts.append(alloc, "occurred <= ?"); 262 + try bind_values.append(alloc, until); 263 + } 264 + 265 + // event name exact match (OR) 266 + if (opts.event_names) |names| { 267 + if (names.len > 0) { 268 + var or_parts = std.ArrayListUnmanaged(u8){}; 269 + try or_parts.appendSlice(alloc, "("); 270 + for (names, 0..) |name, i| { 271 + if (i > 0) try or_parts.appendSlice(alloc, " OR "); 272 + try or_parts.appendSlice(alloc, "event = ?"); 273 + try bind_values.append(alloc, name); 274 + } 275 + try or_parts.appendSlice(alloc, ")"); 276 + try where_parts.append(alloc, try or_parts.toOwnedSlice(alloc)); 277 + } 278 + } 279 + 280 + // event prefix match (OR) 281 + if (opts.event_prefixes) |prefixes| { 282 + if (prefixes.len > 0) { 283 + var or_parts = std.ArrayListUnmanaged(u8){}; 284 + try or_parts.appendSlice(alloc, "("); 285 + for (prefixes, 0..) |prefix, i| { 286 + if (i > 0) try or_parts.appendSlice(alloc, " OR "); 287 + try or_parts.appendSlice(alloc, "event LIKE ?"); 288 + const pattern = try std.fmt.allocPrint(alloc, "{s}%", .{prefix}); 289 + try bind_values.append(alloc, pattern); 290 + } 291 + try or_parts.appendSlice(alloc, ")"); 292 + try where_parts.append(alloc, try or_parts.toOwnedSlice(alloc)); 293 + } 294 + } 295 + 296 + // resource_id exact match (OR) 297 + if (opts.resource_ids) |ids| { 298 + if (ids.len > 0) { 299 + var or_parts = std.ArrayListUnmanaged(u8){}; 300 + try or_parts.appendSlice(alloc, "("); 301 + for (ids, 0..) |id, i| { 302 + if (i > 0) try or_parts.appendSlice(alloc, " OR "); 303 + try or_parts.appendSlice(alloc, "resource_id = ?"); 304 + try bind_values.append(alloc, id); 305 + } 306 + try or_parts.appendSlice(alloc, ")"); 307 + try where_parts.append(alloc, try or_parts.toOwnedSlice(alloc)); 308 + } 309 + } 310 + 311 + // resource_id prefix match (OR) 312 + if (opts.resource_id_prefixes) |prefixes| { 313 + if (prefixes.len > 0) { 314 + var or_parts = std.ArrayListUnmanaged(u8){}; 315 + try or_parts.appendSlice(alloc, "("); 316 + for (prefixes, 0..) |prefix, i| { 317 + if (i > 0) try or_parts.appendSlice(alloc, " OR "); 318 + try or_parts.appendSlice(alloc, "resource_id LIKE ?"); 319 + const pattern = try std.fmt.allocPrint(alloc, "{s}%", .{prefix}); 320 + try bind_values.append(alloc, pattern); 321 + } 322 + try or_parts.appendSlice(alloc, ")"); 323 + try where_parts.append(alloc, try or_parts.toOwnedSlice(alloc)); 324 + } 325 + } 326 + 327 + // build WHERE string 328 + var where_sql = std.ArrayListUnmanaged(u8){}; 329 + if (where_parts.items.len > 0) { 330 + try where_sql.appendSlice(alloc, " WHERE "); 331 + for (where_parts.items, 0..) |part, i| { 332 + if (i > 0) try where_sql.appendSlice(alloc, " AND "); 333 + try where_sql.appendSlice(alloc, part); 334 + } 335 + } 336 + 337 + return .{ 338 + .where = try where_sql.toOwnedSlice(alloc), 339 + .bindings = try bind_values.toOwnedSlice(alloc), 340 + }; 341 + } 342 + 343 + /// Count events matching filter criteria 344 + pub fn countWithFilter(alloc: std.mem.Allocator, opts: FilterOptions) !usize { 345 + const clause = try buildWhereClause(alloc, opts); 346 + const sql = try std.fmt.allocPrint(alloc, "SELECT COUNT(*) FROM events{s}", .{clause.where}); 347 + 348 + // interpolate bindings 349 + const final_sql = try interpolateBindings(alloc, sql, clause.bindings); 350 + 351 + var r = backend.db.row(final_sql, .{}) catch |err| { 352 + log.err("database", "count events filter error: {}", .{err}); 353 + return err; 354 + }; 355 + if (r) |*row| { 356 + defer row.deinit(); 357 + return @intCast(row.bigint(0)); 358 + } 359 + return 0; 360 + } 361 + 362 + /// Query events with filter options and pagination 363 + /// Returns JSON-serialized events matching the filter criteria 364 + pub fn queryWithFilter(alloc: std.mem.Allocator, opts: FilterOptions, limit: usize, offset: usize) ![][]const u8 { 365 + var results = std.ArrayListUnmanaged([]const u8){}; 366 + errdefer { 367 + for (results.items) |item| alloc.free(item); 368 + results.deinit(alloc); 369 + } 370 + 371 + const clause = try buildWhereClause(alloc, opts); 372 + 373 + // build SQL 374 + var sql = std.ArrayListUnmanaged(u8){}; 375 + try sql.appendSlice(alloc, "SELECT id, occurred, event, resource_id, resource, payload, related, received FROM events"); 376 + try sql.appendSlice(alloc, clause.where); 377 + 378 + // order 379 + if (opts.order_asc) { 380 + try sql.appendSlice(alloc, " ORDER BY occurred ASC"); 381 + } else { 382 + try sql.appendSlice(alloc, " ORDER BY occurred DESC"); 383 + } 384 + 385 + // limit and offset 386 + const limit_offset = try std.fmt.allocPrint(alloc, " LIMIT {d} OFFSET {d}", .{ limit, offset }); 387 + try sql.appendSlice(alloc, limit_offset); 388 + 389 + const sql_str = try sql.toOwnedSlice(alloc); 390 + log.debug("events", "filter query: {s}", .{sql_str}); 391 + 392 + // execute with bind values 393 + const rows = queryWithBindings(alloc, sql_str, clause.bindings) catch |err| { 394 + log.err("database", "query events filter error: {}", .{err}); 395 + return err; 396 + }; 397 + 398 + for (rows) |row_data| { 399 + try results.append(alloc, row_data); 400 + } 401 + 402 + return results.toOwnedSlice(alloc); 403 + } 404 + 405 + /// Interpolate bind values into SQL (for queries that need dynamic binding count) 406 + fn interpolateBindings(alloc: std.mem.Allocator, sql: []const u8, bindings: []const []const u8) ![]const u8 { 407 + var final_sql = std.ArrayListUnmanaged(u8){}; 408 + var bind_idx: usize = 0; 409 + 410 + for (sql) |c| { 411 + if (c == '?' and bind_idx < bindings.len) { 412 + try final_sql.append(alloc, '\''); 413 + for (bindings[bind_idx]) |vc| { 414 + if (vc == '\'') { 415 + try final_sql.appendSlice(alloc, "''"); 416 + } else { 417 + try final_sql.append(alloc, vc); 418 + } 419 + } 420 + try final_sql.append(alloc, '\''); 421 + bind_idx += 1; 422 + } else { 423 + try final_sql.append(alloc, c); 424 + } 425 + } 426 + 427 + return final_sql.toOwnedSlice(alloc); 428 + } 429 + 430 + /// Execute query with dynamic bind values 431 + fn queryWithBindings(alloc: std.mem.Allocator, sql: []const u8, bindings: []const []const u8) ![][]const u8 { 432 + var results = std.ArrayListUnmanaged([]const u8){}; 433 + 434 + // column indices for: id, occurred, event, resource_id, resource, payload, related, received 435 + const Col = struct { 436 + const id: usize = 0; 437 + const occurred: usize = 1; 438 + const event: usize = 2; 439 + const resource: usize = 4; 440 + const payload: usize = 5; 441 + const related: usize = 6; 442 + const received: usize = 7; 443 + }; 444 + 445 + const final_sql_str = try interpolateBindings(alloc, sql, bindings); 446 + 447 + var rows = backend.db.query(final_sql_str, .{}) catch |err| { 448 + log.err("database", "query with bindings error: {}", .{err}); 449 + return err; 450 + }; 451 + defer rows.deinit(); 452 + 453 + while (rows.next()) |row| { 454 + const EventJson = struct { 455 + id: []const u8, 456 + occurred: []const u8, 457 + event: []const u8, 458 + received: []const u8, 459 + }; 460 + 461 + const ev = EventJson{ 462 + .id = row.text(Col.id), 463 + .occurred = row.text(Col.occurred), 464 + .event = row.text(Col.event), 465 + .received = row.text(Col.received), 466 + }; 467 + 468 + const base_json = std.json.Stringify.valueAlloc(alloc, ev, .{}) catch continue; 469 + defer alloc.free(base_json); 470 + 471 + // splice in pre-serialized JSON fields (resource, payload, related are already JSON in DB) 472 + const event_json = std.fmt.allocPrint(alloc, "{s},\"resource\":{s},\"payload\":{s},\"related\":{s}}}", .{ 473 + base_json[0 .. base_json.len - 1], // remove closing } 474 + row.text(Col.resource), 475 + row.text(Col.payload), 476 + row.text(Col.related), 477 + }) catch continue; 478 + 479 + try results.append(alloc, event_json); 480 + } 481 + 482 + return results.toOwnedSlice(alloc); 483 + }
+40
src/prefect.zig
··· 1 + //! prefect-server - zig implementation of prefect server 2 + //! 3 + //! public interface for embedding or extending the server. 4 + 5 + // database 6 + pub const db = struct { 7 + const backend = @import("db/backend.zig"); 8 + 9 + pub const Backend = backend.Backend; 10 + pub const Row = backend.Row; 11 + pub const Rows = backend.Rows; 12 + pub const Transaction = backend.Transaction; 13 + pub const Dialect = backend.Dialect; 14 + 15 + pub const init = backend.init; 16 + pub const close = backend.close; 17 + pub const getBackend = backend.getBackend; 18 + pub const getDialect = backend.getDialect; 19 + }; 20 + 21 + // message broker 22 + pub const broker = @import("broker.zig"); 23 + 24 + // background services 25 + pub const services = @import("services.zig"); 26 + 27 + // orchestration engine 28 + pub const orchestration = @import("orchestration.zig"); 29 + 30 + // logging 31 + pub const log = @import("logging.zig"); 32 + 33 + // utilities 34 + pub const util = struct { 35 + pub const json = @import("utilities/json.zig"); 36 + pub const encoding = @import("utilities/encoding.zig"); 37 + pub const time = @import("utilities/time.zig"); 38 + pub const uuid = @import("utilities/uuid.zig"); 39 + pub const hashing = @import("utilities/hashing.zig"); 40 + };
+53
src/utilities/encoding.zig
··· 1 + const std = @import("std"); 2 + 3 + /// base64 encode a byte slice 4 + pub fn base64Encode(alloc: std.mem.Allocator, data: []const u8) ![]const u8 { 5 + const encoder = std.base64.standard.Encoder; 6 + const size = encoder.calcSize(data.len); 7 + const buf = try alloc.alloc(u8, size); 8 + return encoder.encode(buf, data); 9 + } 10 + 11 + /// base64 decode a string 12 + pub fn base64Decode(alloc: std.mem.Allocator, encoded: []const u8) ![]const u8 { 13 + const decoder = std.base64.standard.Decoder; 14 + // upper bound: 3 bytes per 4 chars 15 + const max_size = (encoded.len / 4 + 1) * 3; 16 + const buf = try alloc.alloc(u8, max_size); 17 + decoder.decode(buf, encoded) catch return error.InvalidBase64; 18 + // calculate actual decoded length (may be less due to padding) 19 + const actual_len = encoded.len / 4 * 3 - countPadding(encoded); 20 + return buf[0..actual_len]; 21 + } 22 + 23 + fn countPadding(encoded: []const u8) usize { 24 + var padding: usize = 0; 25 + if (encoded.len > 0 and encoded[encoded.len - 1] == '=') padding += 1; 26 + if (encoded.len > 1 and encoded[encoded.len - 2] == '=') padding += 1; 27 + return padding; 28 + } 29 + 30 + /// URL decode a percent-encoded string (handles %XX and + for space) 31 + pub fn urlDecode(alloc: std.mem.Allocator, encoded: []const u8) ![]const u8 { 32 + var result = std.ArrayListUnmanaged(u8){}; 33 + var i: usize = 0; 34 + while (i < encoded.len) { 35 + if (encoded[i] == '%' and i + 2 < encoded.len) { 36 + const hex = encoded[i + 1 .. i + 3]; 37 + const byte = std.fmt.parseInt(u8, hex, 16) catch { 38 + try result.append(alloc, encoded[i]); 39 + i += 1; 40 + continue; 41 + }; 42 + try result.append(alloc, byte); 43 + i += 3; 44 + } else if (encoded[i] == '+') { 45 + try result.append(alloc, ' '); 46 + i += 1; 47 + } else { 48 + try result.append(alloc, encoded[i]); 49 + i += 1; 50 + } 51 + } 52 + return result.toOwnedSlice(alloc); 53 + }