# database architecture ## directory structure ``` database/ ├── __init__.py # public exports (PrefectDBInterface, etc.) ├── configurations.py # connection/engine classes ├── interface.py # main PrefectDBInterface singleton ├── dependencies.py # dependency injection decorators ├── orm_models.py # SQLAlchemy ORM models (~1600 lines) ├── query_components.py # dialect-specific query builders (~1000 lines) ├── alembic.ini # migration config ├── alembic_commands.py # migration runners ├── _migrations/ │ ├── env.py # alembic environment │ ├── versions/ │ │ ├── postgresql/ # 113 PostgreSQL migrations │ │ └── sqlite/ # 109 SQLite migrations │ └── MIGRATION-NOTES.md # migration history tracking └── sql/ # dialect-specific SQL templates ├── postgres/ │ └── get-runs-from-worker-queues.sql.jinja └── sqlite/ └── get-runs-from-worker-queues.sql.jinja ``` ## three-tier abstraction ### tier 1: database configuration abstract base `BaseDatabaseConfiguration` with implementations: - `AsyncPostgresConfiguration` - PostgreSQL async driver - `AioSqliteConfiguration` - SQLite async driver (aiosqlite) key methods: - `engine()` - returns async SQLAlchemy engine (cached per event loop) - `session()` - creates AsyncSession - `begin_transaction()` - database-specific transaction handling - `create_db()` / `drop_db()` - schema lifecycle engine caching key: `(event_loop, connection_url, echo, timeout)` ### tier 2: query components abstract base `BaseQueryComponents` with implementations: - `AsyncPostgresQueryComponents` - `AioSqliteQueryComponents` key methods: - `insert()` - dialect-specific INSERT statement - `uses_json_strings` property - whether JSON returns as strings - `cast_to_json()` / `build_json_object()` - JSON handling - `make_timestamp_intervals()` - time range generation - `set_state_id_on_inserted_flow_runs_statement()` - UPDATE syntax differences ### tier 3: ORM configuration abstract base `BaseORMConfiguration` with implementations: - `AsyncPostgresORMConfiguration` - points to `_migrations/versions/postgresql/` - `AioSqliteORMConfiguration` - points to `_migrations/versions/sqlite/` defines upsert columns for each entity type. ### main interface: PrefectDBInterface singleton per unique configuration combination. aggregates all three components: - database operations (create/drop, migrations) - session management - ORM model access (properties for all 39+ models) - connectivity checks ## backend selection entry point: `provide_database_interface()` in `dependencies.py` detection logic: 1. reads `PREFECT_API_DATABASE_CONNECTION_URL` setting 2. parses dialect from connection URL via `get_dialect()` 3. instantiates appropriate classes based on dialect name ```python if dialect.name == "postgresql": database_config = AsyncPostgresConfiguration(...) query_components = AsyncPostgresQueryComponents() orm = AsyncPostgresORMConfiguration() elif dialect.name == "sqlite": database_config = AioSqliteConfiguration(...) query_components = AioSqliteQueryComponents() orm = AioSqliteORMConfiguration() ``` ## configuration sources environment variables: - `PREFECT_API_DATABASE_CONNECTION_URL` - connection string (required) - `PREFECT_API_DATABASE_ECHO` - log SQL statements - `PREFECT_API_DATABASE_TIMEOUT` - statement timeout - `PREFECT_API_DATABASE_CONNECTION_TIMEOUT` - connection timeout - `PREFECT_API_DATABASE_SQLALCHEMY_POOL_SIZE` - pool size - `PREFECT_API_DATABASE_SQLALCHEMY_MAX_OVERFLOW` - overflow connections ## dependency injection two decorators: - `@inject_db` - provides db as kwarg - `@db_injector` - provides db as first positional arg (better type hints) manual: `provide_database_interface()`