prefect server in zig

refactor: use std.json.Stringify pattern for flows/flow_runs/task_runs

replaces manual bufPrint JSON templating with streaming Stringify API:
- extract common sendJson helpers to utilities/json.zig
- add writeFlowObject, writeFlowRunObject, writeTaskRunObject helpers
- handle nullable fields and nested objects properly
- use beginWriteRaw/endWriteRaw for pre-formatted JSON from db

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+387 -278
+150 -105
src/api/flow_runs.zig
··· 7 7 const routing = @import("routing.zig"); 8 8 const uuid_util = @import("../utilities/uuid.zig"); 9 9 const time_util = @import("../utilities/time.zig"); 10 + const json_util = @import("../utilities/json.zig"); 10 11 const orchestration = @import("../orchestration/orchestration.zig"); 11 - 12 - fn sendJson(r: zap.Request, body: []const u8) void { 13 - r.setHeader("content-type", "application/json") catch {}; 14 - r.setHeader("access-control-allow-origin", "*") catch {}; 15 - r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {}; 16 - r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {}; 17 - r.sendBody(body) catch {}; 18 - } 19 - 20 - fn sendJsonStatus(r: zap.Request, body: []const u8, status: zap.http.StatusCode) void { 21 - r.setStatus(status); 22 - sendJson(r, body); 23 - } 24 12 25 13 // POST /flow_runs/ - create flow run 26 14 // GET /flow_runs/{id} - read flow run ··· 62 50 } 63 51 } 64 52 65 - sendJsonStatus(r, "{\"detail\":\"not found\"}", .not_found); 53 + json_util.sendStatus(r, "{\"detail\":\"not found\"}", .not_found); 66 54 } 67 55 68 56 fn create(r: zap.Request) !void { ··· 71 59 const alloc = arena.allocator(); 72 60 73 61 const body = r.body orelse { 74 - sendJsonStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 62 + json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 75 63 return; 76 64 }; 77 65 78 66 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 79 - sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 67 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 80 68 return; 81 69 }; 82 70 ··· 84 72 const raw_flow_id = if (obj.get("flow_id")) |v| switch (v) { 85 73 .string => |s| s, 86 74 else => { 87 - sendJsonStatus(r, "{\"detail\":\"flow_id must be string\"}", .bad_request); 75 + json_util.sendStatus(r, "{\"detail\":\"flow_id must be string\"}", .bad_request); 88 76 return; 89 77 }, 90 78 } else { 91 - sendJsonStatus(r, "{\"detail\":\"flow_id required\"}", .bad_request); 79 + json_util.sendStatus(r, "{\"detail\":\"flow_id required\"}", .bad_request); 92 80 return; 93 81 }; 94 82 ··· 115 103 const now = time_util.timestamp(&ts_buf); 116 104 117 105 db.insertFlowRun(new_id, flow_id, name, state_type, state_name, now) catch { 118 - sendJsonStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 106 + json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 119 107 return; 120 108 }; 121 109 122 - // use stack buffer for response 123 110 var state_id_buf: [36]u8 = undefined; 124 111 const state_id = uuid_util.generate(&state_id_buf); 125 - var resp_buf: [2048]u8 = undefined; 126 - const response = std.fmt.bufPrint(&resp_buf, 127 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}},"parameters":{{}},"tags":[],"run_count":0,"expected_start_time":null,"start_time":null,"end_time":null,"total_run_time":0.0}} 128 - , .{ new_id, now, now, name, flow_id, state_type, state_name, state_type, state_name, now, state_id }) catch { 129 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 130 - return; 112 + 113 + const run = db.FlowRunRow{ 114 + .id = new_id, 115 + .created = now, 116 + .updated = now, 117 + .name = name, 118 + .flow_id = flow_id, 119 + .state_type = state_type, 120 + .state_name = state_name, 121 + .state_timestamp = now, 122 + .parameters = "{}", 123 + .tags = "[]", 124 + .run_count = 0, 125 + .expected_start_time = null, 126 + .start_time = null, 127 + .end_time = null, 128 + .total_run_time = 0.0, 131 129 }; 132 130 133 - sendJsonStatus(r, response, .created); 131 + const resp = writeFlowRun(alloc, run, state_id) catch { 132 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 133 + return; 134 + }; 135 + json_util.sendStatus(r, resp, .created); 134 136 } 135 137 136 138 fn read(r: zap.Request, id: []const u8) !void { ··· 139 141 const alloc = arena.allocator(); 140 142 141 143 const run = db.getFlowRun(alloc, id) catch null orelse { 142 - sendJsonStatus(r, "{\"detail\":\"flow run not found\"}", .not_found); 144 + json_util.sendStatus(r, "{\"detail\":\"flow run not found\"}", .not_found); 143 145 return; 144 146 }; 145 147 146 - var resp_buf: [2048]u8 = undefined; 147 - var fbs = std.io.fixedBufferStream(&resp_buf); 148 - const writer = fbs.writer(); 149 - 150 - std.fmt.format(writer, 151 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"parameters":{s},"tags":{s},"run_count":{d},"expected_start_time": 152 - , .{ run.id, run.created, run.updated, run.name, run.flow_id, run.state_type, run.state_name, run.state_type, run.state_name, run.state_timestamp, run.parameters, run.tags, run.run_count }) catch { 153 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 148 + const resp = writeFlowRun(alloc, run, null) catch { 149 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 154 150 return; 155 151 }; 156 - 157 - // write optional time fields 158 - if (run.expected_start_time) |t| { 159 - std.fmt.format(writer, "\"{s}\"", .{t}) catch {}; 160 - } else { 161 - writer.writeAll("null") catch {}; 162 - } 163 - writer.writeAll(",\"start_time\":") catch {}; 164 - if (run.start_time) |t| { 165 - std.fmt.format(writer, "\"{s}\"", .{t}) catch {}; 166 - } else { 167 - writer.writeAll("null") catch {}; 168 - } 169 - writer.writeAll(",\"end_time\":") catch {}; 170 - if (run.end_time) |t| { 171 - std.fmt.format(writer, "\"{s}\"", .{t}) catch {}; 172 - } else { 173 - writer.writeAll("null") catch {}; 174 - } 175 - std.fmt.format(writer, ",\"total_run_time\":{d:.6}}}", .{run.total_run_time}) catch {}; 176 - 177 - sendJson(r, fbs.getWritten()); 152 + json_util.send(r, resp); 178 153 } 179 154 180 155 fn setState(r: zap.Request, id: []const u8) !void { ··· 183 158 const alloc = arena.allocator(); 184 159 185 160 const body = r.body orelse { 186 - sendJsonStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 161 + json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 187 162 return; 188 163 }; 189 164 190 165 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 191 - sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 166 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 192 167 return; 193 168 }; 194 169 195 170 const state = parsed.value.object.get("state") orelse { 196 - sendJsonStatus(r, "{\"detail\":\"state required\"}", .bad_request); 171 + json_util.sendStatus(r, "{\"detail\":\"state required\"}", .bad_request); 197 172 return; 198 173 }; 199 174 ··· 228 203 229 204 // atomic state transition with orchestration data 230 205 db.setFlowRunState(id, state_id, state_type, state_name, now, ctx.new_start_time, ctx.new_end_time, ctx.new_run_count, ctx.new_total_run_time) catch { 231 - sendJsonStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 206 + json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 232 207 return; 233 208 }; 234 209 235 - var resp_buf: [512]u8 = undefined; 236 - const response = std.fmt.bufPrint(&resp_buf, 237 - \\{{"status":"ACCEPT","details":{{}},"state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}}}} 238 - , .{ state_type, state_name, now, state_id }) catch { 239 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 210 + const resp = writeStateResponse(alloc, state_type, state_name, now, state_id) catch { 211 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 240 212 return; 241 213 }; 242 - 243 - sendJson(r, response); 214 + json_util.send(r, resp); 244 215 } 245 216 246 217 fn filter(r: zap.Request) !void { ··· 249 220 const alloc = arena.allocator(); 250 221 251 222 const runs = db.listFlowRuns(alloc, 50) catch { 252 - sendJsonStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 223 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 253 224 return; 254 225 }; 255 226 256 - // use fixed buffer for filter response (increased for time fields) 257 - var resp_buf: [65536]u8 = undefined; 258 - var fbs = std.io.fixedBufferStream(&resp_buf); 259 - const writer = fbs.writer(); 227 + var output: std.Io.Writer.Allocating = .init(alloc); 228 + var jw: json.Stringify = .{ .writer = &output.writer }; 260 229 261 - writer.writeAll("[") catch { 262 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 230 + jw.beginArray() catch { 231 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 263 232 return; 264 233 }; 265 234 266 - for (runs, 0..) |run, i| { 267 - if (i > 0) writer.writeAll(",") catch continue; 235 + for (runs) |run| { 236 + writeFlowRunObject(&jw, run, null) catch continue; 237 + } 238 + 239 + jw.endArray() catch {}; 240 + 241 + json_util.send(r, output.toOwnedSlice() catch "[]"); 242 + } 268 243 269 - std.fmt.format(writer, 270 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"parameters":{s},"tags":{s},"run_count":{d},"expected_start_time": 271 - , .{ run.id, run.created, run.updated, run.name, run.flow_id, run.state_type, run.state_name, run.state_type, run.state_name, run.state_timestamp, run.parameters, run.tags, run.run_count }) catch continue; 244 + fn writeFlowRun(alloc: std.mem.Allocator, run: db.FlowRunRow, state_id: ?[]const u8) ![]const u8 { 245 + var output: std.Io.Writer.Allocating = .init(alloc); 246 + var jw: json.Stringify = .{ .writer = &output.writer }; 247 + try writeFlowRunObject(&jw, run, state_id); 248 + return output.toOwnedSlice(); 249 + } 272 250 273 - // write optional time fields 274 - if (run.expected_start_time) |t| { 275 - std.fmt.format(writer, "\"{s}\"", .{t}) catch {}; 276 - } else { 277 - writer.writeAll("null") catch {}; 278 - } 279 - writer.writeAll(",\"start_time\":") catch {}; 280 - if (run.start_time) |t| { 281 - std.fmt.format(writer, "\"{s}\"", .{t}) catch {}; 282 - } else { 283 - writer.writeAll("null") catch {}; 284 - } 285 - writer.writeAll(",\"end_time\":") catch {}; 286 - if (run.end_time) |t| { 287 - std.fmt.format(writer, "\"{s}\"", .{t}) catch {}; 288 - } else { 289 - writer.writeAll("null") catch {}; 290 - } 291 - std.fmt.format(writer, ",\"total_run_time\":{d:.6}}}", .{run.total_run_time}) catch {}; 251 + fn writeFlowRunObject(jw: *json.Stringify, run: db.FlowRunRow, state_id: ?[]const u8) !void { 252 + try jw.beginObject(); 253 + 254 + try jw.objectField("id"); 255 + try jw.write(run.id); 256 + 257 + try jw.objectField("created"); 258 + try jw.write(run.created); 259 + 260 + try jw.objectField("updated"); 261 + try jw.write(run.updated); 262 + 263 + try jw.objectField("name"); 264 + try jw.write(run.name); 265 + 266 + try jw.objectField("flow_id"); 267 + try jw.write(run.flow_id); 268 + 269 + try jw.objectField("state_type"); 270 + try jw.write(run.state_type); 271 + 272 + try jw.objectField("state_name"); 273 + try jw.write(run.state_name); 274 + 275 + try jw.objectField("state"); 276 + try jw.beginObject(); 277 + try jw.objectField("type"); 278 + try jw.write(run.state_type); 279 + try jw.objectField("name"); 280 + try jw.write(run.state_name); 281 + try jw.objectField("timestamp"); 282 + try jw.write(run.state_timestamp); 283 + if (state_id) |sid| { 284 + try jw.objectField("id"); 285 + try jw.write(sid); 292 286 } 287 + try jw.endObject(); 288 + 289 + try jw.objectField("parameters"); 290 + try jw.beginWriteRaw(); 291 + try jw.writer.writeAll(run.parameters); 292 + jw.endWriteRaw(); 293 + 294 + try jw.objectField("tags"); 295 + try jw.beginWriteRaw(); 296 + try jw.writer.writeAll(run.tags); 297 + jw.endWriteRaw(); 298 + 299 + try jw.objectField("run_count"); 300 + try jw.write(run.run_count); 293 301 294 - writer.writeAll("]") catch { 295 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 296 - return; 297 - }; 302 + try jw.objectField("expected_start_time"); 303 + try jw.write(run.expected_start_time); 304 + 305 + try jw.objectField("start_time"); 306 + try jw.write(run.start_time); 307 + 308 + try jw.objectField("end_time"); 309 + try jw.write(run.end_time); 310 + 311 + try jw.objectField("total_run_time"); 312 + try jw.write(run.total_run_time); 313 + 314 + try jw.endObject(); 315 + } 316 + 317 + fn writeStateResponse(alloc: std.mem.Allocator, state_type: []const u8, state_name: []const u8, timestamp: []const u8, state_id: []const u8) ![]const u8 { 318 + var output: std.Io.Writer.Allocating = .init(alloc); 319 + var jw: json.Stringify = .{ .writer = &output.writer }; 320 + 321 + try jw.beginObject(); 322 + 323 + try jw.objectField("status"); 324 + try jw.write("ACCEPT"); 325 + 326 + try jw.objectField("details"); 327 + try jw.beginObject(); 328 + try jw.endObject(); 329 + 330 + try jw.objectField("state"); 331 + try jw.beginObject(); 332 + try jw.objectField("type"); 333 + try jw.write(state_type); 334 + try jw.objectField("name"); 335 + try jw.write(state_name); 336 + try jw.objectField("timestamp"); 337 + try jw.write(timestamp); 338 + try jw.objectField("id"); 339 + try jw.write(state_id); 340 + try jw.endObject(); 341 + 342 + try jw.endObject(); 298 343 299 - sendJson(r, fbs.getWritten()); 344 + return output.toOwnedSlice(); 300 345 }
+64 -55
src/api/flows.zig
··· 6 6 const db = @import("../db/sqlite.zig"); 7 7 const uuid_util = @import("../utilities/uuid.zig"); 8 8 const time_util = @import("../utilities/time.zig"); 9 - 10 - fn sendJson(r: zap.Request, body: []const u8) void { 11 - r.setHeader("content-type", "application/json") catch {}; 12 - r.setHeader("access-control-allow-origin", "*") catch {}; 13 - r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {}; 14 - r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {}; 15 - r.sendBody(body) catch {}; 16 - } 17 - 18 - fn sendJsonStatus(r: zap.Request, body: []const u8, status: zap.http.StatusCode) void { 19 - r.setStatus(status); 20 - sendJson(r, body); 21 - } 9 + const json_util = @import("../utilities/json.zig"); 22 10 23 11 // POST /flows/ - create or get flow by name 24 12 // POST /flows/filter - list flows ··· 42 30 const flow_id = target[prefix.len..]; 43 31 try getFlow(r, flow_id); 44 32 } else { 45 - sendJsonStatus(r, "{\"detail\":\"flow id required\"}", .bad_request); 33 + json_util.sendStatus(r, "{\"detail\":\"flow id required\"}", .bad_request); 46 34 } 47 35 } else { 48 - sendJsonStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented); 36 + json_util.sendStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented); 49 37 } 50 38 } 51 39 ··· 54 42 defer arena.deinit(); 55 43 const alloc = arena.allocator(); 56 44 57 - // read request body 58 45 const body = r.body orelse { 59 - sendJsonStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 46 + json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 60 47 return; 61 48 }; 62 49 63 - // parse json to get name 64 50 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 65 - sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 51 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 66 52 return; 67 53 }; 68 54 69 55 const name = parsed.value.object.get("name") orelse { 70 - sendJsonStatus(r, "{\"detail\":\"name required\"}", .bad_request); 56 + json_util.sendStatus(r, "{\"detail\":\"name required\"}", .bad_request); 71 57 return; 72 58 }; 73 59 const name_str = name.string; 74 60 75 61 // try to get existing flow first 76 62 if (db.getFlowByName(alloc, name_str) catch null) |flow| { 77 - var resp_buf: [512]u8 = undefined; 78 - const response = std.fmt.bufPrint(&resp_buf, 79 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","tags":{s}}} 80 - , .{ flow.id, flow.created, flow.updated, flow.name, flow.tags }) catch { 81 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 63 + const resp = writeFlow(alloc, flow) catch { 64 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 82 65 return; 83 66 }; 84 - sendJson(r, response); 67 + json_util.send(r, resp); 85 68 return; 86 69 } 87 70 ··· 90 73 const new_id = uuid_util.generate(&new_id_buf); 91 74 92 75 db.insertFlow(new_id, name_str) catch { 93 - sendJsonStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 76 + json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 94 77 return; 95 78 }; 96 79 97 80 var ts_buf: [32]u8 = undefined; 98 81 const now = time_util.timestamp(&ts_buf); 99 82 100 - var resp_buf: [512]u8 = undefined; 101 - const response = std.fmt.bufPrint(&resp_buf, 102 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","tags":[]}} 103 - , .{ new_id, now, now, name_str }) catch { 104 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 105 - return; 83 + const flow = db.FlowRow{ 84 + .id = new_id, 85 + .created = now, 86 + .updated = now, 87 + .name = name_str, 88 + .tags = "[]", 106 89 }; 107 90 108 - sendJsonStatus(r, response, .created); 91 + const resp = writeFlow(alloc, flow) catch { 92 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 93 + return; 94 + }; 95 + json_util.sendStatus(r, resp, .created); 109 96 } 110 97 111 98 fn getFlow(r: zap.Request, flow_id: []const u8) !void { ··· 114 101 const alloc = arena.allocator(); 115 102 116 103 if (db.getFlowById(alloc, flow_id) catch null) |flow| { 117 - var resp_buf: [512]u8 = undefined; 118 - const response = std.fmt.bufPrint(&resp_buf, 119 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","tags":{s}}} 120 - , .{ flow.id, flow.created, flow.updated, flow.name, flow.tags }) catch { 121 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 104 + const resp = writeFlow(alloc, flow) catch { 105 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 122 106 return; 123 107 }; 124 - sendJson(r, response); 108 + json_util.send(r, resp); 125 109 } else { 126 - sendJsonStatus(r, "{\"detail\":\"flow not found\"}", .not_found); 110 + json_util.sendStatus(r, "{\"detail\":\"flow not found\"}", .not_found); 127 111 } 128 112 } 129 113 ··· 133 117 const alloc = arena.allocator(); 134 118 135 119 const flows = db.listFlows(alloc, 50) catch { 136 - sendJsonStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 120 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 137 121 return; 138 122 }; 139 123 140 - var resp_buf: [32768]u8 = undefined; 141 - var fbs = std.io.fixedBufferStream(&resp_buf); 142 - const writer = fbs.writer(); 124 + var output: std.Io.Writer.Allocating = .init(alloc); 125 + var jw: json.Stringify = .{ .writer = &output.writer }; 143 126 144 - writer.writeAll("[") catch { 145 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 127 + jw.beginArray() catch { 128 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 146 129 return; 147 130 }; 148 131 149 - for (flows, 0..) |flow, i| { 150 - if (i > 0) writer.writeAll(",") catch continue; 151 - std.fmt.format(writer, "{{\"id\":\"{s}\",\"created\":\"{s}\",\"updated\":\"{s}\",\"name\":\"{s}\",\"tags\":{s}}}", .{ flow.id, flow.created, flow.updated, flow.name, flow.tags }) catch continue; 132 + for (flows) |flow| { 133 + writeFlowObject(&jw, flow) catch continue; 152 134 } 153 135 154 - writer.writeAll("]") catch { 155 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 156 - return; 157 - }; 136 + jw.endArray() catch {}; 137 + 138 + json_util.send(r, output.toOwnedSlice() catch "[]"); 139 + } 140 + 141 + fn writeFlow(alloc: std.mem.Allocator, flow: db.FlowRow) ![]const u8 { 142 + var output: std.Io.Writer.Allocating = .init(alloc); 143 + var jw: json.Stringify = .{ .writer = &output.writer }; 144 + try writeFlowObject(&jw, flow); 145 + return output.toOwnedSlice(); 146 + } 147 + 148 + fn writeFlowObject(jw: *json.Stringify, flow: db.FlowRow) !void { 149 + try jw.beginObject(); 150 + 151 + try jw.objectField("id"); 152 + try jw.write(flow.id); 153 + 154 + try jw.objectField("created"); 155 + try jw.write(flow.created); 156 + 157 + try jw.objectField("updated"); 158 + try jw.write(flow.updated); 158 159 159 - sendJson(r, fbs.getWritten()); 160 + try jw.objectField("name"); 161 + try jw.write(flow.name); 162 + 163 + try jw.objectField("tags"); 164 + try jw.beginWriteRaw(); 165 + try jw.writer.writeAll(flow.tags); 166 + jw.endWriteRaw(); 167 + 168 + try jw.endObject(); 160 169 }
+156 -118
src/api/task_runs.zig
··· 7 7 const routing = @import("routing.zig"); 8 8 const uuid_util = @import("../utilities/uuid.zig"); 9 9 const time_util = @import("../utilities/time.zig"); 10 - 11 - fn sendJson(r: zap.Request, body: []const u8) void { 12 - r.setHeader("content-type", "application/json") catch {}; 13 - r.setHeader("access-control-allow-origin", "*") catch {}; 14 - r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {}; 15 - r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {}; 16 - r.sendBody(body) catch {}; 17 - } 18 - 19 - fn sendJsonStatus(r: zap.Request, body: []const u8, status: zap.http.StatusCode) void { 20 - r.setStatus(status); 21 - sendJson(r, body); 22 - } 10 + const json_util = @import("../utilities/json.zig"); 23 11 24 12 // POST /task_runs/ - create task run 25 13 // POST /task_runs/filter - list task runs ··· 61 49 } 62 50 } 63 51 64 - sendJsonStatus(r, "{\"detail\":\"not found\"}", .not_found); 52 + json_util.sendStatus(r, "{\"detail\":\"not found\"}", .not_found); 65 53 } 66 54 67 55 fn create(r: zap.Request) !void { ··· 70 58 const alloc = arena.allocator(); 71 59 72 60 const body = r.body orelse { 73 - sendJsonStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 61 + json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 74 62 return; 75 63 }; 76 64 77 65 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 78 - sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 66 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 79 67 return; 80 68 }; 81 69 ··· 85 73 const task_key = if (obj.get("task_key")) |v| switch (v) { 86 74 .string => |s| s, 87 75 else => { 88 - sendJsonStatus(r, "{\"detail\":\"task_key must be string\"}", .bad_request); 76 + json_util.sendStatus(r, "{\"detail\":\"task_key must be string\"}", .bad_request); 89 77 return; 90 78 }, 91 79 } else { 92 - sendJsonStatus(r, "{\"detail\":\"task_key required\"}", .bad_request); 80 + json_util.sendStatus(r, "{\"detail\":\"task_key required\"}", .bad_request); 93 81 return; 94 82 }; 95 83 96 84 const dynamic_key = if (obj.get("dynamic_key")) |v| switch (v) { 97 85 .string => |s| s, 98 86 else => { 99 - sendJsonStatus(r, "{\"detail\":\"dynamic_key must be string\"}", .bad_request); 87 + json_util.sendStatus(r, "{\"detail\":\"dynamic_key must be string\"}", .bad_request); 100 88 return; 101 89 }, 102 90 } else { 103 - sendJsonStatus(r, "{\"detail\":\"dynamic_key required\"}", .bad_request); 91 + json_util.sendStatus(r, "{\"detail\":\"dynamic_key required\"}", .bad_request); 104 92 return; 105 93 }; 106 94 ··· 121 109 122 110 // check for existing task run (idempotency) 123 111 if (db.getTaskRunByKey(alloc, flow_run_id, task_key, dynamic_key) catch null) |existing| { 124 - var resp_buf: [2048]u8 = undefined; 125 - const response = formatTaskRunResponse(&resp_buf, existing) catch { 126 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 112 + const resp = writeTaskRun(alloc, existing, null) catch { 113 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 127 114 return; 128 115 }; 129 - sendJson(r, response); 116 + json_util.send(r, resp); 130 117 return; 131 118 } 132 119 ··· 150 137 const now = time_util.timestamp(&ts_buf); 151 138 152 139 db.insertTaskRun(new_id, flow_run_id, name, task_key, dynamic_key, state_type, state_name, now) catch { 153 - sendJsonStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 140 + json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 154 141 return; 155 142 }; 156 143 157 144 var state_id_buf: [36]u8 = undefined; 158 145 const state_id = uuid_util.generate(&state_id_buf); 159 146 160 - var resp_buf: [2048]u8 = undefined; 161 - const response = std.fmt.bufPrint(&resp_buf, 162 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_run_id":{s},"task_key":"{s}","dynamic_key":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}},"tags":[],"run_count":0,"expected_start_time":null,"start_time":null,"end_time":null,"total_run_time":0}} 163 - , .{ 164 - new_id, 165 - now, 166 - now, 167 - name, 168 - if (flow_run_id) |frid| std.fmt.allocPrint(alloc, "\"{s}\"", .{frid}) catch "null" else "null", 169 - task_key, 170 - dynamic_key, 171 - state_type, 172 - state_name, 173 - state_type, 174 - state_name, 175 - now, 176 - state_id, 177 - }) catch { 178 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 147 + const run = db.TaskRunRow{ 148 + .id = new_id, 149 + .created = now, 150 + .updated = now, 151 + .name = name, 152 + .flow_run_id = flow_run_id orelse "", 153 + .task_key = task_key, 154 + .dynamic_key = dynamic_key, 155 + .state_type = state_type, 156 + .state_name = state_name, 157 + .state_timestamp = now, 158 + .tags = "[]", 159 + .run_count = 0, 160 + }; 161 + 162 + const resp = writeTaskRun(alloc, run, state_id) catch { 163 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 179 164 return; 180 165 }; 181 - 182 - sendJsonStatus(r, response, .created); 166 + json_util.sendStatus(r, resp, .created); 183 167 } 184 168 185 169 fn read(r: zap.Request, id: []const u8) !void { ··· 188 172 const alloc = arena.allocator(); 189 173 190 174 const run = db.getTaskRun(alloc, id) catch null orelse { 191 - sendJsonStatus(r, "{\"detail\":\"task run not found\"}", .not_found); 175 + json_util.sendStatus(r, "{\"detail\":\"task run not found\"}", .not_found); 192 176 return; 193 177 }; 194 178 195 - var resp_buf: [2048]u8 = undefined; 196 - const response = formatTaskRunResponse(&resp_buf, run) catch { 197 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 179 + const resp = writeTaskRun(alloc, run, null) catch { 180 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 198 181 return; 199 182 }; 200 - 201 - sendJson(r, response); 183 + json_util.send(r, resp); 202 184 } 203 185 204 186 fn setState(r: zap.Request, id: []const u8) !void { ··· 207 189 const alloc = arena.allocator(); 208 190 209 191 const body = r.body orelse { 210 - sendJsonStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 192 + json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 211 193 return; 212 194 }; 213 195 214 196 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 215 - sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 197 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 216 198 return; 217 199 }; 218 200 219 201 const state = parsed.value.object.get("state") orelse { 220 - sendJsonStatus(r, "{\"detail\":\"state required\"}", .bad_request); 202 + json_util.sendStatus(r, "{\"detail\":\"state required\"}", .bad_request); 221 203 return; 222 204 }; 223 205 ··· 229 211 const state_id = uuid_util.generate(&state_id_buf); 230 212 231 213 db.setTaskRunState(id, state_id, state_type, state_name, now) catch { 232 - sendJsonStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 214 + json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 233 215 return; 234 216 }; 235 217 236 - var resp_buf: [512]u8 = undefined; 237 - const response = std.fmt.bufPrint(&resp_buf, 238 - \\{{"status":"ACCEPT","details":{{}},"state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}}}} 239 - , .{ state_type, state_name, now, state_id }) catch { 240 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 218 + const resp = writeStateResponse(alloc, state_type, state_name, now, state_id) catch { 219 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 241 220 return; 242 221 }; 243 - 244 - sendJson(r, response); 245 - } 246 - 247 - fn formatTaskRunResponse(buf: *[2048]u8, run: db.TaskRunRow) ![]const u8 { 248 - const flow_run_id_json = if (run.flow_run_id.len > 0) 249 - run.flow_run_id 250 - else 251 - "null"; 252 - 253 - // format flow_run_id as quoted string or null 254 - var frid_buf: [64]u8 = undefined; 255 - const frid_str = if (run.flow_run_id.len > 0) 256 - std.fmt.bufPrint(&frid_buf, "\"{s}\"", .{flow_run_id_json}) catch "null" 257 - else 258 - "null"; 259 - 260 - return std.fmt.bufPrint(buf, 261 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_run_id":{s},"task_key":"{s}","dynamic_key":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"tags":{s},"run_count":{d},"expected_start_time":null,"start_time":null,"end_time":null,"total_run_time":0}} 262 - , .{ 263 - run.id, 264 - run.created, 265 - run.updated, 266 - run.name, 267 - frid_str, 268 - run.task_key, 269 - run.dynamic_key, 270 - run.state_type, 271 - run.state_name, 272 - run.state_type, 273 - run.state_name, 274 - run.state_timestamp, 275 - run.tags, 276 - run.run_count, 277 - }); 222 + json_util.send(r, resp); 278 223 } 279 224 280 225 fn filter(r: zap.Request) !void { ··· 283 228 const alloc = arena.allocator(); 284 229 285 230 const runs = db.listTaskRuns(alloc, 50) catch { 286 - sendJsonStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 231 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 287 232 return; 288 233 }; 289 234 290 - var resp_buf: [65536]u8 = undefined; 291 - var fbs = std.io.fixedBufferStream(&resp_buf); 292 - const writer = fbs.writer(); 235 + var output: std.Io.Writer.Allocating = .init(alloc); 236 + var jw: json.Stringify = .{ .writer = &output.writer }; 293 237 294 - writer.writeAll("[") catch { 295 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 238 + jw.beginArray() catch { 239 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 296 240 return; 297 241 }; 298 242 299 - for (runs, 0..) |run, i| { 300 - if (i > 0) writer.writeAll(",") catch continue; 243 + for (runs) |run| { 244 + writeTaskRunObject(&jw, run, null) catch continue; 245 + } 246 + 247 + jw.endArray() catch {}; 248 + 249 + json_util.send(r, output.toOwnedSlice() catch "[]"); 250 + } 251 + 252 + fn writeTaskRun(alloc: std.mem.Allocator, run: db.TaskRunRow, state_id: ?[]const u8) ![]const u8 { 253 + var output: std.Io.Writer.Allocating = .init(alloc); 254 + var jw: json.Stringify = .{ .writer = &output.writer }; 255 + try writeTaskRunObject(&jw, run, state_id); 256 + return output.toOwnedSlice(); 257 + } 258 + 259 + fn writeTaskRunObject(jw: *json.Stringify, run: db.TaskRunRow, state_id: ?[]const u8) !void { 260 + try jw.beginObject(); 261 + 262 + try jw.objectField("id"); 263 + try jw.write(run.id); 264 + 265 + try jw.objectField("created"); 266 + try jw.write(run.created); 267 + 268 + try jw.objectField("updated"); 269 + try jw.write(run.updated); 301 270 302 - // format flow_run_id as quoted string or null 303 - var frid_buf: [64]u8 = undefined; 304 - const frid_str = if (run.flow_run_id.len > 0) 305 - std.fmt.bufPrint(&frid_buf, "\"{s}\"", .{run.flow_run_id}) catch "null" 306 - else 307 - "null"; 271 + try jw.objectField("name"); 272 + try jw.write(run.name); 308 273 309 - std.fmt.format(writer, 310 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_run_id":{s},"task_key":"{s}","dynamic_key":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"tags":{s},"run_count":{d}}} 311 - , .{ run.id, run.created, run.updated, run.name, frid_str, run.task_key, run.dynamic_key, run.state_type, run.state_name, run.state_type, run.state_name, run.state_timestamp, run.tags, run.run_count }) catch continue; 274 + try jw.objectField("flow_run_id"); 275 + if (run.flow_run_id.len > 0) { 276 + try jw.write(run.flow_run_id); 277 + } else { 278 + try jw.write(null); 279 + } 280 + 281 + try jw.objectField("task_key"); 282 + try jw.write(run.task_key); 283 + 284 + try jw.objectField("dynamic_key"); 285 + try jw.write(run.dynamic_key); 286 + 287 + try jw.objectField("state_type"); 288 + try jw.write(run.state_type); 289 + 290 + try jw.objectField("state_name"); 291 + try jw.write(run.state_name); 292 + 293 + try jw.objectField("state"); 294 + try jw.beginObject(); 295 + try jw.objectField("type"); 296 + try jw.write(run.state_type); 297 + try jw.objectField("name"); 298 + try jw.write(run.state_name); 299 + try jw.objectField("timestamp"); 300 + try jw.write(run.state_timestamp); 301 + if (state_id) |sid| { 302 + try jw.objectField("id"); 303 + try jw.write(sid); 312 304 } 305 + try jw.endObject(); 313 306 314 - writer.writeAll("]") catch { 315 - sendJsonStatus(r, "{\"detail\":\"format error\"}", .internal_server_error); 316 - return; 317 - }; 307 + try jw.objectField("tags"); 308 + try jw.beginWriteRaw(); 309 + try jw.writer.writeAll(run.tags); 310 + jw.endWriteRaw(); 318 311 319 - sendJson(r, fbs.getWritten()); 312 + try jw.objectField("run_count"); 313 + try jw.write(run.run_count); 314 + 315 + try jw.objectField("expected_start_time"); 316 + try jw.write(null); 317 + 318 + try jw.objectField("start_time"); 319 + try jw.write(null); 320 + 321 + try jw.objectField("end_time"); 322 + try jw.write(null); 323 + 324 + try jw.objectField("total_run_time"); 325 + try jw.write(@as(i32, 0)); 326 + 327 + try jw.endObject(); 328 + } 329 + 330 + fn writeStateResponse(alloc: std.mem.Allocator, state_type: []const u8, state_name: []const u8, timestamp: []const u8, state_id: []const u8) ![]const u8 { 331 + var output: std.Io.Writer.Allocating = .init(alloc); 332 + var jw: json.Stringify = .{ .writer = &output.writer }; 333 + 334 + try jw.beginObject(); 335 + 336 + try jw.objectField("status"); 337 + try jw.write("ACCEPT"); 338 + 339 + try jw.objectField("details"); 340 + try jw.beginObject(); 341 + try jw.endObject(); 342 + 343 + try jw.objectField("state"); 344 + try jw.beginObject(); 345 + try jw.objectField("type"); 346 + try jw.write(state_type); 347 + try jw.objectField("name"); 348 + try jw.write(state_name); 349 + try jw.objectField("timestamp"); 350 + try jw.write(timestamp); 351 + try jw.objectField("id"); 352 + try jw.write(state_id); 353 + try jw.endObject(); 354 + 355 + try jw.endObject(); 356 + 357 + return output.toOwnedSlice(); 320 358 }
+17
src/utilities/json.zig
··· 1 + const std = @import("std"); 2 + const zap = @import("zap"); 3 + 4 + /// send json response with cors headers 5 + pub fn send(r: zap.Request, body: []const u8) void { 6 + r.setHeader("content-type", "application/json") catch {}; 7 + r.setHeader("access-control-allow-origin", "*") catch {}; 8 + r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {}; 9 + r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {}; 10 + r.sendBody(body) catch {}; 11 + } 12 + 13 + /// send json response with status code 14 + pub fn sendStatus(r: zap.Request, body: []const u8, status: zap.http.StatusCode) void { 15 + r.setStatus(status); 16 + send(r, body); 17 + }