prefect server in zig
at main 279 lines 9.1 kB view raw
1// migrate.zig - database migration runner 2// 3// applies embedded SQL migrations on startup, tracking applied versions 4// in the _migrations table 5 6const std = @import("std"); 7const Allocator = std.mem.Allocator; 8const backend = @import("backend.zig"); 9const migrations_data = @import("migrations_data.zig"); 10const log = @import("../logging.zig"); 11 12/// migration tracking table schema (same for sqlite and postgres) 13const migrations_table_sql = 14 \\CREATE TABLE IF NOT EXISTS _migrations ( 15 \\ id TEXT PRIMARY KEY, 16 \\ applied_at TEXT NOT NULL 17 \\) 18; 19 20/// apply all pending migrations 21pub fn applyMigrations() !void { 22 const allocator = std.heap.page_allocator; 23 24 // 1. ensure _migrations table exists 25 try ensureMigrationsTable(); 26 27 // 2. get list of already-applied migration IDs 28 var applied = std.StringHashMap(void).init(allocator); 29 defer applied.deinit(); 30 try getAppliedMigrations(&applied); 31 32 // 3. apply each pending migration in order 33 for (migrations_data.all) |m| { 34 if (!applied.contains(m.id)) { 35 try applyMigration(allocator, m); 36 } else { 37 log.debug("migrations", "skipping already applied: {s}", .{m.id}); 38 } 39 } 40} 41 42fn ensureMigrationsTable() !void { 43 backend.db.exec(migrations_table_sql, .{}) catch |err| { 44 log.err("migrations", "failed to create _migrations table: {}", .{err}); 45 return err; 46 }; 47} 48 49fn getAppliedMigrations(applied: *std.StringHashMap(void)) !void { 50 var rows = backend.db.query("SELECT id FROM _migrations", .{}) catch |err| { 51 // table might not exist yet on very first run, that's ok 52 log.debug("migrations", "could not query _migrations: {}", .{err}); 53 return; 54 }; 55 defer rows.deinit(); 56 57 while (rows.next()) |row| { 58 const id = row.text(0); 59 // copy the string since row data is temporary 60 const id_copy = applied.allocator.dupe(u8, id) catch continue; 61 applied.put(id_copy, {}) catch continue; 62 } 63} 64 65fn applyMigration(allocator: Allocator, m: migrations_data.Migration) !void { 66 const sql = switch (backend.db.dialect) { 67 .sqlite => m.sqlite_sql, 68 .postgres => m.postgres_sql, 69 }; 70 71 log.info("migrations", "applying: {s}", .{m.id}); 72 73 // split SQL into individual statements and execute each 74 // we can't use transactions for DDL in sqlite (CREATE TABLE auto-commits) 75 // so we execute each statement separately 76 var iter = StatementIterator.init(sql); 77 var stmt_count: usize = 0; 78 while (iter.next()) |stmt| { 79 if (stmt.len == 0) continue; 80 backend.db.exec(stmt, .{}) catch |err| { 81 log.err("migrations", "failed to execute statement {d}: {}", .{ stmt_count, err }); 82 log.err("migrations", "statement: {s}", .{stmt[0..@min(stmt.len, 200)]}); 83 return err; 84 }; 85 stmt_count += 1; 86 } 87 88 // record migration as applied 89 const timestamp = getIsoTimestamp(allocator); 90 defer allocator.free(timestamp); 91 92 backend.db.exec( 93 "INSERT INTO _migrations (id, applied_at) VALUES (?, ?)", 94 .{ m.id, timestamp }, 95 ) catch |err| { 96 log.err("migrations", "failed to record migration: {}", .{err}); 97 return err; 98 }; 99 100 log.info("migrations", "applied {s} ({d} statements)", .{ m.id, stmt_count }); 101} 102 103/// iterator that splits SQL into individual statements by semicolons 104/// handles comments and string literals 105const StatementIterator = struct { 106 sql: []const u8, 107 pos: usize, 108 109 pub fn init(sql: []const u8) StatementIterator { 110 return .{ .sql = sql, .pos = 0 }; 111 } 112 113 pub fn next(self: *StatementIterator) ?[]const u8 { 114 if (self.pos >= self.sql.len) return null; 115 116 const start = self.pos; 117 var in_string = false; 118 var in_comment = false; 119 var in_line_comment = false; 120 121 while (self.pos < self.sql.len) { 122 const c = self.sql[self.pos]; 123 124 // handle line comments (-- ...) 125 if (!in_string and !in_comment and self.pos + 1 < self.sql.len) { 126 if (c == '-' and self.sql[self.pos + 1] == '-') { 127 in_line_comment = true; 128 self.pos += 2; 129 continue; 130 } 131 } 132 133 if (in_line_comment) { 134 if (c == '\n') { 135 in_line_comment = false; 136 } 137 self.pos += 1; 138 continue; 139 } 140 141 // handle block comments (/* ... */) 142 if (!in_string and !in_comment and self.pos + 1 < self.sql.len) { 143 if (c == '/' and self.sql[self.pos + 1] == '*') { 144 in_comment = true; 145 self.pos += 2; 146 continue; 147 } 148 } 149 150 if (in_comment) { 151 if (c == '*' and self.pos + 1 < self.sql.len and self.sql[self.pos + 1] == '/') { 152 in_comment = false; 153 self.pos += 2; 154 continue; 155 } 156 self.pos += 1; 157 continue; 158 } 159 160 // handle string literals 161 if (c == '\'') { 162 if (in_string) { 163 // check for escaped quote '' 164 if (self.pos + 1 < self.sql.len and self.sql[self.pos + 1] == '\'') { 165 self.pos += 2; 166 continue; 167 } 168 in_string = false; 169 } else { 170 in_string = true; 171 } 172 self.pos += 1; 173 continue; 174 } 175 176 // found statement terminator 177 if (c == ';' and !in_string and !in_comment) { 178 const stmt = std.mem.trim(u8, self.sql[start..self.pos], " \t\n\r"); 179 self.pos += 1; 180 // skip empty statements 181 if (stmt.len == 0 or isOnlyComments(stmt)) { 182 return self.next(); 183 } 184 return stmt; 185 } 186 187 self.pos += 1; 188 } 189 190 // handle trailing statement without semicolon 191 const stmt = std.mem.trim(u8, self.sql[start..self.pos], " \t\n\r"); 192 if (stmt.len == 0 or isOnlyComments(stmt)) { 193 return null; 194 } 195 return stmt; 196 } 197 198 fn isOnlyComments(stmt: []const u8) bool { 199 var i: usize = 0; 200 while (i < stmt.len) { 201 const c = stmt[i]; 202 if (c == ' ' or c == '\t' or c == '\n' or c == '\r') { 203 i += 1; 204 continue; 205 } 206 if (c == '-' and i + 1 < stmt.len and stmt[i + 1] == '-') { 207 // skip to end of line 208 while (i < stmt.len and stmt[i] != '\n') i += 1; 209 continue; 210 } 211 // found non-whitespace, non-comment 212 return false; 213 } 214 return true; 215 } 216}; 217 218fn getIsoTimestamp(allocator: Allocator) []const u8 { 219 const ts = std.time.timestamp(); 220 const epoch_seconds = std.time.epoch.EpochSeconds{ .secs = @intCast(ts) }; 221 const day_seconds = epoch_seconds.getDaySeconds(); 222 const year_day = epoch_seconds.getEpochDay().calculateYearDay(); 223 const month_day = year_day.calculateMonthDay(); 224 225 return std.fmt.allocPrint(allocator, "{d:0>4}-{d:0>2}-{d:0>2}T{d:0>2}:{d:0>2}:{d:0>2}Z", .{ 226 year_day.year, 227 @intFromEnum(month_day.month) + 1, 228 month_day.day_index + 1, 229 day_seconds.getHoursIntoDay(), 230 day_seconds.getMinutesIntoHour(), 231 day_seconds.getSecondsIntoMinute(), 232 }) catch "1970-01-01T00:00:00Z"; 233} 234 235// tests 236 237test "statement iterator basic" { 238 const sql = "CREATE TABLE a (id INT); CREATE TABLE b (id INT);"; 239 var iter = StatementIterator.init(sql); 240 241 const stmt1 = iter.next().?; 242 try std.testing.expectEqualStrings("CREATE TABLE a (id INT)", stmt1); 243 244 const stmt2 = iter.next().?; 245 try std.testing.expectEqualStrings("CREATE TABLE b (id INT)", stmt2); 246 247 try std.testing.expect(iter.next() == null); 248} 249 250test "statement iterator with comments" { 251 const sql = 252 \\-- this is a comment 253 \\CREATE TABLE a (id INT); 254 \\/* block comment */ 255 \\CREATE TABLE b (id INT); 256 ; 257 var iter = StatementIterator.init(sql); 258 259 const stmt1 = iter.next().?; 260 try std.testing.expect(std.mem.indexOf(u8, stmt1, "CREATE TABLE a") != null); 261 262 const stmt2 = iter.next().?; 263 try std.testing.expect(std.mem.indexOf(u8, stmt2, "CREATE TABLE b") != null); 264 265 try std.testing.expect(iter.next() == null); 266} 267 268test "statement iterator with string literals" { 269 const sql = "INSERT INTO t VALUES ('hello; world'); SELECT 1;"; 270 var iter = StatementIterator.init(sql); 271 272 const stmt1 = iter.next().?; 273 try std.testing.expect(std.mem.indexOf(u8, stmt1, "'hello; world'") != null); 274 275 const stmt2 = iter.next().?; 276 try std.testing.expectEqualStrings("SELECT 1", stmt2); 277 278 try std.testing.expect(iter.next() == null); 279}