prefect server in zig

database#

dual backend support: sqlite via zqlite and postgresql via pg.zig.

backend selection#

environment variables control backend:

# sqlite (default)
PREFECT_DATABASE_BACKEND=sqlite
PREFECT_DATABASE_PATH=prefect.db

# postgresql
PREFECT_DATABASE_BACKEND=postgres
PREFECT_DATABASE_URL=postgresql://user:pass@localhost:5432/prefect

architecture#

src/db/
├── backend.zig    # unified Backend, Row, Rows types
├── dialect.zig    # SQL dialect helpers
├── schema/
│   ├── sqlite.zig   # SQLite DDL
│   └── postgres.zig # PostgreSQL DDL
├── sqlite.zig     # schema init + migration
├── flows.zig      # flow entity
├── flow_runs.zig  # flow run entity
├── task_runs.zig  # task run entity
├── events.zig     # event entity
├── block_types.zig
├── block_schemas.zig
└── block_documents.zig

connection models#

sqlite#

single global connection with mutex for thread safety. pragmas:

  • journal_mode=WAL
  • busy_timeout=5000
  • foreign_keys=ON

postgresql#

connection pool (size=10) via pg.zig. parses standard connection URI format.

tables#

  • flow - flow definitions
  • flow_run - flow run instances
  • flow_run_state - state history (FK cascade delete)
  • task_run - task run instances
  • events - persisted events
  • block_type - block type definitions
  • block_schema - block schemas
  • block_document - block document instances

query pattern#

entity modules use the unified backend API:

const backend = @import("backend.zig");

pub fn getFlowById(alloc: Allocator, id: []const u8) !?FlowRow {
    var r = backend.db.row(
        "SELECT id, created, updated, name, tags FROM flow WHERE id = ?",
        .{id},
    ) catch return null;

    if (r) |*row| {
        defer row.deinit();
        return FlowRow{
            .id = try alloc.dupe(u8, row.text(0)),
            .created = try alloc.dupe(u8, row.text(1)),
            // ...
        };
    }
    return null;
}

placeholder rewriting is automatic: ? becomes $1, $2 for postgres.

transactions#

use Transaction type for atomic multi-statement operations. mutex locking is only needed for sqlite (postgres pool handles concurrency):

// lock mutex only for sqlite
if (backend.db.dialect == .sqlite) {
    backend.db.mutex.lock();
}
defer if (backend.db.dialect == .sqlite) {
    backend.db.mutex.unlock();
};

// begin transaction - for postgres this acquires a dedicated connection
var txn = backend.db.beginTransaction() catch |err| {
    log.err("database", "begin transaction error: {}", .{err});
    return err;
};
errdefer txn.rollback();

// execute within transaction (uses same connection for postgres)
txn.exec("UPDATE foo SET bar = ? WHERE id = ?", .{ bar, id }) catch |err| {
    return err;
};
txn.exec("INSERT INTO baz (foo_id, val) VALUES (?, ?)", .{ id, val }) catch |err| {
    return err;
};

txn.commit() catch |err| {
    log.err("database", "commit error: {}", .{err});
    return err;
};

the Transaction type ensures all statements execute on the same connection, which is critical for postgres where the pool might otherwise give different connections for each query.

migrations#

schema changes are managed via embedded migrations. on startup, the server:

  1. creates _migrations table if needed
  2. applies any pending migrations in order
  3. records applied migrations

see migrations.md for details on adding new migrations.

see python-reference/ for how prefect python handles migrations with alembic.

testing#

# test sqlite backend
./scripts/test-db-backends sqlite

# test postgresql (requires docker)
./scripts/test-db-backends postgres

# test both
./scripts/test-db-backends all