prefect server in zig
1const std = @import("std");
2const Allocator = std.mem.Allocator;
3
4const backend = @import("backend.zig");
5const log = @import("../logging.zig");
6
7pub const LogRow = struct {
8 id: []const u8,
9 created: []const u8,
10 updated: []const u8,
11 name: []const u8,
12 level: i64,
13 message: []const u8,
14 timestamp: []const u8,
15 flow_run_id: ?[]const u8,
16 task_run_id: ?[]const u8,
17};
18
19const Col = struct {
20 const id: usize = 0;
21 const created: usize = 1;
22 const updated: usize = 2;
23 const name: usize = 3;
24 const level: usize = 4;
25 const message: usize = 5;
26 const timestamp: usize = 6;
27 const flow_run_id: usize = 7;
28 const task_run_id: usize = 8;
29};
30
31const select_cols = "id, created, updated, name, level, message, timestamp, flow_run_id, task_run_id";
32
33fn rowFromResult(alloc: Allocator, r: anytype) !LogRow {
34 const flow_run_id_text = r.text(Col.flow_run_id);
35 const task_run_id_text = r.text(Col.task_run_id);
36
37 return LogRow{
38 .id = try alloc.dupe(u8, r.text(Col.id)),
39 .created = try alloc.dupe(u8, r.text(Col.created)),
40 .updated = try alloc.dupe(u8, r.text(Col.updated)),
41 .name = try alloc.dupe(u8, r.text(Col.name)),
42 .level = r.bigint(Col.level),
43 .message = try alloc.dupe(u8, r.text(Col.message)),
44 .timestamp = try alloc.dupe(u8, r.text(Col.timestamp)),
45 .flow_run_id = if (flow_run_id_text.len > 0) try alloc.dupe(u8, flow_run_id_text) else null,
46 .task_run_id = if (task_run_id_text.len > 0) try alloc.dupe(u8, task_run_id_text) else null,
47 };
48}
49
50pub fn insert(id: []const u8, name: []const u8, level: i64, message: []const u8, timestamp: []const u8, flow_run_id: ?[]const u8, task_run_id: ?[]const u8) !void {
51 backend.db.exec(
52 "INSERT INTO log (id, name, level, message, timestamp, flow_run_id, task_run_id) VALUES (?, ?, ?, ?, ?, ?, ?)",
53 .{ id, name, level, message, timestamp, flow_run_id, task_run_id },
54 ) catch |err| {
55 log.err("database", "insert log error: {}", .{err});
56 return err;
57 };
58}
59
60pub fn getById(alloc: Allocator, id: []const u8) !?LogRow {
61 var r = backend.db.row(
62 "SELECT " ++ select_cols ++ " FROM log WHERE id = ?",
63 .{id},
64 ) catch return null;
65
66 if (r) |*row| {
67 defer row.deinit();
68 return try rowFromResult(alloc, row);
69 }
70 return null;
71}
72
73pub const FilterOptions = struct {
74 flow_run_id: ?[]const u8 = null,
75 task_run_id: ?[]const u8 = null,
76 level_ge: ?i64 = null,
77 level_le: ?i64 = null,
78 timestamp_after: ?[]const u8 = null,
79 timestamp_before: ?[]const u8 = null,
80 limit: usize = 200,
81 offset: usize = 0,
82 sort_asc: bool = true,
83};
84
85/// Escape a string for SQL (double single quotes)
86fn escapeString(alloc: Allocator, s: []const u8) ![]const u8 {
87 var result = std.ArrayListUnmanaged(u8){};
88 for (s) |c| {
89 if (c == '\'') {
90 try result.appendSlice(alloc, "''");
91 } else {
92 try result.append(alloc, c);
93 }
94 }
95 return result.toOwnedSlice(alloc);
96}
97
98pub fn filter(alloc: Allocator, opts: FilterOptions) ![]LogRow {
99 var results = std.ArrayListUnmanaged(LogRow){};
100 errdefer results.deinit(alloc);
101
102 // build dynamic query with embedded values
103 var query_buf = std.ArrayListUnmanaged(u8){};
104 defer query_buf.deinit(alloc);
105 const writer = query_buf.writer(alloc);
106
107 try writer.writeAll("SELECT " ++ select_cols ++ " FROM log WHERE 1=1");
108
109 if (opts.flow_run_id) |fid| {
110 const escaped = try escapeString(alloc, fid);
111 defer alloc.free(escaped);
112 try writer.print(" AND flow_run_id = '{s}'", .{escaped});
113 }
114
115 if (opts.task_run_id) |tid| {
116 const escaped = try escapeString(alloc, tid);
117 defer alloc.free(escaped);
118 try writer.print(" AND task_run_id = '{s}'", .{escaped});
119 }
120
121 if (opts.level_ge) |lvl| {
122 try writer.print(" AND level >= {d}", .{lvl});
123 }
124
125 if (opts.level_le) |lvl| {
126 try writer.print(" AND level <= {d}", .{lvl});
127 }
128
129 if (opts.timestamp_after) |ts| {
130 const escaped = try escapeString(alloc, ts);
131 defer alloc.free(escaped);
132 try writer.print(" AND timestamp >= '{s}'", .{escaped});
133 }
134
135 if (opts.timestamp_before) |ts| {
136 const escaped = try escapeString(alloc, ts);
137 defer alloc.free(escaped);
138 try writer.print(" AND timestamp <= '{s}'", .{escaped});
139 }
140
141 if (opts.sort_asc) {
142 try writer.writeAll(" ORDER BY timestamp ASC");
143 } else {
144 try writer.writeAll(" ORDER BY timestamp DESC");
145 }
146
147 try writer.print(" LIMIT {d} OFFSET {d}", .{ opts.limit, opts.offset });
148
149 const query = query_buf.items;
150
151 var rows = backend.db.query(query, .{}) catch |err| {
152 log.err("database", "filter logs error: {}", .{err});
153 return err;
154 };
155 defer rows.deinit();
156
157 while (rows.next()) |r| {
158 try results.append(alloc, try rowFromResult(alloc, &r));
159 }
160
161 return results.toOwnedSlice(alloc);
162}
163
164pub fn listByFlowRunId(alloc: Allocator, flow_run_id: []const u8, limit: usize, offset: usize) ![]LogRow {
165 return filter(alloc, .{
166 .flow_run_id = flow_run_id,
167 .limit = limit,
168 .offset = offset,
169 });
170}
171
172pub fn deleteByFlowRunId(flow_run_id: []const u8) !usize {
173 const affected = backend.db.execWithRowCount(
174 "DELETE FROM log WHERE flow_run_id = ?",
175 .{flow_run_id},
176 ) catch |err| {
177 log.err("database", "delete logs by flow_run_id error: {}", .{err});
178 return err;
179 };
180 return affected;
181}
182
183pub fn deleteByTaskRunId(task_run_id: []const u8) !usize {
184 const affected = backend.db.execWithRowCount(
185 "DELETE FROM log WHERE task_run_id = ?",
186 .{task_run_id},
187 ) catch |err| {
188 log.err("database", "delete logs by task_run_id error: {}", .{err});
189 return err;
190 };
191 return affected;
192}