prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const mem = std.mem;
4const json = std.json;
5
6const db = @import("../db/sqlite.zig");
7const uuid_util = @import("../utilities/uuid.zig");
8const time_util = @import("../utilities/time.zig");
9
10const ascii = std.ascii;
11
12/// check if string starts with prefix, ignoring case
13fn startsWithIgnoreCase(haystack: []const u8, prefix: []const u8) bool {
14 if (haystack.len < prefix.len) return false;
15 for (haystack[0..prefix.len], prefix) |h, p| {
16 if (ascii.toLower(h) != ascii.toLower(p)) return false;
17 }
18 return true;
19}
20
21// JSON output types
22const BlockTypeJson = struct {
23 id: []const u8,
24 created: []const u8,
25 updated: []const u8,
26 name: []const u8,
27 slug: []const u8,
28 logo_url: ?[]const u8 = null,
29 documentation_url: ?[]const u8 = null,
30 description: ?[]const u8 = null,
31 code_example: ?[]const u8 = null,
32 is_protected: bool = false,
33
34 fn fromRow(bt: db.block_types.BlockTypeRow) BlockTypeJson {
35 return .{
36 .id = bt.id,
37 .created = bt.created,
38 .updated = bt.updated,
39 .name = bt.name,
40 .slug = bt.slug,
41 .logo_url = bt.logo_url,
42 .documentation_url = bt.documentation_url,
43 .description = bt.description,
44 .code_example = bt.code_example,
45 .is_protected = bt.is_protected,
46 };
47 }
48};
49
50fn sendJson(r: zap.Request, body: []const u8) void {
51 r.setHeader("content-type", "application/json") catch {};
52 r.setHeader("access-control-allow-origin", "*") catch {};
53 r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {};
54 r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {};
55 r.sendBody(body) catch {};
56}
57
58fn sendJsonStatus(r: zap.Request, body: []const u8, status: zap.http.StatusCode) void {
59 r.setStatus(status);
60 sendJson(r, body);
61}
62
63// Routes:
64// POST /block_types/ - create
65// POST /block_types/filter - list
66// GET /block_types/slug/{slug} - read by slug
67// PATCH /block_types/{id} - update
68// GET /block_types/slug/{slug}/block_documents - list documents for type
69// GET /block_types/slug/{slug}/block_documents/name/{name} - get document by name
70pub fn handle(r: zap.Request) !void {
71 const target = r.path orelse "/";
72 const method = r.method orelse "GET";
73
74 // strip /api prefix if present
75 const path = if (mem.startsWith(u8, target, "/api/block_types"))
76 target[4..]
77 else
78 target;
79
80 // POST /block_types/filter
81 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, path, "/filter")) {
82 try filter(r);
83 return;
84 }
85
86 // POST /block_types/ - create
87 if (mem.eql(u8, method, "POST") and (mem.eql(u8, path, "/block_types/") or mem.eql(u8, path, "/block_types"))) {
88 try create(r);
89 return;
90 }
91
92 // GET /block_types/slug/{slug}/block_documents/name/{name}
93 if (mem.eql(u8, method, "GET") and mem.indexOf(u8, path, "/block_documents/name/") != null) {
94 try getDocumentByName(r, path);
95 return;
96 }
97
98 // GET /block_types/slug/{slug}/block_documents
99 if (mem.eql(u8, method, "GET") and mem.endsWith(u8, path, "/block_documents")) {
100 try listDocumentsForType(r, path);
101 return;
102 }
103
104 // GET /block_types/slug/{slug}
105 if (mem.eql(u8, method, "GET") and mem.startsWith(u8, path, "/block_types/slug/")) {
106 const slug = path["/block_types/slug/".len..];
107 try getBySlug(r, slug);
108 return;
109 }
110
111 // PATCH /block_types/{id}
112 if (mem.eql(u8, method, "PATCH") and mem.startsWith(u8, path, "/block_types/")) {
113 const id = path["/block_types/".len..];
114 if (id.len >= 32) {
115 try update(r, id);
116 return;
117 }
118 }
119
120 sendJsonStatus(r, "{\"detail\":\"not found\"}", .not_found);
121}
122
123fn create(r: zap.Request) !void {
124 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
125 defer arena.deinit();
126 const alloc = arena.allocator();
127
128 const body = r.body orelse {
129 sendJsonStatus(r, "{\"detail\":\"request body required\"}", .bad_request);
130 return;
131 };
132
133 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
134 sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
135 return;
136 };
137
138 const obj = parsed.value.object;
139
140 const name = if (obj.get("name")) |v| v.string else {
141 sendJsonStatus(r, "{\"detail\":\"name required\"}", .bad_request);
142 return;
143 };
144
145 const slug = if (obj.get("slug")) |v| v.string else {
146 sendJsonStatus(r, "{\"detail\":\"slug required\"}", .bad_request);
147 return;
148 };
149
150 // forbid Prefect- prefix (case-insensitive, matching python behavior)
151 if (startsWithIgnoreCase(name, "prefect")) {
152 sendJsonStatus(r, "{\"detail\":\"Block type names beginning with 'Prefect' are reserved.\"}", .forbidden);
153 return;
154 }
155
156 const logo_url = if (obj.get("logo_url")) |v| if (v == .string) v.string else null else null;
157 const documentation_url = if (obj.get("documentation_url")) |v| if (v == .string) v.string else null else null;
158 const description = if (obj.get("description")) |v| if (v == .string) v.string else null else null;
159 const code_example = if (obj.get("code_example")) |v| if (v == .string) v.string else null else null;
160 const is_protected = if (obj.get("is_protected")) |v| v == .bool and v.bool else false;
161
162 var id_buf: [36]u8 = undefined;
163 const id = uuid_util.generate(&id_buf);
164
165 db.block_types.insert(id, name, slug, logo_url, documentation_url, description, code_example, is_protected) catch |err| {
166 if (err == error.SQLiteConstraint) {
167 sendJsonStatus(r, "{\"detail\":\"block type with this slug already exists\"}", .conflict);
168 return;
169 }
170 sendJsonStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error);
171 return;
172 };
173
174 var ts_buf: [32]u8 = undefined;
175 const now = time_util.timestamp(&ts_buf);
176
177 const bt = BlockTypeJson{
178 .id = id,
179 .created = now,
180 .updated = now,
181 .name = name,
182 .slug = slug,
183 .logo_url = logo_url,
184 .documentation_url = documentation_url,
185 .description = description,
186 .code_example = code_example,
187 .is_protected = is_protected,
188 };
189
190 var output: std.Io.Writer.Allocating = .init(alloc);
191 var jw: json.Stringify = .{ .writer = &output.writer };
192 jw.write(bt) catch {
193 sendJsonStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
194 return;
195 };
196
197 sendJsonStatus(r, output.toOwnedSlice() catch "", .created);
198}
199
200fn getBySlug(r: zap.Request, slug: []const u8) !void {
201 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
202 defer arena.deinit();
203 const alloc = arena.allocator();
204
205 const bt = db.block_types.getBySlug(alloc, slug) catch null orelse {
206 sendJsonStatus(r, "{\"detail\":\"block type not found\"}", .not_found);
207 return;
208 };
209
210 var output: std.Io.Writer.Allocating = .init(alloc);
211 var jw: json.Stringify = .{ .writer = &output.writer };
212 jw.write(BlockTypeJson.fromRow(bt)) catch {
213 sendJsonStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
214 return;
215 };
216
217 sendJson(r, output.toOwnedSlice() catch "");
218}
219
220fn update(r: zap.Request, id: []const u8) !void {
221 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
222 defer arena.deinit();
223 const alloc = arena.allocator();
224
225 const body = r.body orelse {
226 sendJsonStatus(r, "{\"detail\":\"request body required\"}", .bad_request);
227 return;
228 };
229
230 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
231 sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
232 return;
233 };
234
235 const obj = parsed.value.object;
236
237 const logo_url = if (obj.get("logo_url")) |v| if (v == .string) v.string else null else null;
238 const documentation_url = if (obj.get("documentation_url")) |v| if (v == .string) v.string else null else null;
239 const description = if (obj.get("description")) |v| if (v == .string) v.string else null else null;
240 const code_example = if (obj.get("code_example")) |v| if (v == .string) v.string else null else null;
241
242 db.block_types.update(id, logo_url, documentation_url, description, code_example) catch {
243 sendJsonStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
244 return;
245 };
246
247 r.setStatus(.no_content);
248 r.sendBody("") catch {};
249}
250
251fn filter(r: zap.Request) !void {
252 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
253 defer arena.deinit();
254 const alloc = arena.allocator();
255
256 const types = db.block_types.list(alloc, 200) catch {
257 sendJsonStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
258 return;
259 };
260
261 var output: std.Io.Writer.Allocating = .init(alloc);
262 var jw: json.Stringify = .{ .writer = &output.writer };
263
264 jw.beginArray() catch {
265 sendJsonStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
266 return;
267 };
268 for (types) |bt| {
269 jw.write(BlockTypeJson.fromRow(bt)) catch continue;
270 }
271 jw.endArray() catch {};
272
273 sendJson(r, output.toOwnedSlice() catch "[]");
274}
275
276fn getDocumentByName(r: zap.Request, path: []const u8) !void {
277 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
278 defer arena.deinit();
279 const alloc = arena.allocator();
280
281 // parse /block_types/slug/{slug}/block_documents/name/{name}
282 const slug_start = "/block_types/slug/".len;
283 const docs_idx = mem.indexOf(u8, path, "/block_documents/name/") orelse {
284 sendJsonStatus(r, "{\"detail\":\"invalid path\"}", .bad_request);
285 return;
286 };
287 const slug = path[slug_start..docs_idx];
288 const name = path[docs_idx + "/block_documents/name/".len ..];
289
290 const doc = db.block_documents.getByTypeSlugAndName(alloc, slug, name) catch null orelse {
291 sendJsonStatus(r, "{\"detail\":\"block document not found\"}", .not_found);
292 return;
293 };
294
295 // get associated block type and schema for full response
296 const bt = db.block_types.getById(alloc, doc.block_type_id) catch null;
297 const bs = db.block_schemas.getById(alloc, doc.block_schema_id) catch null;
298
299 try sendBlockDocumentResponse(r, alloc, doc, bt, bs, .ok);
300}
301
302fn listDocumentsForType(r: zap.Request, path: []const u8) !void {
303 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
304 defer arena.deinit();
305 const alloc = arena.allocator();
306
307 // parse /block_types/slug/{slug}/block_documents
308 const slug_start = "/block_types/slug/".len;
309 const docs_idx = mem.indexOf(u8, path, "/block_documents") orelse {
310 sendJsonStatus(r, "{\"detail\":\"invalid path\"}", .bad_request);
311 return;
312 };
313 const slug = path[slug_start..docs_idx];
314
315 const docs = db.block_documents.listByTypeSlug(alloc, slug, 200) catch {
316 sendJsonStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
317 return;
318 };
319
320 var output: std.Io.Writer.Allocating = .init(alloc);
321 var jw: json.Stringify = .{ .writer = &output.writer };
322
323 jw.beginArray() catch {
324 sendJsonStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
325 return;
326 };
327 for (docs) |doc| {
328 jw.beginObject() catch continue;
329
330 jw.objectField("id") catch continue;
331 jw.write(doc.id) catch continue;
332
333 jw.objectField("created") catch continue;
334 jw.write(doc.created) catch continue;
335
336 jw.objectField("updated") catch continue;
337 jw.write(doc.updated) catch continue;
338
339 jw.objectField("name") catch continue;
340 jw.write(doc.name) catch continue;
341
342 jw.objectField("data") catch continue;
343 jw.beginWriteRaw() catch continue;
344 jw.writer.writeAll(doc.data) catch continue;
345 jw.endWriteRaw();
346
347 jw.objectField("is_anonymous") catch continue;
348 jw.write(doc.is_anonymous) catch continue;
349
350 jw.objectField("block_type_id") catch continue;
351 jw.write(doc.block_type_id) catch continue;
352
353 jw.objectField("block_schema_id") catch continue;
354 jw.write(doc.block_schema_id) catch continue;
355
356 jw.endObject() catch continue;
357 }
358 jw.endArray() catch {};
359
360 sendJson(r, output.toOwnedSlice() catch "[]");
361}
362
363fn sendBlockDocumentResponse(
364 r: zap.Request,
365 alloc: std.mem.Allocator,
366 doc: db.block_documents.BlockDocumentRow,
367 bt: ?db.block_types.BlockTypeRow,
368 bs: ?db.block_schemas.BlockSchemaRow,
369 status: zap.http.StatusCode,
370) !void {
371 var output: std.Io.Writer.Allocating = .init(alloc);
372 var jw: json.Stringify = .{ .writer = &output.writer };
373
374 jw.beginObject() catch return;
375
376 jw.objectField("id") catch return;
377 jw.write(doc.id) catch return;
378
379 jw.objectField("created") catch return;
380 jw.write(doc.created) catch return;
381
382 jw.objectField("updated") catch return;
383 jw.write(doc.updated) catch return;
384
385 jw.objectField("name") catch return;
386 jw.write(doc.name) catch return;
387
388 jw.objectField("data") catch return;
389 jw.beginWriteRaw() catch return;
390 jw.writer.writeAll(doc.data) catch return;
391 jw.endWriteRaw();
392
393 jw.objectField("is_anonymous") catch return;
394 jw.write(doc.is_anonymous) catch return;
395
396 jw.objectField("block_type_id") catch return;
397 jw.write(doc.block_type_id) catch return;
398
399 jw.objectField("block_schema_id") catch return;
400 jw.write(doc.block_schema_id) catch return;
401
402 jw.objectField("block_type") catch return;
403 if (bt) |t| {
404 jw.write(BlockTypeJson.fromRow(t)) catch return;
405 } else {
406 jw.write(null) catch return;
407 }
408
409 jw.objectField("block_schema") catch return;
410 if (bs) |s| {
411 jw.beginObject() catch return;
412
413 jw.objectField("id") catch return;
414 jw.write(s.id) catch return;
415
416 jw.objectField("created") catch return;
417 jw.write(s.created) catch return;
418
419 jw.objectField("updated") catch return;
420 jw.write(s.updated) catch return;
421
422 jw.objectField("checksum") catch return;
423 jw.write(s.checksum) catch return;
424
425 jw.objectField("fields") catch return;
426 jw.beginWriteRaw() catch return;
427 jw.writer.writeAll(s.fields) catch return;
428 jw.endWriteRaw();
429
430 jw.objectField("capabilities") catch return;
431 jw.beginWriteRaw() catch return;
432 jw.writer.writeAll(s.capabilities) catch return;
433 jw.endWriteRaw();
434
435 jw.objectField("version") catch return;
436 jw.write(s.version) catch return;
437
438 jw.objectField("block_type_id") catch return;
439 jw.write(s.block_type_id) catch return;
440
441 jw.endObject() catch return;
442 } else {
443 jw.write(null) catch return;
444 }
445
446 jw.objectField("block_document_references") catch return;
447 jw.beginObject() catch return;
448 jw.endObject() catch return;
449
450 jw.endObject() catch return;
451
452 sendJsonStatus(r, output.toOwnedSlice() catch "{}", status);
453}