prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const mem = std.mem;
4const json = std.json;
5
6const db = @import("../db/sqlite.zig");
7const uuid_util = @import("../utilities/uuid.zig");
8const time_util = @import("../utilities/time.zig");
9const json_util = @import("../utilities/json.zig");
10
11const schedules = @import("deployment_schedules.zig");
12
13pub fn handle(r: zap.Request) !void {
14 const target = r.path orelse "/";
15 const method = r.method orelse "GET";
16
17 // POST /deployments/filter
18 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) {
19 try filter(r);
20 return;
21 }
22
23 // POST /deployments/count
24 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/count")) {
25 try count(r);
26 return;
27 }
28
29 // POST /deployments/get_scheduled_flow_runs
30 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/get_scheduled_flow_runs")) {
31 try getScheduledFlowRuns(r);
32 return;
33 }
34
35 // GET /deployments/name/{flow_name}/{deployment_name}
36 if (mem.eql(u8, method, "GET") and mem.indexOf(u8, target, "/name/") != null) {
37 try getByName(r, target);
38 return;
39 }
40
41 // POST /deployments/{id}/create_flow_run
42 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/create_flow_run")) {
43 try createFlowRun(r, target);
44 return;
45 }
46
47 // POST /deployments/{id}/pause_deployment
48 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/pause_deployment")) {
49 try pause(r, target);
50 return;
51 }
52
53 // POST /deployments/{id}/resume_deployment
54 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/resume_deployment")) {
55 try resume_(r, target);
56 return;
57 }
58
59 // Schedule endpoints
60 if (mem.indexOf(u8, target, "/schedules") != null) {
61 try schedules.handle(r, target);
62 return;
63 }
64
65 // POST /deployments/ - create deployment
66 if (mem.eql(u8, method, "POST")) {
67 const is_root = mem.endsWith(u8, target, "/deployments/") or mem.endsWith(u8, target, "/deployments");
68 if (is_root) {
69 try create(r);
70 return;
71 }
72 }
73
74 // GET /deployments/{id}
75 if (mem.eql(u8, method, "GET")) {
76 const id = extractDeploymentId(target) orelse {
77 json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request);
78 return;
79 };
80 try read(r, id);
81 return;
82 }
83
84 // PATCH /deployments/{id}
85 if (mem.eql(u8, method, "PATCH")) {
86 const id = extractDeploymentId(target) orelse {
87 json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request);
88 return;
89 };
90 try update(r, id);
91 return;
92 }
93
94 // DELETE /deployments/{id}
95 if (mem.eql(u8, method, "DELETE")) {
96 const id = extractDeploymentId(target) orelse {
97 json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request);
98 return;
99 };
100 try delete(r, id);
101 return;
102 }
103
104 json_util.sendStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented);
105}
106
107// Path helper
108
109pub fn extractDeploymentId(target: []const u8) ?[]const u8 {
110 const prefix = if (mem.startsWith(u8, target, "/api/deployments/"))
111 "/api/deployments/"
112 else if (mem.startsWith(u8, target, "/deployments/"))
113 "/deployments/"
114 else
115 return null;
116
117 if (target.len <= prefix.len) return null;
118
119 const after = target[prefix.len..];
120 const end = mem.indexOf(u8, after, "/") orelse after.len;
121 if (end == 0) return null;
122
123 return after[0..end];
124}
125
126// CRUD handlers
127
128fn create(r: zap.Request) !void {
129 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
130 defer arena.deinit();
131 const alloc = arena.allocator();
132
133 const body = r.body orelse {
134 json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request);
135 return;
136 };
137
138 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
139 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
140 return;
141 };
142
143 const obj = parsed.value.object;
144
145 const name = getString(obj, "name") orelse {
146 json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request);
147 return;
148 };
149
150 const flow_id = getString(obj, "flow_id") orelse {
151 json_util.sendStatus(r, "{\"detail\":\"flow_id required\"}", .bad_request);
152 return;
153 };
154
155 // Verify flow exists
156 _ = db.flows.getById(alloc, flow_id) catch null orelse {
157 json_util.sendStatus(r, "{\"detail\":\"Flow not found\"}", .not_found);
158 return;
159 };
160
161 var ts_buf: [32]u8 = undefined;
162 const now = time_util.timestamp(&ts_buf);
163
164 // Check for existing deployment (upsert)
165 if (db.deployments.getByFlowAndName(alloc, flow_id, name) catch null) |existing| {
166 _ = db.deployments.updateById(existing.id, now, buildUpdateParams(obj)) catch {
167 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
168 return;
169 };
170 if (obj.get("schedules")) |sched_val| {
171 try schedules.replaceSchedules(alloc, existing.id, sched_val, now);
172 }
173 const deployment = db.deployments.getById(alloc, existing.id) catch null orelse {
174 json_util.sendStatus(r, "{\"detail\":\"not found after update\"}", .internal_server_error);
175 return;
176 };
177 const resp = writeDeployment(alloc, deployment) catch {
178 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
179 return;
180 };
181 json_util.send(r, resp);
182 return;
183 }
184
185 // Create new
186 var id_buf: [36]u8 = undefined;
187 const new_id = uuid_util.generate(&id_buf);
188
189 // Resolve work_queue_id from work_pool_name if provided
190 var insert_params = buildInsertParams(obj);
191 if (insert_params.work_pool_name) |pool_name| {
192 if (db.work_pools.getByName(alloc, pool_name) catch null) |pool| {
193 if (insert_params.work_queue_name) |queue_name| {
194 // Look up specific queue by name
195 if (db.work_queues.getByPoolAndName(alloc, pool.id, queue_name) catch null) |queue| {
196 insert_params.work_queue_id = queue.id;
197 }
198 } else if (pool.default_queue_id) |default_id| {
199 // Use pool's default queue
200 insert_params.work_queue_id = default_id;
201 insert_params.work_queue_name = "default";
202 }
203 }
204 }
205
206 db.deployments.insert(new_id, name, flow_id, now, insert_params) catch {
207 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error);
208 return;
209 };
210
211 if (obj.get("schedules")) |sched_val| {
212 try schedules.replaceSchedules(alloc, new_id, sched_val, now);
213 }
214
215 const deployment = db.deployments.getById(alloc, new_id) catch null orelse {
216 json_util.sendStatus(r, "{\"detail\":\"not found after insert\"}", .internal_server_error);
217 return;
218 };
219 const resp = writeDeployment(alloc, deployment) catch {
220 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
221 return;
222 };
223 json_util.sendStatus(r, resp, .created);
224}
225
226fn read(r: zap.Request, id: []const u8) !void {
227 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
228 defer arena.deinit();
229 const alloc = arena.allocator();
230
231 const deployment = db.deployments.getById(alloc, id) catch null orelse {
232 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found);
233 return;
234 };
235
236 const resp = writeDeployment(alloc, deployment) catch {
237 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
238 return;
239 };
240 json_util.send(r, resp);
241}
242
243fn getByName(r: zap.Request, target: []const u8) !void {
244 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
245 defer arena.deinit();
246 const alloc = arena.allocator();
247
248 const name_idx = mem.indexOf(u8, target, "/name/") orelse {
249 json_util.sendStatus(r, "{\"detail\":\"invalid path\"}", .bad_request);
250 return;
251 };
252 const after_name = target[name_idx + 6 ..];
253 const sep_idx = mem.indexOf(u8, after_name, "/") orelse {
254 json_util.sendStatus(r, "{\"detail\":\"deployment name required\"}", .bad_request);
255 return;
256 };
257 const flow_name = after_name[0..sep_idx];
258 const deployment_name = after_name[sep_idx + 1 ..];
259 if (deployment_name.len == 0) {
260 json_util.sendStatus(r, "{\"detail\":\"deployment name required\"}", .bad_request);
261 return;
262 }
263
264 const flow = db.flows.getByName(alloc, flow_name) catch null orelse {
265 json_util.sendStatus(r, "{\"detail\":\"Flow not found\"}", .not_found);
266 return;
267 };
268
269 const deployment = db.deployments.getByFlowAndName(alloc, flow.id, deployment_name) catch null orelse {
270 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found);
271 return;
272 };
273
274 const resp = writeDeployment(alloc, deployment) catch {
275 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
276 return;
277 };
278 json_util.send(r, resp);
279}
280
281fn update(r: zap.Request, id: []const u8) !void {
282 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
283 defer arena.deinit();
284 const alloc = arena.allocator();
285
286 const body = r.body orelse {
287 json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request);
288 return;
289 };
290
291 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
292 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
293 return;
294 };
295
296 var ts_buf: [32]u8 = undefined;
297 const now = time_util.timestamp(&ts_buf);
298
299 const updated = db.deployments.updateById(id, now, buildUpdateParams(parsed.value.object)) catch {
300 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
301 return;
302 };
303
304 if (!updated) {
305 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found);
306 return;
307 }
308
309 r.setStatus(.no_content);
310 r.sendBody("") catch {};
311}
312
313fn delete(r: zap.Request, id: []const u8) !void {
314 const deleted = db.deployments.deleteById(id) catch {
315 json_util.sendStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error);
316 return;
317 };
318
319 if (!deleted) {
320 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found);
321 return;
322 }
323
324 r.setStatus(.no_content);
325 r.sendBody("") catch {};
326}
327
328fn filter(r: zap.Request) !void {
329 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
330 defer arena.deinit();
331 const alloc = arena.allocator();
332
333 var limit: usize = 200;
334 var offset: usize = 0;
335
336 if (r.body) |body| {
337 if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| {
338 const obj = parsed.value.object;
339 if (obj.get("limit")) |v| {
340 if (v == .integer) limit = @intCast(v.integer);
341 }
342 if (obj.get("offset")) |v| {
343 if (v == .integer) offset = @intCast(v.integer);
344 }
345 } else |_| {}
346 }
347
348 const deployments_list = db.deployments.list(alloc, limit, offset) catch {
349 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
350 return;
351 };
352
353 var output: std.io.Writer.Allocating = .init(alloc);
354 var jw: json.Stringify = .{ .writer = &output.writer };
355
356 jw.beginArray() catch {
357 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
358 return;
359 };
360
361 for (deployments_list) |d| {
362 writeDeploymentObject(&jw, d, alloc) catch continue;
363 }
364
365 jw.endArray() catch {};
366
367 json_util.send(r, output.toOwnedSlice() catch "[]");
368}
369
370fn count(r: zap.Request) !void {
371 const c = db.deployments.count() catch 0;
372 var buf: [32]u8 = undefined;
373 const resp = std.fmt.bufPrint(&buf, "{d}", .{c}) catch "0";
374 json_util.send(r, resp);
375}
376
377// Action handlers
378
379fn createFlowRun(r: zap.Request, target: []const u8) !void {
380 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
381 defer arena.deinit();
382 const alloc = arena.allocator();
383
384 const id = extractDeploymentId(target) orelse {
385 json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request);
386 return;
387 };
388
389 const deployment = db.deployments.getById(alloc, id) catch null orelse {
390 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found);
391 return;
392 };
393
394 var state_type: []const u8 = "SCHEDULED";
395 var state_name: []const u8 = "Scheduled";
396
397 if (r.body) |body| {
398 if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| {
399 const obj = parsed.value.object;
400 if (obj.get("state")) |s| {
401 if (s == .object) {
402 if (s.object.get("type")) |t| {
403 if (t == .string) state_type = t.string;
404 }
405 if (s.object.get("name")) |n| {
406 if (n == .string) state_name = n.string;
407 }
408 }
409 }
410 } else |_| {}
411 }
412
413 var id_buf: [36]u8 = undefined;
414 const run_id = uuid_util.generate(&id_buf);
415
416 var ts_buf: [32]u8 = undefined;
417 const now = time_util.timestamp(&ts_buf);
418
419 var name_buf: [64]u8 = undefined;
420 const run_name = std.fmt.bufPrint(&name_buf, "{s}-{s}", .{
421 deployment.name[0..@min(deployment.name.len, 20)],
422 run_id[0..8],
423 }) catch "run";
424
425 db.flow_runs.insert(run_id, deployment.flow_id, run_name, state_type, state_name, now, .{
426 .deployment_id = deployment.id,
427 .deployment_version = deployment.version,
428 .work_queue_name = deployment.work_queue_name,
429 .work_queue_id = deployment.work_queue_id,
430 }) catch {
431 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error);
432 return;
433 };
434
435 const run = db.flow_runs.get(alloc, run_id) catch null orelse {
436 json_util.sendStatus(r, "{\"detail\":\"not found after insert\"}", .internal_server_error);
437 return;
438 };
439
440 var state_id_buf: [36]u8 = undefined;
441 const state_id = uuid_util.generate(&state_id_buf);
442
443 const resp = writeFlowRunResponse(alloc, run, state_id) catch {
444 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
445 return;
446 };
447 json_util.sendStatus(r, resp, .created);
448}
449
450fn pause(r: zap.Request, target: []const u8) !void {
451 const id = extractDeploymentId(target) orelse {
452 json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request);
453 return;
454 };
455
456 var ts_buf: [32]u8 = undefined;
457 const now = time_util.timestamp(&ts_buf);
458
459 const updated = db.deployments.updateById(id, now, .{ .paused = true }) catch {
460 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
461 return;
462 };
463
464 if (!updated) {
465 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found);
466 return;
467 }
468
469 r.setStatus(.no_content);
470 r.sendBody("") catch {};
471}
472
473fn resume_(r: zap.Request, target: []const u8) !void {
474 const id = extractDeploymentId(target) orelse {
475 json_util.sendStatus(r, "{\"detail\":\"deployment id required\"}", .bad_request);
476 return;
477 };
478
479 var ts_buf: [32]u8 = undefined;
480 const now = time_util.timestamp(&ts_buf);
481
482 const updated = db.deployments.updateById(id, now, .{ .paused = false }) catch {
483 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
484 return;
485 };
486
487 if (!updated) {
488 json_util.sendStatus(r, "{\"detail\":\"Deployment not found\"}", .not_found);
489 return;
490 }
491
492 r.setStatus(.no_content);
493 r.sendBody("") catch {};
494}
495
496fn getScheduledFlowRuns(r: zap.Request) !void {
497 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
498 defer arena.deinit();
499 const alloc = arena.allocator();
500
501 const body = r.body orelse {
502 json_util.send(r, "[]");
503 return;
504 };
505
506 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
507 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
508 return;
509 };
510
511 const obj = parsed.value.object;
512
513 const ids_val = obj.get("deployment_ids") orelse {
514 json_util.send(r, "[]");
515 return;
516 };
517
518 if (ids_val != .array) {
519 json_util.send(r, "[]");
520 return;
521 }
522
523 var deployment_ids = std.ArrayListUnmanaged([]const u8){};
524 for (ids_val.array.items) |item| {
525 if (item == .string) {
526 try deployment_ids.append(alloc, item.string);
527 }
528 }
529
530 if (deployment_ids.items.len == 0) {
531 json_util.send(r, "[]");
532 return;
533 }
534
535 var scheduled_before: ?[]const u8 = null;
536 if (obj.get("scheduled_before")) |v| {
537 if (v == .string) scheduled_before = normalizeTimestamp(alloc, v.string);
538 }
539
540 var limit: usize = 100;
541 if (obj.get("limit")) |v| {
542 if (v == .integer) limit = @intCast(v.integer);
543 }
544
545 const runs = db.flow_runs.getScheduledByDeployments(alloc, deployment_ids.items, scheduled_before, limit) catch {
546 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
547 return;
548 };
549
550 var output: std.io.Writer.Allocating = .init(alloc);
551 var jw: json.Stringify = .{ .writer = &output.writer };
552
553 jw.beginArray() catch {
554 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
555 return;
556 };
557
558 for (runs) |run| {
559 var state_id_buf: [36]u8 = undefined;
560 const state_id = uuid_util.generate(&state_id_buf);
561 writeFlowRunObject(&jw, run, state_id) catch continue;
562 }
563
564 jw.endArray() catch {};
565
566 json_util.send(r, output.toOwnedSlice() catch "[]");
567}
568
569// Timestamp normalization - convert various timestamp formats to ISO8601
570// Client may send "2026-01-22 16:40:23.915842+00:00" but db stores "2026-01-22T16:40:23.915842Z"
571fn normalizeTimestamp(alloc: std.mem.Allocator, raw: []const u8) ?[]const u8 {
572 // find space between date and time
573 const space_idx = mem.indexOf(u8, raw, " ") orelse return raw;
574
575 var normalized = alloc.alloc(u8, raw.len) catch return raw;
576 @memcpy(normalized, raw);
577 normalized[space_idx] = 'T';
578
579 // convert +00:00 to Z
580 if (mem.endsWith(u8, normalized, "+00:00")) {
581 normalized[normalized.len - 6] = 'Z';
582 return normalized[0 .. normalized.len - 5];
583 }
584 return normalized;
585}
586
587// JSON helpers
588
589fn getString(obj: json.ObjectMap, key: []const u8) ?[]const u8 {
590 const v = obj.get(key) orelse return null;
591 return if (v == .string) v.string else null;
592}
593
594fn getBool(obj: json.ObjectMap, key: []const u8) ?bool {
595 const v = obj.get(key) orelse return null;
596 return if (v == .bool) v.bool else null;
597}
598
599fn getInt(obj: json.ObjectMap, key: []const u8) ?i64 {
600 const v = obj.get(key) orelse return null;
601 return if (v == .integer) v.integer else null;
602}
603
604fn getJsonString(alloc: std.mem.Allocator, obj: json.ObjectMap, key: []const u8) ?[]const u8 {
605 const v = obj.get(key) orelse return null;
606 if (v == .null) return null;
607 return std.fmt.allocPrint(alloc, "{f}", .{json.fmt(v, .{})}) catch null;
608}
609
610fn buildInsertParams(obj: json.ObjectMap) db.deployments.InsertParams {
611 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
612 const alloc = arena.allocator();
613
614 return .{
615 .version = getString(obj, "version"),
616 .description = getString(obj, "description"),
617 .paused = getBool(obj, "paused") orelse false,
618 .parameters = getJsonString(alloc, obj, "parameters") orelse "{}",
619 .parameter_openapi_schema = getJsonString(alloc, obj, "parameter_openapi_schema"),
620 .enforce_parameter_schema = getBool(obj, "enforce_parameter_schema") orelse true,
621 .tags = getJsonString(alloc, obj, "tags") orelse "[]",
622 .labels = getJsonString(alloc, obj, "labels") orelse "{}",
623 .path = getString(obj, "path"),
624 .entrypoint = getString(obj, "entrypoint"),
625 .job_variables = getJsonString(alloc, obj, "job_variables") orelse "{}",
626 .pull_steps = getJsonString(alloc, obj, "pull_steps"),
627 .work_pool_name = getString(obj, "work_pool_name"),
628 .work_queue_name = getString(obj, "work_queue_name"),
629 .concurrency_limit = getInt(obj, "concurrency_limit"),
630 };
631}
632
633fn buildUpdateParams(obj: json.ObjectMap) db.deployments.UpdateParams {
634 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
635 const alloc = arena.allocator();
636
637 return .{
638 .version = getString(obj, "version"),
639 .description = getString(obj, "description"),
640 .paused = getBool(obj, "paused"),
641 .parameters = getJsonString(alloc, obj, "parameters"),
642 .parameter_openapi_schema = getJsonString(alloc, obj, "parameter_openapi_schema"),
643 .enforce_parameter_schema = getBool(obj, "enforce_parameter_schema"),
644 .tags = getJsonString(alloc, obj, "tags"),
645 .labels = getJsonString(alloc, obj, "labels"),
646 .path = getString(obj, "path"),
647 .entrypoint = getString(obj, "entrypoint"),
648 .job_variables = getJsonString(alloc, obj, "job_variables"),
649 .pull_steps = getJsonString(alloc, obj, "pull_steps"),
650 .work_pool_name = getString(obj, "work_pool_name"),
651 .work_queue_name = getString(obj, "work_queue_name"),
652 .concurrency_limit = getInt(obj, "concurrency_limit"),
653 };
654}
655
656// Response serializers
657
658fn writeDeployment(alloc: std.mem.Allocator, d: db.deployments.DeploymentRow) ![]const u8 {
659 var output: std.io.Writer.Allocating = .init(alloc);
660 var jw: json.Stringify = .{ .writer = &output.writer };
661 try writeDeploymentObject(&jw, d, alloc);
662 return output.toOwnedSlice();
663}
664
665fn writeDeploymentObject(jw: *json.Stringify, d: db.deployments.DeploymentRow, alloc: std.mem.Allocator) !void {
666 try jw.beginObject();
667
668 try jw.objectField("id");
669 try jw.write(d.id);
670 try jw.objectField("created");
671 try jw.write(d.created);
672 try jw.objectField("updated");
673 try jw.write(d.updated);
674 try jw.objectField("name");
675 try jw.write(d.name);
676 try jw.objectField("flow_id");
677 try jw.write(d.flow_id);
678 try jw.objectField("version");
679 try jw.write(d.version);
680 try jw.objectField("description");
681 try jw.write(d.description);
682 try jw.objectField("paused");
683 try jw.write(d.paused);
684 try jw.objectField("status");
685 try jw.write(d.status.toString());
686 try jw.objectField("last_polled");
687 try jw.write(d.last_polled);
688 try jw.objectField("parameters");
689 try jw.beginWriteRaw();
690 try jw.writer.writeAll(d.parameters);
691 jw.endWriteRaw();
692 try jw.objectField("parameter_openapi_schema");
693 if (d.parameter_openapi_schema) |s| {
694 try jw.beginWriteRaw();
695 try jw.writer.writeAll(s);
696 jw.endWriteRaw();
697 } else {
698 try jw.write(null);
699 }
700 try jw.objectField("enforce_parameter_schema");
701 try jw.write(d.enforce_parameter_schema);
702 try jw.objectField("tags");
703 try jw.beginWriteRaw();
704 try jw.writer.writeAll(d.tags);
705 jw.endWriteRaw();
706 try jw.objectField("labels");
707 try jw.beginWriteRaw();
708 try jw.writer.writeAll(d.labels);
709 jw.endWriteRaw();
710 try jw.objectField("path");
711 try jw.write(d.path);
712 try jw.objectField("entrypoint");
713 try jw.write(d.entrypoint);
714 try jw.objectField("job_variables");
715 try jw.beginWriteRaw();
716 try jw.writer.writeAll(d.job_variables);
717 jw.endWriteRaw();
718 try jw.objectField("pull_steps");
719 if (d.pull_steps) |ps| {
720 try jw.beginWriteRaw();
721 try jw.writer.writeAll(ps);
722 jw.endWriteRaw();
723 } else {
724 try jw.write(null);
725 }
726 try jw.objectField("work_pool_name");
727 try jw.write(d.work_pool_name);
728 try jw.objectField("work_queue_name");
729 try jw.write(d.work_queue_name);
730 try jw.objectField("work_queue_id");
731 try jw.write(d.work_queue_id);
732 try jw.objectField("storage_document_id");
733 try jw.write(d.storage_document_id);
734 try jw.objectField("infrastructure_document_id");
735 try jw.write(d.infrastructure_document_id);
736 try jw.objectField("concurrency_limit");
737 try jw.write(d.concurrency_limit);
738
739 try jw.objectField("schedules");
740 const sched_list = db.deployment_schedules.listByDeployment(alloc, d.id) catch &[_]db.deployment_schedules.DeploymentScheduleRow{};
741 try jw.beginArray();
742 for (sched_list) |s| {
743 try schedules.writeScheduleObject(jw, s);
744 }
745 try jw.endArray();
746
747 try jw.endObject();
748}
749
750fn writeFlowRunObject(jw: *json.Stringify, run: db.flow_runs.FlowRunRow, state_id: []const u8) !void {
751 try jw.beginObject();
752 try jw.objectField("id");
753 try jw.write(run.id);
754 try jw.objectField("created");
755 try jw.write(run.created);
756 try jw.objectField("updated");
757 try jw.write(run.updated);
758 try jw.objectField("name");
759 try jw.write(run.name);
760 try jw.objectField("flow_id");
761 try jw.write(run.flow_id);
762 try jw.objectField("deployment_id");
763 try jw.write(run.deployment_id);
764 try jw.objectField("deployment_version");
765 try jw.write(run.deployment_version);
766 try jw.objectField("work_queue_name");
767 try jw.write(run.work_queue_name);
768 try jw.objectField("work_queue_id");
769 try jw.write(run.work_queue_id);
770 try jw.objectField("state_type");
771 try jw.write(run.state_type);
772 try jw.objectField("state_name");
773 try jw.write(run.state_name);
774 try jw.objectField("expected_start_time");
775 try jw.write(run.expected_start_time);
776 try jw.objectField("next_scheduled_start_time");
777 try jw.write(run.next_scheduled_start_time);
778 try jw.objectField("start_time");
779 try jw.write(run.start_time);
780 try jw.objectField("end_time");
781 try jw.write(run.end_time);
782 try jw.objectField("state");
783 try jw.beginObject();
784 try jw.objectField("type");
785 try jw.write(run.state_type);
786 try jw.objectField("name");
787 try jw.write(run.state_name);
788 try jw.objectField("timestamp");
789 try jw.write(run.state_timestamp);
790 try jw.objectField("id");
791 try jw.write(state_id);
792 try jw.endObject();
793 try jw.objectField("parameters");
794 try jw.beginWriteRaw();
795 try jw.writer.writeAll(run.parameters);
796 jw.endWriteRaw();
797 try jw.objectField("tags");
798 try jw.beginWriteRaw();
799 try jw.writer.writeAll(run.tags);
800 jw.endWriteRaw();
801 try jw.objectField("auto_scheduled");
802 try jw.write(run.auto_scheduled);
803 try jw.endObject();
804}
805
806fn writeFlowRunResponse(alloc: std.mem.Allocator, run: db.flow_runs.FlowRunRow, state_id: []const u8) ![]const u8 {
807 var output: std.io.Writer.Allocating = .init(alloc);
808 var jw: json.Stringify = .{ .writer = &output.writer };
809
810 try jw.beginObject();
811 try jw.objectField("id");
812 try jw.write(run.id);
813 try jw.objectField("created");
814 try jw.write(run.created);
815 try jw.objectField("updated");
816 try jw.write(run.updated);
817 try jw.objectField("name");
818 try jw.write(run.name);
819 try jw.objectField("flow_id");
820 try jw.write(run.flow_id);
821 try jw.objectField("deployment_id");
822 try jw.write(run.deployment_id);
823 try jw.objectField("deployment_version");
824 try jw.write(run.deployment_version);
825 try jw.objectField("work_queue_name");
826 try jw.write(run.work_queue_name);
827 try jw.objectField("work_queue_id");
828 try jw.write(run.work_queue_id);
829 try jw.objectField("state_type");
830 try jw.write(run.state_type);
831 try jw.objectField("state_name");
832 try jw.write(run.state_name);
833 try jw.objectField("state");
834 try jw.beginObject();
835 try jw.objectField("type");
836 try jw.write(run.state_type);
837 try jw.objectField("name");
838 try jw.write(run.state_name);
839 try jw.objectField("timestamp");
840 try jw.write(run.state_timestamp);
841 try jw.objectField("id");
842 try jw.write(state_id);
843 try jw.endObject();
844 try jw.objectField("parameters");
845 try jw.beginWriteRaw();
846 try jw.writer.writeAll(run.parameters);
847 jw.endWriteRaw();
848 try jw.objectField("tags");
849 try jw.beginWriteRaw();
850 try jw.writer.writeAll(run.tags);
851 jw.endWriteRaw();
852 try jw.objectField("auto_scheduled");
853 try jw.write(run.auto_scheduled);
854 try jw.endObject();
855
856 return output.toOwnedSlice();
857}