prefect server in zig
1const std = @import("std");
2const Allocator = std.mem.Allocator;
3
4const backend = @import("backend.zig");
5const log = @import("../logging.zig");
6
7pub const DeploymentScheduleRow = struct {
8 id: []const u8,
9 created: []const u8,
10 updated: []const u8,
11 deployment_id: []const u8,
12 schedule: []const u8,
13 active: bool,
14 max_scheduled_runs: ?i64,
15 parameters: []const u8,
16 slug: ?[]const u8,
17};
18
19const Col = struct {
20 const id: usize = 0;
21 const created: usize = 1;
22 const updated: usize = 2;
23 const deployment_id: usize = 3;
24 const schedule: usize = 4;
25 const active: usize = 5;
26 const max_scheduled_runs: usize = 6;
27 const parameters: usize = 7;
28 const slug: usize = 8;
29};
30
31const select_cols = "id, created, updated, deployment_id, schedule, active, max_scheduled_runs, parameters, slug";
32
33fn rowFromResult(alloc: Allocator, r: anytype) !DeploymentScheduleRow {
34 const max_runs = r.textOrNull(Col.max_scheduled_runs);
35 return DeploymentScheduleRow{
36 .id = try alloc.dupe(u8, r.text(Col.id)),
37 .created = try alloc.dupe(u8, r.text(Col.created)),
38 .updated = try alloc.dupe(u8, r.text(Col.updated)),
39 .deployment_id = try alloc.dupe(u8, r.text(Col.deployment_id)),
40 .schedule = try alloc.dupe(u8, r.text(Col.schedule)),
41 .active = r.int(Col.active) != 0,
42 .max_scheduled_runs = if (max_runs != null) r.bigint(Col.max_scheduled_runs) else null,
43 .parameters = try alloc.dupe(u8, r.text(Col.parameters)),
44 .slug = if (r.textOrNull(Col.slug)) |s| try alloc.dupe(u8, s) else null,
45 };
46}
47
48pub fn getById(alloc: Allocator, id: []const u8) !?DeploymentScheduleRow {
49 var r = backend.db.row(
50 "SELECT " ++ select_cols ++ " FROM deployment_schedule WHERE id = ?",
51 .{id},
52 ) catch return null;
53
54 if (r) |*row| {
55 defer row.deinit();
56 return try rowFromResult(alloc, row);
57 }
58 return null;
59}
60
61pub fn listByDeployment(alloc: Allocator, deployment_id: []const u8) ![]DeploymentScheduleRow {
62 var results = std.ArrayListUnmanaged(DeploymentScheduleRow){};
63 errdefer results.deinit(alloc);
64
65 var rows = backend.db.query(
66 "SELECT " ++ select_cols ++ " FROM deployment_schedule WHERE deployment_id = ? ORDER BY created ASC",
67 .{deployment_id},
68 ) catch |err| {
69 log.err("database", "list deployment_schedules error: {}", .{err});
70 return err;
71 };
72 defer rows.deinit();
73
74 while (rows.next()) |r| {
75 try results.append(alloc, try rowFromResult(alloc, &r));
76 }
77
78 return results.toOwnedSlice(alloc);
79}
80
81pub const InsertParams = struct {
82 active: bool = true,
83 max_scheduled_runs: ?i64 = null,
84 parameters: []const u8 = "{}",
85 slug: ?[]const u8 = null,
86};
87
88pub fn insert(
89 id: []const u8,
90 deployment_id: []const u8,
91 schedule: []const u8,
92 created: []const u8,
93 params: InsertParams,
94) !void {
95 backend.db.exec(
96 \\INSERT INTO deployment_schedule (id, created, updated, deployment_id, schedule, active, max_scheduled_runs, parameters, slug)
97 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
98 , .{
99 id,
100 created,
101 created,
102 deployment_id,
103 schedule,
104 @as(i32, if (params.active) 1 else 0),
105 params.max_scheduled_runs,
106 params.parameters,
107 params.slug,
108 }) catch |err| {
109 log.err("database", "insert deployment_schedule error: {}", .{err});
110 return err;
111 };
112}
113
114pub const UpdateParams = struct {
115 schedule: ?[]const u8 = null,
116 active: ?bool = null,
117 max_scheduled_runs: ?i64 = null,
118 parameters: ?[]const u8 = null,
119 slug: ?[]const u8 = null,
120};
121
122pub fn updateById(id: []const u8, updated: []const u8, params: UpdateParams) !bool {
123 const affected = backend.db.execWithRowCount(
124 \\UPDATE deployment_schedule SET
125 \\ schedule = COALESCE(?, schedule),
126 \\ active = COALESCE(?, active),
127 \\ max_scheduled_runs = COALESCE(?, max_scheduled_runs),
128 \\ parameters = COALESCE(?, parameters),
129 \\ slug = COALESCE(?, slug),
130 \\ updated = ?
131 \\WHERE id = ?
132 , .{
133 params.schedule,
134 if (params.active) |a| @as(?i32, if (a) 1 else 0) else null,
135 params.max_scheduled_runs,
136 params.parameters,
137 params.slug,
138 updated,
139 id,
140 }) catch |err| {
141 log.err("database", "update deployment_schedule error: {}", .{err});
142 return err;
143 };
144 return affected > 0;
145}
146
147pub fn deleteById(id: []const u8) !bool {
148 const affected = backend.db.execWithRowCount(
149 "DELETE FROM deployment_schedule WHERE id = ?",
150 .{id},
151 ) catch |err| {
152 log.err("database", "delete deployment_schedule error: {}", .{err});
153 return err;
154 };
155 return affected > 0;
156}
157
158pub fn deleteByDeployment(deployment_id: []const u8) !usize {
159 const affected = backend.db.execWithRowCount(
160 "DELETE FROM deployment_schedule WHERE deployment_id = ?",
161 .{deployment_id},
162 ) catch |err| {
163 log.err("database", "delete deployment_schedules error: {}", .{err});
164 return err;
165 };
166 return @intCast(affected);
167}
168
169/// List all active schedules (for scheduler service)
170pub fn listActive(alloc: Allocator, limit: usize) ![]DeploymentScheduleRow {
171 var results = std.ArrayListUnmanaged(DeploymentScheduleRow){};
172 errdefer results.deinit(alloc);
173
174 var rows = backend.db.query(
175 "SELECT " ++ select_cols ++ " FROM deployment_schedule WHERE active = 1 ORDER BY created ASC LIMIT ?",
176 .{@as(i64, @intCast(limit))},
177 ) catch |err| {
178 log.err("database", "list active schedules error: {}", .{err});
179 return err;
180 };
181 defer rows.deinit();
182
183 while (rows.next()) |r| {
184 try results.append(alloc, try rowFromResult(alloc, &r));
185 }
186
187 return results.toOwnedSlice(alloc);
188}