const std = @import("std"); const zap = @import("zap"); const mem = std.mem; const json = std.json; const db = @import("../db/sqlite.zig"); const routing = @import("routing.zig"); const uuid_util = @import("../utilities/uuid.zig"); const time_util = @import("../utilities/time.zig"); const json_util = @import("../utilities/json.zig"); // POST /task_runs/ - create task run // POST /task_runs/filter - list task runs // GET /task_runs/{id} - read task run // POST /task_runs/{id}/set_state - set state pub fn handle(r: zap.Request) !void { const target = r.path orelse "/"; const method = r.method orelse "GET"; // POST /task_runs/filter - list if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) { try filter(r); return; } // POST /task_runs/ - create if (mem.eql(u8, method, "POST") and (mem.eql(u8, target, "/task_runs/") or mem.eql(u8, target, "/api/task_runs/"))) { try create(r); return; } // check for /{id}/set_state if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/set_state")) { const id = routing.extractId(target, "/task_runs/", "/set_state") orelse routing.extractId(target, "/api/task_runs/", "/set_state"); if (id) |task_run_id| { try setState(r, task_run_id); return; } } // GET /task_runs/{id} - read single if (mem.eql(u8, method, "GET")) { const id = routing.extractIdAfter(target, "/task_runs/") orelse routing.extractIdAfter(target, "/api/task_runs/"); if (id) |task_run_id| { try read(r, task_run_id); return; } } json_util.sendStatus(r, "{\"detail\":\"not found\"}", .not_found); } fn create(r: zap.Request) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const body = r.body orelse { json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); return; }; const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); return; }; const obj = parsed.value.object; // required fields const task_key = if (obj.get("task_key")) |v| switch (v) { .string => |s| s, else => { json_util.sendStatus(r, "{\"detail\":\"task_key must be string\"}", .bad_request); return; }, } else { json_util.sendStatus(r, "{\"detail\":\"task_key required\"}", .bad_request); return; }; const dynamic_key = if (obj.get("dynamic_key")) |v| switch (v) { .string => |s| s, else => { json_util.sendStatus(r, "{\"detail\":\"dynamic_key must be string\"}", .bad_request); return; }, } else { json_util.sendStatus(r, "{\"detail\":\"dynamic_key required\"}", .bad_request); return; }; // optional fields const flow_run_id: ?[]const u8 = if (obj.get("flow_run_id")) |v| switch (v) { .string => |s| s, .null => null, else => null, } else null; const name = if (obj.get("name")) |v| switch (v) { .string => |s| s, .null => routing.generateRunName(alloc), else => routing.generateRunName(alloc), } else routing.generateRunName(alloc); const state = obj.get("state"); // check for existing task run (idempotency) if (db.getTaskRunByKey(alloc, flow_run_id, task_key, dynamic_key) catch null) |existing| { const resp = writeTaskRun(alloc, existing, null) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.send(r, resp); return; } // extract state info var state_type: []const u8 = "PENDING"; var state_name: []const u8 = "Pending"; if (state) |s| { if (s == .object) { if (s.object.get("type")) |t| { if (t == .string) state_type = t.string; } if (s.object.get("name")) |n| { if (n == .string) state_name = n.string; } } } var new_id_buf: [36]u8 = undefined; const new_id = uuid_util.generate(&new_id_buf); var ts_buf: [32]u8 = undefined; const now = time_util.timestamp(&ts_buf); db.insertTaskRun(new_id, flow_run_id, name, task_key, dynamic_key, state_type, state_name, now) catch { json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); return; }; var state_id_buf: [36]u8 = undefined; const state_id = uuid_util.generate(&state_id_buf); const run = db.TaskRunRow{ .id = new_id, .created = now, .updated = now, .name = name, .flow_run_id = flow_run_id orelse "", .task_key = task_key, .dynamic_key = dynamic_key, .state_type = state_type, .state_name = state_name, .state_timestamp = now, .tags = "[]", .run_count = 0, }; const resp = writeTaskRun(alloc, run, state_id) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.sendStatus(r, resp, .created); } fn read(r: zap.Request, id: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const run = db.getTaskRun(alloc, id) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"task run not found\"}", .not_found); return; }; const resp = writeTaskRun(alloc, run, null) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.send(r, resp); } fn setState(r: zap.Request, id: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const body = r.body orelse { json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); return; }; const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); return; }; const state = parsed.value.object.get("state") orelse { json_util.sendStatus(r, "{\"detail\":\"state required\"}", .bad_request); return; }; const state_type = if (state.object.get("type")) |v| v.string else "PENDING"; const state_name = if (state.object.get("name")) |v| v.string else "Pending"; var ts_buf: [32]u8 = undefined; const now = time_util.timestamp(&ts_buf); var state_id_buf: [36]u8 = undefined; const state_id = uuid_util.generate(&state_id_buf); db.setTaskRunState(id, state_id, state_type, state_name, now) catch { json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); return; }; const resp = writeStateResponse(alloc, state_type, state_name, now, state_id) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.send(r, resp); } fn filter(r: zap.Request) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const runs = db.listTaskRuns(alloc, 50) catch { json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); return; }; var output: std.Io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &output.writer }; jw.beginArray() catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; for (runs) |run| { writeTaskRunObject(&jw, run, null) catch continue; } jw.endArray() catch {}; json_util.send(r, output.toOwnedSlice() catch "[]"); } fn writeTaskRun(alloc: std.mem.Allocator, run: db.TaskRunRow, state_id: ?[]const u8) ![]const u8 { var output: std.Io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &output.writer }; try writeTaskRunObject(&jw, run, state_id); return output.toOwnedSlice(); } fn writeTaskRunObject(jw: *json.Stringify, run: db.TaskRunRow, state_id: ?[]const u8) !void { try jw.beginObject(); try jw.objectField("id"); try jw.write(run.id); try jw.objectField("created"); try jw.write(run.created); try jw.objectField("updated"); try jw.write(run.updated); try jw.objectField("name"); try jw.write(run.name); try jw.objectField("flow_run_id"); if (run.flow_run_id.len > 0) { try jw.write(run.flow_run_id); } else { try jw.write(null); } try jw.objectField("task_key"); try jw.write(run.task_key); try jw.objectField("dynamic_key"); try jw.write(run.dynamic_key); try jw.objectField("state_type"); try jw.write(run.state_type); try jw.objectField("state_name"); try jw.write(run.state_name); try jw.objectField("state"); try jw.beginObject(); try jw.objectField("type"); try jw.write(run.state_type); try jw.objectField("name"); try jw.write(run.state_name); try jw.objectField("timestamp"); try jw.write(run.state_timestamp); if (state_id) |sid| { try jw.objectField("id"); try jw.write(sid); } try jw.endObject(); try jw.objectField("tags"); try jw.beginWriteRaw(); try jw.writer.writeAll(run.tags); jw.endWriteRaw(); try jw.objectField("run_count"); try jw.write(run.run_count); try jw.objectField("expected_start_time"); try jw.write(null); try jw.objectField("start_time"); try jw.write(null); try jw.objectField("end_time"); try jw.write(null); try jw.objectField("total_run_time"); try jw.write(@as(i32, 0)); try jw.endObject(); } fn writeStateResponse(alloc: std.mem.Allocator, state_type: []const u8, state_name: []const u8, timestamp: []const u8, state_id: []const u8) ![]const u8 { var output: std.Io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &output.writer }; try jw.beginObject(); try jw.objectField("status"); try jw.write("ACCEPT"); try jw.objectField("details"); try jw.beginObject(); try jw.endObject(); try jw.objectField("state"); try jw.beginObject(); try jw.objectField("type"); try jw.write(state_type); try jw.objectField("name"); try jw.write(state_name); try jw.objectField("timestamp"); try jw.write(timestamp); try jw.objectField("id"); try jw.write(state_id); try jw.endObject(); try jw.endObject(); return output.toOwnedSlice(); }