const std = @import("std"); const zap = @import("zap"); const mem = std.mem; const json = std.json; const Allocator = std.mem.Allocator; const db = @import("../db/sqlite.zig"); const routing = @import("routing.zig"); const uuid_util = @import("../utilities/uuid.zig"); const time_util = @import("../utilities/time.zig"); const json_util = @import("../utilities/json.zig"); const orchestration = @import("../orchestration.zig"); /// Build updated empirical_policy JSON based on rule context flags /// Preserves existing fields from current_policy while applying updates fn buildUpdatedPolicy( alloc: Allocator, current_policy: []const u8, rule_ctx: *const orchestration.RuleContext, ) ?[]const u8 { // check if any policy updates are needed if (!rule_ctx.set_retry_type_in_process and !rule_ctx.clear_retry_type and !rule_ctx.clear_pause_keys and !rule_ctx.set_resuming_false) { return null; } // parse current policy const parsed = json.parseFromSlice(json.Value, alloc, current_policy, .{}) catch { // if parse fails, start with empty object return buildFreshPolicy(alloc, rule_ctx); }; const current = parsed.value; if (current != .object) { return buildFreshPolicy(alloc, rule_ctx); } // build new policy with updates var out: std.Io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &out.writer }; jw.beginObject() catch return null; // copy existing fields, applying updates var it = current.object.iterator(); while (it.next()) |entry| { const key = entry.key_ptr.*; // skip fields we're updating if (mem.eql(u8, key, "retry_type") or mem.eql(u8, key, "pause_keys") or mem.eql(u8, key, "resuming")) { continue; } jw.objectField(key) catch return null; jw.write(entry.value_ptr.*) catch return null; } // apply updates if (rule_ctx.set_retry_type_in_process) { jw.objectField("retry_type") catch return null; jw.write("in_process") catch return null; } else if (rule_ctx.clear_retry_type) { jw.objectField("retry_type") catch return null; jw.write(null) catch return null; } if (rule_ctx.clear_pause_keys) { jw.objectField("pause_keys") catch return null; jw.beginArray() catch return null; jw.endArray() catch return null; } if (rule_ctx.set_resuming_false) { jw.objectField("resuming") catch return null; jw.write(false) catch return null; } jw.endObject() catch return null; return out.toOwnedSlice() catch null; } /// Build a fresh policy when current is invalid/empty fn buildFreshPolicy(alloc: Allocator, rule_ctx: *const orchestration.RuleContext) ?[]const u8 { var out: std.Io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &out.writer }; jw.beginObject() catch return null; if (rule_ctx.retries) |r| { jw.objectField("retries") catch return null; jw.write(r) catch return null; } if (rule_ctx.retry_delay) |d| { jw.objectField("retry_delay") catch return null; jw.write(d) catch return null; } if (rule_ctx.set_retry_type_in_process) { jw.objectField("retry_type") catch return null; jw.write("in_process") catch return null; } else if (rule_ctx.clear_retry_type) { jw.objectField("retry_type") catch return null; jw.write(null) catch return null; } if (rule_ctx.clear_pause_keys) { jw.objectField("pause_keys") catch return null; jw.beginArray() catch return null; jw.endArray() catch return null; } if (rule_ctx.set_resuming_false) { jw.objectField("resuming") catch return null; jw.write(false) catch return null; } jw.endObject() catch return null; return out.toOwnedSlice() catch null; } // POST /flow_runs/ - create flow run // GET /flow_runs/{id} - read flow run // POST /flow_runs/{id}/set_state - set state // POST /flow_runs/filter - list flow runs pub fn handle(r: zap.Request) !void { const target = r.path orelse "/"; const method = r.method orelse "GET"; // POST /flow_runs/ - create if (mem.eql(u8, method, "POST") and (mem.eql(u8, target, "/flow_runs/") or mem.eql(u8, target, "/api/flow_runs/"))) { try create(r); return; } // POST /flow_runs/filter - list if (mem.eql(u8, method, "POST") and (mem.endsWith(u8, target, "/filter"))) { try filter(r); return; } // check for /{id}/set_state if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/set_state")) { const id = routing.extractId(target, "/flow_runs/", "/set_state") orelse routing.extractId(target, "/api/flow_runs/", "/set_state"); if (id) |flow_run_id| { try setState(r, flow_run_id); return; } } // GET /flow_runs/{id} - read single if (mem.eql(u8, method, "GET")) { const id = routing.extractIdAfter(target, "/flow_runs/") orelse routing.extractIdAfter(target, "/api/flow_runs/"); if (id) |flow_run_id| { try read(r, flow_run_id); return; } } // PATCH /flow_runs/{id} - update if (mem.eql(u8, method, "PATCH")) { const id = routing.extractIdAfter(target, "/flow_runs/") orelse routing.extractIdAfter(target, "/api/flow_runs/"); if (id) |flow_run_id| { try patch(r, flow_run_id); return; } } // DELETE /flow_runs/{id} - delete if (mem.eql(u8, method, "DELETE")) { const id = routing.extractIdAfter(target, "/flow_runs/") orelse routing.extractIdAfter(target, "/api/flow_runs/"); if (id) |flow_run_id| { try delete(r, flow_run_id); return; } } json_util.sendStatus(r, "{\"detail\":\"not found\"}", .not_found); } fn create(r: zap.Request) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const body = r.body orelse { json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); return; }; const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); return; }; const obj = parsed.value.object; const raw_flow_id = if (obj.get("flow_id")) |v| switch (v) { .string => |s| s, else => { json_util.sendStatus(r, "{\"detail\":\"flow_id must be string\"}", .bad_request); return; }, } else { json_util.sendStatus(r, "{\"detail\":\"flow_id required\"}", .bad_request); return; }; const flow_id = raw_flow_id; const name = if (obj.get("name")) |v| switch (v) { .string => |s| s, .null => routing.generateRunName(alloc), else => routing.generateRunName(alloc), } else routing.generateRunName(alloc); const state = obj.get("state"); // extract state info var state_type: []const u8 = "PENDING"; var state_name: []const u8 = "Pending"; if (state) |s| { if (s.object.get("type")) |t| state_type = t.string; if (s.object.get("name")) |n| state_name = n.string; } // extract optional scheduling fields const next_scheduled_start_time: ?[]const u8 = if (obj.get("next_scheduled_start_time")) |v| switch (v) { .string => |s| s, else => null, } else null; // extract empirical_policy (retry settings) const empirical_policy: ?[]const u8 = if (obj.get("empirical_policy")) |v| blk: { // stringify the object back to JSON var out: std.Io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &out.writer }; jw.write(v) catch break :blk null; break :blk out.toOwnedSlice() catch null; } else null; var new_id_buf: [36]u8 = undefined; const new_id = uuid_util.generate(&new_id_buf); var ts_buf: [32]u8 = undefined; const now = time_util.timestamp(&ts_buf); db.insertFlowRun(new_id, flow_id, name, state_type, state_name, now, .{ .next_scheduled_start_time = next_scheduled_start_time, .empirical_policy = empirical_policy, }) catch { json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); return; }; var state_id_buf: [36]u8 = undefined; const state_id = uuid_util.generate(&state_id_buf); const run = db.FlowRunRow{ .id = new_id, .created = now, .updated = now, .name = name, .flow_id = flow_id, .state_type = state_type, .state_name = state_name, .state_timestamp = now, .parameters = "{}", .tags = "[]", .run_count = 0, .expected_start_time = null, .next_scheduled_start_time = next_scheduled_start_time, .start_time = null, .end_time = null, .total_run_time = 0.0, .deployment_id = null, .deployment_version = null, .work_queue_name = null, .work_queue_id = null, .auto_scheduled = false, .idempotency_key = null, .empirical_policy = empirical_policy orelse "{}", .state_transition_id = null, }; const resp = writeFlowRun(alloc, run, state_id) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.sendStatus(r, resp, .created); } fn read(r: zap.Request, id: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const run = db.getFlowRun(alloc, id) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"flow run not found\"}", .not_found); return; }; const resp = writeFlowRun(alloc, run, null) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.send(r, resp); } fn patch(r: zap.Request, id: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); // verify run exists const run = db.getFlowRun(alloc, id) catch null orelse { json_util.sendStatus(r, "{\"detail\":\"flow run not found\"}", .not_found); return; }; const body = r.body orelse { json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); return; }; const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); return; }; const obj = parsed.value.object; // extract optional fields for update const infrastructure_pid = if (obj.get("infrastructure_pid")) |v| switch (v) { .string => |s| s, .integer => |i| std.fmt.allocPrint(alloc, "{d}", .{i}) catch null, else => null, } else null; // update flow run with patched fields db.flow_runs.patch(id, infrastructure_pid) catch { json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); return; }; // return updated run const updated_run = db.getFlowRun(alloc, id) catch null orelse run; const resp = writeFlowRun(alloc, updated_run, null) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.send(r, resp); } fn delete(r: zap.Request, id: []const u8) !void { const deleted = db.flow_runs.delete(id) catch false; if (!deleted) { json_util.sendStatus(r, "{\"detail\":\"flow run not found\"}", .not_found); return; } json_util.sendStatus(r, "", .no_content); } fn setState(r: zap.Request, id: []const u8) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const body = r.body orelse { json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); return; }; const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); return; }; const state = parsed.value.object.get("state") orelse { json_util.sendStatus(r, "{\"detail\":\"state required\"}", .bad_request); return; }; const state_type = if (state.object.get("type")) |v| v.string else "PENDING"; const state_name = if (state.object.get("name")) |v| v.string else "Pending"; var ts_buf: [32]u8 = undefined; const now = time_util.timestamp(&ts_buf); var state_id_buf: [36]u8 = undefined; const state_id = uuid_util.generate(&state_id_buf); // parse state_details.transition_id for idempotency const proposed_transition_id: ?[]const u8 = blk: { const state_details = state.object.get("state_details") orelse break :blk null; if (state_details != .object) break :blk null; const tid = state_details.object.get("transition_id") orelse break :blk null; if (tid != .string) break :blk null; break :blk tid.string; }; // get current run state for orchestration const current_run = db.getFlowRun(alloc, id) catch null; const initial_state_type: ?orchestration.StateType = if (current_run) |run| if (run.state_type.len > 0) orchestration.StateType.fromString(run.state_type) else null else null; const proposed_state_type = orchestration.StateType.fromString(state_type); // parse empirical_policy for retry settings var retries: ?i64 = null; var retry_delay: ?i64 = null; if (current_run) |run| { if (json.parseFromSlice(json.Value, alloc, run.empirical_policy, .{})) |policy_parsed| { if (policy_parsed.value.object.get("retries")) |v| { if (v == .integer) retries = v.integer; } if (policy_parsed.value.object.get("retry_delay")) |v| { if (v == .integer) retry_delay = v.integer; } } else |_| {} } // apply orchestration rules (policy) var rule_ctx = orchestration.RuleContext{ .initial_state = initial_state_type, .proposed_state = proposed_state_type, .initial_state_timestamp = if (current_run) |run| if (run.state_timestamp.len > 0) run.state_timestamp else null else null, .proposed_state_timestamp = now, // for CopyScheduledTime: pass scheduled_time from SCHEDULED state .initial_scheduled_time = if (current_run) |run| run.next_scheduled_start_time else null, // for PreventDuplicateTransitions: pass transition_ids .initial_transition_id = if (current_run) |run| run.state_transition_id else null, .proposed_transition_id = proposed_transition_id, .run_id = id, .flow_id = if (current_run) |run| run.flow_id else null, .deployment_id = if (current_run) |run| run.deployment_id else null, // for RetryFailedFlows: pass retry settings .run_count = if (current_run) |run| run.run_count else 0, .retries = retries, .retry_delay = retry_delay, }; orchestration.applyPolicy(&orchestration.CoreFlowPolicy, &rule_ctx); // if rules rejected/waited/aborted, handle appropriately if (!rule_ctx.isAccepted()) { // special case: RetryFailedFlows rejection - commit AwaitingRetry state if (rule_ctx.result.status == .REJECT and rule_ctx.retry_state_type != null) { const retry_state_type = @tagName(rule_ctx.retry_state_type.?); const retry_state_name = rule_ctx.retry_state_name orelse "AwaitingRetry"; const retry_scheduled = rule_ctx.retry_scheduled_time; // apply bookkeeping for the retry state var bookkeeping_ctx = orchestration.TransitionContext{ .current_state_type = initial_state_type, .current_state_timestamp = if (current_run) |run| if (run.state_timestamp.len > 0) run.state_timestamp else null else null, .start_time = if (current_run) |run| run.start_time else null, .end_time = if (current_run) |run| run.end_time else null, .run_count = if (current_run) |run| run.run_count else 0, .total_run_time = if (current_run) |run| run.total_run_time else 0.0, .proposed_state_type = rule_ctx.retry_state_type.?, .proposed_state_timestamp = now, }; orchestration.applyBookkeeping(&bookkeeping_ctx); // build updated empirical_policy if rule set mutation flags const current_policy = if (current_run) |run| run.empirical_policy else "{}"; const updated_policy = buildUpdatedPolicy(alloc, current_policy, &rule_ctx); // commit the retry state with next_scheduled_start_time and updated policy db.setFlowRunStateWithSchedule( id, state_id, retry_state_type, retry_state_name, now, bookkeeping_ctx.new_start_time, bookkeeping_ctx.new_end_time, bookkeeping_ctx.new_run_count, bookkeeping_ctx.new_total_run_time, null, // expected_start_time retry_scheduled, // next_scheduled_start_time updated_policy, // empirical_policy updates proposed_transition_id, ) catch { json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); return; }; // return REJECT status with the retry state (matching Python's behavior) const resp = writeStateResponse(alloc, .REJECT, rule_ctx.result.details, retry_state_type, retry_state_name, now, state_id) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.send(r, resp); return; } // normal rejection/wait/abort - return without committing const resp = writeStateResponse( alloc, rule_ctx.result.status, rule_ctx.result.details, // for REJECT, return current state; for WAIT/ABORT, return proposed if (rule_ctx.result.status == .REJECT and current_run != null) current_run.?.state_type else state_type, if (rule_ctx.result.status == .REJECT and current_run != null) current_run.?.state_name else state_name, if (rule_ctx.result.status == .REJECT and current_run != null) current_run.?.state_timestamp else now, state_id, ) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.send(r, resp); return; } // apply orchestration bookkeeping transforms var bookkeeping_ctx = orchestration.TransitionContext{ .current_state_type = initial_state_type, .current_state_timestamp = if (current_run) |run| if (run.state_timestamp.len > 0) run.state_timestamp else null else null, .start_time = if (current_run) |run| run.start_time else null, .end_time = if (current_run) |run| run.end_time else null, .run_count = if (current_run) |run| run.run_count else 0, .total_run_time = if (current_run) |run| run.total_run_time else 0.0, .proposed_state_type = proposed_state_type, .proposed_state_timestamp = now, }; orchestration.applyBookkeeping(&bookkeeping_ctx); // check if policy needs updating (e.g., clear_retry_type when retries exhausted) const current_policy = if (current_run) |run| run.empirical_policy else "{}"; const updated_policy = buildUpdatedPolicy(alloc, current_policy, &rule_ctx); // atomic state transition with orchestration data if (updated_policy != null) { // use setStateWithSchedule to update policy (pass null for next_scheduled_start_time) db.setFlowRunStateWithSchedule( id, state_id, state_type, state_name, now, bookkeeping_ctx.new_start_time, bookkeeping_ctx.new_end_time, bookkeeping_ctx.new_run_count, bookkeeping_ctx.new_total_run_time, rule_ctx.new_expected_start_time, null, // next_scheduled_start_time updated_policy, proposed_transition_id, ) catch { json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); return; }; } else { db.setFlowRunState(id, state_id, state_type, state_name, now, bookkeeping_ctx.new_start_time, bookkeeping_ctx.new_end_time, bookkeeping_ctx.new_run_count, bookkeeping_ctx.new_total_run_time, rule_ctx.new_expected_start_time, proposed_transition_id) catch { json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); return; }; } const resp = writeStateResponse(alloc, .ACCEPT, .{}, state_type, state_name, now, state_id) catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; json_util.send(r, resp); } fn filter(r: zap.Request) !void { var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); defer arena.deinit(); const alloc = arena.allocator(); const runs = db.listFlowRuns(alloc, 50) catch { json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); return; }; var output: std.Io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &output.writer }; jw.beginArray() catch { json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); return; }; for (runs) |run| { writeFlowRunObject(&jw, run, null) catch continue; } jw.endArray() catch {}; json_util.send(r, output.toOwnedSlice() catch "[]"); } fn writeFlowRun(alloc: std.mem.Allocator, run: db.FlowRunRow, state_id: ?[]const u8) ![]const u8 { var output: std.Io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &output.writer }; try writeFlowRunObject(&jw, run, state_id); return output.toOwnedSlice(); } fn writeFlowRunObject(jw: *json.Stringify, run: db.FlowRunRow, state_id: ?[]const u8) !void { try jw.beginObject(); try jw.objectField("id"); try jw.write(run.id); try jw.objectField("created"); try jw.write(run.created); try jw.objectField("updated"); try jw.write(run.updated); try jw.objectField("name"); try jw.write(run.name); try jw.objectField("flow_id"); try jw.write(run.flow_id); try jw.objectField("state_type"); try jw.write(run.state_type); try jw.objectField("state_name"); try jw.write(run.state_name); try jw.objectField("state"); try jw.beginObject(); try jw.objectField("type"); try jw.write(run.state_type); try jw.objectField("name"); try jw.write(run.state_name); try jw.objectField("timestamp"); try jw.write(run.state_timestamp); if (state_id) |sid| { try jw.objectField("id"); try jw.write(sid); } try jw.endObject(); try jw.objectField("parameters"); try jw.beginWriteRaw(); try jw.writer.writeAll(run.parameters); jw.endWriteRaw(); try jw.objectField("tags"); try jw.beginWriteRaw(); try jw.writer.writeAll(run.tags); jw.endWriteRaw(); try jw.objectField("run_count"); try jw.write(run.run_count); try jw.objectField("expected_start_time"); try jw.write(run.expected_start_time); try jw.objectField("start_time"); try jw.write(run.start_time); try jw.objectField("end_time"); try jw.write(run.end_time); try jw.objectField("total_run_time"); try jw.write(run.total_run_time); try jw.objectField("deployment_id"); try jw.write(run.deployment_id); try jw.objectField("deployment_version"); try jw.write(run.deployment_version); try jw.objectField("work_queue_name"); try jw.write(run.work_queue_name); try jw.objectField("work_queue_id"); try jw.write(run.work_queue_id); try jw.objectField("auto_scheduled"); try jw.write(run.auto_scheduled); try jw.objectField("empirical_policy"); try jw.beginWriteRaw(); try jw.writer.writeAll(run.empirical_policy); jw.endWriteRaw(); try jw.endObject(); } fn writeStateResponse( alloc: std.mem.Allocator, status: orchestration.ResponseStatus, details: orchestration.ResponseDetails, state_type: []const u8, state_name: []const u8, timestamp: []const u8, state_id: []const u8, ) ![]const u8 { var output: std.Io.Writer.Allocating = .init(alloc); var jw: json.Stringify = .{ .writer = &output.writer }; try jw.beginObject(); try jw.objectField("status"); try jw.write(status.toString()); try jw.objectField("details"); try jw.beginObject(); if (details.reason) |reason| { try jw.objectField("reason"); try jw.write(reason); } if (details.retry_after) |retry_after| { try jw.objectField("retry_after_seconds"); try jw.write(retry_after); } try jw.endObject(); try jw.objectField("state"); try jw.beginObject(); try jw.objectField("type"); try jw.write(state_type); try jw.objectField("name"); try jw.write(state_name); try jw.objectField("timestamp"); try jw.write(timestamp); try jw.objectField("id"); try jw.write(state_id); try jw.endObject(); try jw.endObject(); return output.toOwnedSlice(); }