prefect server in zig
1// migrate.zig - database migration runner
2//
3// applies embedded SQL migrations on startup, tracking applied versions
4// in the _migrations table
5
6const std = @import("std");
7const Allocator = std.mem.Allocator;
8const backend = @import("backend.zig");
9const migrations_data = @import("migrations_data.zig");
10const log = @import("../logging.zig");
11
12/// migration tracking table schema (same for sqlite and postgres)
13const migrations_table_sql =
14 \\CREATE TABLE IF NOT EXISTS _migrations (
15 \\ id TEXT PRIMARY KEY,
16 \\ applied_at TEXT NOT NULL
17 \\)
18;
19
20/// apply all pending migrations
21pub fn applyMigrations() !void {
22 const allocator = std.heap.page_allocator;
23
24 // 1. ensure _migrations table exists
25 try ensureMigrationsTable();
26
27 // 2. get list of already-applied migration IDs
28 var applied = std.StringHashMap(void).init(allocator);
29 defer applied.deinit();
30 try getAppliedMigrations(&applied);
31
32 // 3. apply each pending migration in order
33 for (migrations_data.all) |m| {
34 if (!applied.contains(m.id)) {
35 try applyMigration(allocator, m);
36 } else {
37 log.debug("migrations", "skipping already applied: {s}", .{m.id});
38 }
39 }
40}
41
42fn ensureMigrationsTable() !void {
43 backend.db.exec(migrations_table_sql, .{}) catch |err| {
44 log.err("migrations", "failed to create _migrations table: {}", .{err});
45 return err;
46 };
47}
48
49fn getAppliedMigrations(applied: *std.StringHashMap(void)) !void {
50 var rows = backend.db.query("SELECT id FROM _migrations", .{}) catch |err| {
51 // table might not exist yet on very first run, that's ok
52 log.debug("migrations", "could not query _migrations: {}", .{err});
53 return;
54 };
55 defer rows.deinit();
56
57 while (rows.next()) |row| {
58 const id = row.text(0);
59 // copy the string since row data is temporary
60 const id_copy = applied.allocator.dupe(u8, id) catch continue;
61 applied.put(id_copy, {}) catch continue;
62 }
63}
64
65fn applyMigration(allocator: Allocator, m: migrations_data.Migration) !void {
66 const sql = switch (backend.db.dialect) {
67 .sqlite => m.sqlite_sql,
68 .postgres => m.postgres_sql,
69 };
70
71 log.info("migrations", "applying: {s}", .{m.id});
72
73 // split SQL into individual statements and execute each
74 // we can't use transactions for DDL in sqlite (CREATE TABLE auto-commits)
75 // so we execute each statement separately
76 var iter = StatementIterator.init(sql);
77 var stmt_count: usize = 0;
78 while (iter.next()) |stmt| {
79 if (stmt.len == 0) continue;
80 backend.db.exec(stmt, .{}) catch |err| {
81 log.err("migrations", "failed to execute statement {d}: {}", .{ stmt_count, err });
82 log.err("migrations", "statement: {s}", .{stmt[0..@min(stmt.len, 200)]});
83 return err;
84 };
85 stmt_count += 1;
86 }
87
88 // record migration as applied
89 const timestamp = getIsoTimestamp(allocator);
90 defer allocator.free(timestamp);
91
92 backend.db.exec(
93 "INSERT INTO _migrations (id, applied_at) VALUES (?, ?)",
94 .{ m.id, timestamp },
95 ) catch |err| {
96 log.err("migrations", "failed to record migration: {}", .{err});
97 return err;
98 };
99
100 log.info("migrations", "applied {s} ({d} statements)", .{ m.id, stmt_count });
101}
102
103/// iterator that splits SQL into individual statements by semicolons
104/// handles comments and string literals
105const StatementIterator = struct {
106 sql: []const u8,
107 pos: usize,
108
109 pub fn init(sql: []const u8) StatementIterator {
110 return .{ .sql = sql, .pos = 0 };
111 }
112
113 pub fn next(self: *StatementIterator) ?[]const u8 {
114 if (self.pos >= self.sql.len) return null;
115
116 const start = self.pos;
117 var in_string = false;
118 var in_comment = false;
119 var in_line_comment = false;
120
121 while (self.pos < self.sql.len) {
122 const c = self.sql[self.pos];
123
124 // handle line comments (-- ...)
125 if (!in_string and !in_comment and self.pos + 1 < self.sql.len) {
126 if (c == '-' and self.sql[self.pos + 1] == '-') {
127 in_line_comment = true;
128 self.pos += 2;
129 continue;
130 }
131 }
132
133 if (in_line_comment) {
134 if (c == '\n') {
135 in_line_comment = false;
136 }
137 self.pos += 1;
138 continue;
139 }
140
141 // handle block comments (/* ... */)
142 if (!in_string and !in_comment and self.pos + 1 < self.sql.len) {
143 if (c == '/' and self.sql[self.pos + 1] == '*') {
144 in_comment = true;
145 self.pos += 2;
146 continue;
147 }
148 }
149
150 if (in_comment) {
151 if (c == '*' and self.pos + 1 < self.sql.len and self.sql[self.pos + 1] == '/') {
152 in_comment = false;
153 self.pos += 2;
154 continue;
155 }
156 self.pos += 1;
157 continue;
158 }
159
160 // handle string literals
161 if (c == '\'') {
162 if (in_string) {
163 // check for escaped quote ''
164 if (self.pos + 1 < self.sql.len and self.sql[self.pos + 1] == '\'') {
165 self.pos += 2;
166 continue;
167 }
168 in_string = false;
169 } else {
170 in_string = true;
171 }
172 self.pos += 1;
173 continue;
174 }
175
176 // found statement terminator
177 if (c == ';' and !in_string and !in_comment) {
178 const stmt = std.mem.trim(u8, self.sql[start..self.pos], " \t\n\r");
179 self.pos += 1;
180 // skip empty statements
181 if (stmt.len == 0 or isOnlyComments(stmt)) {
182 return self.next();
183 }
184 return stmt;
185 }
186
187 self.pos += 1;
188 }
189
190 // handle trailing statement without semicolon
191 const stmt = std.mem.trim(u8, self.sql[start..self.pos], " \t\n\r");
192 if (stmt.len == 0 or isOnlyComments(stmt)) {
193 return null;
194 }
195 return stmt;
196 }
197
198 fn isOnlyComments(stmt: []const u8) bool {
199 var i: usize = 0;
200 while (i < stmt.len) {
201 const c = stmt[i];
202 if (c == ' ' or c == '\t' or c == '\n' or c == '\r') {
203 i += 1;
204 continue;
205 }
206 if (c == '-' and i + 1 < stmt.len and stmt[i + 1] == '-') {
207 // skip to end of line
208 while (i < stmt.len and stmt[i] != '\n') i += 1;
209 continue;
210 }
211 // found non-whitespace, non-comment
212 return false;
213 }
214 return true;
215 }
216};
217
218fn getIsoTimestamp(allocator: Allocator) []const u8 {
219 const ts = std.time.timestamp();
220 const epoch_seconds = std.time.epoch.EpochSeconds{ .secs = @intCast(ts) };
221 const day_seconds = epoch_seconds.getDaySeconds();
222 const year_day = epoch_seconds.getEpochDay().calculateYearDay();
223 const month_day = year_day.calculateMonthDay();
224
225 return std.fmt.allocPrint(allocator, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{
226 year_day.year,
227 @intFromEnum(month_day.month) + 1,
228 month_day.day_index + 1,
229 day_seconds.getHoursIntoDay(),
230 day_seconds.getMinutesIntoHour(),
231 day_seconds.getSecondsIntoMinute(),
232 }) catch "1970-01-01T00:00:00Z";
233}
234
235// tests
236
237test "statement iterator basic" {
238 const sql = "CREATE TABLE a (id INT); CREATE TABLE b (id INT);";
239 var iter = StatementIterator.init(sql);
240
241 const stmt1 = iter.next().?;
242 try std.testing.expectEqualStrings("CREATE TABLE a (id INT)", stmt1);
243
244 const stmt2 = iter.next().?;
245 try std.testing.expectEqualStrings("CREATE TABLE b (id INT)", stmt2);
246
247 try std.testing.expect(iter.next() == null);
248}
249
250test "statement iterator with comments" {
251 const sql =
252 \\-- this is a comment
253 \\CREATE TABLE a (id INT);
254 \\/* block comment */
255 \\CREATE TABLE b (id INT);
256 ;
257 var iter = StatementIterator.init(sql);
258
259 const stmt1 = iter.next().?;
260 try std.testing.expect(std.mem.indexOf(u8, stmt1, "CREATE TABLE a") != null);
261
262 const stmt2 = iter.next().?;
263 try std.testing.expect(std.mem.indexOf(u8, stmt2, "CREATE TABLE b") != null);
264
265 try std.testing.expect(iter.next() == null);
266}
267
268test "statement iterator with string literals" {
269 const sql = "INSERT INTO t VALUES ('hello; world'); SELECT 1;";
270 var iter = StatementIterator.init(sql);
271
272 const stmt1 = iter.next().?;
273 try std.testing.expect(std.mem.indexOf(u8, stmt1, "'hello; world'") != null);
274
275 const stmt2 = iter.next().?;
276 try std.testing.expectEqualStrings("SELECT 1", stmt2);
277
278 try std.testing.expect(iter.next() == null);
279}