prefect server in zig
1// REST API handlers for /events endpoints
2// separate from events.zig which handles websocket connections
3
4const std = @import("std");
5const mem = std.mem;
6const zap = @import("zap");
7const log = @import("../logging.zig");
8const db_events = @import("../db/events.zig");
9const json = @import("../utilities/json.zig");
10const encoding = @import("../utilities/encoding.zig");
11
12const PAGE_SIZE: usize = 50;
13
14// POST /events/filter - query events with filters
15// GET /events/filter/next?page-token=... - get next page
16// GET /events/count - count total events
17pub fn handle(r: zap.Request) !void {
18 const target = r.path orelse "/";
19 const method = r.method orelse "GET";
20
21 // GET /events/filter/next?page-token=...
22 if (mem.eql(u8, method, "GET") and mem.indexOf(u8, target, "/filter/next") != null) {
23 try filterNext(r);
24 return;
25 }
26
27 // POST /events/filter
28 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) {
29 try filter(r);
30 return;
31 }
32
33 // GET /events/count
34 if (mem.eql(u8, method, "GET") and mem.endsWith(u8, target, "/count")) {
35 try count(r);
36 return;
37 }
38
39 json.sendStatus(r, "{\"detail\":\"not found\"}", .not_found);
40}
41
42fn filter(r: zap.Request) !void {
43 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
44 defer arena.deinit();
45 const alloc = arena.allocator();
46
47 // parse request body
48 const body = r.body orelse "{}";
49 const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch {
50 json.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
51 return;
52 };
53
54 // parse limit (default PAGE_SIZE, max PAGE_SIZE for pagination)
55 const limit: usize = blk: {
56 if (json.getInt(parsed.value, "limit")) |l| {
57 if (l > 0 and l <= PAGE_SIZE) break :blk @intCast(l);
58 }
59 break :blk PAGE_SIZE;
60 };
61
62 // parse filter object
63 const filter_opts = parseFilterOptions(alloc, parsed.value);
64
65 // get total count first
66 const total = db_events.countWithFilter(alloc, filter_opts) catch 0;
67
68 // query first page (offset 0)
69 const events = db_events.queryWithFilter(alloc, filter_opts, limit, 0) catch |err| {
70 log.err("events-api", "query error: {}", .{err});
71 json.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
72 return;
73 };
74
75 // generate next_page URL if there are more results
76 const next_page = generateNextPageUrl(alloc, r, filter_opts, total, limit, 0);
77
78 // send response
79 sendEventPage(r, alloc, events, total, next_page);
80}
81
82fn filterNext(r: zap.Request) !void {
83 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
84 defer arena.deinit();
85 const alloc = arena.allocator();
86
87 // extract page-token from query string
88 const query = r.query orelse "";
89 const token_start = mem.indexOf(u8, query, "page-token=") orelse {
90 json.sendStatus(r, "{\"detail\":\"missing page token\"}", .forbidden);
91 return;
92 };
93
94 const token_value_start = token_start + "page-token=".len;
95 var token_end = query.len;
96 if (mem.indexOf(u8, query[token_value_start..], "&")) |amp| {
97 token_end = token_value_start + amp;
98 }
99 const url_encoded_token = query[token_value_start..token_end];
100
101 // URL decode (handle %2B -> +, %2F -> /, %3D -> =)
102 const decoded_token = encoding.urlDecode(alloc, url_encoded_token) catch {
103 json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden);
104 return;
105 };
106
107 // first base64 decode (URL layer)
108 const inner_token = encoding.base64Decode(alloc, decoded_token) catch {
109 json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden);
110 return;
111 };
112
113 // second base64 decode (token layer)
114 const token_json = encoding.base64Decode(alloc, inner_token) catch {
115 json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden);
116 return;
117 };
118
119 // parse token JSON
120 const token_parsed = std.json.parseFromSlice(std.json.Value, alloc, token_json, .{}) catch {
121 json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden);
122 return;
123 };
124
125 if (token_parsed.value != .object) {
126 json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden);
127 return;
128 }
129
130 // extract token fields using path helpers
131 const total: usize = if (json.getInt(token_parsed.value, "count")) |c|
132 if (c >= 0) @intCast(c) else 0
133 else
134 0;
135
136 const page_size: usize = if (json.getInt(token_parsed.value, "page_size")) |ps|
137 if (ps > 0) @intCast(ps) else PAGE_SIZE
138 else
139 PAGE_SIZE;
140
141 const offset: usize = if (json.getInt(token_parsed.value, "offset")) |o|
142 if (o >= 0) @intCast(o) else 0
143 else
144 0;
145
146 // parse filter from token
147 var filter_opts = db_events.FilterOptions{};
148 filter_opts.occurred_since = json.getString(token_parsed.value, "filter.occurred_since");
149 filter_opts.occurred_until = json.getString(token_parsed.value, "filter.occurred_until");
150 filter_opts.order_asc = json.getBool(token_parsed.value, "filter.order_asc") orelse false;
151
152 if (json.getPath(token_parsed.value, "filter.event_prefixes")) |v| {
153 filter_opts.event_prefixes = db_events.parseJsonStringArray(alloc, v);
154 }
155 if (json.getPath(token_parsed.value, "filter.event_names")) |v| {
156 filter_opts.event_names = db_events.parseJsonStringArray(alloc, v);
157 }
158 if (json.getPath(token_parsed.value, "filter.resource_ids")) |v| {
159 filter_opts.resource_ids = db_events.parseJsonStringArray(alloc, v);
160 }
161 if (json.getPath(token_parsed.value, "filter.resource_id_prefixes")) |v| {
162 filter_opts.resource_id_prefixes = db_events.parseJsonStringArray(alloc, v);
163 }
164
165 // query this page
166 const events = db_events.queryWithFilter(alloc, filter_opts, page_size, offset) catch |err| {
167 log.err("events-api", "query next page error: {}", .{err});
168 json.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
169 return;
170 };
171
172 // generate next_page URL if there are more results
173 const next_page = generateNextPageUrl(alloc, r, filter_opts, total, page_size, offset);
174
175 // send response
176 sendEventPage(r, alloc, events, total, next_page);
177}
178
179fn parseFilterOptions(alloc: std.mem.Allocator, value: std.json.Value) db_events.FilterOptions {
180 var opts = db_events.FilterOptions{};
181
182 // occurred filter
183 opts.occurred_since = json.getString(value, "filter.occurred.since");
184 opts.occurred_until = json.getString(value, "filter.occurred.until");
185
186 // event filter
187 if (json.getPath(value, "filter.event.prefix")) |v| {
188 opts.event_prefixes = db_events.parseJsonStringArray(alloc, v);
189 }
190 if (json.getPath(value, "filter.event.name")) |v| {
191 opts.event_names = db_events.parseJsonStringArray(alloc, v);
192 }
193
194 // resource filter
195 if (json.getPath(value, "filter.resource.id")) |v| {
196 opts.resource_ids = db_events.parseJsonStringArray(alloc, v);
197 }
198 if (json.getPath(value, "filter.resource.id_prefix")) |v| {
199 opts.resource_id_prefixes = db_events.parseJsonStringArray(alloc, v);
200 }
201
202 // order
203 if (json.getString(value, "filter.order")) |order| {
204 opts.order_asc = mem.eql(u8, order, "ASC");
205 }
206
207 return opts;
208}
209
210fn generateNextPageUrl(
211 alloc: std.mem.Allocator,
212 r: zap.Request,
213 filter_opts: db_events.FilterOptions,
214 total: usize,
215 page_size: usize,
216 current_offset: usize,
217) ?[]const u8 {
218 const next_offset = current_offset + page_size;
219 if (next_offset >= total) return null;
220
221 // build token JSON: {"filter": {...}, "count": N, "page_size": N, "offset": N}
222 const filter_json = filter_opts.toJson(alloc) catch return null;
223
224 const token_json = std.fmt.allocPrint(alloc, "{{\"filter\":{s},\"count\":{d},\"page_size\":{d},\"offset\":{d}}}", .{
225 filter_json,
226 total,
227 page_size,
228 next_offset,
229 }) catch return null;
230
231 // base64 encode the token
232 const inner_token = encoding.base64Encode(alloc, token_json) catch return null;
233
234 // base64 encode again for URL
235 const url_token = encoding.base64Encode(alloc, inner_token) catch return null;
236
237 // build URL
238 const host = r.getHeader("host") orelse "localhost:4200";
239 return std.fmt.allocPrint(alloc, "http://{s}/api/events/filter/next?page-token={s}", .{ host, url_token }) catch null;
240}
241
242fn sendEventPage(r: zap.Request, alloc: std.mem.Allocator, events: []const []const u8, total: usize, next_page: ?[]const u8) void {
243 var output: std.Io.Writer.Allocating = .init(alloc);
244 var jw: std.json.Stringify = .{ .writer = &output.writer };
245
246 jw.beginObject() catch {};
247
248 // "events": [...]
249 jw.objectField("events") catch {};
250 jw.beginArray() catch {};
251 for (events) |event_json| {
252 jw.print("{s}", .{event_json}) catch {};
253 }
254 jw.endArray() catch {};
255
256 // "total": N
257 jw.objectField("total") catch {};
258 jw.write(total) catch {};
259
260 // "next_page": "..." or null
261 jw.objectField("next_page") catch {};
262 if (next_page) |url| {
263 jw.write(url) catch {};
264 } else {
265 jw.write(null) catch {};
266 }
267
268 jw.endObject() catch {};
269
270 json.send(r, output.toOwnedSlice() catch "{\"events\":[],\"total\":0,\"next_page\":null}");
271}
272
273fn count(r: zap.Request) !void {
274 const total = db_events.count();
275
276 var buf: [64]u8 = undefined;
277 const resp = std.fmt.bufPrint(&buf, "{{\"count\":{d}}}", .{total}) catch "{\"count\":0}";
278
279 json.send(r, resp);
280}