prefect server in zig
at main 758 lines 26 kB view raw
1const std = @import("std"); 2const zap = @import("zap"); 3const mem = std.mem; 4const json = std.json; 5const Allocator = std.mem.Allocator; 6 7const db = @import("../db/sqlite.zig"); 8const routing = @import("routing.zig"); 9const uuid_util = @import("../utilities/uuid.zig"); 10const time_util = @import("../utilities/time.zig"); 11const json_util = @import("../utilities/json.zig"); 12const orchestration = @import("../orchestration.zig"); 13 14/// Build updated empirical_policy JSON based on rule context flags 15/// Preserves existing fields from current_policy while applying updates 16fn buildUpdatedPolicy( 17 alloc: Allocator, 18 current_policy: []const u8, 19 rule_ctx: *const orchestration.RuleContext, 20) ?[]const u8 { 21 // check if any policy updates are needed 22 if (!rule_ctx.set_retry_type_in_process and 23 !rule_ctx.clear_retry_type and 24 !rule_ctx.clear_pause_keys and 25 !rule_ctx.set_resuming_false) 26 { 27 return null; 28 } 29 30 // parse current policy 31 const parsed = json.parseFromSlice(json.Value, alloc, current_policy, .{}) catch { 32 // if parse fails, start with empty object 33 return buildFreshPolicy(alloc, rule_ctx); 34 }; 35 const current = parsed.value; 36 37 if (current != .object) { 38 return buildFreshPolicy(alloc, rule_ctx); 39 } 40 41 // build new policy with updates 42 var out: std.Io.Writer.Allocating = .init(alloc); 43 var jw: json.Stringify = .{ .writer = &out.writer }; 44 45 jw.beginObject() catch return null; 46 47 // copy existing fields, applying updates 48 var it = current.object.iterator(); 49 while (it.next()) |entry| { 50 const key = entry.key_ptr.*; 51 52 // skip fields we're updating 53 if (mem.eql(u8, key, "retry_type") or 54 mem.eql(u8, key, "pause_keys") or 55 mem.eql(u8, key, "resuming")) 56 { 57 continue; 58 } 59 60 jw.objectField(key) catch return null; 61 jw.write(entry.value_ptr.*) catch return null; 62 } 63 64 // apply updates 65 if (rule_ctx.set_retry_type_in_process) { 66 jw.objectField("retry_type") catch return null; 67 jw.write("in_process") catch return null; 68 } else if (rule_ctx.clear_retry_type) { 69 jw.objectField("retry_type") catch return null; 70 jw.write(null) catch return null; 71 } 72 73 if (rule_ctx.clear_pause_keys) { 74 jw.objectField("pause_keys") catch return null; 75 jw.beginArray() catch return null; 76 jw.endArray() catch return null; 77 } 78 79 if (rule_ctx.set_resuming_false) { 80 jw.objectField("resuming") catch return null; 81 jw.write(false) catch return null; 82 } 83 84 jw.endObject() catch return null; 85 return out.toOwnedSlice() catch null; 86} 87 88/// Build a fresh policy when current is invalid/empty 89fn buildFreshPolicy(alloc: Allocator, rule_ctx: *const orchestration.RuleContext) ?[]const u8 { 90 var out: std.Io.Writer.Allocating = .init(alloc); 91 var jw: json.Stringify = .{ .writer = &out.writer }; 92 93 jw.beginObject() catch return null; 94 95 if (rule_ctx.retries) |r| { 96 jw.objectField("retries") catch return null; 97 jw.write(r) catch return null; 98 } 99 if (rule_ctx.retry_delay) |d| { 100 jw.objectField("retry_delay") catch return null; 101 jw.write(d) catch return null; 102 } 103 104 if (rule_ctx.set_retry_type_in_process) { 105 jw.objectField("retry_type") catch return null; 106 jw.write("in_process") catch return null; 107 } else if (rule_ctx.clear_retry_type) { 108 jw.objectField("retry_type") catch return null; 109 jw.write(null) catch return null; 110 } 111 112 if (rule_ctx.clear_pause_keys) { 113 jw.objectField("pause_keys") catch return null; 114 jw.beginArray() catch return null; 115 jw.endArray() catch return null; 116 } 117 118 if (rule_ctx.set_resuming_false) { 119 jw.objectField("resuming") catch return null; 120 jw.write(false) catch return null; 121 } 122 123 jw.endObject() catch return null; 124 return out.toOwnedSlice() catch null; 125} 126 127// POST /flow_runs/ - create flow run 128// GET /flow_runs/{id} - read flow run 129// POST /flow_runs/{id}/set_state - set state 130// POST /flow_runs/filter - list flow runs 131pub fn handle(r: zap.Request) !void { 132 const target = r.path orelse "/"; 133 const method = r.method orelse "GET"; 134 135 // POST /flow_runs/ - create 136 if (mem.eql(u8, method, "POST") and (mem.eql(u8, target, "/flow_runs/") or mem.eql(u8, target, "/api/flow_runs/"))) { 137 try create(r); 138 return; 139 } 140 141 // POST /flow_runs/filter - list 142 if (mem.eql(u8, method, "POST") and (mem.endsWith(u8, target, "/filter"))) { 143 try filter(r); 144 return; 145 } 146 147 // check for /{id}/set_state 148 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/set_state")) { 149 const id = routing.extractId(target, "/flow_runs/", "/set_state") orelse 150 routing.extractId(target, "/api/flow_runs/", "/set_state"); 151 if (id) |flow_run_id| { 152 try setState(r, flow_run_id); 153 return; 154 } 155 } 156 157 // GET /flow_runs/{id} - read single 158 if (mem.eql(u8, method, "GET")) { 159 const id = routing.extractIdAfter(target, "/flow_runs/") orelse 160 routing.extractIdAfter(target, "/api/flow_runs/"); 161 if (id) |flow_run_id| { 162 try read(r, flow_run_id); 163 return; 164 } 165 } 166 167 // PATCH /flow_runs/{id} - update 168 if (mem.eql(u8, method, "PATCH")) { 169 const id = routing.extractIdAfter(target, "/flow_runs/") orelse 170 routing.extractIdAfter(target, "/api/flow_runs/"); 171 if (id) |flow_run_id| { 172 try patch(r, flow_run_id); 173 return; 174 } 175 } 176 177 // DELETE /flow_runs/{id} - delete 178 if (mem.eql(u8, method, "DELETE")) { 179 const id = routing.extractIdAfter(target, "/flow_runs/") orelse 180 routing.extractIdAfter(target, "/api/flow_runs/"); 181 if (id) |flow_run_id| { 182 try delete(r, flow_run_id); 183 return; 184 } 185 } 186 187 json_util.sendStatus(r, "{\"detail\":\"not found\"}", .not_found); 188} 189 190fn create(r: zap.Request) !void { 191 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 192 defer arena.deinit(); 193 const alloc = arena.allocator(); 194 195 const body = r.body orelse { 196 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 197 return; 198 }; 199 200 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 201 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 202 return; 203 }; 204 205 const obj = parsed.value.object; 206 const raw_flow_id = if (obj.get("flow_id")) |v| switch (v) { 207 .string => |s| s, 208 else => { 209 json_util.sendStatus(r, "{\"detail\":\"flow_id must be string\"}", .bad_request); 210 return; 211 }, 212 } else { 213 json_util.sendStatus(r, "{\"detail\":\"flow_id required\"}", .bad_request); 214 return; 215 }; 216 217 const flow_id = raw_flow_id; 218 219 const name = if (obj.get("name")) |v| switch (v) { 220 .string => |s| s, 221 .null => routing.generateRunName(alloc), 222 else => routing.generateRunName(alloc), 223 } else routing.generateRunName(alloc); 224 const state = obj.get("state"); 225 226 // extract state info 227 var state_type: []const u8 = "PENDING"; 228 var state_name: []const u8 = "Pending"; 229 if (state) |s| { 230 if (s.object.get("type")) |t| state_type = t.string; 231 if (s.object.get("name")) |n| state_name = n.string; 232 } 233 234 // extract optional scheduling fields 235 const next_scheduled_start_time: ?[]const u8 = if (obj.get("next_scheduled_start_time")) |v| switch (v) { 236 .string => |s| s, 237 else => null, 238 } else null; 239 240 // extract empirical_policy (retry settings) 241 const empirical_policy: ?[]const u8 = if (obj.get("empirical_policy")) |v| blk: { 242 // stringify the object back to JSON 243 var out: std.Io.Writer.Allocating = .init(alloc); 244 var jw: json.Stringify = .{ .writer = &out.writer }; 245 jw.write(v) catch break :blk null; 246 break :blk out.toOwnedSlice() catch null; 247 } else null; 248 249 var new_id_buf: [36]u8 = undefined; 250 const new_id = uuid_util.generate(&new_id_buf); 251 var ts_buf: [32]u8 = undefined; 252 const now = time_util.timestamp(&ts_buf); 253 254 db.insertFlowRun(new_id, flow_id, name, state_type, state_name, now, .{ 255 .next_scheduled_start_time = next_scheduled_start_time, 256 .empirical_policy = empirical_policy, 257 }) catch { 258 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 259 return; 260 }; 261 262 var state_id_buf: [36]u8 = undefined; 263 const state_id = uuid_util.generate(&state_id_buf); 264 265 const run = db.FlowRunRow{ 266 .id = new_id, 267 .created = now, 268 .updated = now, 269 .name = name, 270 .flow_id = flow_id, 271 .state_type = state_type, 272 .state_name = state_name, 273 .state_timestamp = now, 274 .parameters = "{}", 275 .tags = "[]", 276 .run_count = 0, 277 .expected_start_time = null, 278 .next_scheduled_start_time = next_scheduled_start_time, 279 .start_time = null, 280 .end_time = null, 281 .total_run_time = 0.0, 282 .deployment_id = null, 283 .deployment_version = null, 284 .work_queue_name = null, 285 .work_queue_id = null, 286 .auto_scheduled = false, 287 .idempotency_key = null, 288 .empirical_policy = empirical_policy orelse "{}", 289 .state_transition_id = null, 290 }; 291 292 const resp = writeFlowRun(alloc, run, state_id) catch { 293 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 294 return; 295 }; 296 json_util.sendStatus(r, resp, .created); 297} 298 299fn read(r: zap.Request, id: []const u8) !void { 300 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 301 defer arena.deinit(); 302 const alloc = arena.allocator(); 303 304 const run = db.getFlowRun(alloc, id) catch null orelse { 305 json_util.sendStatus(r, "{\"detail\":\"flow run not found\"}", .not_found); 306 return; 307 }; 308 309 const resp = writeFlowRun(alloc, run, null) catch { 310 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 311 return; 312 }; 313 json_util.send(r, resp); 314} 315 316fn patch(r: zap.Request, id: []const u8) !void { 317 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 318 defer arena.deinit(); 319 const alloc = arena.allocator(); 320 321 // verify run exists 322 const run = db.getFlowRun(alloc, id) catch null orelse { 323 json_util.sendStatus(r, "{\"detail\":\"flow run not found\"}", .not_found); 324 return; 325 }; 326 327 const body = r.body orelse { 328 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 329 return; 330 }; 331 332 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 333 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 334 return; 335 }; 336 337 const obj = parsed.value.object; 338 339 // extract optional fields for update 340 const infrastructure_pid = if (obj.get("infrastructure_pid")) |v| switch (v) { 341 .string => |s| s, 342 .integer => |i| std.fmt.allocPrint(alloc, "{d}", .{i}) catch null, 343 else => null, 344 } else null; 345 346 // update flow run with patched fields 347 db.flow_runs.patch(id, infrastructure_pid) catch { 348 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 349 return; 350 }; 351 352 // return updated run 353 const updated_run = db.getFlowRun(alloc, id) catch null orelse run; 354 const resp = writeFlowRun(alloc, updated_run, null) catch { 355 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 356 return; 357 }; 358 json_util.send(r, resp); 359} 360 361fn delete(r: zap.Request, id: []const u8) !void { 362 const deleted = db.flow_runs.delete(id) catch false; 363 if (!deleted) { 364 json_util.sendStatus(r, "{\"detail\":\"flow run not found\"}", .not_found); 365 return; 366 } 367 368 json_util.sendStatus(r, "", .no_content); 369} 370 371fn setState(r: zap.Request, id: []const u8) !void { 372 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 373 defer arena.deinit(); 374 const alloc = arena.allocator(); 375 376 const body = r.body orelse { 377 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 378 return; 379 }; 380 381 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 382 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 383 return; 384 }; 385 386 const state = parsed.value.object.get("state") orelse { 387 json_util.sendStatus(r, "{\"detail\":\"state required\"}", .bad_request); 388 return; 389 }; 390 391 const state_type = if (state.object.get("type")) |v| v.string else "PENDING"; 392 const state_name = if (state.object.get("name")) |v| v.string else "Pending"; 393 var ts_buf: [32]u8 = undefined; 394 const now = time_util.timestamp(&ts_buf); 395 var state_id_buf: [36]u8 = undefined; 396 const state_id = uuid_util.generate(&state_id_buf); 397 398 // parse state_details.transition_id for idempotency 399 const proposed_transition_id: ?[]const u8 = blk: { 400 const state_details = state.object.get("state_details") orelse break :blk null; 401 if (state_details != .object) break :blk null; 402 const tid = state_details.object.get("transition_id") orelse break :blk null; 403 if (tid != .string) break :blk null; 404 break :blk tid.string; 405 }; 406 407 // get current run state for orchestration 408 const current_run = db.getFlowRun(alloc, id) catch null; 409 410 const initial_state_type: ?orchestration.StateType = if (current_run) |run| 411 if (run.state_type.len > 0) orchestration.StateType.fromString(run.state_type) else null 412 else 413 null; 414 415 const proposed_state_type = orchestration.StateType.fromString(state_type); 416 417 // parse empirical_policy for retry settings 418 var retries: ?i64 = null; 419 var retry_delay: ?i64 = null; 420 if (current_run) |run| { 421 if (json.parseFromSlice(json.Value, alloc, run.empirical_policy, .{})) |policy_parsed| { 422 if (policy_parsed.value.object.get("retries")) |v| { 423 if (v == .integer) retries = v.integer; 424 } 425 if (policy_parsed.value.object.get("retry_delay")) |v| { 426 if (v == .integer) retry_delay = v.integer; 427 } 428 } else |_| {} 429 } 430 431 // apply orchestration rules (policy) 432 var rule_ctx = orchestration.RuleContext{ 433 .initial_state = initial_state_type, 434 .proposed_state = proposed_state_type, 435 .initial_state_timestamp = if (current_run) |run| 436 if (run.state_timestamp.len > 0) run.state_timestamp else null 437 else 438 null, 439 .proposed_state_timestamp = now, 440 // for CopyScheduledTime: pass scheduled_time from SCHEDULED state 441 .initial_scheduled_time = if (current_run) |run| run.next_scheduled_start_time else null, 442 // for PreventDuplicateTransitions: pass transition_ids 443 .initial_transition_id = if (current_run) |run| run.state_transition_id else null, 444 .proposed_transition_id = proposed_transition_id, 445 .run_id = id, 446 .flow_id = if (current_run) |run| run.flow_id else null, 447 .deployment_id = if (current_run) |run| run.deployment_id else null, 448 // for RetryFailedFlows: pass retry settings 449 .run_count = if (current_run) |run| run.run_count else 0, 450 .retries = retries, 451 .retry_delay = retry_delay, 452 }; 453 orchestration.applyPolicy(&orchestration.CoreFlowPolicy, &rule_ctx); 454 455 // if rules rejected/waited/aborted, handle appropriately 456 if (!rule_ctx.isAccepted()) { 457 // special case: RetryFailedFlows rejection - commit AwaitingRetry state 458 if (rule_ctx.result.status == .REJECT and rule_ctx.retry_state_type != null) { 459 const retry_state_type = @tagName(rule_ctx.retry_state_type.?); 460 const retry_state_name = rule_ctx.retry_state_name orelse "AwaitingRetry"; 461 const retry_scheduled = rule_ctx.retry_scheduled_time; 462 463 // apply bookkeeping for the retry state 464 var bookkeeping_ctx = orchestration.TransitionContext{ 465 .current_state_type = initial_state_type, 466 .current_state_timestamp = if (current_run) |run| 467 if (run.state_timestamp.len > 0) run.state_timestamp else null 468 else 469 null, 470 .start_time = if (current_run) |run| run.start_time else null, 471 .end_time = if (current_run) |run| run.end_time else null, 472 .run_count = if (current_run) |run| run.run_count else 0, 473 .total_run_time = if (current_run) |run| run.total_run_time else 0.0, 474 .proposed_state_type = rule_ctx.retry_state_type.?, 475 .proposed_state_timestamp = now, 476 }; 477 orchestration.applyBookkeeping(&bookkeeping_ctx); 478 479 // build updated empirical_policy if rule set mutation flags 480 const current_policy = if (current_run) |run| run.empirical_policy else "{}"; 481 const updated_policy = buildUpdatedPolicy(alloc, current_policy, &rule_ctx); 482 483 // commit the retry state with next_scheduled_start_time and updated policy 484 db.setFlowRunStateWithSchedule( 485 id, 486 state_id, 487 retry_state_type, 488 retry_state_name, 489 now, 490 bookkeeping_ctx.new_start_time, 491 bookkeeping_ctx.new_end_time, 492 bookkeeping_ctx.new_run_count, 493 bookkeeping_ctx.new_total_run_time, 494 null, // expected_start_time 495 retry_scheduled, // next_scheduled_start_time 496 updated_policy, // empirical_policy updates 497 proposed_transition_id, 498 ) catch { 499 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 500 return; 501 }; 502 503 // return REJECT status with the retry state (matching Python's behavior) 504 const resp = writeStateResponse(alloc, .REJECT, rule_ctx.result.details, retry_state_type, retry_state_name, now, state_id) catch { 505 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 506 return; 507 }; 508 json_util.send(r, resp); 509 return; 510 } 511 512 // normal rejection/wait/abort - return without committing 513 const resp = writeStateResponse( 514 alloc, 515 rule_ctx.result.status, 516 rule_ctx.result.details, 517 // for REJECT, return current state; for WAIT/ABORT, return proposed 518 if (rule_ctx.result.status == .REJECT and current_run != null) 519 current_run.?.state_type 520 else 521 state_type, 522 if (rule_ctx.result.status == .REJECT and current_run != null) 523 current_run.?.state_name 524 else 525 state_name, 526 if (rule_ctx.result.status == .REJECT and current_run != null) 527 current_run.?.state_timestamp 528 else 529 now, 530 state_id, 531 ) catch { 532 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 533 return; 534 }; 535 json_util.send(r, resp); 536 return; 537 } 538 539 // apply orchestration bookkeeping transforms 540 var bookkeeping_ctx = orchestration.TransitionContext{ 541 .current_state_type = initial_state_type, 542 .current_state_timestamp = if (current_run) |run| 543 if (run.state_timestamp.len > 0) run.state_timestamp else null 544 else 545 null, 546 .start_time = if (current_run) |run| run.start_time else null, 547 .end_time = if (current_run) |run| run.end_time else null, 548 .run_count = if (current_run) |run| run.run_count else 0, 549 .total_run_time = if (current_run) |run| run.total_run_time else 0.0, 550 .proposed_state_type = proposed_state_type, 551 .proposed_state_timestamp = now, 552 }; 553 orchestration.applyBookkeeping(&bookkeeping_ctx); 554 555 // check if policy needs updating (e.g., clear_retry_type when retries exhausted) 556 const current_policy = if (current_run) |run| run.empirical_policy else "{}"; 557 const updated_policy = buildUpdatedPolicy(alloc, current_policy, &rule_ctx); 558 559 // atomic state transition with orchestration data 560 if (updated_policy != null) { 561 // use setStateWithSchedule to update policy (pass null for next_scheduled_start_time) 562 db.setFlowRunStateWithSchedule( 563 id, 564 state_id, 565 state_type, 566 state_name, 567 now, 568 bookkeeping_ctx.new_start_time, 569 bookkeeping_ctx.new_end_time, 570 bookkeeping_ctx.new_run_count, 571 bookkeeping_ctx.new_total_run_time, 572 rule_ctx.new_expected_start_time, 573 null, // next_scheduled_start_time 574 updated_policy, 575 proposed_transition_id, 576 ) catch { 577 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 578 return; 579 }; 580 } else { 581 db.setFlowRunState(id, state_id, state_type, state_name, now, bookkeeping_ctx.new_start_time, bookkeeping_ctx.new_end_time, bookkeeping_ctx.new_run_count, bookkeeping_ctx.new_total_run_time, rule_ctx.new_expected_start_time, proposed_transition_id) catch { 582 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 583 return; 584 }; 585 } 586 587 const resp = writeStateResponse(alloc, .ACCEPT, .{}, state_type, state_name, now, state_id) catch { 588 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 589 return; 590 }; 591 json_util.send(r, resp); 592} 593 594fn filter(r: zap.Request) !void { 595 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 596 defer arena.deinit(); 597 const alloc = arena.allocator(); 598 599 const runs = db.listFlowRuns(alloc, 50) catch { 600 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 601 return; 602 }; 603 604 var output: std.Io.Writer.Allocating = .init(alloc); 605 var jw: json.Stringify = .{ .writer = &output.writer }; 606 607 jw.beginArray() catch { 608 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 609 return; 610 }; 611 612 for (runs) |run| { 613 writeFlowRunObject(&jw, run, null) catch continue; 614 } 615 616 jw.endArray() catch {}; 617 618 json_util.send(r, output.toOwnedSlice() catch "[]"); 619} 620 621fn writeFlowRun(alloc: std.mem.Allocator, run: db.FlowRunRow, state_id: ?[]const u8) ![]const u8 { 622 var output: std.Io.Writer.Allocating = .init(alloc); 623 var jw: json.Stringify = .{ .writer = &output.writer }; 624 try writeFlowRunObject(&jw, run, state_id); 625 return output.toOwnedSlice(); 626} 627 628fn writeFlowRunObject(jw: *json.Stringify, run: db.FlowRunRow, state_id: ?[]const u8) !void { 629 try jw.beginObject(); 630 631 try jw.objectField("id"); 632 try jw.write(run.id); 633 634 try jw.objectField("created"); 635 try jw.write(run.created); 636 637 try jw.objectField("updated"); 638 try jw.write(run.updated); 639 640 try jw.objectField("name"); 641 try jw.write(run.name); 642 643 try jw.objectField("flow_id"); 644 try jw.write(run.flow_id); 645 646 try jw.objectField("state_type"); 647 try jw.write(run.state_type); 648 649 try jw.objectField("state_name"); 650 try jw.write(run.state_name); 651 652 try jw.objectField("state"); 653 try jw.beginObject(); 654 try jw.objectField("type"); 655 try jw.write(run.state_type); 656 try jw.objectField("name"); 657 try jw.write(run.state_name); 658 try jw.objectField("timestamp"); 659 try jw.write(run.state_timestamp); 660 if (state_id) |sid| { 661 try jw.objectField("id"); 662 try jw.write(sid); 663 } 664 try jw.endObject(); 665 666 try jw.objectField("parameters"); 667 try jw.beginWriteRaw(); 668 try jw.writer.writeAll(run.parameters); 669 jw.endWriteRaw(); 670 671 try jw.objectField("tags"); 672 try jw.beginWriteRaw(); 673 try jw.writer.writeAll(run.tags); 674 jw.endWriteRaw(); 675 676 try jw.objectField("run_count"); 677 try jw.write(run.run_count); 678 679 try jw.objectField("expected_start_time"); 680 try jw.write(run.expected_start_time); 681 682 try jw.objectField("start_time"); 683 try jw.write(run.start_time); 684 685 try jw.objectField("end_time"); 686 try jw.write(run.end_time); 687 688 try jw.objectField("total_run_time"); 689 try jw.write(run.total_run_time); 690 691 try jw.objectField("deployment_id"); 692 try jw.write(run.deployment_id); 693 694 try jw.objectField("deployment_version"); 695 try jw.write(run.deployment_version); 696 697 try jw.objectField("work_queue_name"); 698 try jw.write(run.work_queue_name); 699 700 try jw.objectField("work_queue_id"); 701 try jw.write(run.work_queue_id); 702 703 try jw.objectField("auto_scheduled"); 704 try jw.write(run.auto_scheduled); 705 706 try jw.objectField("empirical_policy"); 707 try jw.beginWriteRaw(); 708 try jw.writer.writeAll(run.empirical_policy); 709 jw.endWriteRaw(); 710 711 try jw.endObject(); 712} 713 714fn writeStateResponse( 715 alloc: std.mem.Allocator, 716 status: orchestration.ResponseStatus, 717 details: orchestration.ResponseDetails, 718 state_type: []const u8, 719 state_name: []const u8, 720 timestamp: []const u8, 721 state_id: []const u8, 722) ![]const u8 { 723 var output: std.Io.Writer.Allocating = .init(alloc); 724 var jw: json.Stringify = .{ .writer = &output.writer }; 725 726 try jw.beginObject(); 727 728 try jw.objectField("status"); 729 try jw.write(status.toString()); 730 731 try jw.objectField("details"); 732 try jw.beginObject(); 733 if (details.reason) |reason| { 734 try jw.objectField("reason"); 735 try jw.write(reason); 736 } 737 if (details.retry_after) |retry_after| { 738 try jw.objectField("retry_after_seconds"); 739 try jw.write(retry_after); 740 } 741 try jw.endObject(); 742 743 try jw.objectField("state"); 744 try jw.beginObject(); 745 try jw.objectField("type"); 746 try jw.write(state_type); 747 try jw.objectField("name"); 748 try jw.write(state_name); 749 try jw.objectField("timestamp"); 750 try jw.write(timestamp); 751 try jw.objectField("id"); 752 try jw.write(state_id); 753 try jw.endObject(); 754 755 try jw.endObject(); 756 757 return output.toOwnedSlice(); 758}