prefect server in zig
1const std = @import("std");
2const Allocator = std.mem.Allocator;
3
4const backend = @import("backend.zig");
5const log = @import("../logging.zig");
6
7pub const TaskRunRow = struct {
8 id: []const u8,
9 created: []const u8,
10 updated: []const u8,
11 flow_run_id: []const u8,
12 name: []const u8,
13 task_key: []const u8,
14 dynamic_key: []const u8,
15 state_type: []const u8,
16 state_name: []const u8,
17 state_timestamp: []const u8,
18 tags: []const u8,
19 run_count: i64,
20};
21
22pub fn insert(
23 id: []const u8,
24 flow_run_id: ?[]const u8,
25 name: []const u8,
26 task_key: []const u8,
27 dynamic_key: []const u8,
28 state_type: []const u8,
29 state_name: []const u8,
30 timestamp: []const u8,
31) !void {
32 backend.db.exec(
33 \\INSERT INTO task_run (id, flow_run_id, name, task_key, dynamic_key, state_type, state_name, state_timestamp)
34 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?)
35 , .{
36 id, flow_run_id, name, task_key, dynamic_key, state_type, state_name, timestamp,
37 }) catch |err| {
38 log.err("database", "insert task_run error: {}", .{err});
39 return err;
40 };
41}
42
43pub fn get(alloc: Allocator, id: []const u8) !?TaskRunRow {
44 var rows = backend.db.query(
45 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key,
46 \\ state_type, state_name, state_timestamp, tags, run_count
47 \\FROM task_run WHERE id = ?
48 , .{id}) catch return null;
49 defer rows.deinit();
50
51 if (rows.next()) |row| {
52 return rowToTaskRun(alloc, row);
53 }
54 return null;
55}
56
57pub fn getByKey(
58 alloc: Allocator,
59 flow_run_id: ?[]const u8,
60 task_key: []const u8,
61 dynamic_key: []const u8,
62) !?TaskRunRow {
63 if (flow_run_id) |frid| {
64 var rows = backend.db.query(
65 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key,
66 \\ state_type, state_name, state_timestamp, tags, run_count
67 \\FROM task_run WHERE flow_run_id = ? AND task_key = ? AND dynamic_key = ?
68 , .{ frid, task_key, dynamic_key }) catch return null;
69 defer rows.deinit();
70
71 if (rows.next()) |row| {
72 return rowToTaskRun(alloc, row);
73 }
74 } else {
75 var rows = backend.db.query(
76 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key,
77 \\ state_type, state_name, state_timestamp, tags, run_count
78 \\FROM task_run WHERE flow_run_id IS NULL AND task_key = ? AND dynamic_key = ?
79 , .{ task_key, dynamic_key }) catch return null;
80 defer rows.deinit();
81
82 if (rows.next()) |row| {
83 return rowToTaskRun(alloc, row);
84 }
85 }
86 return null;
87}
88
89pub fn setState(
90 run_id: []const u8,
91 state_id: []const u8,
92 state_type: []const u8,
93 state_name: []const u8,
94 timestamp: []const u8,
95) !void {
96 // Lock mutex only for SQLite (postgres pool handles concurrency)
97 if (backend.db.dialect == .sqlite) backend.db.mutex.lock();
98 defer if (backend.db.dialect == .sqlite) backend.db.mutex.unlock();
99
100 var txn = backend.db.beginTransaction() catch |err| {
101 log.err("database", "begin transaction error: {}", .{err});
102 return err;
103 };
104 errdefer txn.rollback();
105
106 txn.exec(
107 \\UPDATE task_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?
108 \\WHERE id = ?
109 , .{ state_id, state_type, state_name, timestamp, timestamp, run_id }) catch |err| {
110 log.err("database", "update task_run error: {}", .{err});
111 return err;
112 };
113
114 // Insert state history record
115 txn.exec(
116 \\INSERT INTO task_run_state (id, task_run_id, type, name, timestamp)
117 \\VALUES (?, ?, ?, ?, ?)
118 , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| {
119 log.err("database", "insert task_run_state error: {}", .{err});
120 return err;
121 };
122
123 txn.commit() catch |err| {
124 log.err("database", "commit error: {}", .{err});
125 return err;
126 };
127}
128
129pub fn list(alloc: Allocator, limit: usize) ![]TaskRunRow {
130 var results = std.ArrayListUnmanaged(TaskRunRow){};
131 errdefer results.deinit(alloc);
132
133 var rows = backend.db.query(
134 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key,
135 \\ state_type, state_name, state_timestamp, tags, run_count
136 \\FROM task_run ORDER BY created DESC LIMIT ?
137 , .{@as(i64, @intCast(limit))}) catch |err| {
138 log.err("database", "list task_runs error: {}", .{err});
139 return err;
140 };
141 defer rows.deinit();
142
143 while (rows.next()) |row| {
144 try results.append(alloc, rowToTaskRun(alloc, row));
145 }
146
147 return results.toOwnedSlice(alloc);
148}
149
150fn rowToTaskRun(alloc: Allocator, row: anytype) TaskRunRow {
151 return .{
152 .id = alloc.dupe(u8, row.text(0)) catch "",
153 .created = alloc.dupe(u8, row.text(1)) catch "",
154 .updated = alloc.dupe(u8, row.text(2)) catch "",
155 .flow_run_id = alloc.dupe(u8, row.text(3)) catch "",
156 .name = alloc.dupe(u8, row.text(4)) catch "",
157 .task_key = alloc.dupe(u8, row.text(5)) catch "",
158 .dynamic_key = alloc.dupe(u8, row.text(6)) catch "",
159 .state_type = alloc.dupe(u8, row.text(7)) catch "",
160 .state_name = alloc.dupe(u8, row.text(8)) catch "",
161 .state_timestamp = alloc.dupe(u8, row.text(9)) catch "",
162 .tags = alloc.dupe(u8, row.text(10)) catch "[]",
163 .run_count = row.bigint(11),
164 };
165}