prefect server in zig
at main 192 lines 5.9 kB view raw
1const std = @import("std"); 2const Allocator = std.mem.Allocator; 3 4const backend = @import("backend.zig"); 5const log = @import("../logging.zig"); 6 7pub const LogRow = struct { 8 id: []const u8, 9 created: []const u8, 10 updated: []const u8, 11 name: []const u8, 12 level: i64, 13 message: []const u8, 14 timestamp: []const u8, 15 flow_run_id: ?[]const u8, 16 task_run_id: ?[]const u8, 17}; 18 19const Col = struct { 20 const id: usize = 0; 21 const created: usize = 1; 22 const updated: usize = 2; 23 const name: usize = 3; 24 const level: usize = 4; 25 const message: usize = 5; 26 const timestamp: usize = 6; 27 const flow_run_id: usize = 7; 28 const task_run_id: usize = 8; 29}; 30 31const select_cols = "id, created, updated, name, level, message, timestamp, flow_run_id, task_run_id"; 32 33fn rowFromResult(alloc: Allocator, r: anytype) !LogRow { 34 const flow_run_id_text = r.text(Col.flow_run_id); 35 const task_run_id_text = r.text(Col.task_run_id); 36 37 return LogRow{ 38 .id = try alloc.dupe(u8, r.text(Col.id)), 39 .created = try alloc.dupe(u8, r.text(Col.created)), 40 .updated = try alloc.dupe(u8, r.text(Col.updated)), 41 .name = try alloc.dupe(u8, r.text(Col.name)), 42 .level = r.bigint(Col.level), 43 .message = try alloc.dupe(u8, r.text(Col.message)), 44 .timestamp = try alloc.dupe(u8, r.text(Col.timestamp)), 45 .flow_run_id = if (flow_run_id_text.len > 0) try alloc.dupe(u8, flow_run_id_text) else null, 46 .task_run_id = if (task_run_id_text.len > 0) try alloc.dupe(u8, task_run_id_text) else null, 47 }; 48} 49 50pub fn insert(id: []const u8, name: []const u8, level: i64, message: []const u8, timestamp: []const u8, flow_run_id: ?[]const u8, task_run_id: ?[]const u8) !void { 51 backend.db.exec( 52 "INSERT INTO log (id, name, level, message, timestamp, flow_run_id, task_run_id) VALUES (?, ?, ?, ?, ?, ?, ?)", 53 .{ id, name, level, message, timestamp, flow_run_id, task_run_id }, 54 ) catch |err| { 55 log.err("database", "insert log error: {}", .{err}); 56 return err; 57 }; 58} 59 60pub fn getById(alloc: Allocator, id: []const u8) !?LogRow { 61 var r = backend.db.row( 62 "SELECT " ++ select_cols ++ " FROM log WHERE id = ?", 63 .{id}, 64 ) catch return null; 65 66 if (r) |*row| { 67 defer row.deinit(); 68 return try rowFromResult(alloc, row); 69 } 70 return null; 71} 72 73pub const FilterOptions = struct { 74 flow_run_id: ?[]const u8 = null, 75 task_run_id: ?[]const u8 = null, 76 level_ge: ?i64 = null, 77 level_le: ?i64 = null, 78 timestamp_after: ?[]const u8 = null, 79 timestamp_before: ?[]const u8 = null, 80 limit: usize = 200, 81 offset: usize = 0, 82 sort_asc: bool = true, 83}; 84 85/// Escape a string for SQL (double single quotes) 86fn escapeString(alloc: Allocator, s: []const u8) ![]const u8 { 87 var result = std.ArrayListUnmanaged(u8){}; 88 for (s) |c| { 89 if (c == '\'') { 90 try result.appendSlice(alloc, "''"); 91 } else { 92 try result.append(alloc, c); 93 } 94 } 95 return result.toOwnedSlice(alloc); 96} 97 98pub fn filter(alloc: Allocator, opts: FilterOptions) ![]LogRow { 99 var results = std.ArrayListUnmanaged(LogRow){}; 100 errdefer results.deinit(alloc); 101 102 // build dynamic query with embedded values 103 var query_buf = std.ArrayListUnmanaged(u8){}; 104 defer query_buf.deinit(alloc); 105 const writer = query_buf.writer(alloc); 106 107 try writer.writeAll("SELECT " ++ select_cols ++ " FROM log WHERE 1=1"); 108 109 if (opts.flow_run_id) |fid| { 110 const escaped = try escapeString(alloc, fid); 111 defer alloc.free(escaped); 112 try writer.print(" AND flow_run_id = '{s}'", .{escaped}); 113 } 114 115 if (opts.task_run_id) |tid| { 116 const escaped = try escapeString(alloc, tid); 117 defer alloc.free(escaped); 118 try writer.print(" AND task_run_id = '{s}'", .{escaped}); 119 } 120 121 if (opts.level_ge) |lvl| { 122 try writer.print(" AND level >= {d}", .{lvl}); 123 } 124 125 if (opts.level_le) |lvl| { 126 try writer.print(" AND level <= {d}", .{lvl}); 127 } 128 129 if (opts.timestamp_after) |ts| { 130 const escaped = try escapeString(alloc, ts); 131 defer alloc.free(escaped); 132 try writer.print(" AND timestamp >= '{s}'", .{escaped}); 133 } 134 135 if (opts.timestamp_before) |ts| { 136 const escaped = try escapeString(alloc, ts); 137 defer alloc.free(escaped); 138 try writer.print(" AND timestamp <= '{s}'", .{escaped}); 139 } 140 141 if (opts.sort_asc) { 142 try writer.writeAll(" ORDER BY timestamp ASC"); 143 } else { 144 try writer.writeAll(" ORDER BY timestamp DESC"); 145 } 146 147 try writer.print(" LIMIT {d} OFFSET {d}", .{ opts.limit, opts.offset }); 148 149 const query = query_buf.items; 150 151 var rows = backend.db.query(query, .{}) catch |err| { 152 log.err("database", "filter logs error: {}", .{err}); 153 return err; 154 }; 155 defer rows.deinit(); 156 157 while (rows.next()) |r| { 158 try results.append(alloc, try rowFromResult(alloc, &r)); 159 } 160 161 return results.toOwnedSlice(alloc); 162} 163 164pub fn listByFlowRunId(alloc: Allocator, flow_run_id: []const u8, limit: usize, offset: usize) ![]LogRow { 165 return filter(alloc, .{ 166 .flow_run_id = flow_run_id, 167 .limit = limit, 168 .offset = offset, 169 }); 170} 171 172pub fn deleteByFlowRunId(flow_run_id: []const u8) !usize { 173 const affected = backend.db.execWithRowCount( 174 "DELETE FROM log WHERE flow_run_id = ?", 175 .{flow_run_id}, 176 ) catch |err| { 177 log.err("database", "delete logs by flow_run_id error: {}", .{err}); 178 return err; 179 }; 180 return affected; 181} 182 183pub fn deleteByTaskRunId(task_run_id: []const u8) !usize { 184 const affected = backend.db.execWithRowCount( 185 "DELETE FROM log WHERE task_run_id = ?", 186 .{task_run_id}, 187 ) catch |err| { 188 log.err("database", "delete logs by task_run_id error: {}", .{err}); 189 return err; 190 }; 191 return affected; 192}