const std = @import("std"); const zap = @import("zap"); const db = @import("../db/sqlite.zig"); const json_util = @import("../utilities/json.zig"); const uuid_util = @import("../utilities/uuid.zig"); const logging = @import("../logging.zig"); fn sendJson(r: zap.Request, body: []const u8) void { r.setHeader("content-type", "application/json") catch {}; r.setHeader("access-control-allow-origin", "*") catch {}; r.sendBody(body) catch {}; } pub fn handle(r: zap.Request) !void { const target = r.path orelse "/"; const method = r.method orelse "GET"; // POST /logs/ - create logs (batch) if (std.mem.eql(u8, method, "POST")) { if (std.mem.endsWith(u8, target, "/filter")) { try filterLogs(r); } else { try createLogs(r); } } else { sendJson(r, "[]"); } } fn createLogs(r: zap.Request) !void { const body = r.body orelse { json_util.sendStatus(r, "{\"detail\":\"missing body\"}", .bad_request); return; }; var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); // parse JSON array of logs const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch { json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); return; }; if (parsed.value != .array) { json_util.sendStatus(r, "{\"detail\":\"expected array of logs\"}", .bad_request); return; } var inserted: usize = 0; for (parsed.value.array.items) |log_val| { if (log_val != .object) continue; const log_obj = log_val.object; // extract required fields const name = if (log_obj.get("name")) |v| (if (v == .string) v.string else null) else null; const message = if (log_obj.get("message")) |v| (if (v == .string) v.string else null) else null; const timestamp = if (log_obj.get("timestamp")) |v| (if (v == .string) v.string else null) else null; if (name == null or message == null or timestamp == null) continue; // extract level (default to 20 = INFO) const level: i64 = if (log_obj.get("level")) |v| switch (v) { .integer => v.integer, .number_string => std.fmt.parseInt(i64, v.number_string, 10) catch 20, else => 20, } else 20; // extract optional flow_run_id and task_run_id const flow_run_id = if (log_obj.get("flow_run_id")) |v| (if (v == .string) v.string else null) else null; const task_run_id = if (log_obj.get("task_run_id")) |v| (if (v == .string) v.string else null) else null; // generate id var id_buf: [36]u8 = undefined; const id = uuid_util.generate(&id_buf); db.logs.insert(id, name.?, level, message.?, timestamp.?, flow_run_id, task_run_id) catch |err| { logging.err("logs", "insert error: {}", .{err}); continue; }; inserted += 1; } r.setStatus(.created); sendJson(r, "[]"); } fn filterLogs(r: zap.Request) !void { const body = r.body orelse { json_util.sendStatus(r, "{\"detail\":\"missing body\"}", .bad_request); return; }; var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch { json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); return; }; if (parsed.value != .object) { json_util.sendStatus(r, "{\"detail\":\"expected object\"}", .bad_request); return; } const obj = parsed.value.object; // extract pagination const limit: usize = if (obj.get("limit")) |v| switch (v) { .integer => @intCast(@max(1, @min(1000, v.integer))), else => 200, } else 200; const offset: usize = if (obj.get("offset")) |v| switch (v) { .integer => @intCast(@max(0, v.integer)), else => 0, } else 0; // extract sort (default TIMESTAMP_ASC) const sort_asc = if (obj.get("sort")) |v| blk: { if (v == .string) { break :blk std.mem.eql(u8, v.string, "TIMESTAMP_ASC"); } break :blk true; } else true; // extract filter options from "logs" object var filter_opts = db.logs.FilterOptions{ .limit = limit, .offset = offset, .sort_asc = sort_asc, }; if (obj.get("logs")) |logs_obj| { if (logs_obj == .object) { const lf = logs_obj.object; // flow_run_id filter if (lf.get("flow_run_id")) |frf| { if (frf == .object) { if (frf.object.get("any_")) |any_arr| { if (any_arr == .array and any_arr.array.items.len > 0) { if (any_arr.array.items[0] == .string) { filter_opts.flow_run_id = any_arr.array.items[0].string; } } } } } // task_run_id filter if (lf.get("task_run_id")) |trf| { if (trf == .object) { if (trf.object.get("any_")) |any_arr| { if (any_arr == .array and any_arr.array.items.len > 0) { if (any_arr.array.items[0] == .string) { filter_opts.task_run_id = any_arr.array.items[0].string; } } } } } // level filter if (lf.get("level")) |level_obj| { if (level_obj == .object) { if (level_obj.object.get("ge_")) |ge| { if (ge == .integer) filter_opts.level_ge = ge.integer; } if (level_obj.object.get("le_")) |le| { if (le == .integer) filter_opts.level_le = le.integer; } } } // timestamp filter if (lf.get("timestamp")) |ts_obj| { if (ts_obj == .object) { if (ts_obj.object.get("after_")) |after| { if (after == .string) filter_opts.timestamp_after = after.string; } if (ts_obj.object.get("before_")) |before| { if (before == .string) filter_opts.timestamp_before = before.string; } } } } } // query logs const rows = db.logs.filter(alloc, filter_opts) catch |err| { logging.err("logs", "filter error: {}", .{err}); json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); return; }; // build JSON response using std.json.Stringify var output: std.io.Writer.Allocating = .init(alloc); errdefer output.deinit(); var jw: std.json.Stringify = .{ .writer = &output.writer }; try jw.beginArray(); for (rows) |log_row| { try jw.beginObject(); try jw.objectField("id"); try jw.write(log_row.id); try jw.objectField("created"); try jw.write(log_row.created); try jw.objectField("updated"); try jw.write(log_row.updated); try jw.objectField("name"); try jw.write(log_row.name); try jw.objectField("level"); try jw.write(log_row.level); try jw.objectField("message"); try jw.write(log_row.message); try jw.objectField("timestamp"); try jw.write(log_row.timestamp); try jw.objectField("flow_run_id"); try jw.write(log_row.flow_run_id); try jw.objectField("task_run_id"); try jw.write(log_row.task_run_id); try jw.endObject(); } try jw.endArray(); sendJson(r, try output.toOwnedSlice()); }