prefect server in zig
1# query patterns 2 3## dialect differences 4 5### JSON handling 6 7| operation | PostgreSQL | SQLite | 8|-----------|------------|--------| 9| build object | `jsonb_build_object()` | `json_object()` | 10| aggregate array | `jsonb_agg()` | `json_group_array()` | 11| returns strings | no (`uses_json_strings = False`) | yes (`uses_json_strings = True`) | 12| type casting | direct JSON operations | requires `sa.func.json()` | 13 14### time series generation 15 16**PostgreSQL** - native function: 17```sql 18SELECT generate_series(start_time, end_time, interval '1 hour') 19``` 20 21**SQLite** - recursive CTE (no native support): 22```sql 23WITH RECURSIVE timestamps AS ( 24 SELECT start_time AS ts 25 UNION ALL 26 SELECT datetime(ts, '+1 hour') FROM timestamps WHERE ts < end_time 27) 28SELECT ts FROM timestamps 29``` 30 31### UPDATE with subquery 32 33**PostgreSQL** - `UPDATE ... FROM` syntax: 34```sql 35UPDATE flow_run 36SET state_id = flow_run_state.id 37FROM flow_run_state 38WHERE flow_run_state.flow_run_id = flow_run.id 39 AND flow_run_state.timestamp = (SELECT MAX(timestamp) ...) 40``` 41 42**SQLite** - correlated subquery: 43```sql 44UPDATE flow_run 45SET state_id = ( 46 SELECT id FROM flow_run_state 47 WHERE flow_run_id = flow_run.id 48 ORDER BY timestamp DESC LIMIT 1 49) 50WHERE ... 51``` 52 53### worker queue scheduling query 54 55complex queries use Jinja2 templates in `sql/` directory. 56 57**PostgreSQL** (`postgres/get-runs-from-worker-queues.sql.jinja`): 58- `CROSS JOIN LATERAL` for efficient nested limit 59- `FOR UPDATE SKIP LOCKED` for pessimistic row locking 60- `LEAST()` function for min comparison 61- `JSONB` operators (`->>'`) 62 63**SQLite** (`sqlite/get-runs-from-worker-queues.sql.jinja`): 64- nested CTE with `ROW_NUMBER()` window functions (no LATERAL) 65- `json_extract()` for JSON access 66- `MAX(0, ...)` instead of `GREATEST()` 67 68## transaction handling 69 70### PostgreSQL 71 72simple context manager: 73```python 74async with session.begin(): 75 # operations 76``` 77 78`with_for_update=True` is ignored (Postgres handles row locking differently). 79 80### SQLite 81 82uses context variable for transaction mode: 83```python 84# IMMEDIATE mode when with_for_update=True (acquires write lock immediately) 85# DEFERRED mode otherwise (delays locking until needed) 86token = SQLITE_BEGIN_MODE.set("IMMEDIATE" if with_for_update else "DEFERRED") 87try: 88 async with session.begin(): 89 yield 90finally: 91 SQLITE_BEGIN_MODE.reset(token) 92``` 93 94workaround for SQLite's implicit transaction handling. 95 96## connection pooling 97 98### PostgreSQL settings 99 100```python 101pool_pre_ping=True # verify connections on checkout 102pool_use_lifo=True # better availability after traffic spikes 103pool_size=N # from settings 104max_overflow=N # from settings 105``` 106 107connect args: 108- `command_timeout` - statement execution timeout 109- `timeout` - connection timeout 110- `statement_cache_size` - prepared statement cache 111- `server_settings` - application_name, search_path 112- `ssl` - optional TLS configuration 113 114### SQLite settings 115 116for `:memory:` databases: 117```python 118poolclass=AsyncAdaptedQueuePool 119pool_size=1 # single connection required for in-memory 120max_overflow=0 # no overflow 121pool_recycle=-1 # don't recycle (would lose data) 122``` 123 124PRAGMA configuration (on connect): 125```sql 126PRAGMA journal_mode = WAL; -- concurrent readers during writes 127PRAGMA foreign_keys = ON; -- enable FK constraints 128PRAGMA synchronous = NORMAL; -- balance safety/performance 129PRAGMA cache_size = 20000; -- aggressive caching 130PRAGMA busy_timeout = 60000; -- 60s lock wait (5s in tests) 131``` 132 133### engine disposal 134 135scheduled on event loop shutdown via `add_event_loop_shutdown_callback()` to prevent connection leaks. 136 137## session management 138 139```python 140async with db.session_context(begin_transaction=True) as session: 141 # operations run in transaction 142``` 143 144options: 145- `begin_transaction=True` - wrap in transaction 146- `with_for_update=True` - use IMMEDIATE mode (SQLite) or row locks (Postgres) 147 148## upsert patterns 149 150defined per model in `BaseORMConfiguration`: 151```python 152deployment_unique_upsert_columns = [Deployment.flow_id, Deployment.name] 153task_run_unique_upsert_columns = [TaskRun.flow_run_id, TaskRun.task_key, TaskRun.dynamic_key] 154``` 155 156## key takeaways for our implementation 157 1581. **JSON strings**: SQLite returns JSON as strings, PostgreSQL returns native objects 1592. **no generate_series**: SQLite needs recursive CTEs for time ranges 1603. **no LATERAL joins**: SQLite needs window functions/CTEs instead 1614. **UPDATE syntax**: SQLite can't use `UPDATE ... FROM`, needs subqueries 1625. **transaction modes**: SQLite needs explicit IMMEDIATE for write locks 1636. **batch alterations**: SQLite can't ALTER columns in place, needs table recreation