prefect server in zig
at f511403a2b901063559cd17995b45527418e76c6 857 lines 29 kB view raw
1const std = @import("std"); 2const zap = @import("zap"); 3const mem = std.mem; 4const json = std.json; 5 6const db = @import("../db/sqlite.zig"); 7const uuid_util = @import("../utilities/uuid.zig"); 8const time_util = @import("../utilities/time.zig"); 9const json_util = @import("../utilities/json.zig"); 10 11const schedules = @import("deployment_schedules.zig"); 12 13pub fn handle(r: zap.Request) !void { 14 const target = r.path orelse "/"; 15 const method = r.method orelse "GET"; 16 17 // POST /deployments/filter 18 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) { 19 try filter(r); 20 return; 21 } 22 23 // POST /deployments/count 24 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/count")) { 25 try count(r); 26 return; 27 } 28 29 // POST /deployments/get_scheduled_flow_runs 30 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/get_scheduled_flow_runs")) { 31 try getScheduledFlowRuns(r); 32 return; 33 } 34 35 // GET /deployments/name/{flow_name}/{deployment_name} 36 if (mem.eql(u8, method, "GET") and mem.indexOf(u8, target, "/name/") != null) { 37 try getByName(r, target); 38 return; 39 } 40 41 // POST /deployments/{id}/create_flow_run 42 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/create_flow_run")) { 43 try createFlowRun(r, target); 44 return; 45 } 46 47 // POST /deployments/{id}/pause_deployment 48 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/pause_deployment")) { 49 try pause(r, target); 50 return; 51 } 52 53 // POST /deployments/{id}/resume_deployment 54 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/resume_deployment")) { 55 try resume_(r, target); 56 return; 57 } 58 59 // Schedule endpoints 60 if (mem.indexOf(u8, target, "/schedules") != null) { 61 try schedules.handle(r, target); 62 return; 63 } 64 65 // POST /deployments/ - create deployment 66 if (mem.eql(u8, method, "POST")) { 67 const is_root = mem.endsWith(u8, target, "/deployments/") or mem.endsWith(u8, target, "/deployments"); 68 if (is_root) { 69 try create(r); 70 return; 71 } 72 } 73 74 // GET /deployments/{id} 75 if (mem.eql(u8, method, "GET")) { 76 const id = extractDeploymentId(target) orelse { 77 json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 78 return; 79 }; 80 try read(r, id); 81 return; 82 } 83 84 // PATCH /deployments/{id} 85 if (mem.eql(u8, method, "PATCH")) { 86 const id = extractDeploymentId(target) orelse { 87 json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 88 return; 89 }; 90 try update(r, id); 91 return; 92 } 93 94 // DELETE /deployments/{id} 95 if (mem.eql(u8, method, "DELETE")) { 96 const id = extractDeploymentId(target) orelse { 97 json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 98 return; 99 }; 100 try delete(r, id); 101 return; 102 } 103 104 json_util.sendStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented); 105} 106 107// Path helper 108 109pub fn extractDeploymentId(target: []const u8) ?[]const u8 { 110 const prefix = if (mem.startsWith(u8, target, "/api/deployments/")) 111 "/api/deployments/" 112 else if (mem.startsWith(u8, target, "/deployments/")) 113 "/deployments/" 114 else 115 return null; 116 117 if (target.len <= prefix.len) return null; 118 119 const after = target[prefix.len..]; 120 const end = mem.indexOf(u8, after, "/") orelse after.len; 121 if (end == 0) return null; 122 123 return after[0..end]; 124} 125 126// CRUD handlers 127 128fn create(r: zap.Request) !void { 129 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 130 defer arena.deinit(); 131 const alloc = arena.allocator(); 132 133 const body = r.body orelse { 134 json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request); 135 return; 136 }; 137 138 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 139 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 140 return; 141 }; 142 143 const obj = parsed.value.object; 144 145 const name = getString(obj, "name") orelse { 146 json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); 147 return; 148 }; 149 150 const flow_id = getString(obj, "flow_id") orelse { 151 json_util.sendStatus(r, "{\"detail\":\"flow_id required\"}", .bad_request); 152 return; 153 }; 154 155 // Verify flow exists 156 _ = db.flows.getById(alloc, flow_id) catch null orelse { 157 json_util.sendStatus(r, "{\"detail\":\"Flow not found\"}", .not_found); 158 return; 159 }; 160 161 var ts_buf: [32]u8 = undefined; 162 const now = time_util.timestamp(&ts_buf); 163 164 // Check for existing deployment (upsert) 165 if (db.deployments.getByFlowAndName(alloc, flow_id, name) catch null) |existing| { 166 _ = db.deployments.updateById(existing.id, now, buildUpdateParams(obj)) catch { 167 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 168 return; 169 }; 170 if (obj.get("schedules")) |sched_val| { 171 try schedules.replaceSchedules(alloc, existing.id, sched_val, now); 172 } 173 const deployment = db.deployments.getById(alloc, existing.id) catch null orelse { 174 json_util.sendStatus(r, "{\"detail\":\"not found after update\"}", .internal_server_error); 175 return; 176 }; 177 const resp = writeDeployment(alloc, deployment) catch { 178 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 179 return; 180 }; 181 json_util.send(r, resp); 182 return; 183 } 184 185 // Create new 186 var id_buf: [36]u8 = undefined; 187 const new_id = uuid_util.generate(&id_buf); 188 189 // Resolve work_queue_id from work_pool_name if provided 190 var insert_params = buildInsertParams(obj); 191 if (insert_params.work_pool_name) |pool_name| { 192 if (db.work_pools.getByName(alloc, pool_name) catch null) |pool| { 193 if (insert_params.work_queue_name) |queue_name| { 194 // Look up specific queue by name 195 if (db.work_queues.getByPoolAndName(alloc, pool.id, queue_name) catch null) |queue| { 196 insert_params.work_queue_id = queue.id; 197 } 198 } else if (pool.default_queue_id) |default_id| { 199 // Use pool's default queue 200 insert_params.work_queue_id = default_id; 201 insert_params.work_queue_name = "default"; 202 } 203 } 204 } 205 206 db.deployments.insert(new_id, name, flow_id, now, insert_params) catch { 207 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 208 return; 209 }; 210 211 if (obj.get("schedules")) |sched_val| { 212 try schedules.replaceSchedules(alloc, new_id, sched_val, now); 213 } 214 215 const deployment = db.deployments.getById(alloc, new_id) catch null orelse { 216 json_util.sendStatus(r, "{\"detail\":\"not found after insert\"}", .internal_server_error); 217 return; 218 }; 219 const resp = writeDeployment(alloc, deployment) catch { 220 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 221 return; 222 }; 223 json_util.sendStatus(r, resp, .created); 224} 225 226fn read(r: zap.Request, id: []const u8) !void { 227 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 228 defer arena.deinit(); 229 const alloc = arena.allocator(); 230 231 const deployment = db.deployments.getById(alloc, id) catch null orelse { 232 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 233 return; 234 }; 235 236 const resp = writeDeployment(alloc, deployment) catch { 237 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 238 return; 239 }; 240 json_util.send(r, resp); 241} 242 243fn getByName(r: zap.Request, target: []const u8) !void { 244 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 245 defer arena.deinit(); 246 const alloc = arena.allocator(); 247 248 const name_idx = mem.indexOf(u8, target, "/name/") orelse { 249 json_util.sendStatus(r, "{\"detail\":\"invalid path\"}", .bad_request); 250 return; 251 }; 252 const after_name = target[name_idx + 6 ..]; 253 const sep_idx = mem.indexOf(u8, after_name, "/") orelse { 254 json_util.sendStatus(r, "{\"detail\":\"deployment name required\"}", .bad_request); 255 return; 256 }; 257 const flow_name = after_name[0..sep_idx]; 258 const deployment_name = after_name[sep_idx + 1 ..]; 259 if (deployment_name.len == 0) { 260 json_util.sendStatus(r, "{\"detail\":\"deployment name required\"}", .bad_request); 261 return; 262 } 263 264 const flow = db.flows.getByName(alloc, flow_name) catch null orelse { 265 json_util.sendStatus(r, "{\"detail\":\"Flow not found\"}", .not_found); 266 return; 267 }; 268 269 const deployment = db.deployments.getByFlowAndName(alloc, flow.id, deployment_name) catch null orelse { 270 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 271 return; 272 }; 273 274 const resp = writeDeployment(alloc, deployment) catch { 275 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 276 return; 277 }; 278 json_util.send(r, resp); 279} 280 281fn update(r: zap.Request, id: []const u8) !void { 282 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 283 defer arena.deinit(); 284 const alloc = arena.allocator(); 285 286 const body = r.body orelse { 287 json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request); 288 return; 289 }; 290 291 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 292 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 293 return; 294 }; 295 296 var ts_buf: [32]u8 = undefined; 297 const now = time_util.timestamp(&ts_buf); 298 299 const updated = db.deployments.updateById(id, now, buildUpdateParams(parsed.value.object)) catch { 300 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 301 return; 302 }; 303 304 if (!updated) { 305 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 306 return; 307 } 308 309 r.setStatus(.no_content); 310 r.sendBody("") catch {}; 311} 312 313fn delete(r: zap.Request, id: []const u8) !void { 314 const deleted = db.deployments.deleteById(id) catch { 315 json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); 316 return; 317 }; 318 319 if (!deleted) { 320 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 321 return; 322 } 323 324 r.setStatus(.no_content); 325 r.sendBody("") catch {}; 326} 327 328fn filter(r: zap.Request) !void { 329 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 330 defer arena.deinit(); 331 const alloc = arena.allocator(); 332 333 var limit: usize = 200; 334 var offset: usize = 0; 335 336 if (r.body) |body| { 337 if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 338 const obj = parsed.value.object; 339 if (obj.get("limit")) |v| { 340 if (v == .integer) limit = @intCast(v.integer); 341 } 342 if (obj.get("offset")) |v| { 343 if (v == .integer) offset = @intCast(v.integer); 344 } 345 } else |_| {} 346 } 347 348 const deployments_list = db.deployments.list(alloc, limit, offset) catch { 349 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 350 return; 351 }; 352 353 var output: std.io.Writer.Allocating = .init(alloc); 354 var jw: json.Stringify = .{ .writer = &output.writer }; 355 356 jw.beginArray() catch { 357 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 358 return; 359 }; 360 361 for (deployments_list) |d| { 362 writeDeploymentObject(&jw, d, alloc) catch continue; 363 } 364 365 jw.endArray() catch {}; 366 367 json_util.send(r, output.toOwnedSlice() catch "[]"); 368} 369 370fn count(r: zap.Request) !void { 371 const c = db.deployments.count() catch 0; 372 var buf: [32]u8 = undefined; 373 const resp = std.fmt.bufPrint(&buf, "{d}", .{c}) catch "0"; 374 json_util.send(r, resp); 375} 376 377// Action handlers 378 379fn createFlowRun(r: zap.Request, target: []const u8) !void { 380 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 381 defer arena.deinit(); 382 const alloc = arena.allocator(); 383 384 const id = extractDeploymentId(target) orelse { 385 json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 386 return; 387 }; 388 389 const deployment = db.deployments.getById(alloc, id) catch null orelse { 390 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 391 return; 392 }; 393 394 var state_type: []const u8 = "SCHEDULED"; 395 var state_name: []const u8 = "Scheduled"; 396 397 if (r.body) |body| { 398 if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 399 const obj = parsed.value.object; 400 if (obj.get("state")) |s| { 401 if (s == .object) { 402 if (s.object.get("type")) |t| { 403 if (t == .string) state_type = t.string; 404 } 405 if (s.object.get("name")) |n| { 406 if (n == .string) state_name = n.string; 407 } 408 } 409 } 410 } else |_| {} 411 } 412 413 var id_buf: [36]u8 = undefined; 414 const run_id = uuid_util.generate(&id_buf); 415 416 var ts_buf: [32]u8 = undefined; 417 const now = time_util.timestamp(&ts_buf); 418 419 var name_buf: [64]u8 = undefined; 420 const run_name = std.fmt.bufPrint(&name_buf, "{s}-{s}", .{ 421 deployment.name[0..@min(deployment.name.len, 20)], 422 run_id[0..8], 423 }) catch "run"; 424 425 db.flow_runs.insert(run_id, deployment.flow_id, run_name, state_type, state_name, now, .{ 426 .deployment_id = deployment.id, 427 .deployment_version = deployment.version, 428 .work_queue_name = deployment.work_queue_name, 429 .work_queue_id = deployment.work_queue_id, 430 }) catch { 431 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 432 return; 433 }; 434 435 const run = db.flow_runs.get(alloc, run_id) catch null orelse { 436 json_util.sendStatus(r, "{\"detail\":\"not found after insert\"}", .internal_server_error); 437 return; 438 }; 439 440 var state_id_buf: [36]u8 = undefined; 441 const state_id = uuid_util.generate(&state_id_buf); 442 443 const resp = writeFlowRunResponse(alloc, run, state_id) catch { 444 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 445 return; 446 }; 447 json_util.sendStatus(r, resp, .created); 448} 449 450fn pause(r: zap.Request, target: []const u8) !void { 451 const id = extractDeploymentId(target) orelse { 452 json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 453 return; 454 }; 455 456 var ts_buf: [32]u8 = undefined; 457 const now = time_util.timestamp(&ts_buf); 458 459 const updated = db.deployments.updateById(id, now, .{ .paused = true }) catch { 460 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 461 return; 462 }; 463 464 if (!updated) { 465 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 466 return; 467 } 468 469 r.setStatus(.no_content); 470 r.sendBody("") catch {}; 471} 472 473fn resume_(r: zap.Request, target: []const u8) !void { 474 const id = extractDeploymentId(target) orelse { 475 json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request); 476 return; 477 }; 478 479 var ts_buf: [32]u8 = undefined; 480 const now = time_util.timestamp(&ts_buf); 481 482 const updated = db.deployments.updateById(id, now, .{ .paused = false }) catch { 483 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 484 return; 485 }; 486 487 if (!updated) { 488 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found); 489 return; 490 } 491 492 r.setStatus(.no_content); 493 r.sendBody("") catch {}; 494} 495 496fn getScheduledFlowRuns(r: zap.Request) !void { 497 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 498 defer arena.deinit(); 499 const alloc = arena.allocator(); 500 501 const body = r.body orelse { 502 json_util.send(r, "[]"); 503 return; 504 }; 505 506 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 507 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 508 return; 509 }; 510 511 const obj = parsed.value.object; 512 513 const ids_val = obj.get("deployment_ids") orelse { 514 json_util.send(r, "[]"); 515 return; 516 }; 517 518 if (ids_val != .array) { 519 json_util.send(r, "[]"); 520 return; 521 } 522 523 var deployment_ids = std.ArrayListUnmanaged([]const u8){}; 524 for (ids_val.array.items) |item| { 525 if (item == .string) { 526 try deployment_ids.append(alloc, item.string); 527 } 528 } 529 530 if (deployment_ids.items.len == 0) { 531 json_util.send(r, "[]"); 532 return; 533 } 534 535 var scheduled_before: ?[]const u8 = null; 536 if (obj.get("scheduled_before")) |v| { 537 if (v == .string) scheduled_before = normalizeTimestamp(alloc, v.string); 538 } 539 540 var limit: usize = 100; 541 if (obj.get("limit")) |v| { 542 if (v == .integer) limit = @intCast(v.integer); 543 } 544 545 const runs = db.flow_runs.getScheduledByDeployments(alloc, deployment_ids.items, scheduled_before, limit) catch { 546 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 547 return; 548 }; 549 550 var output: std.io.Writer.Allocating = .init(alloc); 551 var jw: json.Stringify = .{ .writer = &output.writer }; 552 553 jw.beginArray() catch { 554 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 555 return; 556 }; 557 558 for (runs) |run| { 559 var state_id_buf: [36]u8 = undefined; 560 const state_id = uuid_util.generate(&state_id_buf); 561 writeFlowRunObject(&jw, run, state_id) catch continue; 562 } 563 564 jw.endArray() catch {}; 565 566 json_util.send(r, output.toOwnedSlice() catch "[]"); 567} 568 569// Timestamp normalization - convert various timestamp formats to ISO8601 570// Client may send "2026-01-22 16:40:23.915842+00:00" but db stores "2026-01-22T16:40:23.915842Z" 571fn normalizeTimestamp(alloc: std.mem.Allocator, raw: []const u8) ?[]const u8 { 572 // find space between date and time 573 const space_idx = mem.indexOf(u8, raw, " ") orelse return raw; 574 575 var normalized = alloc.alloc(u8, raw.len) catch return raw; 576 @memcpy(normalized, raw); 577 normalized[space_idx] = 'T'; 578 579 // convert +00:00 to Z 580 if (mem.endsWith(u8, normalized, "+00:00")) { 581 normalized[normalized.len - 6] = 'Z'; 582 return normalized[0 .. normalized.len - 5]; 583 } 584 return normalized; 585} 586 587// JSON helpers 588 589fn getString(obj: json.ObjectMap, key: []const u8) ?[]const u8 { 590 const v = obj.get(key) orelse return null; 591 return if (v == .string) v.string else null; 592} 593 594fn getBool(obj: json.ObjectMap, key: []const u8) ?bool { 595 const v = obj.get(key) orelse return null; 596 return if (v == .bool) v.bool else null; 597} 598 599fn getInt(obj: json.ObjectMap, key: []const u8) ?i64 { 600 const v = obj.get(key) orelse return null; 601 return if (v == .integer) v.integer else null; 602} 603 604fn getJsonString(alloc: std.mem.Allocator, obj: json.ObjectMap, key: []const u8) ?[]const u8 { 605 const v = obj.get(key) orelse return null; 606 if (v == .null) return null; 607 return std.fmt.allocPrint(alloc, "{f}", .{json.fmt(v, .{})}) catch null; 608} 609 610fn buildInsertParams(obj: json.ObjectMap) db.deployments.InsertParams { 611 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 612 const alloc = arena.allocator(); 613 614 return .{ 615 .version = getString(obj, "version"), 616 .description = getString(obj, "description"), 617 .paused = getBool(obj, "paused") orelse false, 618 .parameters = getJsonString(alloc, obj, "parameters") orelse "{}", 619 .parameter_openapi_schema = getJsonString(alloc, obj, "parameter_openapi_schema"), 620 .enforce_parameter_schema = getBool(obj, "enforce_parameter_schema") orelse true, 621 .tags = getJsonString(alloc, obj, "tags") orelse "[]", 622 .labels = getJsonString(alloc, obj, "labels") orelse "{}", 623 .path = getString(obj, "path"), 624 .entrypoint = getString(obj, "entrypoint"), 625 .job_variables = getJsonString(alloc, obj, "job_variables") orelse "{}", 626 .pull_steps = getJsonString(alloc, obj, "pull_steps"), 627 .work_pool_name = getString(obj, "work_pool_name"), 628 .work_queue_name = getString(obj, "work_queue_name"), 629 .concurrency_limit = getInt(obj, "concurrency_limit"), 630 }; 631} 632 633fn buildUpdateParams(obj: json.ObjectMap) db.deployments.UpdateParams { 634 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 635 const alloc = arena.allocator(); 636 637 return .{ 638 .version = getString(obj, "version"), 639 .description = getString(obj, "description"), 640 .paused = getBool(obj, "paused"), 641 .parameters = getJsonString(alloc, obj, "parameters"), 642 .parameter_openapi_schema = getJsonString(alloc, obj, "parameter_openapi_schema"), 643 .enforce_parameter_schema = getBool(obj, "enforce_parameter_schema"), 644 .tags = getJsonString(alloc, obj, "tags"), 645 .labels = getJsonString(alloc, obj, "labels"), 646 .path = getString(obj, "path"), 647 .entrypoint = getString(obj, "entrypoint"), 648 .job_variables = getJsonString(alloc, obj, "job_variables"), 649 .pull_steps = getJsonString(alloc, obj, "pull_steps"), 650 .work_pool_name = getString(obj, "work_pool_name"), 651 .work_queue_name = getString(obj, "work_queue_name"), 652 .concurrency_limit = getInt(obj, "concurrency_limit"), 653 }; 654} 655 656// Response serializers 657 658fn writeDeployment(alloc: std.mem.Allocator, d: db.deployments.DeploymentRow) ![]const u8 { 659 var output: std.io.Writer.Allocating = .init(alloc); 660 var jw: json.Stringify = .{ .writer = &output.writer }; 661 try writeDeploymentObject(&jw, d, alloc); 662 return output.toOwnedSlice(); 663} 664 665fn writeDeploymentObject(jw: *json.Stringify, d: db.deployments.DeploymentRow, alloc: std.mem.Allocator) !void { 666 try jw.beginObject(); 667 668 try jw.objectField("id"); 669 try jw.write(d.id); 670 try jw.objectField("created"); 671 try jw.write(d.created); 672 try jw.objectField("updated"); 673 try jw.write(d.updated); 674 try jw.objectField("name"); 675 try jw.write(d.name); 676 try jw.objectField("flow_id"); 677 try jw.write(d.flow_id); 678 try jw.objectField("version"); 679 try jw.write(d.version); 680 try jw.objectField("description"); 681 try jw.write(d.description); 682 try jw.objectField("paused"); 683 try jw.write(d.paused); 684 try jw.objectField("status"); 685 try jw.write(d.status.toString()); 686 try jw.objectField("last_polled"); 687 try jw.write(d.last_polled); 688 try jw.objectField("parameters"); 689 try jw.beginWriteRaw(); 690 try jw.writer.writeAll(d.parameters); 691 jw.endWriteRaw(); 692 try jw.objectField("parameter_openapi_schema"); 693 if (d.parameter_openapi_schema) |s| { 694 try jw.beginWriteRaw(); 695 try jw.writer.writeAll(s); 696 jw.endWriteRaw(); 697 } else { 698 try jw.write(null); 699 } 700 try jw.objectField("enforce_parameter_schema"); 701 try jw.write(d.enforce_parameter_schema); 702 try jw.objectField("tags"); 703 try jw.beginWriteRaw(); 704 try jw.writer.writeAll(d.tags); 705 jw.endWriteRaw(); 706 try jw.objectField("labels"); 707 try jw.beginWriteRaw(); 708 try jw.writer.writeAll(d.labels); 709 jw.endWriteRaw(); 710 try jw.objectField("path"); 711 try jw.write(d.path); 712 try jw.objectField("entrypoint"); 713 try jw.write(d.entrypoint); 714 try jw.objectField("job_variables"); 715 try jw.beginWriteRaw(); 716 try jw.writer.writeAll(d.job_variables); 717 jw.endWriteRaw(); 718 try jw.objectField("pull_steps"); 719 if (d.pull_steps) |ps| { 720 try jw.beginWriteRaw(); 721 try jw.writer.writeAll(ps); 722 jw.endWriteRaw(); 723 } else { 724 try jw.write(null); 725 } 726 try jw.objectField("work_pool_name"); 727 try jw.write(d.work_pool_name); 728 try jw.objectField("work_queue_name"); 729 try jw.write(d.work_queue_name); 730 try jw.objectField("work_queue_id"); 731 try jw.write(d.work_queue_id); 732 try jw.objectField("storage_document_id"); 733 try jw.write(d.storage_document_id); 734 try jw.objectField("infrastructure_document_id"); 735 try jw.write(d.infrastructure_document_id); 736 try jw.objectField("concurrency_limit"); 737 try jw.write(d.concurrency_limit); 738 739 try jw.objectField("schedules"); 740 const sched_list = db.deployment_schedules.listByDeployment(alloc, d.id) catch &[_]db.deployment_schedules.DeploymentScheduleRow{}; 741 try jw.beginArray(); 742 for (sched_list) |s| { 743 try schedules.writeScheduleObject(jw, s); 744 } 745 try jw.endArray(); 746 747 try jw.endObject(); 748} 749 750fn writeFlowRunObject(jw: *json.Stringify, run: db.flow_runs.FlowRunRow, state_id: []const u8) !void { 751 try jw.beginObject(); 752 try jw.objectField("id"); 753 try jw.write(run.id); 754 try jw.objectField("created"); 755 try jw.write(run.created); 756 try jw.objectField("updated"); 757 try jw.write(run.updated); 758 try jw.objectField("name"); 759 try jw.write(run.name); 760 try jw.objectField("flow_id"); 761 try jw.write(run.flow_id); 762 try jw.objectField("deployment_id"); 763 try jw.write(run.deployment_id); 764 try jw.objectField("deployment_version"); 765 try jw.write(run.deployment_version); 766 try jw.objectField("work_queue_name"); 767 try jw.write(run.work_queue_name); 768 try jw.objectField("work_queue_id"); 769 try jw.write(run.work_queue_id); 770 try jw.objectField("state_type"); 771 try jw.write(run.state_type); 772 try jw.objectField("state_name"); 773 try jw.write(run.state_name); 774 try jw.objectField("expected_start_time"); 775 try jw.write(run.expected_start_time); 776 try jw.objectField("next_scheduled_start_time"); 777 try jw.write(run.next_scheduled_start_time); 778 try jw.objectField("start_time"); 779 try jw.write(run.start_time); 780 try jw.objectField("end_time"); 781 try jw.write(run.end_time); 782 try jw.objectField("state"); 783 try jw.beginObject(); 784 try jw.objectField("type"); 785 try jw.write(run.state_type); 786 try jw.objectField("name"); 787 try jw.write(run.state_name); 788 try jw.objectField("timestamp"); 789 try jw.write(run.state_timestamp); 790 try jw.objectField("id"); 791 try jw.write(state_id); 792 try jw.endObject(); 793 try jw.objectField("parameters"); 794 try jw.beginWriteRaw(); 795 try jw.writer.writeAll(run.parameters); 796 jw.endWriteRaw(); 797 try jw.objectField("tags"); 798 try jw.beginWriteRaw(); 799 try jw.writer.writeAll(run.tags); 800 jw.endWriteRaw(); 801 try jw.objectField("auto_scheduled"); 802 try jw.write(run.auto_scheduled); 803 try jw.endObject(); 804} 805 806fn writeFlowRunResponse(alloc: std.mem.Allocator, run: db.flow_runs.FlowRunRow, state_id: []const u8) ![]const u8 { 807 var output: std.io.Writer.Allocating = .init(alloc); 808 var jw: json.Stringify = .{ .writer = &output.writer }; 809 810 try jw.beginObject(); 811 try jw.objectField("id"); 812 try jw.write(run.id); 813 try jw.objectField("created"); 814 try jw.write(run.created); 815 try jw.objectField("updated"); 816 try jw.write(run.updated); 817 try jw.objectField("name"); 818 try jw.write(run.name); 819 try jw.objectField("flow_id"); 820 try jw.write(run.flow_id); 821 try jw.objectField("deployment_id"); 822 try jw.write(run.deployment_id); 823 try jw.objectField("deployment_version"); 824 try jw.write(run.deployment_version); 825 try jw.objectField("work_queue_name"); 826 try jw.write(run.work_queue_name); 827 try jw.objectField("work_queue_id"); 828 try jw.write(run.work_queue_id); 829 try jw.objectField("state_type"); 830 try jw.write(run.state_type); 831 try jw.objectField("state_name"); 832 try jw.write(run.state_name); 833 try jw.objectField("state"); 834 try jw.beginObject(); 835 try jw.objectField("type"); 836 try jw.write(run.state_type); 837 try jw.objectField("name"); 838 try jw.write(run.state_name); 839 try jw.objectField("timestamp"); 840 try jw.write(run.state_timestamp); 841 try jw.objectField("id"); 842 try jw.write(state_id); 843 try jw.endObject(); 844 try jw.objectField("parameters"); 845 try jw.beginWriteRaw(); 846 try jw.writer.writeAll(run.parameters); 847 jw.endWriteRaw(); 848 try jw.objectField("tags"); 849 try jw.beginWriteRaw(); 850 try jw.writer.writeAll(run.tags); 851 jw.endWriteRaw(); 852 try jw.objectField("auto_scheduled"); 853 try jw.write(run.auto_scheduled); 854 try jw.endObject(); 855 856 return output.toOwnedSlice(); 857}