database#
dual backend support: sqlite via zqlite and postgresql via pg.zig.
backend selection#
environment variables control backend:
# sqlite (default)
PREFECT_DATABASE_BACKEND=sqlite
PREFECT_DATABASE_PATH=prefect.db
# postgresql
PREFECT_DATABASE_BACKEND=postgres
PREFECT_DATABASE_URL=postgresql://user:pass@localhost:5432/prefect
architecture#
src/db/
├── backend.zig # unified Backend, Row, Rows types
├── dialect.zig # SQL dialect helpers
├── schema/
│ ├── sqlite.zig # SQLite DDL
│ └── postgres.zig # PostgreSQL DDL
├── sqlite.zig # schema init + migration
├── flows.zig # flow entity
├── flow_runs.zig # flow run entity
├── task_runs.zig # task run entity
├── events.zig # event entity
├── block_types.zig
├── block_schemas.zig
└── block_documents.zig
connection models#
sqlite#
single global connection with mutex for thread safety. pragmas:
journal_mode=WALbusy_timeout=5000foreign_keys=ON
postgresql#
connection pool (size=10) via pg.zig. parses standard connection URI format.
tables#
flow- flow definitionsflow_run- flow run instancesflow_run_state- state history (FK cascade delete)task_run- task run instancesevents- persisted eventsblock_type- block type definitionsblock_schema- block schemasblock_document- block document instances
query pattern#
entity modules use the unified backend API:
const backend = @import("backend.zig");
pub fn getFlowById(alloc: Allocator, id: []const u8) !?FlowRow {
var r = backend.db.row(
"SELECT id, created, updated, name, tags FROM flow WHERE id = ?",
.{id},
) catch return null;
if (r) |*row| {
defer row.deinit();
return FlowRow{
.id = try alloc.dupe(u8, row.text(0)),
.created = try alloc.dupe(u8, row.text(1)),
// ...
};
}
return null;
}
placeholder rewriting is automatic: ? becomes $1, $2 for postgres.
transactions#
use Transaction type for atomic multi-statement operations. mutex locking is only needed for sqlite (postgres pool handles concurrency):
// lock mutex only for sqlite
if (backend.db.dialect == .sqlite) {
backend.db.mutex.lock();
}
defer if (backend.db.dialect == .sqlite) {
backend.db.mutex.unlock();
};
// begin transaction - for postgres this acquires a dedicated connection
var txn = backend.db.beginTransaction() catch |err| {
log.err("database", "begin transaction error: {}", .{err});
return err;
};
errdefer txn.rollback();
// execute within transaction (uses same connection for postgres)
txn.exec("UPDATE foo SET bar = ? WHERE id = ?", .{ bar, id }) catch |err| {
return err;
};
txn.exec("INSERT INTO baz (foo_id, val) VALUES (?, ?)", .{ id, val }) catch |err| {
return err;
};
txn.commit() catch |err| {
log.err("database", "commit error: {}", .{err});
return err;
};
the Transaction type ensures all statements execute on the same connection, which is critical for postgres where the pool might otherwise give different connections for each query.
migrations#
schema changes are managed via embedded migrations. on startup, the server:
- creates
_migrationstable if needed - applies any pending migrations in order
- records applied migrations
see migrations.md for details on adding new migrations.
see python-reference/ for how prefect python handles migrations with alembic.
testing#
# test sqlite backend
./scripts/test-db-backends sqlite
# test postgresql (requires docker)
./scripts/test-db-backends postgres
# test both
./scripts/test-db-backends all