prefect server in zig
at 7569870f69992c6426b54b9ca0721e1162577f52 453 lines 15 kB view raw
1const std = @import("std"); 2const zap = @import("zap"); 3const mem = std.mem; 4const json = std.json; 5 6const db = @import("../db/sqlite.zig"); 7const uuid_util = @import("../utilities/uuid.zig"); 8const time_util = @import("../utilities/time.zig"); 9 10fn sendJson(r: zap.Request, body: []const u8) void { 11 r.setHeader("content-type", "application/json") catch {}; 12 r.setHeader("access-control-allow-origin", "*") catch {}; 13 r.setHeader("access-control-allow-methods", "GET, POST, PATCH, DELETE, OPTIONS") catch {}; 14 r.setHeader("access-control-allow-headers", "content-type, x-prefect-api-version") catch {}; 15 r.sendBody(body) catch {}; 16} 17 18fn sendJsonStatus(r: zap.Request, body: []const u8, status: zap.http.StatusCode) void { 19 r.setStatus(status); 20 sendJson(r, body); 21} 22 23// Routes: 24// POST /block_documents/ - create 25// GET /block_documents/{id} - read by id 26// PATCH /block_documents/{id} - update 27// DELETE /block_documents/{id} - delete 28// POST /block_documents/filter - list 29pub fn handle(r: zap.Request) !void { 30 const target = r.path orelse "/"; 31 const method = r.method orelse "GET"; 32 33 // strip /api prefix if present 34 const path = if (mem.startsWith(u8, target, "/api/block_documents")) 35 target[4..] 36 else 37 target; 38 39 // POST /block_documents/filter 40 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, path, "/filter")) { 41 try filter(r); 42 return; 43 } 44 45 // POST /block_documents/ - create 46 if (mem.eql(u8, method, "POST") and (mem.eql(u8, path, "/block_documents/") or mem.eql(u8, path, "/block_documents"))) { 47 try create(r); 48 return; 49 } 50 51 // GET /block_documents/{id} or PATCH /block_documents/{id} or DELETE /block_documents/{id} 52 if (mem.startsWith(u8, path, "/block_documents/")) { 53 const id = path["/block_documents/".len..]; 54 if (id.len >= 32) { 55 if (mem.eql(u8, method, "GET")) { 56 try getById(r, id); 57 return; 58 } else if (mem.eql(u8, method, "PATCH")) { 59 try update(r, id); 60 return; 61 } else if (mem.eql(u8, method, "DELETE")) { 62 try delete(r, id); 63 return; 64 } 65 } 66 } 67 68 sendJsonStatus(r, "{\"detail\":\"not found\"}", .not_found); 69} 70 71fn create(r: zap.Request) !void { 72 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 73 defer arena.deinit(); 74 const alloc = arena.allocator(); 75 76 const body = r.body orelse { 77 sendJsonStatus(r, "{\"detail\":\"request body required\"}", .bad_request); 78 return; 79 }; 80 81 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 82 sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 83 return; 84 }; 85 86 const obj = parsed.value.object; 87 88 const block_type_id = if (obj.get("block_type_id")) |v| v.string else { 89 sendJsonStatus(r, "{\"detail\":\"block_type_id required\"}", .bad_request); 90 return; 91 }; 92 93 const block_schema_id = if (obj.get("block_schema_id")) |v| v.string else { 94 sendJsonStatus(r, "{\"detail\":\"block_schema_id required\"}", .bad_request); 95 return; 96 }; 97 98 const name = if (obj.get("name")) |v| if (v == .string) v.string else null else null; 99 const is_anonymous = if (obj.get("is_anonymous")) |v| v == .bool and v.bool else (name == null); 100 101 // serialize data 102 const data = blk: { 103 if (obj.get("data")) |v| { 104 var out: std.Io.Writer.Allocating = .init(alloc); 105 var jw: json.Stringify = .{ .writer = &out.writer }; 106 jw.write(v) catch break :blk "{}"; 107 break :blk out.toOwnedSlice() catch "{}"; 108 } else break :blk "{}"; 109 }; 110 111 // get block_type_name from block_type 112 const bt = db.block_types.getById(alloc, block_type_id) catch null; 113 const block_type_name = if (bt) |t| t.name else null; 114 115 var id_buf: [36]u8 = undefined; 116 const id = uuid_util.generate(&id_buf); 117 118 db.block_documents.insert(id, name, data, is_anonymous, block_type_id, block_type_name, block_schema_id) catch |err| { 119 if (err == error.Constraint) { 120 sendJsonStatus(r, "{\"detail\":\"Block already exists\"}", .conflict); 121 return; 122 } 123 sendJsonStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 124 return; 125 }; 126 127 var ts_buf: [32]u8 = undefined; 128 const now = time_util.timestamp(&ts_buf); 129 130 const doc = db.block_documents.BlockDocumentRow{ 131 .id = id, 132 .created = now, 133 .updated = now, 134 .name = name, 135 .data = data, 136 .is_anonymous = is_anonymous, 137 .block_type_id = block_type_id, 138 .block_type_name = block_type_name, 139 .block_schema_id = block_schema_id, 140 }; 141 142 const bs = db.block_schemas.getById(alloc, block_schema_id) catch null; 143 try sendBlockDocumentResponse(r, alloc, doc, bt, bs, .created); 144} 145 146fn getById(r: zap.Request, id: []const u8) !void { 147 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 148 defer arena.deinit(); 149 const alloc = arena.allocator(); 150 151 const doc = db.block_documents.getById(alloc, id) catch null orelse { 152 sendJsonStatus(r, "{\"detail\":\"block document not found\"}", .not_found); 153 return; 154 }; 155 156 const bt = db.block_types.getById(alloc, doc.block_type_id) catch null; 157 const bs = db.block_schemas.getById(alloc, doc.block_schema_id) catch null; 158 159 try sendBlockDocumentResponse(r, alloc, doc, bt, bs, .ok); 160} 161 162fn update(r: zap.Request, id: []const u8) !void { 163 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 164 defer arena.deinit(); 165 const alloc = arena.allocator(); 166 167 const body = r.body orelse { 168 sendJsonStatus(r, "{\"detail\":\"request body required\"}", .bad_request); 169 return; 170 }; 171 172 const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 173 sendJsonStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 174 return; 175 }; 176 177 const obj = parsed.value.object; 178 179 // merge_existing_data defaults to true (matching python behavior) 180 const merge_existing_data = if (obj.get("merge_existing_data")) |v| 181 (v == .bool and v.bool) 182 else 183 true; 184 185 // get the new data from request (if provided) 186 const new_data_value = obj.get("data"); 187 188 // determine final data to store 189 const data = blk: { 190 if (new_data_value) |new_val| { 191 if (merge_existing_data) { 192 // fetch current document to merge with 193 const current = db.block_documents.getById(alloc, id) catch null orelse { 194 sendJsonStatus(r, "{\"detail\":\"Block document not found\"}", .not_found); 195 return; 196 }; 197 198 // parse current data 199 const current_parsed = json.parseFromSlice(json.Value, alloc, current.data, .{}) catch { 200 // if current data isn't valid json, just use new data 201 var out: std.Io.Writer.Allocating = .init(alloc); 202 var jw: json.Stringify = .{ .writer = &out.writer }; 203 jw.write(new_val) catch break :blk "{}"; 204 break :blk out.toOwnedSlice() catch "{}"; 205 }; 206 defer current_parsed.deinit(); 207 208 // merge: overlay new data onto current data 209 // modify the current object in place with new values 210 if (current_parsed.value == .object and new_val == .object) { 211 var current_obj = current_parsed.value.object; 212 var it = new_val.object.iterator(); 213 while (it.next()) |entry| { 214 current_obj.put(entry.key_ptr.*, entry.value_ptr.*) catch continue; 215 } 216 // serialize merged result 217 var out: std.Io.Writer.Allocating = .init(alloc); 218 var jw: json.Stringify = .{ .writer = &out.writer }; 219 jw.write(json.Value{ .object = current_obj }) catch break :blk "{}"; 220 break :blk out.toOwnedSlice() catch "{}"; 221 } 222 223 // fallback: just use new data if types don't match 224 var out: std.Io.Writer.Allocating = .init(alloc); 225 var jw: json.Stringify = .{ .writer = &out.writer }; 226 jw.write(new_val) catch break :blk "{}"; 227 break :blk out.toOwnedSlice() catch "{}"; 228 } else { 229 // no merge - just use new data directly 230 var out: std.Io.Writer.Allocating = .init(alloc); 231 var jw: json.Stringify = .{ .writer = &out.writer }; 232 jw.write(new_val) catch break :blk "{}"; 233 break :blk out.toOwnedSlice() catch "{}"; 234 } 235 } else { 236 // no data provided - don't update data field 237 break :blk null; 238 } 239 }; 240 241 const block_schema_id = if (obj.get("block_schema_id")) |v| if (v == .string) v.string else null else null; 242 243 // only update if we have something to update 244 if (data == null and block_schema_id == null) { 245 r.setStatus(.no_content); 246 r.sendBody("") catch {}; 247 return; 248 } 249 250 db.block_documents.update(id, data orelse "{}", block_schema_id) catch { 251 sendJsonStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 252 return; 253 }; 254 255 r.setStatus(.no_content); 256 r.sendBody("") catch {}; 257} 258 259fn delete(r: zap.Request, id: []const u8) !void { 260 const deleted = db.block_documents.delete(id) catch { 261 sendJsonStatus(r, "{\"detail\":\"delete failed\"}", .internal_server_error); 262 return; 263 }; 264 265 if (!deleted) { 266 sendJsonStatus(r, "{\"detail\":\"Block document not found\"}", .not_found); 267 return; 268 } 269 270 r.setStatus(.no_content); 271 r.sendBody("") catch {}; 272} 273 274fn filter(r: zap.Request) !void { 275 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 276 defer arena.deinit(); 277 const alloc = arena.allocator(); 278 279 const docs = db.block_documents.list(alloc, 200) catch { 280 sendJsonStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 281 return; 282 }; 283 284 var output: std.Io.Writer.Allocating = .init(alloc); 285 var jw: json.Stringify = .{ .writer = &output.writer }; 286 287 jw.beginArray() catch { 288 sendJsonStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 289 return; 290 }; 291 for (docs) |doc| { 292 writeBlockDocument(&jw, doc) catch continue; 293 } 294 jw.endArray() catch {}; 295 296 sendJson(r, output.toOwnedSlice() catch "[]"); 297} 298 299fn writeBlockDocument(jw: *json.Stringify, doc: db.block_documents.BlockDocumentRow) !void { 300 try jw.beginObject(); 301 302 try jw.objectField("id"); 303 try jw.write(doc.id); 304 305 try jw.objectField("created"); 306 try jw.write(doc.created); 307 308 try jw.objectField("updated"); 309 try jw.write(doc.updated); 310 311 try jw.objectField("name"); 312 try jw.write(doc.name); 313 314 try jw.objectField("data"); 315 try jw.beginWriteRaw(); 316 try jw.writer.writeAll(doc.data); 317 jw.endWriteRaw(); 318 319 try jw.objectField("is_anonymous"); 320 try jw.write(doc.is_anonymous); 321 322 try jw.objectField("block_type_id"); 323 try jw.write(doc.block_type_id); 324 325 try jw.objectField("block_schema_id"); 326 try jw.write(doc.block_schema_id); 327 328 try jw.endObject(); 329} 330 331fn sendBlockDocumentResponse( 332 r: zap.Request, 333 alloc: std.mem.Allocator, 334 doc: db.block_documents.BlockDocumentRow, 335 bt: ?db.block_types.BlockTypeRow, 336 bs: ?db.block_schemas.BlockSchemaRow, 337 status: zap.http.StatusCode, 338) !void { 339 var output: std.Io.Writer.Allocating = .init(alloc); 340 var jw: json.Stringify = .{ .writer = &output.writer }; 341 342 jw.beginObject() catch return; 343 344 jw.objectField("id") catch return; 345 jw.write(doc.id) catch return; 346 347 jw.objectField("created") catch return; 348 jw.write(doc.created) catch return; 349 350 jw.objectField("updated") catch return; 351 jw.write(doc.updated) catch return; 352 353 jw.objectField("name") catch return; 354 jw.write(doc.name) catch return; 355 356 jw.objectField("data") catch return; 357 jw.beginWriteRaw() catch return; 358 jw.writer.writeAll(doc.data) catch return; 359 jw.endWriteRaw(); 360 361 jw.objectField("is_anonymous") catch return; 362 jw.write(doc.is_anonymous) catch return; 363 364 jw.objectField("block_type_id") catch return; 365 jw.write(doc.block_type_id) catch return; 366 367 jw.objectField("block_schema_id") catch return; 368 jw.write(doc.block_schema_id) catch return; 369 370 jw.objectField("block_type") catch return; 371 if (bt) |t| { 372 jw.beginObject() catch return; 373 374 jw.objectField("id") catch return; 375 jw.write(t.id) catch return; 376 377 jw.objectField("created") catch return; 378 jw.write(t.created) catch return; 379 380 jw.objectField("updated") catch return; 381 jw.write(t.updated) catch return; 382 383 jw.objectField("name") catch return; 384 jw.write(t.name) catch return; 385 386 jw.objectField("slug") catch return; 387 jw.write(t.slug) catch return; 388 389 jw.objectField("logo_url") catch return; 390 jw.write(t.logo_url) catch return; 391 392 jw.objectField("documentation_url") catch return; 393 jw.write(t.documentation_url) catch return; 394 395 jw.objectField("description") catch return; 396 jw.write(t.description) catch return; 397 398 jw.objectField("code_example") catch return; 399 jw.write(t.code_example) catch return; 400 401 jw.objectField("is_protected") catch return; 402 jw.write(t.is_protected) catch return; 403 404 jw.endObject() catch return; 405 } else { 406 jw.write(null) catch return; 407 } 408 409 jw.objectField("block_schema") catch return; 410 if (bs) |s| { 411 jw.beginObject() catch return; 412 413 jw.objectField("id") catch return; 414 jw.write(s.id) catch return; 415 416 jw.objectField("created") catch return; 417 jw.write(s.created) catch return; 418 419 jw.objectField("updated") catch return; 420 jw.write(s.updated) catch return; 421 422 jw.objectField("checksum") catch return; 423 jw.write(s.checksum) catch return; 424 425 jw.objectField("fields") catch return; 426 jw.beginWriteRaw() catch return; 427 jw.writer.writeAll(s.fields) catch return; 428 jw.endWriteRaw(); 429 430 jw.objectField("capabilities") catch return; 431 jw.beginWriteRaw() catch return; 432 jw.writer.writeAll(s.capabilities) catch return; 433 jw.endWriteRaw(); 434 435 jw.objectField("version") catch return; 436 jw.write(s.version) catch return; 437 438 jw.objectField("block_type_id") catch return; 439 jw.write(s.block_type_id) catch return; 440 441 jw.endObject() catch return; 442 } else { 443 jw.write(null) catch return; 444 } 445 446 jw.objectField("block_document_references") catch return; 447 jw.beginObject() catch return; 448 jw.endObject() catch return; 449 450 jw.endObject() catch return; 451 452 sendJsonStatus(r, output.toOwnedSlice() catch "{}", status); 453}