prefect server in zig
1const std = @import("std");
2const Allocator = std.mem.Allocator;
3const Thread = std.Thread;
4
5const dialect_mod = @import("dialect.zig");
6pub const Dialect = dialect_mod.Dialect;
7
8const log = @import("../logging.zig");
9const logfire = @import("logfire");
10
11// backend libraries
12const zqlite = @import("zqlite");
13const pg = @import("pg");
14
15/// Unified row interface that abstracts over backend-specific row types
16pub const Row = union(enum) {
17 sqlite: zqlite.Row,
18 postgres: PostgresRow,
19
20 const PostgresRow = union(enum) {
21 from_result: pg.Row, // from iterating Result
22 from_query: pg.QueryRow, // from pool.row()
23
24 fn getText(self: PostgresRow, col: usize) []const u8 {
25 // Return empty string for NULL to match SQLite behavior
26 return switch (self) {
27 .from_result => |r| r.get(?[]const u8, col) orelse "",
28 .from_query => |r| r.get(?[]const u8, col) orelse "",
29 };
30 }
31
32 fn getTextOrNull(self: PostgresRow, col: usize) ?[]const u8 {
33 return switch (self) {
34 .from_result => |r| r.get(?[]const u8, col),
35 .from_query => |r| r.get(?[]const u8, col),
36 };
37 }
38
39 fn getInt(self: PostgresRow, col: usize) i64 {
40 // PostgreSQL INTEGER (INT4) needs i32, BIGINT (INT8) needs i64
41 // Try i32 first (most common), then i64
42 return switch (self) {
43 .from_result => |r| @as(i64, r.get(i32, col)),
44 .from_query => |r| @as(i64, r.get(i32, col)),
45 };
46 }
47
48 fn getBigInt(self: PostgresRow, col: usize) i64 {
49 // For columns known to be BIGINT (INT8)
50 return switch (self) {
51 .from_result => |r| r.get(i64, col),
52 .from_query => |r| r.get(i64, col),
53 };
54 }
55
56 fn getFloat(self: PostgresRow, col: usize) f64 {
57 return switch (self) {
58 .from_result => |r| r.get(f64, col),
59 .from_query => |r| r.get(f64, col),
60 };
61 }
62 };
63
64 pub fn text(self: Row, col: usize) []const u8 {
65 return switch (self) {
66 .sqlite => |r| r.text(col),
67 .postgres => |r| r.getText(col),
68 };
69 }
70
71 pub fn textOrNull(self: Row, col: usize) ?[]const u8 {
72 return switch (self) {
73 .sqlite => |r| {
74 // SQLite returns empty string for NULL, so check length
75 const txt = r.text(col);
76 return if (txt.len > 0) txt else null;
77 },
78 .postgres => |r| r.getTextOrNull(col),
79 };
80 }
81
82 pub fn int(self: Row, col: usize) i64 {
83 return switch (self) {
84 .sqlite => |r| r.int(col),
85 .postgres => |r| r.getInt(col), // reads as i32 for INTEGER columns
86 };
87 }
88
89 pub fn bigint(self: Row, col: usize) i64 {
90 return switch (self) {
91 .sqlite => |r| r.int(col),
92 .postgres => |r| r.getBigInt(col), // reads as i64 for BIGINT columns
93 };
94 }
95
96 pub fn float(self: Row, col: usize) f64 {
97 return switch (self) {
98 .sqlite => |r| r.float(col),
99 .postgres => |r| r.getFloat(col),
100 };
101 }
102
103 pub fn deinit(self: *Row) void {
104 switch (self.*) {
105 .sqlite => |*r| r.deinit(),
106 .postgres => |*r| switch (r.*) {
107 .from_result => {}, // Result.deinit() handles cleanup
108 .from_query => |*qr| qr.deinit() catch {},
109 },
110 }
111 }
112};
113
114/// Unified rows iterator
115pub const Rows = struct {
116 backend: RowsBackend,
117
118 const RowsBackend = union(enum) {
119 sqlite: zqlite.Rows,
120 postgres: *pg.Result,
121 };
122
123 pub fn next(self: *Rows) ?Row {
124 return switch (self.backend) {
125 .sqlite => |*r| {
126 if (r.next()) |sqlite_row| {
127 return Row{ .sqlite = sqlite_row };
128 }
129 return null;
130 },
131 .postgres => |r| {
132 if (r.next() catch null) |pg_row| {
133 return Row{ .postgres = .{ .from_result = pg_row } };
134 }
135 return null;
136 },
137 };
138 }
139
140 pub fn deinit(self: *Rows) void {
141 switch (self.backend) {
142 .sqlite => |*r| r.deinit(),
143 .postgres => |r| r.deinit(),
144 }
145 }
146};
147
148/// Transaction handle for atomic multi-statement operations
149pub const Transaction = struct {
150 backend: *Backend,
151 pg_conn: ?*pg.Conn, // only set for postgres
152
153 /// Execute a statement within this transaction (no auto-commit)
154 pub fn exec(self: *Transaction, sql: []const u8, args: anytype) !void {
155 switch (self.backend.impl) {
156 .sqlite => |*s| {
157 s.conn.exec(sql, args) catch |err| {
158 log.err("database", "transaction exec error: {}", .{err});
159 return err;
160 };
161 },
162 .postgres => {
163 const conn = self.pg_conn orelse return error.NoConnection;
164 const rewritten = try self.backend.dialect.rewritePlaceholders(self.backend.allocator, sql);
165 defer if (rewritten.ptr != sql.ptr) self.backend.allocator.free(rewritten);
166
167 _ = conn.exec(rewritten, args) catch |err| {
168 // Log detailed PostgreSQL error if available
169 if (conn.err) |pg_err| {
170 log.err("database", "postgres error [{s}]: {s}", .{ pg_err.code, pg_err.message });
171 if (pg_err.detail) |detail| {
172 log.err("database", " detail: {s}", .{detail});
173 }
174 if (pg_err.hint) |hint| {
175 log.err("database", " hint: {s}", .{hint});
176 }
177 }
178 log.err("database", "postgres transaction exec error: {}", .{err});
179 return err;
180 };
181 },
182 }
183 }
184
185 /// Commit the transaction
186 pub fn commit(self: *Transaction) !void {
187 switch (self.backend.impl) {
188 .sqlite => |*s| {
189 s.conn.commit() catch |err| {
190 log.err("database", "commit error: {}", .{err});
191 return err;
192 };
193 },
194 .postgres => |*p| {
195 const conn = self.pg_conn orelse return error.NoConnection;
196 conn.commit() catch |err| {
197 if (conn.err) |pg_err| {
198 log.err("database", "postgres commit error [{s}]: {s}", .{ pg_err.code, pg_err.message });
199 }
200 log.err("database", "postgres commit error: {}", .{err});
201 return err;
202 };
203 // Release connection back to pool
204 p.pool.release(conn);
205 self.pg_conn = null;
206 },
207 }
208 }
209
210 /// Rollback the transaction
211 pub fn rollback(self: *Transaction) void {
212 switch (self.backend.impl) {
213 .sqlite => |*s| {
214 s.conn.rollback();
215 },
216 .postgres => |*p| {
217 if (self.pg_conn) |conn| {
218 conn.rollback() catch {};
219 p.pool.release(conn);
220 self.pg_conn = null;
221 }
222 },
223 }
224 }
225};
226
227/// Main database backend abstraction
228pub const Backend = struct {
229 impl: BackendImpl,
230 dialect: Dialect,
231 mutex: Thread.Mutex = .{},
232 allocator: Allocator,
233
234 const BackendImpl = union(enum) {
235 sqlite: SqliteBackend,
236 postgres: PostgresBackend,
237 };
238
239 fn dbSystem(self: *Backend) []const u8 {
240 return switch (self.impl) {
241 .sqlite => "sqlite",
242 .postgres => "postgresql",
243 };
244 }
245
246 const SqliteBackend = struct {
247 conn: zqlite.Conn,
248 };
249
250 const PostgresBackend = struct {
251 pool: *pg.Pool,
252 };
253
254 /// Initialize SQLite backend
255 pub fn initSqlite(allocator: Allocator, path: [*:0]const u8) !Backend {
256 const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite;
257 const conn = zqlite.open(path, flags) catch |err| {
258 log.err("database", "failed to open sqlite: {}", .{err});
259 return err;
260 };
261
262 // SQLite-specific pragmas
263 _ = conn.exec("PRAGMA journal_mode=WAL", .{}) catch {};
264 _ = conn.exec("PRAGMA busy_timeout=5000", .{}) catch {};
265 _ = conn.exec("PRAGMA foreign_keys=ON", .{}) catch {};
266
267 return Backend{
268 .impl = .{ .sqlite = .{ .conn = conn } },
269 .dialect = .sqlite,
270 .allocator = allocator,
271 };
272 }
273
274 /// Initialize PostgreSQL backend
275 pub fn initPostgres(allocator: Allocator, connection_string: []const u8) !Backend {
276 // Parse connection string and use pg.Pool.initUri
277 const uri = std.Uri.parse(connection_string) catch {
278 log.err("database", "invalid postgres connection string", .{});
279 return error.InvalidConnectionString;
280 };
281
282 const pool = pg.Pool.initUri(allocator, uri, .{
283 .size = 5,
284 }) catch |err| {
285 log.err("database", "failed to init postgres pool: {}", .{err});
286 return err;
287 };
288
289 return Backend{
290 .impl = .{ .postgres = .{ .pool = pool } },
291 .dialect = .postgres,
292 .allocator = allocator,
293 };
294 }
295
296 pub fn deinit(self: *Backend) void {
297 switch (self.impl) {
298 .sqlite => |*s| s.conn.close(),
299 .postgres => |*p| p.pool.deinit(),
300 }
301 }
302
303 /// Execute a statement that doesn't return rows (caller must hold mutex for transactions)
304 pub fn execUnsafe(self: *Backend, sql: []const u8, args: anytype) !void {
305 switch (self.impl) {
306 .sqlite => |*s| {
307 s.conn.exec(sql, args) catch |err| {
308 log.err("database", "exec error: {}", .{err});
309 return err;
310 };
311 },
312 .postgres => |*p| {
313 // Rewrite placeholders for postgres
314 const rewritten = try self.dialect.rewritePlaceholders(self.allocator, sql);
315 defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten);
316
317 _ = p.pool.exec(rewritten, args) catch |err| {
318 log.err("database", "postgres exec error: {}", .{err});
319 return err;
320 };
321 },
322 }
323 }
324
325 /// Execute a statement that doesn't return rows (thread-safe)
326 pub fn exec(self: *Backend, sql: []const u8, args: anytype) !void {
327 const span = logfire.sqlSpan(sql, self.dbSystem());
328 defer span.end();
329
330 switch (self.impl) {
331 .sqlite => {
332 self.mutex.lock();
333 defer self.mutex.unlock();
334 self.execUnsafe(sql, args) catch |err| {
335 span.recordError(err);
336 return err;
337 };
338 },
339 .postgres => {
340 // Postgres pool handles concurrency - no mutex needed
341 self.execUnsafe(sql, args) catch |err| {
342 span.recordError(err);
343 return err;
344 };
345 },
346 }
347 }
348
349 /// Execute a statement and return the number of affected rows (thread-safe)
350 pub fn execWithRowCount(self: *Backend, sql: []const u8, args: anytype) !i64 {
351 const span = logfire.sqlSpan(sql, self.dbSystem());
352 defer span.end();
353
354 switch (self.impl) {
355 .sqlite => |*s| {
356 self.mutex.lock();
357 defer self.mutex.unlock();
358 s.conn.exec(sql, args) catch |err| {
359 span.recordError(err);
360 log.err("database", "exec error: {}", .{err});
361 return err;
362 };
363 const count = @as(i64, @intCast(s.conn.changes()));
364 span.setAttribute("db.response.rows_affected", count);
365 return count;
366 },
367 .postgres => |*p| {
368 // Postgres pool handles concurrency - no mutex needed
369 const rewritten = try self.dialect.rewritePlaceholders(self.allocator, sql);
370 defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten);
371
372 const affected = p.pool.exec(rewritten, args) catch |err| {
373 span.recordError(err);
374 log.err("database", "postgres exec error: {}", .{err});
375 return err;
376 };
377 const count = affected orelse 0;
378 span.setAttribute("db.response.rows_affected", count);
379 return count;
380 },
381 }
382 }
383
384 /// Query for a single row
385 pub fn row(self: *Backend, sql: []const u8, args: anytype) !?Row {
386 const span = logfire.sqlSpan(sql, self.dbSystem());
387 defer span.end();
388
389 switch (self.impl) {
390 .sqlite => |*s| {
391 self.mutex.lock();
392 defer self.mutex.unlock();
393 const r = s.conn.row(sql, args) catch return null;
394 if (r) |sqlite_row| {
395 return Row{ .sqlite = sqlite_row };
396 }
397 return null;
398 },
399 .postgres => |*p| {
400 // Postgres pool handles concurrency - no mutex needed
401 const rewritten = try self.dialect.rewritePlaceholders(self.allocator, sql);
402 defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten);
403
404 const r = p.pool.row(rewritten, args) catch return null;
405 if (r) |pg_row| {
406 return Row{ .postgres = .{ .from_query = pg_row } };
407 }
408 return null;
409 },
410 }
411 }
412
413 /// Query for multiple rows
414 pub fn query(self: *Backend, sql: []const u8, args: anytype) !Rows {
415 const span = logfire.sqlSpan(sql, self.dbSystem());
416 defer span.end();
417
418 switch (self.impl) {
419 .sqlite => |*s| {
420 self.mutex.lock();
421 defer self.mutex.unlock();
422 const rows = s.conn.rows(sql, args) catch |err| {
423 span.recordError(err);
424 log.err("database", "query error: {}", .{err});
425 return err;
426 };
427 return Rows{ .backend = .{ .sqlite = rows } };
428 },
429 .postgres => |*p| {
430 // Postgres pool handles concurrency - no mutex needed
431 const rewritten = try self.dialect.rewritePlaceholders(self.allocator, sql);
432 defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten);
433
434 const result = p.pool.query(rewritten, args) catch |err| {
435 span.recordError(err);
436 log.err("database", "postgres query error: {}", .{err});
437 return err;
438 };
439 return Rows{ .backend = .{ .postgres = result } };
440 },
441 }
442 }
443
444 /// Begin a transaction and return a Transaction handle
445 /// IMPORTANT: Caller must hold mutex for SQLite. For Postgres, acquires a dedicated connection.
446 pub fn beginTransaction(self: *Backend) !Transaction {
447 switch (self.impl) {
448 .sqlite => |*s| {
449 _ = s.conn.transaction() catch |err| {
450 log.err("database", "begin transaction error: {}", .{err});
451 return err;
452 };
453 return Transaction{
454 .backend = self,
455 .pg_conn = null,
456 };
457 },
458 .postgres => |*p| {
459 // Acquire a dedicated connection from the pool
460 const conn = p.pool.acquire() catch |err| {
461 log.err("database", "postgres acquire connection error: {}", .{err});
462 return err;
463 };
464 errdefer p.pool.release(conn);
465
466 // Begin transaction on this specific connection using pg.zig's method
467 conn.begin() catch |err| {
468 if (conn.err) |pg_err| {
469 log.err("database", "postgres begin error [{s}]: {s}", .{ pg_err.code, pg_err.message });
470 }
471 log.err("database", "postgres begin error: {}", .{err});
472 return err;
473 };
474
475 return Transaction{
476 .backend = self,
477 .pg_conn = conn,
478 };
479 },
480 }
481 }
482};
483
484// Global backend instance
485pub var db: Backend = undefined;
486var initialized: bool = false;
487
488/// Initialize the database with environment-based configuration
489pub fn init() !void {
490 if (initialized) return;
491
492 const allocator = std.heap.page_allocator;
493 const backend_env = std.posix.getenv("PREFECT_DATABASE_BACKEND") orelse "sqlite";
494
495 if (std.mem.eql(u8, backend_env, "postgres") or std.mem.eql(u8, backend_env, "postgresql")) {
496 const url = std.posix.getenv("PREFECT_DATABASE_URL") orelse "postgresql://localhost:5432/prefect";
497 log.info("database", "initializing postgres: {s}", .{url});
498 db = try Backend.initPostgres(allocator, url);
499 } else {
500 // SQLite (default)
501 const path_env = std.posix.getenv("PREFECT_DATABASE_PATH") orelse "prefect.db";
502 var path_buf: [256]u8 = undefined;
503 @memcpy(path_buf[0..path_env.len], path_env);
504 path_buf[path_env.len] = 0;
505 const path: [*:0]const u8 = path_buf[0..path_env.len :0];
506
507 log.info("database", "initializing sqlite: {s}", .{path_env});
508 db = try Backend.initSqlite(allocator, path);
509 }
510
511 initialized = true;
512}
513
514/// Close the database
515pub fn close() void {
516 if (initialized) {
517 db.deinit();
518 initialized = false;
519 }
520}
521
522/// Get the database backend (returns null if not initialized)
523pub fn getBackend() ?*Backend {
524 if (initialized) return &db;
525 return null;
526}
527
528/// Get the current dialect
529pub fn getDialect() Dialect {
530 return db.dialect;
531}
532
533test "sqlite backend basic operations" {
534 var b = try Backend.initSqlite(std.testing.allocator, ":memory:");
535 defer b.deinit();
536 try b.exec("CREATE TABLE test (id TEXT PRIMARY KEY, name TEXT)", .{});
537 try b.exec("INSERT INTO test (id, name) VALUES (?, ?)", .{ "1", "alice" });
538 var r = try b.row("SELECT id, name FROM test WHERE id = ?", .{"1"});
539 if (r) |*row| {
540 defer row.deinit();
541 try std.testing.expectEqualStrings("1", row.text(0));
542 try std.testing.expectEqualStrings("alice", row.text(1));
543 } else return error.RowNotFound;
544 try b.exec("INSERT INTO test (id, name) VALUES (?, ?)", .{ "2", "bob" });
545 var rows = try b.query("SELECT id, name FROM test ORDER BY id", .{});
546 defer rows.deinit();
547 var count: usize = 0;
548 while (rows.next()) |_| count += 1;
549 try std.testing.expectEqual(@as(usize, 2), count);
550}
551
552test "dialect placeholder rewriting" {
553 const sql = "SELECT * FROM t WHERE a = ? AND b = ?";
554 const sqlite_result = try Dialect.sqlite.rewritePlaceholders(std.testing.allocator, sql);
555 try std.testing.expectEqualStrings(sql, sqlite_result);
556 const pg_result = try Dialect.postgres.rewritePlaceholders(std.testing.allocator, sql);
557 defer std.testing.allocator.free(pg_result);
558 try std.testing.expectEqualStrings("SELECT * FROM t WHERE a = $1 AND b = $2", pg_result);
559}