prefect server in zig

add log persistence (POST /api/logs/, POST /api/logs/filter)

- create log table with migration 004_log_table
- POST /api/logs/ - batch insert logs (name, level, message, timestamp, flow_run_id, task_run_id)
- POST /api/logs/filter - filter by flow_run_id, task_run_id, level (ge/le), timestamp (after/before)
- supports both sqlite and postgres backends
- logs survive server restart (previously in-memory only)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+484 -6
+7 -2
ROADMAP.md
··· 20 20 - [x] PATCH /api/flow_runs/{id} 21 21 - [x] DELETE /api/flow_runs/{id} 22 22 - [x] POST /api/logs/ 23 + - [x] POST /api/logs/filter 23 24 - [x] WS /api/events/in 24 25 25 26 ### task runs ··· 83 84 - [x] GET /api/block_documents/{id} 84 85 - [x] PATCH /api/block_documents/{id} 85 86 - [x] DELETE /api/block_documents/{id} 86 - - [ ] POST /api/block_capabilities/ 87 + - [ ] GET /api/block_capabilities/ (optional - discovery only, not used by client) 87 88 88 89 ### concurrency (v2 only - skip v1 API) 89 90 - [x] POST /api/v2/concurrency_limits/ ··· 200 201 - [ ] artifact table 201 202 - [ ] automation table 202 203 - [x] variable table 203 - - [ ] log table (currently in-memory only) 204 + - [x] log table 204 205 205 206 ### database backends 206 207 - [x] sqlite (zqlite) ··· 261 262 - `POST /work_pools/{name}/get_scheduled_flow_runs` 262 263 - returns runs ready to execute 263 264 - updates pool/deployment status to READY 265 + 266 + ### short-term plan 267 + 1. ~~**log persistence**~~ ✓ - logs now persisted to database 268 + 2. **automations** - event-driven triggers, core feature for production use 264 269 265 270 ### what's working (~5x faster than python) 266 271 - flow/flow_run/task_run lifecycle
+239 -4
src/api/logs.zig
··· 1 1 const std = @import("std"); 2 2 const zap = @import("zap"); 3 + const db = @import("../db/sqlite.zig"); 4 + const json_util = @import("../utilities/json.zig"); 5 + const uuid_util = @import("../utilities/uuid.zig"); 6 + const logging = @import("../logging.zig"); 3 7 4 8 fn sendJson(r: zap.Request, body: []const u8) void { 5 9 r.setHeader("content-type", "application/json") catch {}; ··· 7 11 r.sendBody(body) catch {}; 8 12 } 9 13 14 + /// Write JSON-escaped string (without quotes) 15 + fn writeJsonString(writer: anytype, s: []const u8) !void { 16 + for (s) |c| { 17 + switch (c) { 18 + '"' => try writer.writeAll("\\\""), 19 + '\\' => try writer.writeAll("\\\\"), 20 + '\n' => try writer.writeAll("\\n"), 21 + '\r' => try writer.writeAll("\\r"), 22 + '\t' => try writer.writeAll("\\t"), 23 + 0x00...0x08, 0x0b, 0x0c, 0x0e...0x1f => { 24 + var buf: [6]u8 = undefined; 25 + const slice = std.fmt.bufPrint(&buf, "\\u{x:0>4}", .{c}) catch unreachable; 26 + try writer.writeAll(slice); 27 + }, 28 + else => try writer.writeByte(c), 29 + } 30 + } 31 + } 32 + 10 33 pub fn handle(r: zap.Request) !void { 34 + const target = r.path orelse "/"; 11 35 const method = r.method orelse "GET"; 12 36 13 - // accept logs but don't store them yet (TODO: implement log storage) 37 + // POST /logs/ - create logs (batch) 14 38 if (std.mem.eql(u8, method, "POST")) { 15 - // body is automatically handled by zap 16 - r.setStatus(.created); 17 - sendJson(r, "[]"); 39 + if (std.mem.endsWith(u8, target, "/filter")) { 40 + try filterLogs(r); 41 + } else { 42 + try createLogs(r); 43 + } 18 44 } else { 19 45 sendJson(r, "[]"); 20 46 } 21 47 } 48 + 49 + fn createLogs(r: zap.Request) !void { 50 + const body = r.body orelse { 51 + json_util.sendStatus(r, "{\"detail\":\"missing body\"}", .bad_request); 52 + return; 53 + }; 54 + 55 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 56 + defer arena.deinit(); 57 + const alloc = arena.allocator(); 58 + 59 + // parse JSON array of logs 60 + const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch { 61 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 62 + return; 63 + }; 64 + 65 + if (parsed.value != .array) { 66 + json_util.sendStatus(r, "{\"detail\":\"expected array of logs\"}", .bad_request); 67 + return; 68 + } 69 + 70 + var inserted: usize = 0; 71 + for (parsed.value.array.items) |log_val| { 72 + if (log_val != .object) continue; 73 + const log_obj = log_val.object; 74 + 75 + // extract required fields 76 + const name = if (log_obj.get("name")) |v| (if (v == .string) v.string else null) else null; 77 + const message = if (log_obj.get("message")) |v| (if (v == .string) v.string else null) else null; 78 + const timestamp = if (log_obj.get("timestamp")) |v| (if (v == .string) v.string else null) else null; 79 + 80 + if (name == null or message == null or timestamp == null) continue; 81 + 82 + // extract level (default to 20 = INFO) 83 + const level: i64 = if (log_obj.get("level")) |v| switch (v) { 84 + .integer => v.integer, 85 + .number_string => std.fmt.parseInt(i64, v.number_string, 10) catch 20, 86 + else => 20, 87 + } else 20; 88 + 89 + // extract optional flow_run_id and task_run_id 90 + const flow_run_id = if (log_obj.get("flow_run_id")) |v| (if (v == .string) v.string else null) else null; 91 + const task_run_id = if (log_obj.get("task_run_id")) |v| (if (v == .string) v.string else null) else null; 92 + 93 + // generate id 94 + var id_buf: [36]u8 = undefined; 95 + const id = uuid_util.generate(&id_buf); 96 + 97 + db.logs.insert(id, name.?, level, message.?, timestamp.?, flow_run_id, task_run_id) catch |err| { 98 + logging.err("logs", "insert error: {}", .{err}); 99 + continue; 100 + }; 101 + inserted += 1; 102 + } 103 + 104 + r.setStatus(.created); 105 + sendJson(r, "[]"); 106 + } 107 + 108 + fn filterLogs(r: zap.Request) !void { 109 + const body = r.body orelse { 110 + json_util.sendStatus(r, "{\"detail\":\"missing body\"}", .bad_request); 111 + return; 112 + }; 113 + 114 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 115 + defer arena.deinit(); 116 + const alloc = arena.allocator(); 117 + 118 + const parsed = std.json.parseFromSlice(std.json.Value, alloc, body, .{}) catch { 119 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 120 + return; 121 + }; 122 + 123 + if (parsed.value != .object) { 124 + json_util.sendStatus(r, "{\"detail\":\"expected object\"}", .bad_request); 125 + return; 126 + } 127 + 128 + const obj = parsed.value.object; 129 + 130 + // extract pagination 131 + const limit: usize = if (obj.get("limit")) |v| switch (v) { 132 + .integer => @intCast(@max(1, @min(1000, v.integer))), 133 + else => 200, 134 + } else 200; 135 + 136 + const offset: usize = if (obj.get("offset")) |v| switch (v) { 137 + .integer => @intCast(@max(0, v.integer)), 138 + else => 0, 139 + } else 0; 140 + 141 + // extract sort (default TIMESTAMP_ASC) 142 + const sort_asc = if (obj.get("sort")) |v| blk: { 143 + if (v == .string) { 144 + break :blk std.mem.eql(u8, v.string, "TIMESTAMP_ASC"); 145 + } 146 + break :blk true; 147 + } else true; 148 + 149 + // extract filter options from "logs" object 150 + var filter_opts = db.logs.FilterOptions{ 151 + .limit = limit, 152 + .offset = offset, 153 + .sort_asc = sort_asc, 154 + }; 155 + 156 + if (obj.get("logs")) |logs_obj| { 157 + if (logs_obj == .object) { 158 + const lf = logs_obj.object; 159 + 160 + // flow_run_id filter 161 + if (lf.get("flow_run_id")) |frf| { 162 + if (frf == .object) { 163 + if (frf.object.get("any_")) |any_arr| { 164 + if (any_arr == .array and any_arr.array.items.len > 0) { 165 + if (any_arr.array.items[0] == .string) { 166 + filter_opts.flow_run_id = any_arr.array.items[0].string; 167 + } 168 + } 169 + } 170 + } 171 + } 172 + 173 + // task_run_id filter 174 + if (lf.get("task_run_id")) |trf| { 175 + if (trf == .object) { 176 + if (trf.object.get("any_")) |any_arr| { 177 + if (any_arr == .array and any_arr.array.items.len > 0) { 178 + if (any_arr.array.items[0] == .string) { 179 + filter_opts.task_run_id = any_arr.array.items[0].string; 180 + } 181 + } 182 + } 183 + } 184 + } 185 + 186 + // level filter 187 + if (lf.get("level")) |level_obj| { 188 + if (level_obj == .object) { 189 + if (level_obj.object.get("ge_")) |ge| { 190 + if (ge == .integer) filter_opts.level_ge = ge.integer; 191 + } 192 + if (level_obj.object.get("le_")) |le| { 193 + if (le == .integer) filter_opts.level_le = le.integer; 194 + } 195 + } 196 + } 197 + 198 + // timestamp filter 199 + if (lf.get("timestamp")) |ts_obj| { 200 + if (ts_obj == .object) { 201 + if (ts_obj.object.get("after_")) |after| { 202 + if (after == .string) filter_opts.timestamp_after = after.string; 203 + } 204 + if (ts_obj.object.get("before_")) |before| { 205 + if (before == .string) filter_opts.timestamp_before = before.string; 206 + } 207 + } 208 + } 209 + } 210 + } 211 + 212 + // query logs 213 + const rows = db.logs.filter(alloc, filter_opts) catch |err| { 214 + logging.err("logs", "filter error: {}", .{err}); 215 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 216 + return; 217 + }; 218 + 219 + // build JSON response 220 + var json_buf = std.ArrayListUnmanaged(u8){}; 221 + const writer = json_buf.writer(alloc); 222 + 223 + try writer.writeByte('['); 224 + for (rows, 0..) |log_row, i| { 225 + if (i > 0) try writer.writeByte(','); 226 + try writer.print( 227 + \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","level":{d},"message":" 228 + , .{ log_row.id, log_row.created, log_row.updated, log_row.name, log_row.level }); 229 + 230 + // escape message for JSON 231 + try writeJsonString(writer, log_row.message); 232 + 233 + try writer.print( 234 + \\","timestamp":"{s}" 235 + , .{log_row.timestamp}); 236 + 237 + // flow_run_id 238 + if (log_row.flow_run_id) |fid| { 239 + try writer.print(",\"flow_run_id\":\"{s}\"", .{fid}); 240 + } else { 241 + try writer.writeAll(",\"flow_run_id\":null"); 242 + } 243 + 244 + // task_run_id 245 + if (log_row.task_run_id) |tid| { 246 + try writer.print(",\"task_run_id\":\"{s}\"", .{tid}); 247 + } else { 248 + try writer.writeAll(",\"task_run_id\":null"); 249 + } 250 + 251 + try writer.writeByte('}'); 252 + } 253 + try writer.writeByte(']'); 254 + 255 + sendJson(r, json_buf.items); 256 + }
+192
src/db/logs.zig
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + 4 + const backend = @import("backend.zig"); 5 + const log = @import("../logging.zig"); 6 + 7 + pub const LogRow = struct { 8 + id: []const u8, 9 + created: []const u8, 10 + updated: []const u8, 11 + name: []const u8, 12 + level: i64, 13 + message: []const u8, 14 + timestamp: []const u8, 15 + flow_run_id: ?[]const u8, 16 + task_run_id: ?[]const u8, 17 + }; 18 + 19 + const Col = struct { 20 + const id: usize = 0; 21 + const created: usize = 1; 22 + const updated: usize = 2; 23 + const name: usize = 3; 24 + const level: usize = 4; 25 + const message: usize = 5; 26 + const timestamp: usize = 6; 27 + const flow_run_id: usize = 7; 28 + const task_run_id: usize = 8; 29 + }; 30 + 31 + const select_cols = "id, created, updated, name, level, message, timestamp, flow_run_id, task_run_id"; 32 + 33 + fn rowFromResult(alloc: Allocator, r: anytype) !LogRow { 34 + const flow_run_id_text = r.text(Col.flow_run_id); 35 + const task_run_id_text = r.text(Col.task_run_id); 36 + 37 + return LogRow{ 38 + .id = try alloc.dupe(u8, r.text(Col.id)), 39 + .created = try alloc.dupe(u8, r.text(Col.created)), 40 + .updated = try alloc.dupe(u8, r.text(Col.updated)), 41 + .name = try alloc.dupe(u8, r.text(Col.name)), 42 + .level = r.bigint(Col.level), 43 + .message = try alloc.dupe(u8, r.text(Col.message)), 44 + .timestamp = try alloc.dupe(u8, r.text(Col.timestamp)), 45 + .flow_run_id = if (flow_run_id_text.len > 0) try alloc.dupe(u8, flow_run_id_text) else null, 46 + .task_run_id = if (task_run_id_text.len > 0) try alloc.dupe(u8, task_run_id_text) else null, 47 + }; 48 + } 49 + 50 + pub fn insert(id: []const u8, name: []const u8, level: i64, message: []const u8, timestamp: []const u8, flow_run_id: ?[]const u8, task_run_id: ?[]const u8) !void { 51 + backend.db.exec( 52 + "INSERT INTO log (id, name, level, message, timestamp, flow_run_id, task_run_id) VALUES (?, ?, ?, ?, ?, ?, ?)", 53 + .{ id, name, level, message, timestamp, flow_run_id, task_run_id }, 54 + ) catch |err| { 55 + log.err("database", "insert log error: {}", .{err}); 56 + return err; 57 + }; 58 + } 59 + 60 + pub fn getById(alloc: Allocator, id: []const u8) !?LogRow { 61 + var r = backend.db.row( 62 + "SELECT " ++ select_cols ++ " FROM log WHERE id = ?", 63 + .{id}, 64 + ) catch return null; 65 + 66 + if (r) |*row| { 67 + defer row.deinit(); 68 + return try rowFromResult(alloc, row); 69 + } 70 + return null; 71 + } 72 + 73 + pub const FilterOptions = struct { 74 + flow_run_id: ?[]const u8 = null, 75 + task_run_id: ?[]const u8 = null, 76 + level_ge: ?i64 = null, 77 + level_le: ?i64 = null, 78 + timestamp_after: ?[]const u8 = null, 79 + timestamp_before: ?[]const u8 = null, 80 + limit: usize = 200, 81 + offset: usize = 0, 82 + sort_asc: bool = true, 83 + }; 84 + 85 + /// Escape a string for SQL (double single quotes) 86 + fn escapeString(alloc: Allocator, s: []const u8) ![]const u8 { 87 + var result = std.ArrayListUnmanaged(u8){}; 88 + for (s) |c| { 89 + if (c == '\'') { 90 + try result.appendSlice(alloc, "''"); 91 + } else { 92 + try result.append(alloc, c); 93 + } 94 + } 95 + return result.toOwnedSlice(alloc); 96 + } 97 + 98 + pub fn filter(alloc: Allocator, opts: FilterOptions) ![]LogRow { 99 + var results = std.ArrayListUnmanaged(LogRow){}; 100 + errdefer results.deinit(alloc); 101 + 102 + // build dynamic query with embedded values 103 + var query_buf = std.ArrayListUnmanaged(u8){}; 104 + defer query_buf.deinit(alloc); 105 + const writer = query_buf.writer(alloc); 106 + 107 + try writer.writeAll("SELECT " ++ select_cols ++ " FROM log WHERE 1=1"); 108 + 109 + if (opts.flow_run_id) |fid| { 110 + const escaped = try escapeString(alloc, fid); 111 + defer alloc.free(escaped); 112 + try writer.print(" AND flow_run_id = '{s}'", .{escaped}); 113 + } 114 + 115 + if (opts.task_run_id) |tid| { 116 + const escaped = try escapeString(alloc, tid); 117 + defer alloc.free(escaped); 118 + try writer.print(" AND task_run_id = '{s}'", .{escaped}); 119 + } 120 + 121 + if (opts.level_ge) |lvl| { 122 + try writer.print(" AND level >= {d}", .{lvl}); 123 + } 124 + 125 + if (opts.level_le) |lvl| { 126 + try writer.print(" AND level <= {d}", .{lvl}); 127 + } 128 + 129 + if (opts.timestamp_after) |ts| { 130 + const escaped = try escapeString(alloc, ts); 131 + defer alloc.free(escaped); 132 + try writer.print(" AND timestamp >= '{s}'", .{escaped}); 133 + } 134 + 135 + if (opts.timestamp_before) |ts| { 136 + const escaped = try escapeString(alloc, ts); 137 + defer alloc.free(escaped); 138 + try writer.print(" AND timestamp <= '{s}'", .{escaped}); 139 + } 140 + 141 + if (opts.sort_asc) { 142 + try writer.writeAll(" ORDER BY timestamp ASC"); 143 + } else { 144 + try writer.writeAll(" ORDER BY timestamp DESC"); 145 + } 146 + 147 + try writer.print(" LIMIT {d} OFFSET {d}", .{ opts.limit, opts.offset }); 148 + 149 + const query = query_buf.items; 150 + 151 + var rows = backend.db.query(query, .{}) catch |err| { 152 + log.err("database", "filter logs error: {}", .{err}); 153 + return err; 154 + }; 155 + defer rows.deinit(); 156 + 157 + while (rows.next()) |r| { 158 + try results.append(alloc, try rowFromResult(alloc, &r)); 159 + } 160 + 161 + return results.toOwnedSlice(alloc); 162 + } 163 + 164 + pub fn listByFlowRunId(alloc: Allocator, flow_run_id: []const u8, limit: usize, offset: usize) ![]LogRow { 165 + return filter(alloc, .{ 166 + .flow_run_id = flow_run_id, 167 + .limit = limit, 168 + .offset = offset, 169 + }); 170 + } 171 + 172 + pub fn deleteByFlowRunId(flow_run_id: []const u8) !usize { 173 + const affected = backend.db.execWithRowCount( 174 + "DELETE FROM log WHERE flow_run_id = ?", 175 + .{flow_run_id}, 176 + ) catch |err| { 177 + log.err("database", "delete logs by flow_run_id error: {}", .{err}); 178 + return err; 179 + }; 180 + return affected; 181 + } 182 + 183 + pub fn deleteByTaskRunId(task_run_id: []const u8) !usize { 184 + const affected = backend.db.execWithRowCount( 185 + "DELETE FROM log WHERE task_run_id = ?", 186 + .{task_run_id}, 187 + ) catch |err| { 188 + log.err("database", "delete logs by task_run_id error: {}", .{err}); 189 + return err; 190 + }; 191 + return affected; 192 + }
+20
src/db/migrations/004_log_table/postgres.sql
··· 1 + -- 004_log_table: add log table for persistent log storage 2 + -- previously logs were in-memory only 3 + 4 + CREATE TABLE IF NOT EXISTS log ( 5 + id TEXT PRIMARY KEY, 6 + created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 7 + updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 8 + name TEXT NOT NULL, 9 + level INTEGER NOT NULL, 10 + message TEXT NOT NULL, 11 + timestamp TEXT NOT NULL, 12 + flow_run_id TEXT REFERENCES flow_run(id) ON DELETE CASCADE, 13 + task_run_id TEXT REFERENCES task_run(id) ON DELETE CASCADE 14 + ); 15 + 16 + CREATE INDEX IF NOT EXISTS ix_log__flow_run_id ON log(flow_run_id); 17 + CREATE INDEX IF NOT EXISTS ix_log__task_run_id ON log(task_run_id); 18 + CREATE INDEX IF NOT EXISTS ix_log__level ON log(level); 19 + CREATE INDEX IF NOT EXISTS ix_log__timestamp ON log(timestamp); 20 + CREATE INDEX IF NOT EXISTS ix_log__flow_run_id_timestamp ON log(flow_run_id, timestamp);
+20
src/db/migrations/004_log_table/sqlite.sql
··· 1 + -- 004_log_table: add log table for persistent log storage 2 + -- previously logs were in-memory only 3 + 4 + CREATE TABLE IF NOT EXISTS log ( 5 + id TEXT PRIMARY KEY, 6 + created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 7 + updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 8 + name TEXT NOT NULL, 9 + level INTEGER NOT NULL, 10 + message TEXT NOT NULL, 11 + timestamp TEXT NOT NULL, 12 + flow_run_id TEXT REFERENCES flow_run(id) ON DELETE CASCADE, 13 + task_run_id TEXT REFERENCES task_run(id) ON DELETE CASCADE 14 + ); 15 + 16 + CREATE INDEX IF NOT EXISTS ix_log__flow_run_id ON log(flow_run_id); 17 + CREATE INDEX IF NOT EXISTS ix_log__task_run_id ON log(task_run_id); 18 + CREATE INDEX IF NOT EXISTS ix_log__level ON log(level); 19 + CREATE INDEX IF NOT EXISTS ix_log__timestamp ON log(timestamp); 20 + CREATE INDEX IF NOT EXISTS ix_log__flow_run_id_timestamp ON log(flow_run_id, timestamp);
+5
src/db/migrations_data.zig
··· 27 27 .sqlite_sql = @embedFile("migrations/003_concurrency_limits/sqlite.sql"), 28 28 .postgres_sql = @embedFile("migrations/003_concurrency_limits/postgres.sql"), 29 29 }, 30 + .{ 31 + .id = "004_log_table", 32 + .sqlite_sql = @embedFile("migrations/004_log_table/sqlite.sql"), 33 + .postgres_sql = @embedFile("migrations/004_log_table/postgres.sql"), 34 + }, 30 35 };
+1
src/db/sqlite.zig
··· 22 22 pub const concurrency_limits = @import("concurrency_limits.zig"); 23 23 pub const flow_run_states = @import("flow_run_states.zig"); 24 24 pub const task_run_states = @import("task_run_states.zig"); 25 + pub const logs = @import("logs.zig"); 25 26 26 27 // re-export types for compatibility 27 28 pub const FlowRow = flows.FlowRow;