prefect server in zig
1# query patterns
2
3## dialect differences
4
5### JSON handling
6
7| operation | PostgreSQL | SQLite |
8|-----------|------------|--------|
9| build object | `jsonb_build_object()` | `json_object()` |
10| aggregate array | `jsonb_agg()` | `json_group_array()` |
11| returns strings | no (`uses_json_strings = False`) | yes (`uses_json_strings = True`) |
12| type casting | direct JSON operations | requires `sa.func.json()` |
13
14### time series generation
15
16**PostgreSQL** - native function:
17```sql
18SELECT generate_series(start_time, end_time, interval '1 hour')
19```
20
21**SQLite** - recursive CTE (no native support):
22```sql
23WITH RECURSIVE timestamps AS (
24 SELECT start_time AS ts
25 UNION ALL
26 SELECT datetime(ts, '+1 hour') FROM timestamps WHERE ts < end_time
27)
28SELECT ts FROM timestamps
29```
30
31### UPDATE with subquery
32
33**PostgreSQL** - `UPDATE ... FROM` syntax:
34```sql
35UPDATE flow_run
36SET state_id = flow_run_state.id
37FROM flow_run_state
38WHERE flow_run_state.flow_run_id = flow_run.id
39 AND flow_run_state.timestamp = (SELECT MAX(timestamp) ...)
40```
41
42**SQLite** - correlated subquery:
43```sql
44UPDATE flow_run
45SET state_id = (
46 SELECT id FROM flow_run_state
47 WHERE flow_run_id = flow_run.id
48 ORDER BY timestamp DESC LIMIT 1
49)
50WHERE ...
51```
52
53### worker queue scheduling query
54
55complex queries use Jinja2 templates in `sql/` directory.
56
57**PostgreSQL** (`postgres/get-runs-from-worker-queues.sql.jinja`):
58- `CROSS JOIN LATERAL` for efficient nested limit
59- `FOR UPDATE SKIP LOCKED` for pessimistic row locking
60- `LEAST()` function for min comparison
61- `JSONB` operators (`->>'`)
62
63**SQLite** (`sqlite/get-runs-from-worker-queues.sql.jinja`):
64- nested CTE with `ROW_NUMBER()` window functions (no LATERAL)
65- `json_extract()` for JSON access
66- `MAX(0, ...)` instead of `GREATEST()`
67
68## transaction handling
69
70### PostgreSQL
71
72simple context manager:
73```python
74async with session.begin():
75 # operations
76```
77
78`with_for_update=True` is ignored (Postgres handles row locking differently).
79
80### SQLite
81
82uses context variable for transaction mode:
83```python
84# IMMEDIATE mode when with_for_update=True (acquires write lock immediately)
85# DEFERRED mode otherwise (delays locking until needed)
86token = SQLITE_BEGIN_MODE.set("IMMEDIATE" if with_for_update else "DEFERRED")
87try:
88 async with session.begin():
89 yield
90finally:
91 SQLITE_BEGIN_MODE.reset(token)
92```
93
94workaround for SQLite's implicit transaction handling.
95
96## connection pooling
97
98### PostgreSQL settings
99
100```python
101pool_pre_ping=True # verify connections on checkout
102pool_use_lifo=True # better availability after traffic spikes
103pool_size=N # from settings
104max_overflow=N # from settings
105```
106
107connect args:
108- `command_timeout` - statement execution timeout
109- `timeout` - connection timeout
110- `statement_cache_size` - prepared statement cache
111- `server_settings` - application_name, search_path
112- `ssl` - optional TLS configuration
113
114### SQLite settings
115
116for `:memory:` databases:
117```python
118poolclass=AsyncAdaptedQueuePool
119pool_size=1 # single connection required for in-memory
120max_overflow=0 # no overflow
121pool_recycle=-1 # don't recycle (would lose data)
122```
123
124PRAGMA configuration (on connect):
125```sql
126PRAGMA journal_mode = WAL; -- concurrent readers during writes
127PRAGMA foreign_keys = ON; -- enable FK constraints
128PRAGMA synchronous = NORMAL; -- balance safety/performance
129PRAGMA cache_size = 20000; -- aggressive caching
130PRAGMA busy_timeout = 60000; -- 60s lock wait (5s in tests)
131```
132
133### engine disposal
134
135scheduled on event loop shutdown via `add_event_loop_shutdown_callback()` to prevent connection leaks.
136
137## session management
138
139```python
140async with db.session_context(begin_transaction=True) as session:
141 # operations run in transaction
142```
143
144options:
145- `begin_transaction=True` - wrap in transaction
146- `with_for_update=True` - use IMMEDIATE mode (SQLite) or row locks (Postgres)
147
148## upsert patterns
149
150defined per model in `BaseORMConfiguration`:
151```python
152deployment_unique_upsert_columns = [Deployment.flow_id, Deployment.name]
153task_run_unique_upsert_columns = [TaskRun.flow_run_id, TaskRun.task_key, TaskRun.dynamic_key]
154```
155
156## key takeaways for our implementation
157
1581. **JSON strings**: SQLite returns JSON as strings, PostgreSQL returns native objects
1592. **no generate_series**: SQLite needs recursive CTEs for time ranges
1603. **no LATERAL joins**: SQLite needs window functions/CTEs instead
1614. **UPDATE syntax**: SQLite can't use `UPDATE ... FROM`, needs subqueries
1625. **transaction modes**: SQLite needs explicit IMMEDIATE for write locks
1636. **batch alterations**: SQLite can't ALTER columns in place, needs table recreation