prefect server in zig
at main 559 lines 20 kB view raw
1const std = @import("std"); 2const Allocator = std.mem.Allocator; 3const Thread = std.Thread; 4 5const dialect_mod = @import("dialect.zig"); 6pub const Dialect = dialect_mod.Dialect; 7 8const log = @import("../logging.zig"); 9const logfire = @import("logfire"); 10 11// backend libraries 12const zqlite = @import("zqlite"); 13const pg = @import("pg"); 14 15/// Unified row interface that abstracts over backend-specific row types 16pub const Row = union(enum) { 17 sqlite: zqlite.Row, 18 postgres: PostgresRow, 19 20 const PostgresRow = union(enum) { 21 from_result: pg.Row, // from iterating Result 22 from_query: pg.QueryRow, // from pool.row() 23 24 fn getText(self: PostgresRow, col: usize) []const u8 { 25 // Return empty string for NULL to match SQLite behavior 26 return switch (self) { 27 .from_result => |r| r.get(?[]const u8, col) orelse "", 28 .from_query => |r| r.get(?[]const u8, col) orelse "", 29 }; 30 } 31 32 fn getTextOrNull(self: PostgresRow, col: usize) ?[]const u8 { 33 return switch (self) { 34 .from_result => |r| r.get(?[]const u8, col), 35 .from_query => |r| r.get(?[]const u8, col), 36 }; 37 } 38 39 fn getInt(self: PostgresRow, col: usize) i64 { 40 // PostgreSQL INTEGER (INT4) needs i32, BIGINT (INT8) needs i64 41 // Try i32 first (most common), then i64 42 return switch (self) { 43 .from_result => |r| @as(i64, r.get(i32, col)), 44 .from_query => |r| @as(i64, r.get(i32, col)), 45 }; 46 } 47 48 fn getBigInt(self: PostgresRow, col: usize) i64 { 49 // For columns known to be BIGINT (INT8) 50 return switch (self) { 51 .from_result => |r| r.get(i64, col), 52 .from_query => |r| r.get(i64, col), 53 }; 54 } 55 56 fn getFloat(self: PostgresRow, col: usize) f64 { 57 return switch (self) { 58 .from_result => |r| r.get(f64, col), 59 .from_query => |r| r.get(f64, col), 60 }; 61 } 62 }; 63 64 pub fn text(self: Row, col: usize) []const u8 { 65 return switch (self) { 66 .sqlite => |r| r.text(col), 67 .postgres => |r| r.getText(col), 68 }; 69 } 70 71 pub fn textOrNull(self: Row, col: usize) ?[]const u8 { 72 return switch (self) { 73 .sqlite => |r| { 74 // SQLite returns empty string for NULL, so check length 75 const txt = r.text(col); 76 return if (txt.len > 0) txt else null; 77 }, 78 .postgres => |r| r.getTextOrNull(col), 79 }; 80 } 81 82 pub fn int(self: Row, col: usize) i64 { 83 return switch (self) { 84 .sqlite => |r| r.int(col), 85 .postgres => |r| r.getInt(col), // reads as i32 for INTEGER columns 86 }; 87 } 88 89 pub fn bigint(self: Row, col: usize) i64 { 90 return switch (self) { 91 .sqlite => |r| r.int(col), 92 .postgres => |r| r.getBigInt(col), // reads as i64 for BIGINT columns 93 }; 94 } 95 96 pub fn float(self: Row, col: usize) f64 { 97 return switch (self) { 98 .sqlite => |r| r.float(col), 99 .postgres => |r| r.getFloat(col), 100 }; 101 } 102 103 pub fn deinit(self: *Row) void { 104 switch (self.*) { 105 .sqlite => |*r| r.deinit(), 106 .postgres => |*r| switch (r.*) { 107 .from_result => {}, // Result.deinit() handles cleanup 108 .from_query => |*qr| qr.deinit() catch {}, 109 }, 110 } 111 } 112}; 113 114/// Unified rows iterator 115pub const Rows = struct { 116 backend: RowsBackend, 117 118 const RowsBackend = union(enum) { 119 sqlite: zqlite.Rows, 120 postgres: *pg.Result, 121 }; 122 123 pub fn next(self: *Rows) ?Row { 124 return switch (self.backend) { 125 .sqlite => |*r| { 126 if (r.next()) |sqlite_row| { 127 return Row{ .sqlite = sqlite_row }; 128 } 129 return null; 130 }, 131 .postgres => |r| { 132 if (r.next() catch null) |pg_row| { 133 return Row{ .postgres = .{ .from_result = pg_row } }; 134 } 135 return null; 136 }, 137 }; 138 } 139 140 pub fn deinit(self: *Rows) void { 141 switch (self.backend) { 142 .sqlite => |*r| r.deinit(), 143 .postgres => |r| r.deinit(), 144 } 145 } 146}; 147 148/// Transaction handle for atomic multi-statement operations 149pub const Transaction = struct { 150 backend: *Backend, 151 pg_conn: ?*pg.Conn, // only set for postgres 152 153 /// Execute a statement within this transaction (no auto-commit) 154 pub fn exec(self: *Transaction, sql: []const u8, args: anytype) !void { 155 switch (self.backend.impl) { 156 .sqlite => |*s| { 157 s.conn.exec(sql, args) catch |err| { 158 log.err("database", "transaction exec error: {}", .{err}); 159 return err; 160 }; 161 }, 162 .postgres => { 163 const conn = self.pg_conn orelse return error.NoConnection; 164 const rewritten = try self.backend.dialect.rewritePlaceholders(self.backend.allocator, sql); 165 defer if (rewritten.ptr != sql.ptr) self.backend.allocator.free(rewritten); 166 167 _ = conn.exec(rewritten, args) catch |err| { 168 // Log detailed PostgreSQL error if available 169 if (conn.err) |pg_err| { 170 log.err("database", "postgres error [{s}]: {s}", .{ pg_err.code, pg_err.message }); 171 if (pg_err.detail) |detail| { 172 log.err("database", " detail: {s}", .{detail}); 173 } 174 if (pg_err.hint) |hint| { 175 log.err("database", " hint: {s}", .{hint}); 176 } 177 } 178 log.err("database", "postgres transaction exec error: {}", .{err}); 179 return err; 180 }; 181 }, 182 } 183 } 184 185 /// Commit the transaction 186 pub fn commit(self: *Transaction) !void { 187 switch (self.backend.impl) { 188 .sqlite => |*s| { 189 s.conn.commit() catch |err| { 190 log.err("database", "commit error: {}", .{err}); 191 return err; 192 }; 193 }, 194 .postgres => |*p| { 195 const conn = self.pg_conn orelse return error.NoConnection; 196 conn.commit() catch |err| { 197 if (conn.err) |pg_err| { 198 log.err("database", "postgres commit error [{s}]: {s}", .{ pg_err.code, pg_err.message }); 199 } 200 log.err("database", "postgres commit error: {}", .{err}); 201 return err; 202 }; 203 // Release connection back to pool 204 p.pool.release(conn); 205 self.pg_conn = null; 206 }, 207 } 208 } 209 210 /// Rollback the transaction 211 pub fn rollback(self: *Transaction) void { 212 switch (self.backend.impl) { 213 .sqlite => |*s| { 214 s.conn.rollback(); 215 }, 216 .postgres => |*p| { 217 if (self.pg_conn) |conn| { 218 conn.rollback() catch {}; 219 p.pool.release(conn); 220 self.pg_conn = null; 221 } 222 }, 223 } 224 } 225}; 226 227/// Main database backend abstraction 228pub const Backend = struct { 229 impl: BackendImpl, 230 dialect: Dialect, 231 mutex: Thread.Mutex = .{}, 232 allocator: Allocator, 233 234 const BackendImpl = union(enum) { 235 sqlite: SqliteBackend, 236 postgres: PostgresBackend, 237 }; 238 239 fn dbSystem(self: *Backend) []const u8 { 240 return switch (self.impl) { 241 .sqlite => "sqlite", 242 .postgres => "postgresql", 243 }; 244 } 245 246 const SqliteBackend = struct { 247 conn: zqlite.Conn, 248 }; 249 250 const PostgresBackend = struct { 251 pool: *pg.Pool, 252 }; 253 254 /// Initialize SQLite backend 255 pub fn initSqlite(allocator: Allocator, path: [*:0]const u8) !Backend { 256 const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite; 257 const conn = zqlite.open(path, flags) catch |err| { 258 log.err("database", "failed to open sqlite: {}", .{err}); 259 return err; 260 }; 261 262 // SQLite-specific pragmas 263 _ = conn.exec("PRAGMA journal_mode=WAL", .{}) catch {}; 264 _ = conn.exec("PRAGMA busy_timeout=5000", .{}) catch {}; 265 _ = conn.exec("PRAGMA foreign_keys=ON", .{}) catch {}; 266 267 return Backend{ 268 .impl = .{ .sqlite = .{ .conn = conn } }, 269 .dialect = .sqlite, 270 .allocator = allocator, 271 }; 272 } 273 274 /// Initialize PostgreSQL backend 275 pub fn initPostgres(allocator: Allocator, connection_string: []const u8) !Backend { 276 // Parse connection string and use pg.Pool.initUri 277 const uri = std.Uri.parse(connection_string) catch { 278 log.err("database", "invalid postgres connection string", .{}); 279 return error.InvalidConnectionString; 280 }; 281 282 const pool = pg.Pool.initUri(allocator, uri, .{ 283 .size = 5, 284 }) catch |err| { 285 log.err("database", "failed to init postgres pool: {}", .{err}); 286 return err; 287 }; 288 289 return Backend{ 290 .impl = .{ .postgres = .{ .pool = pool } }, 291 .dialect = .postgres, 292 .allocator = allocator, 293 }; 294 } 295 296 pub fn deinit(self: *Backend) void { 297 switch (self.impl) { 298 .sqlite => |*s| s.conn.close(), 299 .postgres => |*p| p.pool.deinit(), 300 } 301 } 302 303 /// Execute a statement that doesn't return rows (caller must hold mutex for transactions) 304 pub fn execUnsafe(self: *Backend, sql: []const u8, args: anytype) !void { 305 switch (self.impl) { 306 .sqlite => |*s| { 307 s.conn.exec(sql, args) catch |err| { 308 log.err("database", "exec error: {}", .{err}); 309 return err; 310 }; 311 }, 312 .postgres => |*p| { 313 // Rewrite placeholders for postgres 314 const rewritten = try self.dialect.rewritePlaceholders(self.allocator, sql); 315 defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten); 316 317 _ = p.pool.exec(rewritten, args) catch |err| { 318 log.err("database", "postgres exec error: {}", .{err}); 319 return err; 320 }; 321 }, 322 } 323 } 324 325 /// Execute a statement that doesn't return rows (thread-safe) 326 pub fn exec(self: *Backend, sql: []const u8, args: anytype) !void { 327 const span = logfire.sqlSpan(sql, self.dbSystem()); 328 defer span.end(); 329 330 switch (self.impl) { 331 .sqlite => { 332 self.mutex.lock(); 333 defer self.mutex.unlock(); 334 self.execUnsafe(sql, args) catch |err| { 335 span.recordError(err); 336 return err; 337 }; 338 }, 339 .postgres => { 340 // Postgres pool handles concurrency - no mutex needed 341 self.execUnsafe(sql, args) catch |err| { 342 span.recordError(err); 343 return err; 344 }; 345 }, 346 } 347 } 348 349 /// Execute a statement and return the number of affected rows (thread-safe) 350 pub fn execWithRowCount(self: *Backend, sql: []const u8, args: anytype) !i64 { 351 const span = logfire.sqlSpan(sql, self.dbSystem()); 352 defer span.end(); 353 354 switch (self.impl) { 355 .sqlite => |*s| { 356 self.mutex.lock(); 357 defer self.mutex.unlock(); 358 s.conn.exec(sql, args) catch |err| { 359 span.recordError(err); 360 log.err("database", "exec error: {}", .{err}); 361 return err; 362 }; 363 const count = @as(i64, @intCast(s.conn.changes())); 364 span.setAttribute("db.response.rows_affected", count); 365 return count; 366 }, 367 .postgres => |*p| { 368 // Postgres pool handles concurrency - no mutex needed 369 const rewritten = try self.dialect.rewritePlaceholders(self.allocator, sql); 370 defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten); 371 372 const affected = p.pool.exec(rewritten, args) catch |err| { 373 span.recordError(err); 374 log.err("database", "postgres exec error: {}", .{err}); 375 return err; 376 }; 377 const count = affected orelse 0; 378 span.setAttribute("db.response.rows_affected", count); 379 return count; 380 }, 381 } 382 } 383 384 /// Query for a single row 385 pub fn row(self: *Backend, sql: []const u8, args: anytype) !?Row { 386 const span = logfire.sqlSpan(sql, self.dbSystem()); 387 defer span.end(); 388 389 switch (self.impl) { 390 .sqlite => |*s| { 391 self.mutex.lock(); 392 defer self.mutex.unlock(); 393 const r = s.conn.row(sql, args) catch return null; 394 if (r) |sqlite_row| { 395 return Row{ .sqlite = sqlite_row }; 396 } 397 return null; 398 }, 399 .postgres => |*p| { 400 // Postgres pool handles concurrency - no mutex needed 401 const rewritten = try self.dialect.rewritePlaceholders(self.allocator, sql); 402 defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten); 403 404 const r = p.pool.row(rewritten, args) catch return null; 405 if (r) |pg_row| { 406 return Row{ .postgres = .{ .from_query = pg_row } }; 407 } 408 return null; 409 }, 410 } 411 } 412 413 /// Query for multiple rows 414 pub fn query(self: *Backend, sql: []const u8, args: anytype) !Rows { 415 const span = logfire.sqlSpan(sql, self.dbSystem()); 416 defer span.end(); 417 418 switch (self.impl) { 419 .sqlite => |*s| { 420 self.mutex.lock(); 421 defer self.mutex.unlock(); 422 const rows = s.conn.rows(sql, args) catch |err| { 423 span.recordError(err); 424 log.err("database", "query error: {}", .{err}); 425 return err; 426 }; 427 return Rows{ .backend = .{ .sqlite = rows } }; 428 }, 429 .postgres => |*p| { 430 // Postgres pool handles concurrency - no mutex needed 431 const rewritten = try self.dialect.rewritePlaceholders(self.allocator, sql); 432 defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten); 433 434 const result = p.pool.query(rewritten, args) catch |err| { 435 span.recordError(err); 436 log.err("database", "postgres query error: {}", .{err}); 437 return err; 438 }; 439 return Rows{ .backend = .{ .postgres = result } }; 440 }, 441 } 442 } 443 444 /// Begin a transaction and return a Transaction handle 445 /// IMPORTANT: Caller must hold mutex for SQLite. For Postgres, acquires a dedicated connection. 446 pub fn beginTransaction(self: *Backend) !Transaction { 447 switch (self.impl) { 448 .sqlite => |*s| { 449 _ = s.conn.transaction() catch |err| { 450 log.err("database", "begin transaction error: {}", .{err}); 451 return err; 452 }; 453 return Transaction{ 454 .backend = self, 455 .pg_conn = null, 456 }; 457 }, 458 .postgres => |*p| { 459 // Acquire a dedicated connection from the pool 460 const conn = p.pool.acquire() catch |err| { 461 log.err("database", "postgres acquire connection error: {}", .{err}); 462 return err; 463 }; 464 errdefer p.pool.release(conn); 465 466 // Begin transaction on this specific connection using pg.zig's method 467 conn.begin() catch |err| { 468 if (conn.err) |pg_err| { 469 log.err("database", "postgres begin error [{s}]: {s}", .{ pg_err.code, pg_err.message }); 470 } 471 log.err("database", "postgres begin error: {}", .{err}); 472 return err; 473 }; 474 475 return Transaction{ 476 .backend = self, 477 .pg_conn = conn, 478 }; 479 }, 480 } 481 } 482}; 483 484// Global backend instance 485pub var db: Backend = undefined; 486var initialized: bool = false; 487 488/// Initialize the database with environment-based configuration 489pub fn init() !void { 490 if (initialized) return; 491 492 const allocator = std.heap.page_allocator; 493 const backend_env = std.posix.getenv("PREFECT_DATABASE_BACKEND") orelse "sqlite"; 494 495 if (std.mem.eql(u8, backend_env, "postgres") or std.mem.eql(u8, backend_env, "postgresql")) { 496 const url = std.posix.getenv("PREFECT_DATABASE_URL") orelse "postgresql://localhost:5432/prefect"; 497 log.info("database", "initializing postgres: {s}", .{url}); 498 db = try Backend.initPostgres(allocator, url); 499 } else { 500 // SQLite (default) 501 const path_env = std.posix.getenv("PREFECT_DATABASE_PATH") orelse "prefect.db"; 502 var path_buf: [256]u8 = undefined; 503 @memcpy(path_buf[0..path_env.len], path_env); 504 path_buf[path_env.len] = 0; 505 const path: [*:0]const u8 = path_buf[0..path_env.len :0]; 506 507 log.info("database", "initializing sqlite: {s}", .{path_env}); 508 db = try Backend.initSqlite(allocator, path); 509 } 510 511 initialized = true; 512} 513 514/// Close the database 515pub fn close() void { 516 if (initialized) { 517 db.deinit(); 518 initialized = false; 519 } 520} 521 522/// Get the database backend (returns null if not initialized) 523pub fn getBackend() ?*Backend { 524 if (initialized) return &db; 525 return null; 526} 527 528/// Get the current dialect 529pub fn getDialect() Dialect { 530 return db.dialect; 531} 532 533test "sqlite backend basic operations" { 534 var b = try Backend.initSqlite(std.testing.allocator, ":memory:"); 535 defer b.deinit(); 536 try b.exec("CREATE TABLE test (id TEXT PRIMARY KEY, name TEXT)", .{}); 537 try b.exec("INSERT INTO test (id, name) VALUES (?, ?)", .{ "1", "alice" }); 538 var r = try b.row("SELECT id, name FROM test WHERE id = ?", .{"1"}); 539 if (r) |*row| { 540 defer row.deinit(); 541 try std.testing.expectEqualStrings("1", row.text(0)); 542 try std.testing.expectEqualStrings("alice", row.text(1)); 543 } else return error.RowNotFound; 544 try b.exec("INSERT INTO test (id, name) VALUES (?, ?)", .{ "2", "bob" }); 545 var rows = try b.query("SELECT id, name FROM test ORDER BY id", .{}); 546 defer rows.deinit(); 547 var count: usize = 0; 548 while (rows.next()) |_| count += 1; 549 try std.testing.expectEqual(@as(usize, 2), count); 550} 551 552test "dialect placeholder rewriting" { 553 const sql = "SELECT * FROM t WHERE a = ? AND b = ?"; 554 const sqlite_result = try Dialect.sqlite.rewritePlaceholders(std.testing.allocator, sql); 555 try std.testing.expectEqualStrings(sql, sqlite_result); 556 const pg_result = try Dialect.postgres.rewritePlaceholders(std.testing.allocator, sql); 557 defer std.testing.allocator.free(pg_result); 558 try std.testing.expectEqualStrings("SELECT * FROM t WHERE a = $1 AND b = $2", pg_result); 559}