prefect server in zig
database architecture#
directory structure#
database/
├── __init__.py # public exports (PrefectDBInterface, etc.)
├── configurations.py # connection/engine classes
├── interface.py # main PrefectDBInterface singleton
├── dependencies.py # dependency injection decorators
├── orm_models.py # SQLAlchemy ORM models (~1600 lines)
├── query_components.py # dialect-specific query builders (~1000 lines)
├── alembic.ini # migration config
├── alembic_commands.py # migration runners
├── _migrations/
│ ├── env.py # alembic environment
│ ├── versions/
│ │ ├── postgresql/ # 113 PostgreSQL migrations
│ │ └── sqlite/ # 109 SQLite migrations
│ └── MIGRATION-NOTES.md # migration history tracking
└── sql/ # dialect-specific SQL templates
├── postgres/
│ └── get-runs-from-worker-queues.sql.jinja
└── sqlite/
└── get-runs-from-worker-queues.sql.jinja
three-tier abstraction#
tier 1: database configuration#
abstract base BaseDatabaseConfiguration with implementations:
AsyncPostgresConfiguration- PostgreSQL async driverAioSqliteConfiguration- SQLite async driver (aiosqlite)
key methods:
engine()- returns async SQLAlchemy engine (cached per event loop)session()- creates AsyncSessionbegin_transaction()- database-specific transaction handlingcreate_db()/drop_db()- schema lifecycle
engine caching key: (event_loop, connection_url, echo, timeout)
tier 2: query components#
abstract base BaseQueryComponents with implementations:
AsyncPostgresQueryComponentsAioSqliteQueryComponents
key methods:
insert()- dialect-specific INSERT statementuses_json_stringsproperty - whether JSON returns as stringscast_to_json()/build_json_object()- JSON handlingmake_timestamp_intervals()- time range generationset_state_id_on_inserted_flow_runs_statement()- UPDATE syntax differences
tier 3: ORM configuration#
abstract base BaseORMConfiguration with implementations:
AsyncPostgresORMConfiguration- points to_migrations/versions/postgresql/AioSqliteORMConfiguration- points to_migrations/versions/sqlite/
defines upsert columns for each entity type.
main interface: PrefectDBInterface#
singleton per unique configuration combination. aggregates all three components:
- database operations (create/drop, migrations)
- session management
- ORM model access (properties for all 39+ models)
- connectivity checks
backend selection#
entry point: provide_database_interface() in dependencies.py
detection logic:
- reads
PREFECT_API_DATABASE_CONNECTION_URLsetting - parses dialect from connection URL via
get_dialect() - instantiates appropriate classes based on dialect name
if dialect.name == "postgresql":
database_config = AsyncPostgresConfiguration(...)
query_components = AsyncPostgresQueryComponents()
orm = AsyncPostgresORMConfiguration()
elif dialect.name == "sqlite":
database_config = AioSqliteConfiguration(...)
query_components = AioSqliteQueryComponents()
orm = AioSqliteORMConfiguration()
configuration sources#
environment variables:
PREFECT_API_DATABASE_CONNECTION_URL- connection string (required)PREFECT_API_DATABASE_ECHO- log SQL statementsPREFECT_API_DATABASE_TIMEOUT- statement timeoutPREFECT_API_DATABASE_CONNECTION_TIMEOUT- connection timeoutPREFECT_API_DATABASE_SQLALCHEMY_POOL_SIZE- pool sizePREFECT_API_DATABASE_SQLALCHEMY_MAX_OVERFLOW- overflow connections
dependency injection#
two decorators:
@inject_db- provides db as kwarg@db_injector- provides db as first positional arg (better type hints)
manual: provide_database_interface()