prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const db = @import("../db/sqlite.zig");
4const json_util = @import("../utilities/json.zig");
5const uuid_util = @import("../utilities/uuid.zig");
6const logging = @import("../logging.zig");
7
8fn sendJson(r: zap.Request, body: []const u8) void {
9 r.setHeader("content-type", "application/json") catch {};
10 r.setHeader("access-control-allow-origin", "*") catch {};
11 r.sendBody(body) catch {};
12}
13
14pub fn handle(r: zap.Request) !void {
15 const target = r.path orelse "/";
16 const method = r.method orelse "GET";
17
18 // POST /logs/ - create logs (batch)
19 if (std.mem.eql(u8, method, "POST")) {
20 if (std.mem.endsWith(u8, target, "/filter")) {
21 try filterLogs(r);
22 } else {
23 try createLogs(r);
24 }
25 } else {
26 sendJson(r, "[]");
27 }
28}
29
30fn createLogs(r: zap.Request) !void {
31 const body = r.body orelse {
32 json_util.sendStatus(r, "{\"detail\":\"missing body\"}", .bad_request);
33 return;
34 };
35
36 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
37 defer arena.deinit();
38 const alloc = arena.allocator();
39
40 // parse JSON array of logs
41 const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch {
42 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
43 return;
44 };
45
46 if (parsed.value != .array) {
47 json_util.sendStatus(r, "{\"detail\":\"expected array of logs\"}", .bad_request);
48 return;
49 }
50
51 var inserted: usize = 0;
52 for (parsed.value.array.items) |log_val| {
53 if (log_val != .object) continue;
54 const log_obj = log_val.object;
55
56 // extract required fields
57 const name = if (log_obj.get("name")) |v| (if (v == .string) v.string else null) else null;
58 const message = if (log_obj.get("message")) |v| (if (v == .string) v.string else null) else null;
59 const timestamp = if (log_obj.get("timestamp")) |v| (if (v == .string) v.string else null) else null;
60
61 if (name == null or message == null or timestamp == null) continue;
62
63 // extract level (default to 20 = INFO)
64 const level: i64 = if (log_obj.get("level")) |v| switch (v) {
65 .integer => v.integer,
66 .number_string => std.fmt.parseInt(i64, v.number_string, 10) catch 20,
67 else => 20,
68 } else 20;
69
70 // extract optional flow_run_id and task_run_id
71 const flow_run_id = if (log_obj.get("flow_run_id")) |v| (if (v == .string) v.string else null) else null;
72 const task_run_id = if (log_obj.get("task_run_id")) |v| (if (v == .string) v.string else null) else null;
73
74 // generate id
75 var id_buf: [36]u8 = undefined;
76 const id = uuid_util.generate(&id_buf);
77
78 db.logs.insert(id, name.?, level, message.?, timestamp.?, flow_run_id, task_run_id) catch |err| {
79 logging.err("logs", "insert error: {}", .{err});
80 continue;
81 };
82 inserted += 1;
83 }
84
85 r.setStatus(.created);
86 sendJson(r, "[]");
87}
88
89fn filterLogs(r: zap.Request) !void {
90 const body = r.body orelse {
91 json_util.sendStatus(r, "{\"detail\":\"missing body\"}", .bad_request);
92 return;
93 };
94
95 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
96 defer arena.deinit();
97 const alloc = arena.allocator();
98
99 const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch {
100 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
101 return;
102 };
103
104 if (parsed.value != .object) {
105 json_util.sendStatus(r, "{\"detail\":\"expected object\"}", .bad_request);
106 return;
107 }
108
109 const obj = parsed.value.object;
110
111 // extract pagination
112 const limit: usize = if (obj.get("limit")) |v| switch (v) {
113 .integer => @intCast(@max(1, @min(1000, v.integer))),
114 else => 200,
115 } else 200;
116
117 const offset: usize = if (obj.get("offset")) |v| switch (v) {
118 .integer => @intCast(@max(0, v.integer)),
119 else => 0,
120 } else 0;
121
122 // extract sort (default TIMESTAMP_ASC)
123 const sort_asc = if (obj.get("sort")) |v| blk: {
124 if (v == .string) {
125 break :blk std.mem.eql(u8, v.string, "TIMESTAMP_ASC");
126 }
127 break :blk true;
128 } else true;
129
130 // extract filter options from "logs" object
131 var filter_opts = db.logs.FilterOptions{
132 .limit = limit,
133 .offset = offset,
134 .sort_asc = sort_asc,
135 };
136
137 if (obj.get("logs")) |logs_obj| {
138 if (logs_obj == .object) {
139 const lf = logs_obj.object;
140
141 // flow_run_id filter
142 if (lf.get("flow_run_id")) |frf| {
143 if (frf == .object) {
144 if (frf.object.get("any_")) |any_arr| {
145 if (any_arr == .array and any_arr.array.items.len > 0) {
146 if (any_arr.array.items[0] == .string) {
147 filter_opts.flow_run_id = any_arr.array.items[0].string;
148 }
149 }
150 }
151 }
152 }
153
154 // task_run_id filter
155 if (lf.get("task_run_id")) |trf| {
156 if (trf == .object) {
157 if (trf.object.get("any_")) |any_arr| {
158 if (any_arr == .array and any_arr.array.items.len > 0) {
159 if (any_arr.array.items[0] == .string) {
160 filter_opts.task_run_id = any_arr.array.items[0].string;
161 }
162 }
163 }
164 }
165 }
166
167 // level filter
168 if (lf.get("level")) |level_obj| {
169 if (level_obj == .object) {
170 if (level_obj.object.get("ge_")) |ge| {
171 if (ge == .integer) filter_opts.level_ge = ge.integer;
172 }
173 if (level_obj.object.get("le_")) |le| {
174 if (le == .integer) filter_opts.level_le = le.integer;
175 }
176 }
177 }
178
179 // timestamp filter
180 if (lf.get("timestamp")) |ts_obj| {
181 if (ts_obj == .object) {
182 if (ts_obj.object.get("after_")) |after| {
183 if (after == .string) filter_opts.timestamp_after = after.string;
184 }
185 if (ts_obj.object.get("before_")) |before| {
186 if (before == .string) filter_opts.timestamp_before = before.string;
187 }
188 }
189 }
190 }
191 }
192
193 // query logs
194 const rows = db.logs.filter(alloc, filter_opts) catch |err| {
195 logging.err("logs", "filter error: {}", .{err});
196 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
197 return;
198 };
199
200 // build JSON response using std.json.Stringify
201 var output: std.io.Writer.Allocating = .init(alloc);
202 errdefer output.deinit();
203 var jw: std.json.Stringify = .{ .writer = &output.writer };
204
205 try jw.beginArray();
206 for (rows) |log_row| {
207 try jw.beginObject();
208 try jw.objectField("id");
209 try jw.write(log_row.id);
210 try jw.objectField("created");
211 try jw.write(log_row.created);
212 try jw.objectField("updated");
213 try jw.write(log_row.updated);
214 try jw.objectField("name");
215 try jw.write(log_row.name);
216 try jw.objectField("level");
217 try jw.write(log_row.level);
218 try jw.objectField("message");
219 try jw.write(log_row.message);
220 try jw.objectField("timestamp");
221 try jw.write(log_row.timestamp);
222 try jw.objectField("flow_run_id");
223 try jw.write(log_row.flow_run_id);
224 try jw.objectField("task_run_id");
225 try jw.write(log_row.task_run_id);
226 try jw.endObject();
227 }
228 try jw.endArray();
229
230 sendJson(r, try output.toOwnedSlice());
231}