prefect server in zig
1# database architecture 2 3## directory structure 4 5``` 6database/ 7├── __init__.py # public exports (PrefectDBInterface, etc.) 8├── configurations.py # connection/engine classes 9├── interface.py # main PrefectDBInterface singleton 10├── dependencies.py # dependency injection decorators 11├── orm_models.py # SQLAlchemy ORM models (~1600 lines) 12├── query_components.py # dialect-specific query builders (~1000 lines) 13├── alembic.ini # migration config 14├── alembic_commands.py # migration runners 15├── _migrations/ 16│ ├── env.py # alembic environment 17│ ├── versions/ 18│ │ ├── postgresql/ # 113 PostgreSQL migrations 19│ │ └── sqlite/ # 109 SQLite migrations 20│ └── MIGRATION-NOTES.md # migration history tracking 21└── sql/ # dialect-specific SQL templates 22 ├── postgres/ 23 │ └── get-runs-from-worker-queues.sql.jinja 24 └── sqlite/ 25 └── get-runs-from-worker-queues.sql.jinja 26``` 27 28## three-tier abstraction 29 30### tier 1: database configuration 31 32abstract base `BaseDatabaseConfiguration` with implementations: 33- `AsyncPostgresConfiguration` - PostgreSQL async driver 34- `AioSqliteConfiguration` - SQLite async driver (aiosqlite) 35 36key methods: 37- `engine()` - returns async SQLAlchemy engine (cached per event loop) 38- `session()` - creates AsyncSession 39- `begin_transaction()` - database-specific transaction handling 40- `create_db()` / `drop_db()` - schema lifecycle 41 42engine caching key: `(event_loop, connection_url, echo, timeout)` 43 44### tier 2: query components 45 46abstract base `BaseQueryComponents` with implementations: 47- `AsyncPostgresQueryComponents` 48- `AioSqliteQueryComponents` 49 50key methods: 51- `insert()` - dialect-specific INSERT statement 52- `uses_json_strings` property - whether JSON returns as strings 53- `cast_to_json()` / `build_json_object()` - JSON handling 54- `make_timestamp_intervals()` - time range generation 55- `set_state_id_on_inserted_flow_runs_statement()` - UPDATE syntax differences 56 57### tier 3: ORM configuration 58 59abstract base `BaseORMConfiguration` with implementations: 60- `AsyncPostgresORMConfiguration` - points to `_migrations/versions/postgresql/` 61- `AioSqliteORMConfiguration` - points to `_migrations/versions/sqlite/` 62 63defines upsert columns for each entity type. 64 65### main interface: PrefectDBInterface 66 67singleton per unique configuration combination. aggregates all three components: 68- database operations (create/drop, migrations) 69- session management 70- ORM model access (properties for all 39+ models) 71- connectivity checks 72 73## backend selection 74 75entry point: `provide_database_interface()` in `dependencies.py` 76 77detection logic: 781. reads `PREFECT_API_DATABASE_CONNECTION_URL` setting 792. parses dialect from connection URL via `get_dialect()` 803. instantiates appropriate classes based on dialect name 81 82```python 83if dialect.name == "postgresql": 84 database_config = AsyncPostgresConfiguration(...) 85 query_components = AsyncPostgresQueryComponents() 86 orm = AsyncPostgresORMConfiguration() 87elif dialect.name == "sqlite": 88 database_config = AioSqliteConfiguration(...) 89 query_components = AioSqliteQueryComponents() 90 orm = AioSqliteORMConfiguration() 91``` 92 93## configuration sources 94 95environment variables: 96- `PREFECT_API_DATABASE_CONNECTION_URL` - connection string (required) 97- `PREFECT_API_DATABASE_ECHO` - log SQL statements 98- `PREFECT_API_DATABASE_TIMEOUT` - statement timeout 99- `PREFECT_API_DATABASE_CONNECTION_TIMEOUT` - connection timeout 100- `PREFECT_API_DATABASE_SQLALCHEMY_POOL_SIZE` - pool size 101- `PREFECT_API_DATABASE_SQLALCHEMY_MAX_OVERFLOW` - overflow connections 102 103## dependency injection 104 105two decorators: 106- `@inject_db` - provides db as kwarg 107- `@db_injector` - provides db as first positional arg (better type hints) 108 109manual: `provide_database_interface()`