prefect server in zig
at main 188 lines 5.9 kB view raw
1const std = @import("std"); 2const Allocator = std.mem.Allocator; 3 4const backend = @import("backend.zig"); 5const log = @import("../logging.zig"); 6 7pub const DeploymentScheduleRow = struct { 8 id: []const u8, 9 created: []const u8, 10 updated: []const u8, 11 deployment_id: []const u8, 12 schedule: []const u8, 13 active: bool, 14 max_scheduled_runs: ?i64, 15 parameters: []const u8, 16 slug: ?[]const u8, 17}; 18 19const Col = struct { 20 const id: usize = 0; 21 const created: usize = 1; 22 const updated: usize = 2; 23 const deployment_id: usize = 3; 24 const schedule: usize = 4; 25 const active: usize = 5; 26 const max_scheduled_runs: usize = 6; 27 const parameters: usize = 7; 28 const slug: usize = 8; 29}; 30 31const select_cols = "id, created, updated, deployment_id, schedule, active, max_scheduled_runs, parameters, slug"; 32 33fn rowFromResult(alloc: Allocator, r: anytype) !DeploymentScheduleRow { 34 const max_runs = r.textOrNull(Col.max_scheduled_runs); 35 return DeploymentScheduleRow{ 36 .id = try alloc.dupe(u8, r.text(Col.id)), 37 .created = try alloc.dupe(u8, r.text(Col.created)), 38 .updated = try alloc.dupe(u8, r.text(Col.updated)), 39 .deployment_id = try alloc.dupe(u8, r.text(Col.deployment_id)), 40 .schedule = try alloc.dupe(u8, r.text(Col.schedule)), 41 .active = r.int(Col.active) != 0, 42 .max_scheduled_runs = if (max_runs != null) r.bigint(Col.max_scheduled_runs) else null, 43 .parameters = try alloc.dupe(u8, r.text(Col.parameters)), 44 .slug = if (r.textOrNull(Col.slug)) |s| try alloc.dupe(u8, s) else null, 45 }; 46} 47 48pub fn getById(alloc: Allocator, id: []const u8) !?DeploymentScheduleRow { 49 var r = backend.db.row( 50 "SELECT " ++ select_cols ++ " FROM deployment_schedule WHERE id = ?", 51 .{id}, 52 ) catch return null; 53 54 if (r) |*row| { 55 defer row.deinit(); 56 return try rowFromResult(alloc, row); 57 } 58 return null; 59} 60 61pub fn listByDeployment(alloc: Allocator, deployment_id: []const u8) ![]DeploymentScheduleRow { 62 var results = std.ArrayListUnmanaged(DeploymentScheduleRow){}; 63 errdefer results.deinit(alloc); 64 65 var rows = backend.db.query( 66 "SELECT " ++ select_cols ++ " FROM deployment_schedule WHERE deployment_id = ? ORDER BY created ASC", 67 .{deployment_id}, 68 ) catch |err| { 69 log.err("database", "list deployment_schedules error: {}", .{err}); 70 return err; 71 }; 72 defer rows.deinit(); 73 74 while (rows.next()) |r| { 75 try results.append(alloc, try rowFromResult(alloc, &r)); 76 } 77 78 return results.toOwnedSlice(alloc); 79} 80 81pub const InsertParams = struct { 82 active: bool = true, 83 max_scheduled_runs: ?i64 = null, 84 parameters: []const u8 = "{}", 85 slug: ?[]const u8 = null, 86}; 87 88pub fn insert( 89 id: []const u8, 90 deployment_id: []const u8, 91 schedule: []const u8, 92 created: []const u8, 93 params: InsertParams, 94) !void { 95 backend.db.exec( 96 \\INSERT INTO deployment_schedule (id, created, updated, deployment_id, schedule, active, max_scheduled_runs, parameters, slug) 97 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 98 , .{ 99 id, 100 created, 101 created, 102 deployment_id, 103 schedule, 104 @as(i32, if (params.active) 1 else 0), 105 params.max_scheduled_runs, 106 params.parameters, 107 params.slug, 108 }) catch |err| { 109 log.err("database", "insert deployment_schedule error: {}", .{err}); 110 return err; 111 }; 112} 113 114pub const UpdateParams = struct { 115 schedule: ?[]const u8 = null, 116 active: ?bool = null, 117 max_scheduled_runs: ?i64 = null, 118 parameters: ?[]const u8 = null, 119 slug: ?[]const u8 = null, 120}; 121 122pub fn updateById(id: []const u8, updated: []const u8, params: UpdateParams) !bool { 123 const affected = backend.db.execWithRowCount( 124 \\UPDATE deployment_schedule SET 125 \\ schedule = COALESCE(?, schedule), 126 \\ active = COALESCE(?, active), 127 \\ max_scheduled_runs = COALESCE(?, max_scheduled_runs), 128 \\ parameters = COALESCE(?, parameters), 129 \\ slug = COALESCE(?, slug), 130 \\ updated = ? 131 \\WHERE id = ? 132 , .{ 133 params.schedule, 134 if (params.active) |a| @as(?i32, if (a) 1 else 0) else null, 135 params.max_scheduled_runs, 136 params.parameters, 137 params.slug, 138 updated, 139 id, 140 }) catch |err| { 141 log.err("database", "update deployment_schedule error: {}", .{err}); 142 return err; 143 }; 144 return affected > 0; 145} 146 147pub fn deleteById(id: []const u8) !bool { 148 const affected = backend.db.execWithRowCount( 149 "DELETE FROM deployment_schedule WHERE id = ?", 150 .{id}, 151 ) catch |err| { 152 log.err("database", "delete deployment_schedule error: {}", .{err}); 153 return err; 154 }; 155 return affected > 0; 156} 157 158pub fn deleteByDeployment(deployment_id: []const u8) !usize { 159 const affected = backend.db.execWithRowCount( 160 "DELETE FROM deployment_schedule WHERE deployment_id = ?", 161 .{deployment_id}, 162 ) catch |err| { 163 log.err("database", "delete deployment_schedules error: {}", .{err}); 164 return err; 165 }; 166 return @intCast(affected); 167} 168 169/// List all active schedules (for scheduler service) 170pub fn listActive(alloc: Allocator, limit: usize) ![]DeploymentScheduleRow { 171 var results = std.ArrayListUnmanaged(DeploymentScheduleRow){}; 172 errdefer results.deinit(alloc); 173 174 var rows = backend.db.query( 175 "SELECT " ++ select_cols ++ " FROM deployment_schedule WHERE active = 1 ORDER BY created ASC LIMIT ?", 176 .{@as(i64, @intCast(limit))}, 177 ) catch |err| { 178 log.err("database", "list active schedules error: {}", .{err}); 179 return err; 180 }; 181 defer rows.deinit(); 182 183 while (rows.next()) |r| { 184 try results.append(alloc, try rowFromResult(alloc, &r)); 185 } 186 187 return results.toOwnedSlice(alloc); 188}