# query patterns ## dialect differences ### JSON handling | operation | PostgreSQL | SQLite | |-----------|------------|--------| | build object | `jsonb_build_object()` | `json_object()` | | aggregate array | `jsonb_agg()` | `json_group_array()` | | returns strings | no (`uses_json_strings = False`) | yes (`uses_json_strings = True`) | | type casting | direct JSON operations | requires `sa.func.json()` | ### time series generation **PostgreSQL** - native function: ```sql SELECT generate_series(start_time, end_time, interval '1 hour') ``` **SQLite** - recursive CTE (no native support): ```sql WITH RECURSIVE timestamps AS ( SELECT start_time AS ts UNION ALL SELECT datetime(ts, '+1 hour') FROM timestamps WHERE ts < end_time ) SELECT ts FROM timestamps ``` ### UPDATE with subquery **PostgreSQL** - `UPDATE ... FROM` syntax: ```sql UPDATE flow_run SET state_id = flow_run_state.id FROM flow_run_state WHERE flow_run_state.flow_run_id = flow_run.id AND flow_run_state.timestamp = (SELECT MAX(timestamp) ...) ``` **SQLite** - correlated subquery: ```sql UPDATE flow_run SET state_id = ( SELECT id FROM flow_run_state WHERE flow_run_id = flow_run.id ORDER BY timestamp DESC LIMIT 1 ) WHERE ... ``` ### worker queue scheduling query complex queries use Jinja2 templates in `sql/` directory. **PostgreSQL** (`postgres/get-runs-from-worker-queues.sql.jinja`): - `CROSS JOIN LATERAL` for efficient nested limit - `FOR UPDATE SKIP LOCKED` for pessimistic row locking - `LEAST()` function for min comparison - `JSONB` operators (`->>'`) **SQLite** (`sqlite/get-runs-from-worker-queues.sql.jinja`): - nested CTE with `ROW_NUMBER()` window functions (no LATERAL) - `json_extract()` for JSON access - `MAX(0, ...)` instead of `GREATEST()` ## transaction handling ### PostgreSQL simple context manager: ```python async with session.begin(): # operations ``` `with_for_update=True` is ignored (Postgres handles row locking differently). ### SQLite uses context variable for transaction mode: ```python # IMMEDIATE mode when with_for_update=True (acquires write lock immediately) # DEFERRED mode otherwise (delays locking until needed) token = SQLITE_BEGIN_MODE.set("IMMEDIATE" if with_for_update else "DEFERRED") try: async with session.begin(): yield finally: SQLITE_BEGIN_MODE.reset(token) ``` workaround for SQLite's implicit transaction handling. ## connection pooling ### PostgreSQL settings ```python pool_pre_ping=True # verify connections on checkout pool_use_lifo=True # better availability after traffic spikes pool_size=N # from settings max_overflow=N # from settings ``` connect args: - `command_timeout` - statement execution timeout - `timeout` - connection timeout - `statement_cache_size` - prepared statement cache - `server_settings` - application_name, search_path - `ssl` - optional TLS configuration ### SQLite settings for `:memory:` databases: ```python poolclass=AsyncAdaptedQueuePool pool_size=1 # single connection required for in-memory max_overflow=0 # no overflow pool_recycle=-1 # don't recycle (would lose data) ``` PRAGMA configuration (on connect): ```sql PRAGMA journal_mode = WAL; -- concurrent readers during writes PRAGMA foreign_keys = ON; -- enable FK constraints PRAGMA synchronous = NORMAL; -- balance safety/performance PRAGMA cache_size = 20000; -- aggressive caching PRAGMA busy_timeout = 60000; -- 60s lock wait (5s in tests) ``` ### engine disposal scheduled on event loop shutdown via `add_event_loop_shutdown_callback()` to prevent connection leaks. ## session management ```python async with db.session_context(begin_transaction=True) as session: # operations run in transaction ``` options: - `begin_transaction=True` - wrap in transaction - `with_for_update=True` - use IMMEDIATE mode (SQLite) or row locks (Postgres) ## upsert patterns defined per model in `BaseORMConfiguration`: ```python deployment_unique_upsert_columns = [Deployment.flow_id, Deployment.name] task_run_unique_upsert_columns = [TaskRun.flow_run_id, TaskRun.task_key, TaskRun.dynamic_key] ``` ## key takeaways for our implementation 1. **JSON strings**: SQLite returns JSON as strings, PostgreSQL returns native objects 2. **no generate_series**: SQLite needs recursive CTEs for time ranges 3. **no LATERAL joins**: SQLite needs window functions/CTEs instead 4. **UPDATE syntax**: SQLite can't use `UPDATE ... FROM`, needs subqueries 5. **transaction modes**: SQLite needs explicit IMMEDIATE for write locks 6. **batch alterations**: SQLite can't ALTER columns in place, needs table recreation