prefect server in zig
ORM models#
base class#
all models inherit from Base (DeclarativeBase):
- auto table names: CamelCase → snake_case (e.g.,
FlowRun→flow_run) - universal columns:
id: UUID primary key with server-side generation (GenerateUUID())created: timestamp with UTC timezone and server defaultupdated: indexed timestamp withonupdate=sa.func.now()
- eager defaults:
eager_defaults=Truefetches server defaults after INSERT without extra queries
custom type decorators#
UUID#
- PostgreSQL: native
postgresql.UUID() - SQLite:
CHAR(36)string representation
Timestamp#
- PostgreSQL:
TIMESTAMP(timezone=True) - SQLite:
DATETIME()naive, stored as UTC - always returns UTC timezone on read
JSON#
- PostgreSQL:
postgresql.JSONB(none_as_null=True)- binary, indexed, queryable - SQLite:
sqlite.JSON(none_as_null=True) - sanitizes special floats (NaN, Infinity) unsupported by PostgreSQL
Pydantic#
- wrapper over JSON that auto-serializes/deserializes Pydantic models
- used for:
StateDetails,FlowRunPolicy,CreatedBy, etc.
GenerateUUID (server default)#
- PostgreSQL:
GEN_RANDOM_UUID() - SQLite: complex
hex(randomblob())construction mimicking UUID v4
key models#
core execution#
| model | key fields | notes |
|---|---|---|
| Flow | name, tags, labels | unique name constraint |
| FlowRun | flow_id, state_type, state_name, parameters, empirical_policy | inherits from Run mixin |
| TaskRun | flow_run_id, task_key, dynamic_key, cache_key | inherits from Run mixin |
| FlowRunState | flow_run_id, type, name, timestamp, state_details | one-to-many with FlowRun |
| TaskRunState | task_run_id, type, name, timestamp, state_details | one-to-many with TaskRun |
Run mixin (shared by FlowRun and TaskRun)#
state_type: StateType enum
state_name: string
state_timestamp: datetime
start_time: datetime (nullable)
end_time: datetime (nullable)
expected_start_time: datetime (nullable)
total_run_time: float (seconds)
run_count: int
deployments#
| model | key fields | notes |
|---|---|---|
| Deployment | flow_id, name, status, concurrency_limit | unique (flow_id, name) |
| DeploymentSchedule | deployment_id, schedule, active | cron/interval schedules |
work management#
| model | key fields | notes |
|---|---|---|
| WorkPool | name, type, status | pool infrastructure config |
| WorkQueue | work_pool_id, name, priority, concurrency_limit | routes runs to workers |
| Worker | work_pool_id, name, last_heartbeat_time | heartbeat tracking |
blocks#
| model | key fields | notes |
|---|---|---|
| BlockType | name, slug | schema definitions |
| BlockSchema | block_type_id, checksum, capabilities | versioned schemas |
| BlockDocument | block_type_id, block_schema_id, name, data | encrypted credential storage |
artifacts#
| model | key fields | notes |
|---|---|---|
| Artifact | key, type, data, flow_run_id, task_run_id | flexible result storage |
| ArtifactCollection | key, latest_id | tracks latest artifact per key |
relationship patterns#
one-to-many (standard)#
# Flow → FlowRuns
flow_runs: Mapped[list["FlowRun"]] = relationship(
back_populates="flow", lazy="raise"
)
lazy loading strategies#
lazy="raise"- prevents implicit queries (catches N+1 problems)lazy="selectin"- eager loads in separate SELECT with IN clause
foreign keys#
# cascade delete
flow_id = mapped_column(sa.ForeignKey("flow.id", ondelete="cascade"))
# nullable with soft delete
parent_task_run_id = mapped_column(
sa.ForeignKey("task_run.id", ondelete="SET NULL", use_alter=True)
)
use_alter=True defers FK constraint creation (handles circular dependencies)
state tracking pattern#
models maintain BOTH:
- state history - via many-to-one relationship to State table
- current state pointer - via optional FK with
use_alter=True
unique constraint on (run_id, timestamp DESC) ensures one state per instant.
JSON field patterns#
# plain JSON with defaults
tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list)
parameters: Mapped[dict] = mapped_column(JSON, server_default="{}", default=dict)
# Pydantic-backed JSON
empirical_policy: Mapped[FlowRunPolicy] = mapped_column(
Pydantic(FlowRunPolicy),
server_default="{}",
default=FlowRunPolicy,
)
FlowRunPolicy schema#
the empirical_policy column stores retry/pause behavior as JSON:
| field | type | default | description |
|---|---|---|---|
| max_retries | int | 0 | deprecated - use retries |
| retry_delay_seconds | float | 0 | deprecated - use retry_delay |
| retries | int? | null | number of retries allowed |
| retry_delay | int? | null | delay between retries (seconds) |
| pause_keys | set[str]? | [] | tracks pauses observed |
| resuming | bool? | false | indicates resuming from pause |
| retry_type | "in_process" | "reschedule"? | null | retry execution mode |
this is the key schema for implementing RetryFailedFlows orchestration rule.
PostgreSQL-specific indexes#
# GIN index for JSON operations
sa.Index("ix_events__related_resource_ids_gin", "related_resource_ids",
postgresql_using="gin").ddl_if(dialect="postgresql")
# partial index (both dialects)
sa.Index("ix_flow_run__state_type_scheduled",
cls.deployment_id, cls.auto_scheduled, cls.next_scheduled_start_time,
postgresql_where=cls.state_type == StateType.SCHEDULED,
sqlite_where=cls.state_type == StateType.SCHEDULED,
)
default value pattern#
both server and Python defaults for safety:
created: Mapped[DateTime] = mapped_column(
server_default=sa.func.now(), # database-side
default=lambda: now("UTC") # ORM-side
)