prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const mem = std.mem;
4const json = std.json;
5
6const db = @import("../db/sqlite.zig");
7const routing = @import("routing.zig");
8const uuid_util = @import("../utilities/uuid.zig");
9const time_util = @import("../utilities/time.zig");
10const json_util = @import("../utilities/json.zig");
11
12// POST /task_runs/ - create task run
13// POST /task_runs/filter - list task runs
14// GET /task_runs/{id} - read task run
15// POST /task_runs/{id}/set_state - set state
16pub fn handle(r: zap.Request) !void {
17 const target = r.path orelse "/";
18 const method = r.method orelse "GET";
19
20 // POST /task_runs/filter - list
21 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) {
22 try filter(r);
23 return;
24 }
25
26 // POST /task_runs/ - create
27 if (mem.eql(u8, method, "POST") and (mem.eql(u8, target, "/task_runs/") or mem.eql(u8, target, "/api/task_runs/"))) {
28 try create(r);
29 return;
30 }
31
32 // check for /{id}/set_state
33 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/set_state")) {
34 const id = routing.extractId(target, "/task_runs/", "/set_state") orelse
35 routing.extractId(target, "/api/task_runs/", "/set_state");
36 if (id) |task_run_id| {
37 try setState(r, task_run_id);
38 return;
39 }
40 }
41
42 // GET /task_runs/{id} - read single
43 if (mem.eql(u8, method, "GET")) {
44 const id = routing.extractIdAfter(target, "/task_runs/") orelse
45 routing.extractIdAfter(target, "/api/task_runs/");
46 if (id) |task_run_id| {
47 try read(r, task_run_id);
48 return;
49 }
50 }
51
52 json_util.sendStatus(r, "{\"detail\":\"not found\"}", .not_found);
53}
54
55fn create(r: zap.Request) !void {
56 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
57 defer arena.deinit();
58 const alloc = arena.allocator();
59
60 const body = r.body orelse {
61 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request);
62 return;
63 };
64
65 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
66 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
67 return;
68 };
69
70 const obj = parsed.value.object;
71
72 // required fields
73 const task_key = if (obj.get("task_key")) |v| switch (v) {
74 .string => |s| s,
75 else => {
76 json_util.sendStatus(r, "{\"detail\":\"task_key must be string\"}", .bad_request);
77 return;
78 },
79 } else {
80 json_util.sendStatus(r, "{\"detail\":\"task_key required\"}", .bad_request);
81 return;
82 };
83
84 const dynamic_key = if (obj.get("dynamic_key")) |v| switch (v) {
85 .string => |s| s,
86 else => {
87 json_util.sendStatus(r, "{\"detail\":\"dynamic_key must be string\"}", .bad_request);
88 return;
89 },
90 } else {
91 json_util.sendStatus(r, "{\"detail\":\"dynamic_key required\"}", .bad_request);
92 return;
93 };
94
95 // optional fields
96 const flow_run_id: ?[]const u8 = if (obj.get("flow_run_id")) |v| switch (v) {
97 .string => |s| s,
98 .null => null,
99 else => null,
100 } else null;
101
102 const name = if (obj.get("name")) |v| switch (v) {
103 .string => |s| s,
104 .null => routing.generateRunName(alloc),
105 else => routing.generateRunName(alloc),
106 } else routing.generateRunName(alloc);
107
108 const state = obj.get("state");
109
110 // check for existing task run (idempotency)
111 if (db.getTaskRunByKey(alloc, flow_run_id, task_key, dynamic_key) catch null) |existing| {
112 const resp = writeTaskRun(alloc, existing, null) catch {
113 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
114 return;
115 };
116 json_util.send(r, resp);
117 return;
118 }
119
120 // extract state info
121 var state_type: []const u8 = "PENDING";
122 var state_name: []const u8 = "Pending";
123 if (state) |s| {
124 if (s == .object) {
125 if (s.object.get("type")) |t| {
126 if (t == .string) state_type = t.string;
127 }
128 if (s.object.get("name")) |n| {
129 if (n == .string) state_name = n.string;
130 }
131 }
132 }
133
134 var new_id_buf: [36]u8 = undefined;
135 const new_id = uuid_util.generate(&new_id_buf);
136 var ts_buf: [32]u8 = undefined;
137 const now = time_util.timestamp(&ts_buf);
138
139 db.insertTaskRun(new_id, flow_run_id, name, task_key, dynamic_key, state_type, state_name, now) catch {
140 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error);
141 return;
142 };
143
144 var state_id_buf: [36]u8 = undefined;
145 const state_id = uuid_util.generate(&state_id_buf);
146
147 const run = db.TaskRunRow{
148 .id = new_id,
149 .created = now,
150 .updated = now,
151 .name = name,
152 .flow_run_id = flow_run_id orelse "",
153 .task_key = task_key,
154 .dynamic_key = dynamic_key,
155 .state_type = state_type,
156 .state_name = state_name,
157 .state_timestamp = now,
158 .tags = "[]",
159 .run_count = 0,
160 };
161
162 const resp = writeTaskRun(alloc, run, state_id) catch {
163 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
164 return;
165 };
166 json_util.sendStatus(r, resp, .created);
167}
168
169fn read(r: zap.Request, id: []const u8) !void {
170 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
171 defer arena.deinit();
172 const alloc = arena.allocator();
173
174 const run = db.getTaskRun(alloc, id) catch null orelse {
175 json_util.sendStatus(r, "{\"detail\":\"task run not found\"}", .not_found);
176 return;
177 };
178
179 const resp = writeTaskRun(alloc, run, null) catch {
180 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
181 return;
182 };
183 json_util.send(r, resp);
184}
185
186fn setState(r: zap.Request, id: []const u8) !void {
187 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
188 defer arena.deinit();
189 const alloc = arena.allocator();
190
191 const body = r.body orelse {
192 json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request);
193 return;
194 };
195
196 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
197 json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
198 return;
199 };
200
201 const state = parsed.value.object.get("state") orelse {
202 json_util.sendStatus(r, "{\"detail\":\"state required\"}", .bad_request);
203 return;
204 };
205
206 const state_type = if (state.object.get("type")) |v| v.string else "PENDING";
207 const state_name = if (state.object.get("name")) |v| v.string else "Pending";
208 var ts_buf: [32]u8 = undefined;
209 const now = time_util.timestamp(&ts_buf);
210 var state_id_buf: [36]u8 = undefined;
211 const state_id = uuid_util.generate(&state_id_buf);
212
213 db.setTaskRunState(id, state_id, state_type, state_name, now) catch {
214 json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
215 return;
216 };
217
218 const resp = writeStateResponse(alloc, state_type, state_name, now, state_id) catch {
219 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
220 return;
221 };
222 json_util.send(r, resp);
223}
224
225fn filter(r: zap.Request) !void {
226 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
227 defer arena.deinit();
228 const alloc = arena.allocator();
229
230 const runs = db.listTaskRuns(alloc, 50) catch {
231 json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
232 return;
233 };
234
235 var output: std.Io.Writer.Allocating = .init(alloc);
236 var jw: json.Stringify = .{ .writer = &output.writer };
237
238 jw.beginArray() catch {
239 json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
240 return;
241 };
242
243 for (runs) |run| {
244 writeTaskRunObject(&jw, run, null) catch continue;
245 }
246
247 jw.endArray() catch {};
248
249 json_util.send(r, output.toOwnedSlice() catch "[]");
250}
251
252fn writeTaskRun(alloc: std.mem.Allocator, run: db.TaskRunRow, state_id: ?[]const u8) ![]const u8 {
253 var output: std.Io.Writer.Allocating = .init(alloc);
254 var jw: json.Stringify = .{ .writer = &output.writer };
255 try writeTaskRunObject(&jw, run, state_id);
256 return output.toOwnedSlice();
257}
258
259fn writeTaskRunObject(jw: *json.Stringify, run: db.TaskRunRow, state_id: ?[]const u8) !void {
260 try jw.beginObject();
261
262 try jw.objectField("id");
263 try jw.write(run.id);
264
265 try jw.objectField("created");
266 try jw.write(run.created);
267
268 try jw.objectField("updated");
269 try jw.write(run.updated);
270
271 try jw.objectField("name");
272 try jw.write(run.name);
273
274 try jw.objectField("flow_run_id");
275 if (run.flow_run_id.len > 0) {
276 try jw.write(run.flow_run_id);
277 } else {
278 try jw.write(null);
279 }
280
281 try jw.objectField("task_key");
282 try jw.write(run.task_key);
283
284 try jw.objectField("dynamic_key");
285 try jw.write(run.dynamic_key);
286
287 try jw.objectField("state_type");
288 try jw.write(run.state_type);
289
290 try jw.objectField("state_name");
291 try jw.write(run.state_name);
292
293 try jw.objectField("state");
294 try jw.beginObject();
295 try jw.objectField("type");
296 try jw.write(run.state_type);
297 try jw.objectField("name");
298 try jw.write(run.state_name);
299 try jw.objectField("timestamp");
300 try jw.write(run.state_timestamp);
301 if (state_id) |sid| {
302 try jw.objectField("id");
303 try jw.write(sid);
304 }
305 try jw.endObject();
306
307 try jw.objectField("tags");
308 try jw.beginWriteRaw();
309 try jw.writer.writeAll(run.tags);
310 jw.endWriteRaw();
311
312 try jw.objectField("run_count");
313 try jw.write(run.run_count);
314
315 try jw.objectField("expected_start_time");
316 try jw.write(null);
317
318 try jw.objectField("start_time");
319 try jw.write(null);
320
321 try jw.objectField("end_time");
322 try jw.write(null);
323
324 try jw.objectField("total_run_time");
325 try jw.write(@as(i32, 0));
326
327 try jw.endObject();
328}
329
330fn writeStateResponse(alloc: std.mem.Allocator, state_type: []const u8, state_name: []const u8, timestamp: []const u8, state_id: []const u8) ![]const u8 {
331 var output: std.Io.Writer.Allocating = .init(alloc);
332 var jw: json.Stringify = .{ .writer = &output.writer };
333
334 try jw.beginObject();
335
336 try jw.objectField("status");
337 try jw.write("ACCEPT");
338
339 try jw.objectField("details");
340 try jw.beginObject();
341 try jw.endObject();
342
343 try jw.objectField("state");
344 try jw.beginObject();
345 try jw.objectField("type");
346 try jw.write(state_type);
347 try jw.objectField("name");
348 try jw.write(state_name);
349 try jw.objectField("timestamp");
350 try jw.write(timestamp);
351 try jw.objectField("id");
352 try jw.write(state_id);
353 try jw.endObject();
354
355 try jw.endObject();
356
357 return output.toOwnedSlice();
358}