prefect server in zig
at main 231 lines 8.0 kB view raw
1const std = @import("std"); 2const zap = @import("zap"); 3const db = @import("../db/sqlite.zig"); 4const json_util = @import("../utilities/json.zig"); 5const uuid_util = @import("../utilities/uuid.zig"); 6const logging = @import("../logging.zig"); 7 8fn sendJson(r: zap.Request, body: []const u8) void { 9 r.setHeader("content-type", "application/json") catch {}; 10 r.setHeader("access-control-allow-origin", "*") catch {}; 11 r.sendBody(body) catch {}; 12} 13 14pub fn handle(r: zap.Request) !void { 15 const target = r.path orelse "/"; 16 const method = r.method orelse "GET"; 17 18 // POST /logs/ - create logs (batch) 19 if (std.mem.eql(u8, method, "POST")) { 20 if (std.mem.endsWith(u8, target, "/filter")) { 21 try filterLogs(r); 22 } else { 23 try createLogs(r); 24 } 25 } else { 26 sendJson(r, "[]"); 27 } 28} 29 30fn createLogs(r: zap.Request) !void { 31 const body = r.body orelse { 32 json_util.sendStatus(r, "{\"detail\":\"missing body\"}", .bad_request); 33 return; 34 }; 35 36 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 37 defer arena.deinit(); 38 const alloc = arena.allocator(); 39 40 // parse JSON array of logs 41 const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch { 42 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 43 return; 44 }; 45 46 if (parsed.value != .array) { 47 json_util.sendStatus(r, "{\"detail\":\"expected array of logs\"}", .bad_request); 48 return; 49 } 50 51 var inserted: usize = 0; 52 for (parsed.value.array.items) |log_val| { 53 if (log_val != .object) continue; 54 const log_obj = log_val.object; 55 56 // extract required fields 57 const name = if (log_obj.get("name")) |v| (if (v == .string) v.string else null) else null; 58 const message = if (log_obj.get("message")) |v| (if (v == .string) v.string else null) else null; 59 const timestamp = if (log_obj.get("timestamp")) |v| (if (v == .string) v.string else null) else null; 60 61 if (name == null or message == null or timestamp == null) continue; 62 63 // extract level (default to 20 = INFO) 64 const level: i64 = if (log_obj.get("level")) |v| switch (v) { 65 .integer => v.integer, 66 .number_string => std.fmt.parseInt(i64, v.number_string, 10) catch 20, 67 else => 20, 68 } else 20; 69 70 // extract optional flow_run_id and task_run_id 71 const flow_run_id = if (log_obj.get("flow_run_id")) |v| (if (v == .string) v.string else null) else null; 72 const task_run_id = if (log_obj.get("task_run_id")) |v| (if (v == .string) v.string else null) else null; 73 74 // generate id 75 var id_buf: [36]u8 = undefined; 76 const id = uuid_util.generate(&id_buf); 77 78 db.logs.insert(id, name.?, level, message.?, timestamp.?, flow_run_id, task_run_id) catch |err| { 79 logging.err("logs", "insert error: {}", .{err}); 80 continue; 81 }; 82 inserted += 1; 83 } 84 85 r.setStatus(.created); 86 sendJson(r, "[]"); 87} 88 89fn filterLogs(r: zap.Request) !void { 90 const body = r.body orelse { 91 json_util.sendStatus(r, "{\"detail\":\"missing body\"}", .bad_request); 92 return; 93 }; 94 95 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 96 defer arena.deinit(); 97 const alloc = arena.allocator(); 98 99 const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch { 100 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 101 return; 102 }; 103 104 if (parsed.value != .object) { 105 json_util.sendStatus(r, "{\"detail\":\"expected object\"}", .bad_request); 106 return; 107 } 108 109 const obj = parsed.value.object; 110 111 // extract pagination 112 const limit: usize = if (obj.get("limit")) |v| switch (v) { 113 .integer => @intCast(@max(1, @min(1000, v.integer))), 114 else => 200, 115 } else 200; 116 117 const offset: usize = if (obj.get("offset")) |v| switch (v) { 118 .integer => @intCast(@max(0, v.integer)), 119 else => 0, 120 } else 0; 121 122 // extract sort (default TIMESTAMP_ASC) 123 const sort_asc = if (obj.get("sort")) |v| blk: { 124 if (v == .string) { 125 break :blk std.mem.eql(u8, v.string, "TIMESTAMP_ASC"); 126 } 127 break :blk true; 128 } else true; 129 130 // extract filter options from "logs" object 131 var filter_opts = db.logs.FilterOptions{ 132 .limit = limit, 133 .offset = offset, 134 .sort_asc = sort_asc, 135 }; 136 137 if (obj.get("logs")) |logs_obj| { 138 if (logs_obj == .object) { 139 const lf = logs_obj.object; 140 141 // flow_run_id filter 142 if (lf.get("flow_run_id")) |frf| { 143 if (frf == .object) { 144 if (frf.object.get("any_")) |any_arr| { 145 if (any_arr == .array and any_arr.array.items.len > 0) { 146 if (any_arr.array.items[0] == .string) { 147 filter_opts.flow_run_id = any_arr.array.items[0].string; 148 } 149 } 150 } 151 } 152 } 153 154 // task_run_id filter 155 if (lf.get("task_run_id")) |trf| { 156 if (trf == .object) { 157 if (trf.object.get("any_")) |any_arr| { 158 if (any_arr == .array and any_arr.array.items.len > 0) { 159 if (any_arr.array.items[0] == .string) { 160 filter_opts.task_run_id = any_arr.array.items[0].string; 161 } 162 } 163 } 164 } 165 } 166 167 // level filter 168 if (lf.get("level")) |level_obj| { 169 if (level_obj == .object) { 170 if (level_obj.object.get("ge_")) |ge| { 171 if (ge == .integer) filter_opts.level_ge = ge.integer; 172 } 173 if (level_obj.object.get("le_")) |le| { 174 if (le == .integer) filter_opts.level_le = le.integer; 175 } 176 } 177 } 178 179 // timestamp filter 180 if (lf.get("timestamp")) |ts_obj| { 181 if (ts_obj == .object) { 182 if (ts_obj.object.get("after_")) |after| { 183 if (after == .string) filter_opts.timestamp_after = after.string; 184 } 185 if (ts_obj.object.get("before_")) |before| { 186 if (before == .string) filter_opts.timestamp_before = before.string; 187 } 188 } 189 } 190 } 191 } 192 193 // query logs 194 const rows = db.logs.filter(alloc, filter_opts) catch |err| { 195 logging.err("logs", "filter error: {}", .{err}); 196 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 197 return; 198 }; 199 200 // build JSON response using std.json.Stringify 201 var output: std.io.Writer.Allocating = .init(alloc); 202 errdefer output.deinit(); 203 var jw: std.json.Stringify = .{ .writer = &output.writer }; 204 205 try jw.beginArray(); 206 for (rows) |log_row| { 207 try jw.beginObject(); 208 try jw.objectField("id"); 209 try jw.write(log_row.id); 210 try jw.objectField("created"); 211 try jw.write(log_row.created); 212 try jw.objectField("updated"); 213 try jw.write(log_row.updated); 214 try jw.objectField("name"); 215 try jw.write(log_row.name); 216 try jw.objectField("level"); 217 try jw.write(log_row.level); 218 try jw.objectField("message"); 219 try jw.write(log_row.message); 220 try jw.objectField("timestamp"); 221 try jw.write(log_row.timestamp); 222 try jw.objectField("flow_run_id"); 223 try jw.write(log_row.flow_run_id); 224 try jw.objectField("task_run_id"); 225 try jw.write(log_row.task_run_id); 226 try jw.endObject(); 227 } 228 try jw.endArray(); 229 230 sendJson(r, try output.toOwnedSlice()); 231}