prefect server in zig
at main 280 lines 9.7 kB view raw
1// REST API handlers for /events endpoints 2// separate from events.zig which handles websocket connections 3 4const std = @import("std"); 5const mem = std.mem; 6const zap = @import("zap"); 7const log = @import("../logging.zig"); 8const db_events = @import("../db/events.zig"); 9const json = @import("../utilities/json.zig"); 10const encoding = @import("../utilities/encoding.zig"); 11 12const PAGE_SIZE: usize = 50; 13 14// POST /events/filter - query events with filters 15// GET /events/filter/next?page-token=... - get next page 16// GET /events/count - count total events 17pub fn handle(r: zap.Request) !void { 18 const target = r.path orelse "/"; 19 const method = r.method orelse "GET"; 20 21 // GET /events/filter/next?page-token=... 22 if (mem.eql(u8, method, "GET") and mem.indexOf(u8, target, "/filter/next") != null) { 23 try filterNext(r); 24 return; 25 } 26 27 // POST /events/filter 28 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) { 29 try filter(r); 30 return; 31 } 32 33 // GET /events/count 34 if (mem.eql(u8, method, "GET") and mem.endsWith(u8, target, "/count")) { 35 try count(r); 36 return; 37 } 38 39 json.sendStatus(r, "{\"detail\":\"not found\"}", .not_found); 40} 41 42fn filter(r: zap.Request) !void { 43 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 44 defer arena.deinit(); 45 const alloc = arena.allocator(); 46 47 // parse request body 48 const body = r.body orelse "{}"; 49 const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch { 50 json.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 51 return; 52 }; 53 54 // parse limit (default PAGE_SIZE, max PAGE_SIZE for pagination) 55 const limit: usize = blk: { 56 if (json.getInt(parsed.value, "limit")) |l| { 57 if (l > 0 and l <= PAGE_SIZE) break :blk @intCast(l); 58 } 59 break :blk PAGE_SIZE; 60 }; 61 62 // parse filter object 63 const filter_opts = parseFilterOptions(alloc, parsed.value); 64 65 // get total count first 66 const total = db_events.countWithFilter(alloc, filter_opts) catch 0; 67 68 // query first page (offset 0) 69 const events = db_events.queryWithFilter(alloc, filter_opts, limit, 0) catch |err| { 70 log.err("events-api", "query error: {}", .{err}); 71 json.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 72 return; 73 }; 74 75 // generate next_page URL if there are more results 76 const next_page = generateNextPageUrl(alloc, r, filter_opts, total, limit, 0); 77 78 // send response 79 sendEventPage(r, alloc, events, total, next_page); 80} 81 82fn filterNext(r: zap.Request) !void { 83 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 84 defer arena.deinit(); 85 const alloc = arena.allocator(); 86 87 // extract page-token from query string 88 const query = r.query orelse ""; 89 const token_start = mem.indexOf(u8, query, "page-token=") orelse { 90 json.sendStatus(r, "{\"detail\":\"missing page token\"}", .forbidden); 91 return; 92 }; 93 94 const token_value_start = token_start + "page-token=".len; 95 var token_end = query.len; 96 if (mem.indexOf(u8, query[token_value_start..], "&")) |amp| { 97 token_end = token_value_start + amp; 98 } 99 const url_encoded_token = query[token_value_start..token_end]; 100 101 // URL decode (handle %2B -> +, %2F -> /, %3D -> =) 102 const decoded_token = encoding.urlDecode(alloc, url_encoded_token) catch { 103 json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden); 104 return; 105 }; 106 107 // first base64 decode (URL layer) 108 const inner_token = encoding.base64Decode(alloc, decoded_token) catch { 109 json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden); 110 return; 111 }; 112 113 // second base64 decode (token layer) 114 const token_json = encoding.base64Decode(alloc, inner_token) catch { 115 json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden); 116 return; 117 }; 118 119 // parse token JSON 120 const token_parsed = std.json.parseFromSlice(std.json.Value, alloc, token_json, .{}) catch { 121 json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden); 122 return; 123 }; 124 125 if (token_parsed.value != .object) { 126 json.sendStatus(r, "{\"detail\":\"invalid page token\"}", .forbidden); 127 return; 128 } 129 130 // extract token fields using path helpers 131 const total: usize = if (json.getInt(token_parsed.value, "count")) |c| 132 if (c >= 0) @intCast(c) else 0 133 else 134 0; 135 136 const page_size: usize = if (json.getInt(token_parsed.value, "page_size")) |ps| 137 if (ps > 0) @intCast(ps) else PAGE_SIZE 138 else 139 PAGE_SIZE; 140 141 const offset: usize = if (json.getInt(token_parsed.value, "offset")) |o| 142 if (o >= 0) @intCast(o) else 0 143 else 144 0; 145 146 // parse filter from token 147 var filter_opts = db_events.FilterOptions{}; 148 filter_opts.occurred_since = json.getString(token_parsed.value, "filter.occurred_since"); 149 filter_opts.occurred_until = json.getString(token_parsed.value, "filter.occurred_until"); 150 filter_opts.order_asc = json.getBool(token_parsed.value, "filter.order_asc") orelse false; 151 152 if (json.getPath(token_parsed.value, "filter.event_prefixes")) |v| { 153 filter_opts.event_prefixes = db_events.parseJsonStringArray(alloc, v); 154 } 155 if (json.getPath(token_parsed.value, "filter.event_names")) |v| { 156 filter_opts.event_names = db_events.parseJsonStringArray(alloc, v); 157 } 158 if (json.getPath(token_parsed.value, "filter.resource_ids")) |v| { 159 filter_opts.resource_ids = db_events.parseJsonStringArray(alloc, v); 160 } 161 if (json.getPath(token_parsed.value, "filter.resource_id_prefixes")) |v| { 162 filter_opts.resource_id_prefixes = db_events.parseJsonStringArray(alloc, v); 163 } 164 165 // query this page 166 const events = db_events.queryWithFilter(alloc, filter_opts, page_size, offset) catch |err| { 167 log.err("events-api", "query next page error: {}", .{err}); 168 json.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 169 return; 170 }; 171 172 // generate next_page URL if there are more results 173 const next_page = generateNextPageUrl(alloc, r, filter_opts, total, page_size, offset); 174 175 // send response 176 sendEventPage(r, alloc, events, total, next_page); 177} 178 179fn parseFilterOptions(alloc: std.mem.Allocator, value: std.json.Value) db_events.FilterOptions { 180 var opts = db_events.FilterOptions{}; 181 182 // occurred filter 183 opts.occurred_since = json.getString(value, "filter.occurred.since"); 184 opts.occurred_until = json.getString(value, "filter.occurred.until"); 185 186 // event filter 187 if (json.getPath(value, "filter.event.prefix")) |v| { 188 opts.event_prefixes = db_events.parseJsonStringArray(alloc, v); 189 } 190 if (json.getPath(value, "filter.event.name")) |v| { 191 opts.event_names = db_events.parseJsonStringArray(alloc, v); 192 } 193 194 // resource filter 195 if (json.getPath(value, "filter.resource.id")) |v| { 196 opts.resource_ids = db_events.parseJsonStringArray(alloc, v); 197 } 198 if (json.getPath(value, "filter.resource.id_prefix")) |v| { 199 opts.resource_id_prefixes = db_events.parseJsonStringArray(alloc, v); 200 } 201 202 // order 203 if (json.getString(value, "filter.order")) |order| { 204 opts.order_asc = mem.eql(u8, order, "ASC"); 205 } 206 207 return opts; 208} 209 210fn generateNextPageUrl( 211 alloc: std.mem.Allocator, 212 r: zap.Request, 213 filter_opts: db_events.FilterOptions, 214 total: usize, 215 page_size: usize, 216 current_offset: usize, 217) ?[]const u8 { 218 const next_offset = current_offset + page_size; 219 if (next_offset >= total) return null; 220 221 // build token JSON: {"filter": {...}, "count": N, "page_size": N, "offset": N} 222 const filter_json = filter_opts.toJson(alloc) catch return null; 223 224 const token_json = std.fmt.allocPrint(alloc, "{{\"filter\":{s},\"count\":{d},\"page_size\":{d},\"offset\":{d}}}", .{ 225 filter_json, 226 total, 227 page_size, 228 next_offset, 229 }) catch return null; 230 231 // base64 encode the token 232 const inner_token = encoding.base64Encode(alloc, token_json) catch return null; 233 234 // base64 encode again for URL 235 const url_token = encoding.base64Encode(alloc, inner_token) catch return null; 236 237 // build URL 238 const host = r.getHeader("host") orelse "localhost:4200"; 239 return std.fmt.allocPrint(alloc, "http://{s}/api/events/filter/next?page-token={s}", .{ host, url_token }) catch null; 240} 241 242fn sendEventPage(r: zap.Request, alloc: std.mem.Allocator, events: []const []const u8, total: usize, next_page: ?[]const u8) void { 243 var output: std.Io.Writer.Allocating = .init(alloc); 244 var jw: std.json.Stringify = .{ .writer = &output.writer }; 245 246 jw.beginObject() catch {}; 247 248 // "events": [...] 249 jw.objectField("events") catch {}; 250 jw.beginArray() catch {}; 251 for (events) |event_json| { 252 jw.print("{s}", .{event_json}) catch {}; 253 } 254 jw.endArray() catch {}; 255 256 // "total": N 257 jw.objectField("total") catch {}; 258 jw.write(total) catch {}; 259 260 // "next_page": "..." or null 261 jw.objectField("next_page") catch {}; 262 if (next_page) |url| { 263 jw.write(url) catch {}; 264 } else { 265 jw.write(null) catch {}; 266 } 267 268 jw.endObject() catch {}; 269 270 json.send(r, output.toOwnedSlice() catch "{\"events\":[],\"total\":0,\"next_page\":null}"); 271} 272 273fn count(r: zap.Request) !void { 274 const total = db_events.count(); 275 276 var buf: [64]u8 = undefined; 277 const resp = std.fmt.bufPrint(&buf, "{{\"count\":{d}}}", .{total}) catch "{\"count\":0}"; 278 279 json.send(r, resp); 280}