prefect server in zig

database architecture#

directory structure#

database/
├── __init__.py                    # public exports (PrefectDBInterface, etc.)
├── configurations.py              # connection/engine classes
├── interface.py                   # main PrefectDBInterface singleton
├── dependencies.py                # dependency injection decorators
├── orm_models.py                  # SQLAlchemy ORM models (~1600 lines)
├── query_components.py            # dialect-specific query builders (~1000 lines)
├── alembic.ini                    # migration config
├── alembic_commands.py            # migration runners
├── _migrations/
│   ├── env.py                     # alembic environment
│   ├── versions/
│   │   ├── postgresql/            # 113 PostgreSQL migrations
│   │   └── sqlite/                # 109 SQLite migrations
│   └── MIGRATION-NOTES.md         # migration history tracking
└── sql/                           # dialect-specific SQL templates
    ├── postgres/
    │   └── get-runs-from-worker-queues.sql.jinja
    └── sqlite/
        └── get-runs-from-worker-queues.sql.jinja

three-tier abstraction#

tier 1: database configuration#

abstract base BaseDatabaseConfiguration with implementations:

  • AsyncPostgresConfiguration - PostgreSQL async driver
  • AioSqliteConfiguration - SQLite async driver (aiosqlite)

key methods:

  • engine() - returns async SQLAlchemy engine (cached per event loop)
  • session() - creates AsyncSession
  • begin_transaction() - database-specific transaction handling
  • create_db() / drop_db() - schema lifecycle

engine caching key: (event_loop, connection_url, echo, timeout)

tier 2: query components#

abstract base BaseQueryComponents with implementations:

  • AsyncPostgresQueryComponents
  • AioSqliteQueryComponents

key methods:

  • insert() - dialect-specific INSERT statement
  • uses_json_strings property - whether JSON returns as strings
  • cast_to_json() / build_json_object() - JSON handling
  • make_timestamp_intervals() - time range generation
  • set_state_id_on_inserted_flow_runs_statement() - UPDATE syntax differences

tier 3: ORM configuration#

abstract base BaseORMConfiguration with implementations:

  • AsyncPostgresORMConfiguration - points to _migrations/versions/postgresql/
  • AioSqliteORMConfiguration - points to _migrations/versions/sqlite/

defines upsert columns for each entity type.

main interface: PrefectDBInterface#

singleton per unique configuration combination. aggregates all three components:

  • database operations (create/drop, migrations)
  • session management
  • ORM model access (properties for all 39+ models)
  • connectivity checks

backend selection#

entry point: provide_database_interface() in dependencies.py

detection logic:

  1. reads PREFECT_API_DATABASE_CONNECTION_URL setting
  2. parses dialect from connection URL via get_dialect()
  3. instantiates appropriate classes based on dialect name
if dialect.name == "postgresql":
    database_config = AsyncPostgresConfiguration(...)
    query_components = AsyncPostgresQueryComponents()
    orm = AsyncPostgresORMConfiguration()
elif dialect.name == "sqlite":
    database_config = AioSqliteConfiguration(...)
    query_components = AioSqliteQueryComponents()
    orm = AioSqliteORMConfiguration()

configuration sources#

environment variables:

  • PREFECT_API_DATABASE_CONNECTION_URL - connection string (required)
  • PREFECT_API_DATABASE_ECHO - log SQL statements
  • PREFECT_API_DATABASE_TIMEOUT - statement timeout
  • PREFECT_API_DATABASE_CONNECTION_TIMEOUT - connection timeout
  • PREFECT_API_DATABASE_SQLALCHEMY_POOL_SIZE - pool size
  • PREFECT_API_DATABASE_SQLALCHEMY_MAX_OVERFLOW - overflow connections

dependency injection#

two decorators:

  • @inject_db - provides db as kwarg
  • @db_injector - provides db as first positional arg (better type hints)

manual: provide_database_interface()