prefect server in zig

add PATCH/DELETE for flows and flow_runs

- flows: PATCH /flows/{id} (update tags), DELETE /flows/{id}
- flow_runs: DELETE /flow_runs/{id} (cascade deletes states)
- test-matrix: ensure port released between tests
- update ROADMAP.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+179 -16
+4 -4
ROADMAP.md
··· 11 11 - [x] POST /api/flows/ 12 12 - [x] GET /api/flows/{id} 13 13 - [x] POST /api/flows/filter 14 - - [ ] PATCH /api/flows/{id} 15 - - [ ] DELETE /api/flows/{id} 14 + - [x] PATCH /api/flows/{id} 15 + - [x] DELETE /api/flows/{id} 16 16 - [x] POST /api/flow_runs/ 17 17 - [x] GET /api/flow_runs/{id} 18 18 - [x] POST /api/flow_runs/{id}/set_state 19 19 - [x] POST /api/flow_runs/filter 20 - - [ ] PATCH /api/flow_runs/{id} 21 - - [ ] DELETE /api/flow_runs/{id} 20 + - [x] PATCH /api/flow_runs/{id} 21 + - [x] DELETE /api/flow_runs/{id} 22 22 - [x] POST /api/logs/ 23 23 - [x] WS /api/events/in 24 24
+1 -1
build.zig.zon
··· 30 30 }, 31 31 .logfire = .{ 32 32 .url = "https://tangled.sh/zzstoatzz.io/logfire-zig/archive/main", 33 - .hash = "logfire_zig-0.1.0-x2yDLnOdAQAvTiA7t9y2PFV-GK6n4pCKPalmannHH2Up", 33 + .hash = "logfire_zig-0.1.0-x2yDLgdwAABPUnnE_Smk0Rjrf93qyC5vcJXzvieuNFWI", 34 34 }, 35 35 }, 36 36 .paths = .{
+2 -2
loq.toml
··· 26 26 27 27 [[rules]] 28 28 path = "src/api/flow_runs.zig" 29 - max_lines = 750 29 + max_lines = 800 30 30 31 31 [[rules]] 32 32 path = "src/db/flow_runs.zig" 33 - max_lines = 600 33 + max_lines = 650
+8
scripts/test-matrix
··· 195 195 wait "$SERVER_PID" 2>/dev/null || true 196 196 SERVER_PID="" 197 197 198 + # ensure port is released before next test 199 + local port_wait=0 200 + while [[ $port_wait -lt 20 ]] && lsof -ti:$TEST_PORT >/dev/null 2>&1; do 201 + lsof -ti:$TEST_PORT 2>/dev/null | xargs -r kill 2>/dev/null || true 202 + sleep 0.25 203 + ((port_wait++)) 204 + done 205 + 198 206 rm -f "$db_path" 199 207 return $result 200 208 }
+20
src/api/flow_runs.zig
··· 174 174 } 175 175 } 176 176 177 + // DELETE /flow_runs/{id} - delete 178 + if (mem.eql(u8, method, "DELETE")) { 179 + const id = routing.extractIdAfter(target, "/flow_runs/") orelse 180 + routing.extractIdAfter(target, "/api/flow_runs/"); 181 + if (id) |flow_run_id| { 182 + try delete(r, flow_run_id); 183 + return; 184 + } 185 + } 186 + 177 187 json_util.sendStatus(r, "{\"detail\":\"not found\"}", .not_found); 178 188 } 179 189 ··· 346 356 return; 347 357 }; 348 358 json_util.send(r, resp); 359 + } 360 + 361 + fn delete(r: zap.Request, id: []const u8) !void { 362 + const deleted = db.flow_runs.delete(id) catch false; 363 + if (!deleted) { 364 + json_util.sendStatus(r, "{\"detail\":\"flow run not found\"}", .not_found); 365 + return; 366 + } 367 + 368 + json_util.sendStatus(r, "", .no_content); 349 369 } 350 370 351 371 fn setState(r: zap.Request, id: []const u8) !void {
+61 -9
src/api/flows.zig
··· 11 11 // POST /flows/ - create or get flow by name 12 12 // POST /flows/filter - list flows 13 13 // GET /flows/{id} - get flow by id 14 + // PATCH /flows/{id} - update flow 15 + // DELETE /flows/{id} - delete flow 14 16 pub fn handle(r: zap.Request) !void { 15 17 const target = r.path orelse "/"; 16 18 const method = r.method orelse "GET"; ··· 21 23 return; 22 24 } 23 25 26 + // extract id from path for GET/PATCH/DELETE 27 + const prefix = if (mem.startsWith(u8, target, "/api/flows/")) "/api/flows/" else "/flows/"; 28 + const has_id = target.len > prefix.len; 29 + const flow_id = if (has_id) target[prefix.len..] else ""; 30 + 24 31 if (mem.eql(u8, method, "POST") and (mem.eql(u8, target, "/flows/") or mem.eql(u8, target, "/api/flows/"))) { 25 32 try createFlow(r); 26 - } else if (mem.eql(u8, method, "GET")) { 27 - // extract id from path: /api/flows/{id} or /flows/{id} 28 - const prefix = if (mem.startsWith(u8, target, "/api/flows/")) "/api/flows/" else "/flows/"; 29 - if (target.len > prefix.len) { 30 - const flow_id = target[prefix.len..]; 31 - try getFlow(r, flow_id); 32 - } else { 33 - json_util.sendStatus(r, "{\"detail\":\"flow id required\"}", .bad_request); 34 - } 33 + } else if (mem.eql(u8, method, "GET") and has_id) { 34 + try getFlow(r, flow_id); 35 + } else if (mem.eql(u8, method, "PATCH") and has_id) { 36 + try patchFlow(r, flow_id); 37 + } else if (mem.eql(u8, method, "DELETE") and has_id) { 38 + try deleteFlow(r, flow_id); 39 + } else if (mem.eql(u8, method, "GET") or mem.eql(u8, method, "PATCH") or mem.eql(u8, method, "DELETE")) { 40 + json_util.sendStatus(r, "{\"detail\":\"flow id required\"}", .bad_request); 35 41 } else { 36 42 json_util.sendStatus(r, "{\"detail\":\"not implemented\"}", .not_implemented); 37 43 } ··· 109 115 } else { 110 116 json_util.sendStatus(r, "{\"detail\":\"flow not found\"}", .not_found); 111 117 } 118 + } 119 + 120 + fn patchFlow(r: zap.Request, flow_id: []const u8) !void { 121 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 122 + defer arena.deinit(); 123 + const alloc = arena.allocator(); 124 + 125 + const body = r.body orelse { 126 + json_util.sendStatus(r, "{\"detail\":\"failed to read body\"}", .bad_request); 127 + return; 128 + }; 129 + 130 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 131 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 132 + return; 133 + }; 134 + 135 + // extract tags if provided 136 + var tags_json: ?[]const u8 = null; 137 + if (parsed.value.object.get("tags")) |tags_val| { 138 + var output: std.Io.Writer.Allocating = .init(alloc); 139 + var jw: json.Stringify = .{ .writer = &output.writer }; 140 + jw.write(tags_val) catch { 141 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 142 + return; 143 + }; 144 + tags_json = output.toOwnedSlice() catch null; 145 + } 146 + 147 + const updated = db.updateFlow(flow_id, tags_json) catch false; 148 + if (!updated) { 149 + json_util.sendStatus(r, "{\"detail\":\"flow not found\"}", .not_found); 150 + return; 151 + } 152 + 153 + json_util.sendStatus(r, "", .no_content); 154 + } 155 + 156 + fn deleteFlow(r: zap.Request, flow_id: []const u8) !void { 157 + const deleted = db.deleteFlow(flow_id) catch false; 158 + if (!deleted) { 159 + json_util.sendStatus(r, "{\"detail\":\"flow not found\"}", .not_found); 160 + return; 161 + } 162 + 163 + json_util.sendStatus(r, "", .no_content); 112 164 } 113 165 114 166 fn filter(r: zap.Request) !void {
+33
src/db/flow_runs.zig
··· 589 589 _ = infrastructure_pid; 590 590 // TODO: add infrastructure_pid column and update it 591 591 } 592 + 593 + pub fn delete(id: []const u8) !bool { 594 + // check if flow run exists 595 + var rows = backend.db.query( 596 + "SELECT id FROM flow_run WHERE id = ?", 597 + .{id}, 598 + ) catch return false; 599 + defer rows.deinit(); 600 + 601 + if (rows.next() == null) { 602 + return false; 603 + } 604 + 605 + // delete associated state records first 606 + backend.db.exec( 607 + "DELETE FROM flow_run_state WHERE flow_run_id = ?", 608 + .{id}, 609 + ) catch |err| { 610 + log.err("database", "delete flow_run_state error: {}", .{err}); 611 + return false; 612 + }; 613 + 614 + // delete the flow run 615 + backend.db.exec( 616 + "DELETE FROM flow_run WHERE id = ?", 617 + .{id}, 618 + ) catch |err| { 619 + log.err("database", "delete flow_run error: {}", .{err}); 620 + return false; 621 + }; 622 + 623 + return true; 624 + }
+48
src/db/flows.zig
··· 68 68 return results.toOwnedSlice(alloc); 69 69 } 70 70 71 + pub fn update(id: []const u8, tags: ?[]const u8) !bool { 72 + // check if flow exists 73 + var rows = backend.db.query( 74 + "SELECT id FROM flow WHERE id = ?", 75 + .{id}, 76 + ) catch return false; 77 + defer rows.deinit(); 78 + 79 + if (rows.next() == null) { 80 + return false; 81 + } 82 + 83 + if (tags) |t| { 84 + backend.db.exec( 85 + "UPDATE flow SET tags = ?, updated = CURRENT_TIMESTAMP WHERE id = ?", 86 + .{ t, id }, 87 + ) catch |err| { 88 + log.err("database", "update flow error: {}", .{err}); 89 + return false; 90 + }; 91 + } 92 + 93 + return true; 94 + } 95 + 96 + pub fn delete(id: []const u8) !bool { 97 + // check if flow exists 98 + var rows = backend.db.query( 99 + "SELECT id FROM flow WHERE id = ?", 100 + .{id}, 101 + ) catch return false; 102 + defer rows.deinit(); 103 + 104 + if (rows.next() == null) { 105 + return false; 106 + } 107 + 108 + backend.db.exec( 109 + "DELETE FROM flow WHERE id = ?", 110 + .{id}, 111 + ) catch |err| { 112 + log.err("database", "delete flow error: {}", .{err}); 113 + return false; 114 + }; 115 + 116 + return true; 117 + } 118 + 71 119 fn rowToFlow(alloc: Allocator, row: anytype) FlowRow { 72 120 return .{ 73 121 .id = alloc.dupe(u8, row.text(0)) catch "",
+2
src/db/sqlite.zig
··· 30 30 pub const getFlowById = flows.getById; 31 31 pub const insertFlow = flows.insert; 32 32 pub const listFlows = flows.list; 33 + pub const updateFlow = flows.update; 34 + pub const deleteFlow = flows.delete; 33 35 34 36 pub const insertFlowRun = flow_runs.insert; 35 37 pub const getFlowRun = flow_runs.get;