prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const mem = std.mem;
4const json = std.json;
5const Allocator = std.mem.Allocator;
6
7const db = @import("../db/sqlite.zig");
8const routing = @import("routing.zig");
9const uuid_util = @import("../utilities/uuid.zig");
10const time_util = @import("../utilities/time.zig");
11const json_util = @import("../utilities/json.zig");
12const orchestration = @import("../orchestration.zig");
13
14/// Build updated empirical_policy JSON based on rule context flags
15/// Preserves existing fields from current_policy while applying updates
16fn buildUpdatedPolicy(
17 alloc: Allocator,
18 current_policy: []const u8,
19 rule_ctx: *const orchestration.RuleContext,
20) ?[]const u8 {
21 // check if any policy updates are needed
22 if (!rule_ctx.set_retry_type_in_process and
23 !rule_ctx.clear_retry_type and
24 !rule_ctx.clear_pause_keys and
25 !rule_ctx.set_resuming_false)
26 {
27 return null;
28 }
29
30 // parse current policy
31 const parsed = json.parseFromSlice(json.Value, alloc, current_policy, .{}) catch {
32 // if parse fails, start with empty object
33 return buildFreshPolicy(alloc, rule_ctx);
34 };
35 const current = parsed.value;
36
37 if (current != .object) {
38 return buildFreshPolicy(alloc, rule_ctx);
39 }
40
41 // build new policy with updates
42 var out: std.Io.Writer.Allocating = .init(alloc);
43 var jw: json.Stringify = .{ .writer = &out.writer };
44
45 jw.beginObject() catch return null;
46
47 // copy existing fields, applying updates
48 var it = current.object.iterator();
49 while (it.next()) |entry| {
50 const key = entry.key_ptr.*;
51
52 // skip fields we're updating
53 if (mem.eql(u8, key, "retry_type") or
54 mem.eql(u8, key, "pause_keys") or
55 mem.eql(u8, key, "resuming"))
56 {
57 continue;
58 }
59
60 jw.objectField(key) catch return null;
61 jw.write(entry.value_ptr.*) catch return null;
62 }
63
64 // apply updates
65 if (rule_ctx.set_retry_type_in_process) {
66 jw.objectField("retry_type") catch return null;
67 jw.write("in_process") catch return null;
68 } else if (rule_ctx.clear_retry_type) {
69 jw.objectField("retry_type") catch return null;
70 jw.write(null) catch return null;
71 }
72
73 if (rule_ctx.clear_pause_keys) {
74 jw.objectField("pause_keys") catch return null;
75 jw.beginArray() catch return null;
76 jw.endArray() catch return null;
77 }
78
79 if (rule_ctx.set_resuming_false) {
80 jw.objectField("resuming") catch return null;
81 jw.write(false) catch return null;
82 }
83
84 jw.endObject() catch return null;
85 return out.toOwnedSlice() catch null;
86}
87
88/// Build a fresh policy when current is invalid/empty
89fn buildFreshPolicy(alloc: Allocator, rule_ctx: *const orchestration.RuleContext) ?[]const u8 {
90 var out: std.Io.Writer.Allocating = .init(alloc);
91 var jw: json.Stringify = .{ .writer = &out.writer };
92
93 jw.beginObject() catch return null;
94
95 if (rule_ctx.retries) |r| {
96 jw.objectField("retries") catch return null;
97 jw.write(r) catch return null;
98 }
99 if (rule_ctx.retry_delay) |d| {
100 jw.objectField("retry_delay") catch return null;
101 jw.write(d) catch return null;
102 }
103
104 if (rule_ctx.set_retry_type_in_process) {
105 jw.objectField("retry_type") catch return null;
106 jw.write("in_process") catch return null;
107 } else if (rule_ctx.clear_retry_type) {
108 jw.objectField("retry_type") catch return null;
109 jw.write(null) catch return null;
110 }
111
112 if (rule_ctx.clear_pause_keys) {
113 jw.objectField("pause_keys") catch return null;
114 jw.beginArray() catch return null;
115 jw.endArray() catch return null;
116 }
117
118 if (rule_ctx.set_resuming_false) {
119 jw.objectField("resuming") catch return null;
120 jw.write(false) catch return null;
121 }
122
123 jw.endObject() catch return null;
124 return out.toOwnedSlice() catch null;
125}
126
127// POST /flow_runs/ - create flow run
128// GET /flow_runs/{id} - read flow run
129// POST /flow_runs/{id}/set_state - set state
130// POST /flow_runs/filter - list flow runs
131pub fn handle(r: zap.Request) !void {
132 const target = r.path orelse "/";
133 const method = r.method orelse "GET";
134
135 // POST /flow_runs/ - create
136 if (mem.eql(u8, method, "POST") and (mem.eql(u8, target, "/flow_runs/") or mem.eql(u8, target, "/api/flow_runs/"))) {
137 try create(r);
138 return;
139 }
140
141 // POST /flow_runs/filter - list
142 if (mem.eql(u8, method, "POST") and (mem.endsWith(u8, target, "/filter"))) {
143 try filter(r);
144 return;
145 }
146
147 // check for /{id}/set_state
148 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/set_state")) {
149 const id = routing.extractId(target, "/flow_runs/", "/set_state") orelse
150 routing.extractId(target, "/api/flow_runs/", "/set_state");
151 if (id) |flow_run_id| {
152 try setState(r, flow_run_id);
153 return;
154 }
155 }
156
157 // GET /flow_runs/{id} - read single
158 if (mem.eql(u8, method, "GET")) {
159 const id = routing.extractIdAfter(target, "/flow_runs/") orelse
160 routing.extractIdAfter(target, "/api/flow_runs/");
161 if (id) |flow_run_id| {
162 try read(r, flow_run_id);
163 return;
164 }
165 }
166
167 // PATCH /flow_runs/{id} - update
168 if (mem.eql(u8, method, "PATCH")) {
169 const id = routing.extractIdAfter(target, "/flow_runs/") orelse
170 routing.extractIdAfter(target, "/api/flow_runs/");
171 if (id) |flow_run_id| {
172 try patch(r, flow_run_id);
173 return;
174 }
175 }
176
177 // DELETE /flow_runs/{id} - delete
178 if (mem.eql(u8, method, "DELETE")) {
179 const id = routing.extractIdAfter(target, "/flow_runs/") orelse
180 routing.extractIdAfter(target, "/api/flow_runs/");
181 if (id) |flow_run_id| {
182 try delete(r, flow_run_id);
183 return;
184 }
185 }
186
187 json_util.sendStatus(r, "{\"detail\":\"not found\"}", .not_found);
188}
189
190fn create(r: zap.Request) !void {
191 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
192 defer arena.deinit();
193 const alloc = arena.allocator();
194
195 const body = r.body orelse {
196 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request);
197 return;
198 };
199
200 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
201 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
202 return;
203 };
204
205 const obj = parsed.value.object;
206 const raw_flow_id = if (obj.get("flow_id")) |v| switch (v) {
207 .string => |s| s,
208 else => {
209 json_util.sendStatus(r, "{\"detail\":\"flow_id must be string\"}", .bad_request);
210 return;
211 },
212 } else {
213 json_util.sendStatus(r, "{\"detail\":\"flow_id required\"}", .bad_request);
214 return;
215 };
216
217 const flow_id = raw_flow_id;
218
219 const name = if (obj.get("name")) |v| switch (v) {
220 .string => |s| s,
221 .null => routing.generateRunName(alloc),
222 else => routing.generateRunName(alloc),
223 } else routing.generateRunName(alloc);
224 const state = obj.get("state");
225
226 // extract state info
227 var state_type: []const u8 = "PENDING";
228 var state_name: []const u8 = "Pending";
229 if (state) |s| {
230 if (s.object.get("type")) |t| state_type = t.string;
231 if (s.object.get("name")) |n| state_name = n.string;
232 }
233
234 // extract optional scheduling fields
235 const next_scheduled_start_time: ?[]const u8 = if (obj.get("next_scheduled_start_time")) |v| switch (v) {
236 .string => |s| s,
237 else => null,
238 } else null;
239
240 // extract empirical_policy (retry settings)
241 const empirical_policy: ?[]const u8 = if (obj.get("empirical_policy")) |v| blk: {
242 // stringify the object back to JSON
243 var out: std.Io.Writer.Allocating = .init(alloc);
244 var jw: json.Stringify = .{ .writer = &out.writer };
245 jw.write(v) catch break :blk null;
246 break :blk out.toOwnedSlice() catch null;
247 } else null;
248
249 var new_id_buf: [36]u8 = undefined;
250 const new_id = uuid_util.generate(&new_id_buf);
251 var ts_buf: [32]u8 = undefined;
252 const now = time_util.timestamp(&ts_buf);
253
254 db.insertFlowRun(new_id, flow_id, name, state_type, state_name, now, .{
255 .next_scheduled_start_time = next_scheduled_start_time,
256 .empirical_policy = empirical_policy,
257 }) catch {
258 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error);
259 return;
260 };
261
262 var state_id_buf: [36]u8 = undefined;
263 const state_id = uuid_util.generate(&state_id_buf);
264
265 const run = db.FlowRunRow{
266 .id = new_id,
267 .created = now,
268 .updated = now,
269 .name = name,
270 .flow_id = flow_id,
271 .state_type = state_type,
272 .state_name = state_name,
273 .state_timestamp = now,
274 .parameters = "{}",
275 .tags = "[]",
276 .run_count = 0,
277 .expected_start_time = null,
278 .next_scheduled_start_time = next_scheduled_start_time,
279 .start_time = null,
280 .end_time = null,
281 .total_run_time = 0.0,
282 .deployment_id = null,
283 .deployment_version = null,
284 .work_queue_name = null,
285 .work_queue_id = null,
286 .auto_scheduled = false,
287 .idempotency_key = null,
288 .empirical_policy = empirical_policy orelse "{}",
289 .state_transition_id = null,
290 };
291
292 const resp = writeFlowRun(alloc, run, state_id) catch {
293 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
294 return;
295 };
296 json_util.sendStatus(r, resp, .created);
297}
298
299fn read(r: zap.Request, id: []const u8) !void {
300 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
301 defer arena.deinit();
302 const alloc = arena.allocator();
303
304 const run = db.getFlowRun(alloc, id) catch null orelse {
305 json_util.sendStatus(r, "{\"detail\":\"flow run not found\"}", .not_found);
306 return;
307 };
308
309 const resp = writeFlowRun(alloc, run, null) catch {
310 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
311 return;
312 };
313 json_util.send(r, resp);
314}
315
316fn patch(r: zap.Request, id: []const u8) !void {
317 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
318 defer arena.deinit();
319 const alloc = arena.allocator();
320
321 // verify run exists
322 const run = db.getFlowRun(alloc, id) catch null orelse {
323 json_util.sendStatus(r, "{\"detail\":\"flow run not found\"}", .not_found);
324 return;
325 };
326
327 const body = r.body orelse {
328 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request);
329 return;
330 };
331
332 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
333 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
334 return;
335 };
336
337 const obj = parsed.value.object;
338
339 // extract optional fields for update
340 const infrastructure_pid = if (obj.get("infrastructure_pid")) |v| switch (v) {
341 .string => |s| s,
342 .integer => |i| std.fmt.allocPrint(alloc, "{d}", .{i}) catch null,
343 else => null,
344 } else null;
345
346 // update flow run with patched fields
347 db.flow_runs.patch(id, infrastructure_pid) catch {
348 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
349 return;
350 };
351
352 // return updated run
353 const updated_run = db.getFlowRun(alloc, id) catch null orelse run;
354 const resp = writeFlowRun(alloc, updated_run, null) catch {
355 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
356 return;
357 };
358 json_util.send(r, resp);
359}
360
361fn delete(r: zap.Request, id: []const u8) !void {
362 const deleted = db.flow_runs.delete(id) catch false;
363 if (!deleted) {
364 json_util.sendStatus(r, "{\"detail\":\"flow run not found\"}", .not_found);
365 return;
366 }
367
368 json_util.sendStatus(r, "", .no_content);
369}
370
371fn setState(r: zap.Request, id: []const u8) !void {
372 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
373 defer arena.deinit();
374 const alloc = arena.allocator();
375
376 const body = r.body orelse {
377 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request);
378 return;
379 };
380
381 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
382 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
383 return;
384 };
385
386 const state = parsed.value.object.get("state") orelse {
387 json_util.sendStatus(r, "{\"detail\":\"state required\"}", .bad_request);
388 return;
389 };
390
391 const state_type = if (state.object.get("type")) |v| v.string else "PENDING";
392 const state_name = if (state.object.get("name")) |v| v.string else "Pending";
393 var ts_buf: [32]u8 = undefined;
394 const now = time_util.timestamp(&ts_buf);
395 var state_id_buf: [36]u8 = undefined;
396 const state_id = uuid_util.generate(&state_id_buf);
397
398 // parse state_details.transition_id for idempotency
399 const proposed_transition_id: ?[]const u8 = blk: {
400 const state_details = state.object.get("state_details") orelse break :blk null;
401 if (state_details != .object) break :blk null;
402 const tid = state_details.object.get("transition_id") orelse break :blk null;
403 if (tid != .string) break :blk null;
404 break :blk tid.string;
405 };
406
407 // get current run state for orchestration
408 const current_run = db.getFlowRun(alloc, id) catch null;
409
410 const initial_state_type: ?orchestration.StateType = if (current_run) |run|
411 if (run.state_type.len > 0) orchestration.StateType.fromString(run.state_type) else null
412 else
413 null;
414
415 const proposed_state_type = orchestration.StateType.fromString(state_type);
416
417 // parse empirical_policy for retry settings
418 var retries: ?i64 = null;
419 var retry_delay: ?i64 = null;
420 if (current_run) |run| {
421 if (json.parseFromSlice(json.Value, alloc, run.empirical_policy, .{})) |policy_parsed| {
422 if (policy_parsed.value.object.get("retries")) |v| {
423 if (v == .integer) retries = v.integer;
424 }
425 if (policy_parsed.value.object.get("retry_delay")) |v| {
426 if (v == .integer) retry_delay = v.integer;
427 }
428 } else |_| {}
429 }
430
431 // apply orchestration rules (policy)
432 var rule_ctx = orchestration.RuleContext{
433 .initial_state = initial_state_type,
434 .proposed_state = proposed_state_type,
435 .initial_state_timestamp = if (current_run) |run|
436 if (run.state_timestamp.len > 0) run.state_timestamp else null
437 else
438 null,
439 .proposed_state_timestamp = now,
440 // for CopyScheduledTime: pass scheduled_time from SCHEDULED state
441 .initial_scheduled_time = if (current_run) |run| run.next_scheduled_start_time else null,
442 // for PreventDuplicateTransitions: pass transition_ids
443 .initial_transition_id = if (current_run) |run| run.state_transition_id else null,
444 .proposed_transition_id = proposed_transition_id,
445 .run_id = id,
446 .flow_id = if (current_run) |run| run.flow_id else null,
447 .deployment_id = if (current_run) |run| run.deployment_id else null,
448 // for RetryFailedFlows: pass retry settings
449 .run_count = if (current_run) |run| run.run_count else 0,
450 .retries = retries,
451 .retry_delay = retry_delay,
452 };
453 orchestration.applyPolicy(&orchestration.CoreFlowPolicy, &rule_ctx);
454
455 // if rules rejected/waited/aborted, handle appropriately
456 if (!rule_ctx.isAccepted()) {
457 // special case: RetryFailedFlows rejection - commit AwaitingRetry state
458 if (rule_ctx.result.status == .REJECT and rule_ctx.retry_state_type != null) {
459 const retry_state_type = @tagName(rule_ctx.retry_state_type.?);
460 const retry_state_name = rule_ctx.retry_state_name orelse "AwaitingRetry";
461 const retry_scheduled = rule_ctx.retry_scheduled_time;
462
463 // apply bookkeeping for the retry state
464 var bookkeeping_ctx = orchestration.TransitionContext{
465 .current_state_type = initial_state_type,
466 .current_state_timestamp = if (current_run) |run|
467 if (run.state_timestamp.len > 0) run.state_timestamp else null
468 else
469 null,
470 .start_time = if (current_run) |run| run.start_time else null,
471 .end_time = if (current_run) |run| run.end_time else null,
472 .run_count = if (current_run) |run| run.run_count else 0,
473 .total_run_time = if (current_run) |run| run.total_run_time else 0.0,
474 .proposed_state_type = rule_ctx.retry_state_type.?,
475 .proposed_state_timestamp = now,
476 };
477 orchestration.applyBookkeeping(&bookkeeping_ctx);
478
479 // build updated empirical_policy if rule set mutation flags
480 const current_policy = if (current_run) |run| run.empirical_policy else "{}";
481 const updated_policy = buildUpdatedPolicy(alloc, current_policy, &rule_ctx);
482
483 // commit the retry state with next_scheduled_start_time and updated policy
484 db.setFlowRunStateWithSchedule(
485 id,
486 state_id,
487 retry_state_type,
488 retry_state_name,
489 now,
490 bookkeeping_ctx.new_start_time,
491 bookkeeping_ctx.new_end_time,
492 bookkeeping_ctx.new_run_count,
493 bookkeeping_ctx.new_total_run_time,
494 null, // expected_start_time
495 retry_scheduled, // next_scheduled_start_time
496 updated_policy, // empirical_policy updates
497 proposed_transition_id,
498 ) catch {
499 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
500 return;
501 };
502
503 // return REJECT status with the retry state (matching Python's behavior)
504 const resp = writeStateResponse(alloc, .REJECT, rule_ctx.result.details, retry_state_type, retry_state_name, now, state_id) catch {
505 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
506 return;
507 };
508 json_util.send(r, resp);
509 return;
510 }
511
512 // normal rejection/wait/abort - return without committing
513 const resp = writeStateResponse(
514 alloc,
515 rule_ctx.result.status,
516 rule_ctx.result.details,
517 // for REJECT, return current state; for WAIT/ABORT, return proposed
518 if (rule_ctx.result.status == .REJECT and current_run != null)
519 current_run.?.state_type
520 else
521 state_type,
522 if (rule_ctx.result.status == .REJECT and current_run != null)
523 current_run.?.state_name
524 else
525 state_name,
526 if (rule_ctx.result.status == .REJECT and current_run != null)
527 current_run.?.state_timestamp
528 else
529 now,
530 state_id,
531 ) catch {
532 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
533 return;
534 };
535 json_util.send(r, resp);
536 return;
537 }
538
539 // apply orchestration bookkeeping transforms
540 var bookkeeping_ctx = orchestration.TransitionContext{
541 .current_state_type = initial_state_type,
542 .current_state_timestamp = if (current_run) |run|
543 if (run.state_timestamp.len > 0) run.state_timestamp else null
544 else
545 null,
546 .start_time = if (current_run) |run| run.start_time else null,
547 .end_time = if (current_run) |run| run.end_time else null,
548 .run_count = if (current_run) |run| run.run_count else 0,
549 .total_run_time = if (current_run) |run| run.total_run_time else 0.0,
550 .proposed_state_type = proposed_state_type,
551 .proposed_state_timestamp = now,
552 };
553 orchestration.applyBookkeeping(&bookkeeping_ctx);
554
555 // check if policy needs updating (e.g., clear_retry_type when retries exhausted)
556 const current_policy = if (current_run) |run| run.empirical_policy else "{}";
557 const updated_policy = buildUpdatedPolicy(alloc, current_policy, &rule_ctx);
558
559 // atomic state transition with orchestration data
560 if (updated_policy != null) {
561 // use setStateWithSchedule to update policy (pass null for next_scheduled_start_time)
562 db.setFlowRunStateWithSchedule(
563 id,
564 state_id,
565 state_type,
566 state_name,
567 now,
568 bookkeeping_ctx.new_start_time,
569 bookkeeping_ctx.new_end_time,
570 bookkeeping_ctx.new_run_count,
571 bookkeeping_ctx.new_total_run_time,
572 rule_ctx.new_expected_start_time,
573 null, // next_scheduled_start_time
574 updated_policy,
575 proposed_transition_id,
576 ) catch {
577 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
578 return;
579 };
580 } else {
581 db.setFlowRunState(id, state_id, state_type, state_name, now, bookkeeping_ctx.new_start_time, bookkeeping_ctx.new_end_time, bookkeeping_ctx.new_run_count, bookkeeping_ctx.new_total_run_time, rule_ctx.new_expected_start_time, proposed_transition_id) catch {
582 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
583 return;
584 };
585 }
586
587 const resp = writeStateResponse(alloc, .ACCEPT, .{}, state_type, state_name, now, state_id) catch {
588 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
589 return;
590 };
591 json_util.send(r, resp);
592}
593
594fn filter(r: zap.Request) !void {
595 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
596 defer arena.deinit();
597 const alloc = arena.allocator();
598
599 const runs = db.listFlowRuns(alloc, 50) catch {
600 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
601 return;
602 };
603
604 var output: std.Io.Writer.Allocating = .init(alloc);
605 var jw: json.Stringify = .{ .writer = &output.writer };
606
607 jw.beginArray() catch {
608 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
609 return;
610 };
611
612 for (runs) |run| {
613 writeFlowRunObject(&jw, run, null) catch continue;
614 }
615
616 jw.endArray() catch {};
617
618 json_util.send(r, output.toOwnedSlice() catch "[]");
619}
620
621fn writeFlowRun(alloc: std.mem.Allocator, run: db.FlowRunRow, state_id: ?[]const u8) ![]const u8 {
622 var output: std.Io.Writer.Allocating = .init(alloc);
623 var jw: json.Stringify = .{ .writer = &output.writer };
624 try writeFlowRunObject(&jw, run, state_id);
625 return output.toOwnedSlice();
626}
627
628fn writeFlowRunObject(jw: *json.Stringify, run: db.FlowRunRow, state_id: ?[]const u8) !void {
629 try jw.beginObject();
630
631 try jw.objectField("id");
632 try jw.write(run.id);
633
634 try jw.objectField("created");
635 try jw.write(run.created);
636
637 try jw.objectField("updated");
638 try jw.write(run.updated);
639
640 try jw.objectField("name");
641 try jw.write(run.name);
642
643 try jw.objectField("flow_id");
644 try jw.write(run.flow_id);
645
646 try jw.objectField("state_type");
647 try jw.write(run.state_type);
648
649 try jw.objectField("state_name");
650 try jw.write(run.state_name);
651
652 try jw.objectField("state");
653 try jw.beginObject();
654 try jw.objectField("type");
655 try jw.write(run.state_type);
656 try jw.objectField("name");
657 try jw.write(run.state_name);
658 try jw.objectField("timestamp");
659 try jw.write(run.state_timestamp);
660 if (state_id) |sid| {
661 try jw.objectField("id");
662 try jw.write(sid);
663 }
664 try jw.endObject();
665
666 try jw.objectField("parameters");
667 try jw.beginWriteRaw();
668 try jw.writer.writeAll(run.parameters);
669 jw.endWriteRaw();
670
671 try jw.objectField("tags");
672 try jw.beginWriteRaw();
673 try jw.writer.writeAll(run.tags);
674 jw.endWriteRaw();
675
676 try jw.objectField("run_count");
677 try jw.write(run.run_count);
678
679 try jw.objectField("expected_start_time");
680 try jw.write(run.expected_start_time);
681
682 try jw.objectField("start_time");
683 try jw.write(run.start_time);
684
685 try jw.objectField("end_time");
686 try jw.write(run.end_time);
687
688 try jw.objectField("total_run_time");
689 try jw.write(run.total_run_time);
690
691 try jw.objectField("deployment_id");
692 try jw.write(run.deployment_id);
693
694 try jw.objectField("deployment_version");
695 try jw.write(run.deployment_version);
696
697 try jw.objectField("work_queue_name");
698 try jw.write(run.work_queue_name);
699
700 try jw.objectField("work_queue_id");
701 try jw.write(run.work_queue_id);
702
703 try jw.objectField("auto_scheduled");
704 try jw.write(run.auto_scheduled);
705
706 try jw.objectField("empirical_policy");
707 try jw.beginWriteRaw();
708 try jw.writer.writeAll(run.empirical_policy);
709 jw.endWriteRaw();
710
711 try jw.endObject();
712}
713
714fn writeStateResponse(
715 alloc: std.mem.Allocator,
716 status: orchestration.ResponseStatus,
717 details: orchestration.ResponseDetails,
718 state_type: []const u8,
719 state_name: []const u8,
720 timestamp: []const u8,
721 state_id: []const u8,
722) ![]const u8 {
723 var output: std.Io.Writer.Allocating = .init(alloc);
724 var jw: json.Stringify = .{ .writer = &output.writer };
725
726 try jw.beginObject();
727
728 try jw.objectField("status");
729 try jw.write(status.toString());
730
731 try jw.objectField("details");
732 try jw.beginObject();
733 if (details.reason) |reason| {
734 try jw.objectField("reason");
735 try jw.write(reason);
736 }
737 if (details.retry_after) |retry_after| {
738 try jw.objectField("retry_after_seconds");
739 try jw.write(retry_after);
740 }
741 try jw.endObject();
742
743 try jw.objectField("state");
744 try jw.beginObject();
745 try jw.objectField("type");
746 try jw.write(state_type);
747 try jw.objectField("name");
748 try jw.write(state_name);
749 try jw.objectField("timestamp");
750 try jw.write(timestamp);
751 try jw.objectField("id");
752 try jw.write(state_id);
753 try jw.endObject();
754
755 try jw.endObject();
756
757 return output.toOwnedSlice();
758}