prefect server in zig
query patterns#
dialect differences#
JSON handling#
| operation | PostgreSQL | SQLite |
|---|---|---|
| build object | jsonb_build_object() |
json_object() |
| aggregate array | jsonb_agg() |
json_group_array() |
| returns strings | no (uses_json_strings = False) |
yes (uses_json_strings = True) |
| type casting | direct JSON operations | requires sa.func.json() |
time series generation#
PostgreSQL - native function:
SELECT generate_series(start_time, end_time, interval '1 hour')
SQLite - recursive CTE (no native support):
WITH RECURSIVE timestamps AS (
SELECT start_time AS ts
UNION ALL
SELECT datetime(ts, '+1 hour') FROM timestamps WHERE ts < end_time
)
SELECT ts FROM timestamps
UPDATE with subquery#
PostgreSQL - UPDATE ... FROM syntax:
UPDATE flow_run
SET state_id = flow_run_state.id
FROM flow_run_state
WHERE flow_run_state.flow_run_id = flow_run.id
AND flow_run_state.timestamp = (SELECT MAX(timestamp) ...)
SQLite - correlated subquery:
UPDATE flow_run
SET state_id = (
SELECT id FROM flow_run_state
WHERE flow_run_id = flow_run.id
ORDER BY timestamp DESC LIMIT 1
)
WHERE ...
worker queue scheduling query#
complex queries use Jinja2 templates in sql/ directory.
PostgreSQL (postgres/get-runs-from-worker-queues.sql.jinja):
CROSS JOIN LATERALfor efficient nested limitFOR UPDATE SKIP LOCKEDfor pessimistic row lockingLEAST()function for min comparisonJSONBoperators (->>')
SQLite (sqlite/get-runs-from-worker-queues.sql.jinja):
- nested CTE with
ROW_NUMBER()window functions (no LATERAL) json_extract()for JSON accessMAX(0, ...)instead ofGREATEST()
transaction handling#
PostgreSQL#
simple context manager:
async with session.begin():
# operations
with_for_update=True is ignored (Postgres handles row locking differently).
SQLite#
uses context variable for transaction mode:
# IMMEDIATE mode when with_for_update=True (acquires write lock immediately)
# DEFERRED mode otherwise (delays locking until needed)
token = SQLITE_BEGIN_MODE.set("IMMEDIATE" if with_for_update else "DEFERRED")
try:
async with session.begin():
yield
finally:
SQLITE_BEGIN_MODE.reset(token)
workaround for SQLite's implicit transaction handling.
connection pooling#
PostgreSQL settings#
pool_pre_ping=True # verify connections on checkout
pool_use_lifo=True # better availability after traffic spikes
pool_size=N # from settings
max_overflow=N # from settings
connect args:
command_timeout- statement execution timeouttimeout- connection timeoutstatement_cache_size- prepared statement cacheserver_settings- application_name, search_pathssl- optional TLS configuration
SQLite settings#
for :memory: databases:
poolclass=AsyncAdaptedQueuePool
pool_size=1 # single connection required for in-memory
max_overflow=0 # no overflow
pool_recycle=-1 # don't recycle (would lose data)
PRAGMA configuration (on connect):
PRAGMA journal_mode = WAL; -- concurrent readers during writes
PRAGMA foreign_keys = ON; -- enable FK constraints
PRAGMA synchronous = NORMAL; -- balance safety/performance
PRAGMA cache_size = 20000; -- aggressive caching
PRAGMA busy_timeout = 60000; -- 60s lock wait (5s in tests)
engine disposal#
scheduled on event loop shutdown via add_event_loop_shutdown_callback() to prevent connection leaks.
session management#
async with db.session_context(begin_transaction=True) as session:
# operations run in transaction
options:
begin_transaction=True- wrap in transactionwith_for_update=True- use IMMEDIATE mode (SQLite) or row locks (Postgres)
upsert patterns#
defined per model in BaseORMConfiguration:
deployment_unique_upsert_columns = [Deployment.flow_id, Deployment.name]
task_run_unique_upsert_columns = [TaskRun.flow_run_id, TaskRun.task_key, TaskRun.dynamic_key]
key takeaways for our implementation#
- JSON strings: SQLite returns JSON as strings, PostgreSQL returns native objects
- no generate_series: SQLite needs recursive CTEs for time ranges
- no LATERAL joins: SQLite needs window functions/CTEs instead
- UPDATE syntax: SQLite can't use
UPDATE ... FROM, needs subqueries - transaction modes: SQLite needs explicit IMMEDIATE for write locks
- batch alterations: SQLite can't ALTER columns in place, needs table recreation