prefect server in zig

query patterns#

dialect differences#

JSON handling#

operation PostgreSQL SQLite
build object jsonb_build_object() json_object()
aggregate array jsonb_agg() json_group_array()
returns strings no (uses_json_strings = False) yes (uses_json_strings = True)
type casting direct JSON operations requires sa.func.json()

time series generation#

PostgreSQL - native function:

SELECT generate_series(start_time, end_time, interval '1 hour')

SQLite - recursive CTE (no native support):

WITH RECURSIVE timestamps AS (
    SELECT start_time AS ts
    UNION ALL
    SELECT datetime(ts, '+1 hour') FROM timestamps WHERE ts < end_time
)
SELECT ts FROM timestamps

UPDATE with subquery#

PostgreSQL - UPDATE ... FROM syntax:

UPDATE flow_run
SET state_id = flow_run_state.id
FROM flow_run_state
WHERE flow_run_state.flow_run_id = flow_run.id
  AND flow_run_state.timestamp = (SELECT MAX(timestamp) ...)

SQLite - correlated subquery:

UPDATE flow_run
SET state_id = (
    SELECT id FROM flow_run_state
    WHERE flow_run_id = flow_run.id
    ORDER BY timestamp DESC LIMIT 1
)
WHERE ...

worker queue scheduling query#

complex queries use Jinja2 templates in sql/ directory.

PostgreSQL (postgres/get-runs-from-worker-queues.sql.jinja):

  • CROSS JOIN LATERAL for efficient nested limit
  • FOR UPDATE SKIP LOCKED for pessimistic row locking
  • LEAST() function for min comparison
  • JSONB operators (->>')

SQLite (sqlite/get-runs-from-worker-queues.sql.jinja):

  • nested CTE with ROW_NUMBER() window functions (no LATERAL)
  • json_extract() for JSON access
  • MAX(0, ...) instead of GREATEST()

transaction handling#

PostgreSQL#

simple context manager:

async with session.begin():
    # operations

with_for_update=True is ignored (Postgres handles row locking differently).

SQLite#

uses context variable for transaction mode:

# IMMEDIATE mode when with_for_update=True (acquires write lock immediately)
# DEFERRED mode otherwise (delays locking until needed)
token = SQLITE_BEGIN_MODE.set("IMMEDIATE" if with_for_update else "DEFERRED")
try:
    async with session.begin():
        yield
finally:
    SQLITE_BEGIN_MODE.reset(token)

workaround for SQLite's implicit transaction handling.

connection pooling#

PostgreSQL settings#

pool_pre_ping=True     # verify connections on checkout
pool_use_lifo=True     # better availability after traffic spikes
pool_size=N            # from settings
max_overflow=N         # from settings

connect args:

  • command_timeout - statement execution timeout
  • timeout - connection timeout
  • statement_cache_size - prepared statement cache
  • server_settings - application_name, search_path
  • ssl - optional TLS configuration

SQLite settings#

for :memory: databases:

poolclass=AsyncAdaptedQueuePool
pool_size=1            # single connection required for in-memory
max_overflow=0         # no overflow
pool_recycle=-1        # don't recycle (would lose data)

PRAGMA configuration (on connect):

PRAGMA journal_mode = WAL;           -- concurrent readers during writes
PRAGMA foreign_keys = ON;            -- enable FK constraints
PRAGMA synchronous = NORMAL;         -- balance safety/performance
PRAGMA cache_size = 20000;           -- aggressive caching
PRAGMA busy_timeout = 60000;         -- 60s lock wait (5s in tests)

engine disposal#

scheduled on event loop shutdown via add_event_loop_shutdown_callback() to prevent connection leaks.

session management#

async with db.session_context(begin_transaction=True) as session:
    # operations run in transaction

options:

  • begin_transaction=True - wrap in transaction
  • with_for_update=True - use IMMEDIATE mode (SQLite) or row locks (Postgres)

upsert patterns#

defined per model in BaseORMConfiguration:

deployment_unique_upsert_columns = [Deployment.flow_id, Deployment.name]
task_run_unique_upsert_columns = [TaskRun.flow_run_id, TaskRun.task_key, TaskRun.dynamic_key]

key takeaways for our implementation#

  1. JSON strings: SQLite returns JSON as strings, PostgreSQL returns native objects
  2. no generate_series: SQLite needs recursive CTEs for time ranges
  3. no LATERAL joins: SQLite needs window functions/CTEs instead
  4. UPDATE syntax: SQLite can't use UPDATE ... FROM, needs subqueries
  5. transaction modes: SQLite needs explicit IMMEDIATE for write locks
  6. batch alterations: SQLite can't ALTER columns in place, needs table recreation