prefect server in zig
1# database architecture
2
3## directory structure
4
5```
6database/
7├── __init__.py # public exports (PrefectDBInterface, etc.)
8├── configurations.py # connection/engine classes
9├── interface.py # main PrefectDBInterface singleton
10├── dependencies.py # dependency injection decorators
11├── orm_models.py # SQLAlchemy ORM models (~1600 lines)
12├── query_components.py # dialect-specific query builders (~1000 lines)
13├── alembic.ini # migration config
14├── alembic_commands.py # migration runners
15├── _migrations/
16│ ├── env.py # alembic environment
17│ ├── versions/
18│ │ ├── postgresql/ # 113 PostgreSQL migrations
19│ │ └── sqlite/ # 109 SQLite migrations
20│ └── MIGRATION-NOTES.md # migration history tracking
21└── sql/ # dialect-specific SQL templates
22 ├── postgres/
23 │ └── get-runs-from-worker-queues.sql.jinja
24 └── sqlite/
25 └── get-runs-from-worker-queues.sql.jinja
26```
27
28## three-tier abstraction
29
30### tier 1: database configuration
31
32abstract base `BaseDatabaseConfiguration` with implementations:
33- `AsyncPostgresConfiguration` - PostgreSQL async driver
34- `AioSqliteConfiguration` - SQLite async driver (aiosqlite)
35
36key methods:
37- `engine()` - returns async SQLAlchemy engine (cached per event loop)
38- `session()` - creates AsyncSession
39- `begin_transaction()` - database-specific transaction handling
40- `create_db()` / `drop_db()` - schema lifecycle
41
42engine caching key: `(event_loop, connection_url, echo, timeout)`
43
44### tier 2: query components
45
46abstract base `BaseQueryComponents` with implementations:
47- `AsyncPostgresQueryComponents`
48- `AioSqliteQueryComponents`
49
50key methods:
51- `insert()` - dialect-specific INSERT statement
52- `uses_json_strings` property - whether JSON returns as strings
53- `cast_to_json()` / `build_json_object()` - JSON handling
54- `make_timestamp_intervals()` - time range generation
55- `set_state_id_on_inserted_flow_runs_statement()` - UPDATE syntax differences
56
57### tier 3: ORM configuration
58
59abstract base `BaseORMConfiguration` with implementations:
60- `AsyncPostgresORMConfiguration` - points to `_migrations/versions/postgresql/`
61- `AioSqliteORMConfiguration` - points to `_migrations/versions/sqlite/`
62
63defines upsert columns for each entity type.
64
65### main interface: PrefectDBInterface
66
67singleton per unique configuration combination. aggregates all three components:
68- database operations (create/drop, migrations)
69- session management
70- ORM model access (properties for all 39+ models)
71- connectivity checks
72
73## backend selection
74
75entry point: `provide_database_interface()` in `dependencies.py`
76
77detection logic:
781. reads `PREFECT_API_DATABASE_CONNECTION_URL` setting
792. parses dialect from connection URL via `get_dialect()`
803. instantiates appropriate classes based on dialect name
81
82```python
83if dialect.name == "postgresql":
84 database_config = AsyncPostgresConfiguration(...)
85 query_components = AsyncPostgresQueryComponents()
86 orm = AsyncPostgresORMConfiguration()
87elif dialect.name == "sqlite":
88 database_config = AioSqliteConfiguration(...)
89 query_components = AioSqliteQueryComponents()
90 orm = AioSqliteORMConfiguration()
91```
92
93## configuration sources
94
95environment variables:
96- `PREFECT_API_DATABASE_CONNECTION_URL` - connection string (required)
97- `PREFECT_API_DATABASE_ECHO` - log SQL statements
98- `PREFECT_API_DATABASE_TIMEOUT` - statement timeout
99- `PREFECT_API_DATABASE_CONNECTION_TIMEOUT` - connection timeout
100- `PREFECT_API_DATABASE_SQLALCHEMY_POOL_SIZE` - pool size
101- `PREFECT_API_DATABASE_SQLALCHEMY_MAX_OVERFLOW` - overflow connections
102
103## dependency injection
104
105two decorators:
106- `@inject_db` - provides db as kwarg
107- `@db_injector` - provides db as first positional arg (better type hints)
108
109manual: `provide_database_interface()`