prefect server in zig

add python prefect server database reference docs

documents how Python prefect server implements database layer:
- architecture: three-tier abstraction (config, queries, ORM)
- orm-models: SQLAlchemy models, custom types, relationships
- migrations: alembic setup, dialect-specific patterns
- query-patterns: JSON handling, time series, transactions
- zig-compat-notes: comparison with our implementation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+643
+4
docs/README.md
··· 14 14 15 15 - [scratch/](./scratch/) - patterns and ideas being developed 16 16 17 + ## reference 18 + 19 + - [python-reference/](./python-reference/) - how the Python prefect server implements its database layer (SQLAlchemy, migrations, dialect differences) 20 + 17 21 ## see also 18 22 19 23 - [roadmap](../ROADMAP.md) - implementation status vs python prefect
+27
docs/python-reference/README.md
··· 1 + # python prefect server database reference 2 + 3 + reference documentation for understanding how the Python prefect server implements its database layer. useful for ensuring our Zig implementation stores data compatibly. 4 + 5 + ## contents 6 + 7 + - [architecture.md](./architecture.md) - overall structure, abstraction layers, backend selection 8 + - [orm-models.md](./orm-models.md) - SQLAlchemy models, relationships, custom types 9 + - [migrations.md](./migrations.md) - alembic setup, dialect-specific migrations 10 + - [query-patterns.md](./query-patterns.md) - dialect differences, transaction handling, connection pooling 11 + - [zig-compat-notes.md](./zig-compat-notes.md) - comparison with our zig implementation, gaps to address 12 + 13 + ## key insight 14 + 15 + the Python implementation uses a three-tier abstraction: 16 + 17 + 1. **DatabaseConfiguration** - connection/engine management (AsyncPostgresConfiguration, AioSqliteConfiguration) 18 + 2. **QueryComponents** - dialect-specific SQL generation (json handling, time series, updates) 19 + 3. **ORMConfiguration** - migration paths, upsert columns 20 + 21 + this allows the same codebase to work with both SQLite and PostgreSQL by swapping implementations at runtime based on connection URL. 22 + 23 + ## source location 24 + 25 + ``` 26 + ~/github.com/prefecthq/prefect/src/prefect/server/database/ 27 + ```
+109
docs/python-reference/architecture.md
··· 1 + # database architecture 2 + 3 + ## directory structure 4 + 5 + ``` 6 + database/ 7 + ├── __init__.py # public exports (PrefectDBInterface, etc.) 8 + ├── configurations.py # connection/engine classes 9 + ├── interface.py # main PrefectDBInterface singleton 10 + ├── dependencies.py # dependency injection decorators 11 + ├── orm_models.py # SQLAlchemy ORM models (~1600 lines) 12 + ├── query_components.py # dialect-specific query builders (~1000 lines) 13 + ├── alembic.ini # migration config 14 + ├── alembic_commands.py # migration runners 15 + ├── _migrations/ 16 + │ ├── env.py # alembic environment 17 + │ ├── versions/ 18 + │ │ ├── postgresql/ # 113 PostgreSQL migrations 19 + │ │ └── sqlite/ # 109 SQLite migrations 20 + │ └── MIGRATION-NOTES.md # migration history tracking 21 + └── sql/ # dialect-specific SQL templates 22 + ├── postgres/ 23 + │ └── get-runs-from-worker-queues.sql.jinja 24 + └── sqlite/ 25 + └── get-runs-from-worker-queues.sql.jinja 26 + ``` 27 + 28 + ## three-tier abstraction 29 + 30 + ### tier 1: database configuration 31 + 32 + abstract base `BaseDatabaseConfiguration` with implementations: 33 + - `AsyncPostgresConfiguration` - PostgreSQL async driver 34 + - `AioSqliteConfiguration` - SQLite async driver (aiosqlite) 35 + 36 + key methods: 37 + - `engine()` - returns async SQLAlchemy engine (cached per event loop) 38 + - `session()` - creates AsyncSession 39 + - `begin_transaction()` - database-specific transaction handling 40 + - `create_db()` / `drop_db()` - schema lifecycle 41 + 42 + engine caching key: `(event_loop, connection_url, echo, timeout)` 43 + 44 + ### tier 2: query components 45 + 46 + abstract base `BaseQueryComponents` with implementations: 47 + - `AsyncPostgresQueryComponents` 48 + - `AioSqliteQueryComponents` 49 + 50 + key methods: 51 + - `insert()` - dialect-specific INSERT statement 52 + - `uses_json_strings` property - whether JSON returns as strings 53 + - `cast_to_json()` / `build_json_object()` - JSON handling 54 + - `make_timestamp_intervals()` - time range generation 55 + - `set_state_id_on_inserted_flow_runs_statement()` - UPDATE syntax differences 56 + 57 + ### tier 3: ORM configuration 58 + 59 + abstract base `BaseORMConfiguration` with implementations: 60 + - `AsyncPostgresORMConfiguration` - points to `_migrations/versions/postgresql/` 61 + - `AioSqliteORMConfiguration` - points to `_migrations/versions/sqlite/` 62 + 63 + defines upsert columns for each entity type. 64 + 65 + ### main interface: PrefectDBInterface 66 + 67 + singleton per unique configuration combination. aggregates all three components: 68 + - database operations (create/drop, migrations) 69 + - session management 70 + - ORM model access (properties for all 39+ models) 71 + - connectivity checks 72 + 73 + ## backend selection 74 + 75 + entry point: `provide_database_interface()` in `dependencies.py` 76 + 77 + detection logic: 78 + 1. reads `PREFECT_API_DATABASE_CONNECTION_URL` setting 79 + 2. parses dialect from connection URL via `get_dialect()` 80 + 3. instantiates appropriate classes based on dialect name 81 + 82 + ```python 83 + if dialect.name == "postgresql": 84 + database_config = AsyncPostgresConfiguration(...) 85 + query_components = AsyncPostgresQueryComponents() 86 + orm = AsyncPostgresORMConfiguration() 87 + elif dialect.name == "sqlite": 88 + database_config = AioSqliteConfiguration(...) 89 + query_components = AioSqliteQueryComponents() 90 + orm = AioSqliteORMConfiguration() 91 + ``` 92 + 93 + ## configuration sources 94 + 95 + environment variables: 96 + - `PREFECT_API_DATABASE_CONNECTION_URL` - connection string (required) 97 + - `PREFECT_API_DATABASE_ECHO` - log SQL statements 98 + - `PREFECT_API_DATABASE_TIMEOUT` - statement timeout 99 + - `PREFECT_API_DATABASE_CONNECTION_TIMEOUT` - connection timeout 100 + - `PREFECT_API_DATABASE_SQLALCHEMY_POOL_SIZE` - pool size 101 + - `PREFECT_API_DATABASE_SQLALCHEMY_MAX_OVERFLOW` - overflow connections 102 + 103 + ## dependency injection 104 + 105 + two decorators: 106 + - `@inject_db` - provides db as kwarg 107 + - `@db_injector` - provides db as first positional arg (better type hints) 108 + 109 + manual: `provide_database_interface()`
+94
docs/python-reference/migrations.md
··· 1 + # migrations 2 + 3 + ## alembic configuration 4 + 5 + - **location**: `_migrations/` 6 + - **config file**: `alembic.ini` 7 + - **filename format**: `YYYY_MM_DD_HHMMSS_REV_SLUG` 8 + - **template**: `script.py.mako` supports dialect-specific code via `${dialect}` variable 9 + 10 + ## separate migration chains 11 + 12 + key design decision: **separate migration files per database dialect** 13 + 14 + ``` 15 + _migrations/versions/ 16 + ├── postgresql/ # 113 PostgreSQL migrations 17 + └── sqlite/ # 109 SQLite migrations 18 + ``` 19 + 20 + benefits: 21 + - different revision IDs but same semantic versioning timestamp 22 + - prevents cross-dialect conflicts 23 + - enables dialect-specific optimizations 24 + 25 + ## migration notes tracking 26 + 27 + `MIGRATION-NOTES.md` documents schema changes with parallel revision IDs: 28 + 29 + ``` 30 + # Add `deployment_version` table 31 + SQLite: `bbca16f6f218` 32 + Postgres: `06b7c293bc09` 33 + 34 + # Add `labels` column to Flow, FlowRun, TaskRun, and Deployment 35 + SQLite: `5952a5498b51` 36 + Postgres: `68a44144428d` 37 + ``` 38 + 39 + creates natural merge conflicts if migrations diverge, keeping developers synchronized. 40 + 41 + ## dialect-specific patterns 42 + 43 + ### PostgreSQL migrations 44 + 45 + - uses native `postgresql.ENUM()` types 46 + - direct DDL operations (no batch mode needed) 47 + - supports `postgresql_using="gin"` for full-text search indexes 48 + - supports `postgresql_include` and `postgresql_where` clauses 49 + 50 + example: 51 + ```python 52 + from sqlalchemy.dialects import postgresql 53 + 54 + deployment_status = postgresql.ENUM("READY", "NOT_READY", name="deployment_status") 55 + deployment_status.create(op.get_bind()) 56 + op.add_column("deployment", sa.Column("status", deployment_status, ...)) 57 + ``` 58 + 59 + ### SQLite migrations 60 + 61 + - **batch mode required**: `render_as_batch=True` (SQLite requires table recreation for ALTER) 62 + - **PRAGMA foreign_keys management**: disabled during migrations 63 + - **transaction mode**: uses `SQLITE_BEGIN_MODE` context variable set to "IMMEDIATE" 64 + - **enum handling**: uses `sa.Enum()` which SQLite converts to VARCHAR 65 + 66 + example: 67 + ```python 68 + with op.batch_alter_table("deployment", schema=None) as batch_op: 69 + batch_op.add_column( 70 + sa.Column("status", sa.Enum("READY", "NOT_READY", name="deployment_status"), ...) 71 + ) 72 + ``` 73 + 74 + ## auto-generation logic (env.py) 75 + 76 + `include_object()` function filters out: 77 + - dialect-specific indexes that don't apply 78 + - trigram/GIN indexes on SQLite 79 + - functional indexes (asc/desc variants) 80 + - enum column mismatches between reflection and ORM 81 + 82 + ## transaction handling 83 + 84 + - `transaction_per_migration=True` - each migration runs in its own transaction 85 + - SQLite disables foreign key constraints during migration 86 + - context variables manage SQLite transaction mode 87 + 88 + ## thread safety 89 + 90 + `ALEMBIC_LOCK` prevents concurrent migration execution. 91 + 92 + ## async support 93 + 94 + uses `run_async_from_worker_thread()` to run async migrations.
+182
docs/python-reference/orm-models.md
··· 1 + # ORM models 2 + 3 + ## base class 4 + 5 + all models inherit from `Base` (DeclarativeBase): 6 + 7 + - **auto table names**: CamelCase → snake_case (e.g., `FlowRun` → `flow_run`) 8 + - **universal columns**: 9 + - `id`: UUID primary key with server-side generation (`GenerateUUID()`) 10 + - `created`: timestamp with UTC timezone and server default 11 + - `updated`: indexed timestamp with `onupdate=sa.func.now()` 12 + - **eager defaults**: `eager_defaults=True` fetches server defaults after INSERT without extra queries 13 + 14 + ## custom type decorators 15 + 16 + ### UUID 17 + - **PostgreSQL**: native `postgresql.UUID()` 18 + - **SQLite**: `CHAR(36)` string representation 19 + 20 + ### Timestamp 21 + - **PostgreSQL**: `TIMESTAMP(timezone=True)` 22 + - **SQLite**: `DATETIME()` naive, stored as UTC 23 + - always returns UTC timezone on read 24 + 25 + ### JSON 26 + - **PostgreSQL**: `postgresql.JSONB(none_as_null=True)` - binary, indexed, queryable 27 + - **SQLite**: `sqlite.JSON(none_as_null=True)` 28 + - sanitizes special floats (NaN, Infinity) unsupported by PostgreSQL 29 + 30 + ### Pydantic 31 + - wrapper over JSON that auto-serializes/deserializes Pydantic models 32 + - used for: `StateDetails`, `FlowRunPolicy`, `CreatedBy`, etc. 33 + 34 + ### GenerateUUID (server default) 35 + - **PostgreSQL**: `GEN_RANDOM_UUID()` 36 + - **SQLite**: complex `hex(randomblob())` construction mimicking UUID v4 37 + 38 + ## key models 39 + 40 + ### core execution 41 + 42 + | model | key fields | notes | 43 + |-------|------------|-------| 44 + | Flow | name, tags, labels | unique name constraint | 45 + | FlowRun | flow_id, state_type, state_name, parameters, empirical_policy | inherits from `Run` mixin | 46 + | TaskRun | flow_run_id, task_key, dynamic_key, cache_key | inherits from `Run` mixin | 47 + | FlowRunState | flow_run_id, type, name, timestamp, state_details | one-to-many with FlowRun | 48 + | TaskRunState | task_run_id, type, name, timestamp, state_details | one-to-many with TaskRun | 49 + 50 + ### Run mixin (shared by FlowRun and TaskRun) 51 + 52 + ``` 53 + state_type: StateType enum 54 + state_name: string 55 + state_timestamp: datetime 56 + start_time: datetime (nullable) 57 + end_time: datetime (nullable) 58 + expected_start_time: datetime (nullable) 59 + total_run_time: float (seconds) 60 + run_count: int 61 + ``` 62 + 63 + ### deployments 64 + 65 + | model | key fields | notes | 66 + |-------|------------|-------| 67 + | Deployment | flow_id, name, status, concurrency_limit | unique (flow_id, name) | 68 + | DeploymentSchedule | deployment_id, schedule, active | cron/interval schedules | 69 + 70 + ### work management 71 + 72 + | model | key fields | notes | 73 + |-------|------------|-------| 74 + | WorkPool | name, type, status | pool infrastructure config | 75 + | WorkQueue | work_pool_id, name, priority, concurrency_limit | routes runs to workers | 76 + | Worker | work_pool_id, name, last_heartbeat_time | heartbeat tracking | 77 + 78 + ### blocks 79 + 80 + | model | key fields | notes | 81 + |-------|------------|-------| 82 + | BlockType | name, slug | schema definitions | 83 + | BlockSchema | block_type_id, checksum, capabilities | versioned schemas | 84 + | BlockDocument | block_type_id, block_schema_id, name, data | encrypted credential storage | 85 + 86 + ### artifacts 87 + 88 + | model | key fields | notes | 89 + |-------|------------|-------| 90 + | Artifact | key, type, data, flow_run_id, task_run_id | flexible result storage | 91 + | ArtifactCollection | key, latest_id | tracks latest artifact per key | 92 + 93 + ## relationship patterns 94 + 95 + ### one-to-many (standard) 96 + ```python 97 + # Flow → FlowRuns 98 + flow_runs: Mapped[list["FlowRun"]] = relationship( 99 + back_populates="flow", lazy="raise" 100 + ) 101 + ``` 102 + 103 + ### lazy loading strategies 104 + - `lazy="raise"` - prevents implicit queries (catches N+1 problems) 105 + - `lazy="selectin"` - eager loads in separate SELECT with IN clause 106 + 107 + ### foreign keys 108 + ```python 109 + # cascade delete 110 + flow_id = mapped_column(sa.ForeignKey("flow.id", ondelete="cascade")) 111 + 112 + # nullable with soft delete 113 + parent_task_run_id = mapped_column( 114 + sa.ForeignKey("task_run.id", ondelete="SET NULL", use_alter=True) 115 + ) 116 + ``` 117 + 118 + `use_alter=True` defers FK constraint creation (handles circular dependencies) 119 + 120 + ## state tracking pattern 121 + 122 + models maintain BOTH: 123 + 1. **state history** - via many-to-one relationship to State table 124 + 2. **current state pointer** - via optional FK with `use_alter=True` 125 + 126 + unique constraint on `(run_id, timestamp DESC)` ensures one state per instant. 127 + 128 + ## JSON field patterns 129 + 130 + ```python 131 + # plain JSON with defaults 132 + tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list) 133 + parameters: Mapped[dict] = mapped_column(JSON, server_default="{}", default=dict) 134 + 135 + # Pydantic-backed JSON 136 + empirical_policy: Mapped[FlowRunPolicy] = mapped_column( 137 + Pydantic(FlowRunPolicy), 138 + server_default="{}", 139 + default=FlowRunPolicy, 140 + ) 141 + ``` 142 + 143 + ### FlowRunPolicy schema 144 + 145 + the `empirical_policy` column stores retry/pause behavior as JSON: 146 + 147 + | field | type | default | description | 148 + |-------|------|---------|-------------| 149 + | max_retries | int | 0 | **deprecated** - use `retries` | 150 + | retry_delay_seconds | float | 0 | **deprecated** - use `retry_delay` | 151 + | retries | int? | null | number of retries allowed | 152 + | retry_delay | int? | null | delay between retries (seconds) | 153 + | pause_keys | set[str]? | [] | tracks pauses observed | 154 + | resuming | bool? | false | indicates resuming from pause | 155 + | retry_type | "in_process" \| "reschedule"? | null | retry execution mode | 156 + 157 + this is the key schema for implementing RetryFailedFlows orchestration rule. 158 + 159 + ## PostgreSQL-specific indexes 160 + 161 + ```python 162 + # GIN index for JSON operations 163 + sa.Index("ix_events__related_resource_ids_gin", "related_resource_ids", 164 + postgresql_using="gin").ddl_if(dialect="postgresql") 165 + 166 + # partial index (both dialects) 167 + sa.Index("ix_flow_run__state_type_scheduled", 168 + cls.deployment_id, cls.auto_scheduled, cls.next_scheduled_start_time, 169 + postgresql_where=cls.state_type == StateType.SCHEDULED, 170 + sqlite_where=cls.state_type == StateType.SCHEDULED, 171 + ) 172 + ``` 173 + 174 + ## default value pattern 175 + 176 + both server and Python defaults for safety: 177 + ```python 178 + created: Mapped[DateTime] = mapped_column( 179 + server_default=sa.func.now(), # database-side 180 + default=lambda: now("UTC") # ORM-side 181 + ) 182 + ```
+163
docs/python-reference/query-patterns.md
··· 1 + # query patterns 2 + 3 + ## dialect differences 4 + 5 + ### JSON handling 6 + 7 + | operation | PostgreSQL | SQLite | 8 + |-----------|------------|--------| 9 + | build object | `jsonb_build_object()` | `json_object()` | 10 + | aggregate array | `jsonb_agg()` | `json_group_array()` | 11 + | returns strings | no (`uses_json_strings = False`) | yes (`uses_json_strings = True`) | 12 + | type casting | direct JSON operations | requires `sa.func.json()` | 13 + 14 + ### time series generation 15 + 16 + **PostgreSQL** - native function: 17 + ```sql 18 + SELECT generate_series(start_time, end_time, interval '1 hour') 19 + ``` 20 + 21 + **SQLite** - recursive CTE (no native support): 22 + ```sql 23 + WITH RECURSIVE timestamps AS ( 24 + SELECT start_time AS ts 25 + UNION ALL 26 + SELECT datetime(ts, '+1 hour') FROM timestamps WHERE ts < end_time 27 + ) 28 + SELECT ts FROM timestamps 29 + ``` 30 + 31 + ### UPDATE with subquery 32 + 33 + **PostgreSQL** - `UPDATE ... FROM` syntax: 34 + ```sql 35 + UPDATE flow_run 36 + SET state_id = flow_run_state.id 37 + FROM flow_run_state 38 + WHERE flow_run_state.flow_run_id = flow_run.id 39 + AND flow_run_state.timestamp = (SELECT MAX(timestamp) ...) 40 + ``` 41 + 42 + **SQLite** - correlated subquery: 43 + ```sql 44 + UPDATE flow_run 45 + SET state_id = ( 46 + SELECT id FROM flow_run_state 47 + WHERE flow_run_id = flow_run.id 48 + ORDER BY timestamp DESC LIMIT 1 49 + ) 50 + WHERE ... 51 + ``` 52 + 53 + ### worker queue scheduling query 54 + 55 + complex queries use Jinja2 templates in `sql/` directory. 56 + 57 + **PostgreSQL** (`postgres/get-runs-from-worker-queues.sql.jinja`): 58 + - `CROSS JOIN LATERAL` for efficient nested limit 59 + - `FOR UPDATE SKIP LOCKED` for pessimistic row locking 60 + - `LEAST()` function for min comparison 61 + - `JSONB` operators (`->>'`) 62 + 63 + **SQLite** (`sqlite/get-runs-from-worker-queues.sql.jinja`): 64 + - nested CTE with `ROW_NUMBER()` window functions (no LATERAL) 65 + - `json_extract()` for JSON access 66 + - `MAX(0, ...)` instead of `GREATEST()` 67 + 68 + ## transaction handling 69 + 70 + ### PostgreSQL 71 + 72 + simple context manager: 73 + ```python 74 + async with session.begin(): 75 + # operations 76 + ``` 77 + 78 + `with_for_update=True` is ignored (Postgres handles row locking differently). 79 + 80 + ### SQLite 81 + 82 + uses context variable for transaction mode: 83 + ```python 84 + # IMMEDIATE mode when with_for_update=True (acquires write lock immediately) 85 + # DEFERRED mode otherwise (delays locking until needed) 86 + token = SQLITE_BEGIN_MODE.set("IMMEDIATE" if with_for_update else "DEFERRED") 87 + try: 88 + async with session.begin(): 89 + yield 90 + finally: 91 + SQLITE_BEGIN_MODE.reset(token) 92 + ``` 93 + 94 + workaround for SQLite's implicit transaction handling. 95 + 96 + ## connection pooling 97 + 98 + ### PostgreSQL settings 99 + 100 + ```python 101 + pool_pre_ping=True # verify connections on checkout 102 + pool_use_lifo=True # better availability after traffic spikes 103 + pool_size=N # from settings 104 + max_overflow=N # from settings 105 + ``` 106 + 107 + connect args: 108 + - `command_timeout` - statement execution timeout 109 + - `timeout` - connection timeout 110 + - `statement_cache_size` - prepared statement cache 111 + - `server_settings` - application_name, search_path 112 + - `ssl` - optional TLS configuration 113 + 114 + ### SQLite settings 115 + 116 + for `:memory:` databases: 117 + ```python 118 + poolclass=AsyncAdaptedQueuePool 119 + pool_size=1 # single connection required for in-memory 120 + max_overflow=0 # no overflow 121 + pool_recycle=-1 # don't recycle (would lose data) 122 + ``` 123 + 124 + PRAGMA configuration (on connect): 125 + ```sql 126 + PRAGMA journal_mode = WAL; -- concurrent readers during writes 127 + PRAGMA foreign_keys = ON; -- enable FK constraints 128 + PRAGMA synchronous = NORMAL; -- balance safety/performance 129 + PRAGMA cache_size = 20000; -- aggressive caching 130 + PRAGMA busy_timeout = 60000; -- 60s lock wait (5s in tests) 131 + ``` 132 + 133 + ### engine disposal 134 + 135 + scheduled on event loop shutdown via `add_event_loop_shutdown_callback()` to prevent connection leaks. 136 + 137 + ## session management 138 + 139 + ```python 140 + async with db.session_context(begin_transaction=True) as session: 141 + # operations run in transaction 142 + ``` 143 + 144 + options: 145 + - `begin_transaction=True` - wrap in transaction 146 + - `with_for_update=True` - use IMMEDIATE mode (SQLite) or row locks (Postgres) 147 + 148 + ## upsert patterns 149 + 150 + defined per model in `BaseORMConfiguration`: 151 + ```python 152 + deployment_unique_upsert_columns = [Deployment.flow_id, Deployment.name] 153 + task_run_unique_upsert_columns = [TaskRun.flow_run_id, TaskRun.task_key, TaskRun.dynamic_key] 154 + ``` 155 + 156 + ## key takeaways for our implementation 157 + 158 + 1. **JSON strings**: SQLite returns JSON as strings, PostgreSQL returns native objects 159 + 2. **no generate_series**: SQLite needs recursive CTEs for time ranges 160 + 3. **no LATERAL joins**: SQLite needs window functions/CTEs instead 161 + 4. **UPDATE syntax**: SQLite can't use `UPDATE ... FROM`, needs subqueries 162 + 5. **transaction modes**: SQLite needs explicit IMMEDIATE for write locks 163 + 6. **batch alterations**: SQLite can't ALTER columns in place, needs table recreation
+64
docs/python-reference/zig-compat-notes.md
··· 1 + # zig compatibility notes 2 + 3 + comparison of zig implementation with python prefect server. 4 + 5 + ## RetryFailedFlows 6 + 7 + ### matching behavior ✓ 8 + 9 + | aspect | python | zig | 10 + |--------|--------|-----| 11 + | from_states | {RUNNING} | StateTypeSet.init(&.{.RUNNING}) | 12 + | to_states | {FAILED} | StateTypeSet.init(&.{.FAILED}) | 13 + | retry check | `retries is None or run_count > retries` | `ctx.retries orelse return; if (ctx.run_count > max_retries) return;` | 14 + | scheduled time | `now + timedelta(seconds=retry_delay or 0)` | `now_us + delay_seconds * 1_000_000` | 15 + | retry state | `AwaitingRetry` (SCHEDULED) | `SCHEDULED` with name "AwaitingRetry" | 16 + | response | `reject_transition(state, reason="Retrying")` | `REJECT` status | 17 + 18 + ### gaps to address later 19 + 20 + 1. **empirical_policy updates during retry** 21 + - python sets `retry_type = "in_process"`, `resuming = False`, `pause_keys = set()` on retry 22 + - python clears `retry_type = None` when retries exhausted 23 + - zig: not yet implemented (policy stored but not mutated) 24 + 25 + 2. **old client compatibility** (API < 0.8.3) 26 + - python resets failed task runs to AwaitingRetry state 27 + - zig: not needed for new clients 28 + 29 + ## schema compatibility 30 + 31 + ### empirical_policy JSON 32 + 33 + python schema: 34 + ```json 35 + { 36 + "retries": int | null, 37 + "retry_delay": int | null, 38 + "pause_keys": [], 39 + "resuming": false, 40 + "retry_type": null | "in_process" | "reschedule", 41 + "max_retries": 0, // deprecated 42 + "retry_delay_seconds": 0 // deprecated 43 + } 44 + ``` 45 + 46 + zig reads: `retries`, `retry_delay` from this JSON 47 + zig stores: as TEXT (sqlite) or JSONB (postgres) - compatible 48 + 49 + ### storage types 50 + 51 + | column | postgres python | postgres zig | sqlite python | sqlite zig | 52 + |--------|-----------------|--------------|---------------|------------| 53 + | id | UUID | UUID | CHAR(36) | CHAR(36) | 54 + | timestamps | TIMESTAMP(tz) | TIMESTAMP(tz) | DATETIME | DATETIME | 55 + | empirical_policy | JSONB | JSONB | JSON | TEXT | 56 + 57 + ## migration path 58 + 59 + our zig server stores data compatible with python prefect server: 60 + - UUIDs as standard format 61 + - timestamps as ISO 8601 UTC 62 + - JSON/JSONB for policy fields 63 + 64 + data created by zig server can be read by python server (and vice versa).