prefect server in zig
1const std = @import("std");
2const zap = @import("zap");
3const mem = std.mem;
4const json = std.json;
5
6const db = @import("../db/sqlite.zig");
7const uuid_util = @import("../utilities/uuid.zig");
8const time_util = @import("../utilities/time.zig");
9
10fn sendJson(r: zap.Request, body: []const u8) void {
11 r.setHeader("content-type", "application/json") catch {};
12 r.setHeader("access-control-allow-origin", "*") catch {};
13 r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {};
14 r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {};
15 r.sendBody(body) catch {};
16}
17
18fn sendJsonStatus(r: zap.Request, body: []const u8, status: zap.http.StatusCode) void {
19 r.setStatus(status);
20 sendJson(r, body);
21}
22
23// Routes:
24// POST /block_documents/ - create
25// GET /block_documents/{id} - read by id
26// PATCH /block_documents/{id} - update
27// DELETE /block_documents/{id} - delete
28// POST /block_documents/filter - list
29pub fn handle(r: zap.Request) !void {
30 const target = r.path orelse "/";
31 const method = r.method orelse "GET";
32
33 // strip /api prefix if present
34 const path = if (mem.startsWith(u8, target, "/api/block_documents"))
35 target[4..]
36 else
37 target;
38
39 // POST /block_documents/filter
40 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, path, "/filter")) {
41 try filter(r);
42 return;
43 }
44
45 // POST /block_documents/ - create
46 if (mem.eql(u8, method, "POST") and (mem.eql(u8, path, "/block_documents/") or mem.eql(u8, path, "/block_documents"))) {
47 try create(r);
48 return;
49 }
50
51 // GET /block_documents/{id} or PATCH /block_documents/{id} or DELETE /block_documents/{id}
52 if (mem.startsWith(u8, path, "/block_documents/")) {
53 const id = path["/block_documents/".len..];
54 if (id.len >= 32) {
55 if (mem.eql(u8, method, "GET")) {
56 try getById(r, id);
57 return;
58 } else if (mem.eql(u8, method, "PATCH")) {
59 try update(r, id);
60 return;
61 } else if (mem.eql(u8, method, "DELETE")) {
62 try delete(r, id);
63 return;
64 }
65 }
66 }
67
68 sendJsonStatus(r, "{\"detail\":\"not found\"}", .not_found);
69}
70
71fn create(r: zap.Request) !void {
72 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
73 defer arena.deinit();
74 const alloc = arena.allocator();
75
76 const body = r.body orelse {
77 sendJsonStatus(r, "{\"detail\":\"request body required\"}", .bad_request);
78 return;
79 };
80
81 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
82 sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
83 return;
84 };
85
86 const obj = parsed.value.object;
87
88 const block_type_id = if (obj.get("block_type_id")) |v| v.string else {
89 sendJsonStatus(r, "{\"detail\":\"block_type_id required\"}", .bad_request);
90 return;
91 };
92
93 const block_schema_id = if (obj.get("block_schema_id")) |v| v.string else {
94 sendJsonStatus(r, "{\"detail\":\"block_schema_id required\"}", .bad_request);
95 return;
96 };
97
98 const name = if (obj.get("name")) |v| if (v == .string) v.string else null else null;
99 const is_anonymous = if (obj.get("is_anonymous")) |v| v == .bool and v.bool else (name == null);
100
101 // serialize data
102 const data = blk: {
103 if (obj.get("data")) |v| {
104 var out: std.Io.Writer.Allocating = .init(alloc);
105 var jw: json.Stringify = .{ .writer = &out.writer };
106 jw.write(v) catch break :blk "{}";
107 break :blk out.toOwnedSlice() catch "{}";
108 } else break :blk "{}";
109 };
110
111 // get block_type_name from block_type
112 const bt = db.block_types.getById(alloc, block_type_id) catch null;
113 const block_type_name = if (bt) |t| t.name else null;
114
115 var id_buf: [36]u8 = undefined;
116 const id = uuid_util.generate(&id_buf);
117
118 db.block_documents.insert(id, name, data, is_anonymous, block_type_id, block_type_name, block_schema_id) catch |err| {
119 if (err == error.Constraint) {
120 sendJsonStatus(r, "{\"detail\":\"Block already exists\"}", .conflict);
121 return;
122 }
123 sendJsonStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error);
124 return;
125 };
126
127 var ts_buf: [32]u8 = undefined;
128 const now = time_util.timestamp(&ts_buf);
129
130 const doc = db.block_documents.BlockDocumentRow{
131 .id = id,
132 .created = now,
133 .updated = now,
134 .name = name,
135 .data = data,
136 .is_anonymous = is_anonymous,
137 .block_type_id = block_type_id,
138 .block_type_name = block_type_name,
139 .block_schema_id = block_schema_id,
140 };
141
142 const bs = db.block_schemas.getById(alloc, block_schema_id) catch null;
143 try sendBlockDocumentResponse(r, alloc, doc, bt, bs, .created);
144}
145
146fn getById(r: zap.Request, id: []const u8) !void {
147 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
148 defer arena.deinit();
149 const alloc = arena.allocator();
150
151 const doc = db.block_documents.getById(alloc, id) catch null orelse {
152 sendJsonStatus(r, "{\"detail\":\"block document not found\"}", .not_found);
153 return;
154 };
155
156 const bt = db.block_types.getById(alloc, doc.block_type_id) catch null;
157 const bs = db.block_schemas.getById(alloc, doc.block_schema_id) catch null;
158
159 try sendBlockDocumentResponse(r, alloc, doc, bt, bs, .ok);
160}
161
162fn update(r: zap.Request, id: []const u8) !void {
163 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
164 defer arena.deinit();
165 const alloc = arena.allocator();
166
167 const body = r.body orelse {
168 sendJsonStatus(r, "{\"detail\":\"request body required\"}", .bad_request);
169 return;
170 };
171
172 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch {
173 sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request);
174 return;
175 };
176
177 const obj = parsed.value.object;
178
179 // merge_existing_data defaults to true (matching python behavior)
180 const merge_existing_data = if (obj.get("merge_existing_data")) |v|
181 (v == .bool and v.bool)
182 else
183 true;
184
185 // get the new data from request (if provided)
186 const new_data_value = obj.get("data");
187
188 // determine final data to store
189 const data = blk: {
190 if (new_data_value) |new_val| {
191 if (merge_existing_data) {
192 // fetch current document to merge with
193 const current = db.block_documents.getById(alloc, id) catch null orelse {
194 sendJsonStatus(r, "{\"detail\":\"Block document not found\"}", .not_found);
195 return;
196 };
197
198 // parse current data
199 const current_parsed = json.parseFromSlice(json.Value, alloc, current.data, .{}) catch {
200 // if current data isn't valid json, just use new data
201 var out: std.Io.Writer.Allocating = .init(alloc);
202 var jw: json.Stringify = .{ .writer = &out.writer };
203 jw.write(new_val) catch break :blk "{}";
204 break :blk out.toOwnedSlice() catch "{}";
205 };
206 defer current_parsed.deinit();
207
208 // merge: overlay new data onto current data
209 // modify the current object in place with new values
210 if (current_parsed.value == .object and new_val == .object) {
211 var current_obj = current_parsed.value.object;
212 var it = new_val.object.iterator();
213 while (it.next()) |entry| {
214 current_obj.put(entry.key_ptr.*, entry.value_ptr.*) catch continue;
215 }
216 // serialize merged result
217 var out: std.Io.Writer.Allocating = .init(alloc);
218 var jw: json.Stringify = .{ .writer = &out.writer };
219 jw.write(json.Value{ .object = current_obj }) catch break :blk "{}";
220 break :blk out.toOwnedSlice() catch "{}";
221 }
222
223 // fallback: just use new data if types don't match
224 var out: std.Io.Writer.Allocating = .init(alloc);
225 var jw: json.Stringify = .{ .writer = &out.writer };
226 jw.write(new_val) catch break :blk "{}";
227 break :blk out.toOwnedSlice() catch "{}";
228 } else {
229 // no merge - just use new data directly
230 var out: std.Io.Writer.Allocating = .init(alloc);
231 var jw: json.Stringify = .{ .writer = &out.writer };
232 jw.write(new_val) catch break :blk "{}";
233 break :blk out.toOwnedSlice() catch "{}";
234 }
235 } else {
236 // no data provided - don't update data field
237 break :blk null;
238 }
239 };
240
241 const block_schema_id = if (obj.get("block_schema_id")) |v| if (v == .string) v.string else null else null;
242
243 // only update if we have something to update
244 if (data == null and block_schema_id == null) {
245 r.setStatus(.no_content);
246 r.sendBody("") catch {};
247 return;
248 }
249
250 db.block_documents.update(id, data orelse "{}", block_schema_id) catch {
251 sendJsonStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error);
252 return;
253 };
254
255 r.setStatus(.no_content);
256 r.sendBody("") catch {};
257}
258
259fn delete(r: zap.Request, id: []const u8) !void {
260 const deleted = db.block_documents.delete(id) catch {
261 sendJsonStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error);
262 return;
263 };
264
265 if (!deleted) {
266 sendJsonStatus(r, "{\"detail\":\"Block document not found\"}", .not_found);
267 return;
268 }
269
270 r.setStatus(.no_content);
271 r.sendBody("") catch {};
272}
273
274fn filter(r: zap.Request) !void {
275 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator);
276 defer arena.deinit();
277 const alloc = arena.allocator();
278
279 const docs = db.block_documents.list(alloc, 200) catch {
280 sendJsonStatus(r, "{\"detail\":\"database error\"}", .internal_server_error);
281 return;
282 };
283
284 var output: std.Io.Writer.Allocating = .init(alloc);
285 var jw: json.Stringify = .{ .writer = &output.writer };
286
287 jw.beginArray() catch {
288 sendJsonStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error);
289 return;
290 };
291 for (docs) |doc| {
292 writeBlockDocument(&jw, doc) catch continue;
293 }
294 jw.endArray() catch {};
295
296 sendJson(r, output.toOwnedSlice() catch "[]");
297}
298
299fn writeBlockDocument(jw: *json.Stringify, doc: db.block_documents.BlockDocumentRow) !void {
300 try jw.beginObject();
301
302 try jw.objectField("id");
303 try jw.write(doc.id);
304
305 try jw.objectField("created");
306 try jw.write(doc.created);
307
308 try jw.objectField("updated");
309 try jw.write(doc.updated);
310
311 try jw.objectField("name");
312 try jw.write(doc.name);
313
314 try jw.objectField("data");
315 try jw.beginWriteRaw();
316 try jw.writer.writeAll(doc.data);
317 jw.endWriteRaw();
318
319 try jw.objectField("is_anonymous");
320 try jw.write(doc.is_anonymous);
321
322 try jw.objectField("block_type_id");
323 try jw.write(doc.block_type_id);
324
325 try jw.objectField("block_schema_id");
326 try jw.write(doc.block_schema_id);
327
328 try jw.endObject();
329}
330
331fn sendBlockDocumentResponse(
332 r: zap.Request,
333 alloc: std.mem.Allocator,
334 doc: db.block_documents.BlockDocumentRow,
335 bt: ?db.block_types.BlockTypeRow,
336 bs: ?db.block_schemas.BlockSchemaRow,
337 status: zap.http.StatusCode,
338) !void {
339 var output: std.Io.Writer.Allocating = .init(alloc);
340 var jw: json.Stringify = .{ .writer = &output.writer };
341
342 jw.beginObject() catch return;
343
344 jw.objectField("id") catch return;
345 jw.write(doc.id) catch return;
346
347 jw.objectField("created") catch return;
348 jw.write(doc.created) catch return;
349
350 jw.objectField("updated") catch return;
351 jw.write(doc.updated) catch return;
352
353 jw.objectField("name") catch return;
354 jw.write(doc.name) catch return;
355
356 jw.objectField("data") catch return;
357 jw.beginWriteRaw() catch return;
358 jw.writer.writeAll(doc.data) catch return;
359 jw.endWriteRaw();
360
361 jw.objectField("is_anonymous") catch return;
362 jw.write(doc.is_anonymous) catch return;
363
364 jw.objectField("block_type_id") catch return;
365 jw.write(doc.block_type_id) catch return;
366
367 jw.objectField("block_schema_id") catch return;
368 jw.write(doc.block_schema_id) catch return;
369
370 jw.objectField("block_type") catch return;
371 if (bt) |t| {
372 jw.beginObject() catch return;
373
374 jw.objectField("id") catch return;
375 jw.write(t.id) catch return;
376
377 jw.objectField("created") catch return;
378 jw.write(t.created) catch return;
379
380 jw.objectField("updated") catch return;
381 jw.write(t.updated) catch return;
382
383 jw.objectField("name") catch return;
384 jw.write(t.name) catch return;
385
386 jw.objectField("slug") catch return;
387 jw.write(t.slug) catch return;
388
389 jw.objectField("logo_url") catch return;
390 jw.write(t.logo_url) catch return;
391
392 jw.objectField("documentation_url") catch return;
393 jw.write(t.documentation_url) catch return;
394
395 jw.objectField("description") catch return;
396 jw.write(t.description) catch return;
397
398 jw.objectField("code_example") catch return;
399 jw.write(t.code_example) catch return;
400
401 jw.objectField("is_protected") catch return;
402 jw.write(t.is_protected) catch return;
403
404 jw.endObject() catch return;
405 } else {
406 jw.write(null) catch return;
407 }
408
409 jw.objectField("block_schema") catch return;
410 if (bs) |s| {
411 jw.beginObject() catch return;
412
413 jw.objectField("id") catch return;
414 jw.write(s.id) catch return;
415
416 jw.objectField("created") catch return;
417 jw.write(s.created) catch return;
418
419 jw.objectField("updated") catch return;
420 jw.write(s.updated) catch return;
421
422 jw.objectField("checksum") catch return;
423 jw.write(s.checksum) catch return;
424
425 jw.objectField("fields") catch return;
426 jw.beginWriteRaw() catch return;
427 jw.writer.writeAll(s.fields) catch return;
428 jw.endWriteRaw();
429
430 jw.objectField("capabilities") catch return;
431 jw.beginWriteRaw() catch return;
432 jw.writer.writeAll(s.capabilities) catch return;
433 jw.endWriteRaw();
434
435 jw.objectField("version") catch return;
436 jw.write(s.version) catch return;
437
438 jw.objectField("block_type_id") catch return;
439 jw.write(s.block_type_id) catch return;
440
441 jw.endObject() catch return;
442 } else {
443 jw.write(null) catch return;
444 }
445
446 jw.objectField("block_document_references") catch return;
447 jw.beginObject() catch return;
448 jw.endObject() catch return;
449
450 jw.endObject() catch return;
451
452 sendJsonStatus(r, output.toOwnedSlice() catch "{}", status);
453}