prefect server in zig

ORM models#

base class#

all models inherit from Base (DeclarativeBase):

  • auto table names: CamelCase → snake_case (e.g., FlowRunflow_run)
  • universal columns:
    • id: UUID primary key with server-side generation (GenerateUUID())
    • created: timestamp with UTC timezone and server default
    • updated: indexed timestamp with onupdate=sa.func.now()
  • eager defaults: eager_defaults=True fetches server defaults after INSERT without extra queries

custom type decorators#

UUID#

  • PostgreSQL: native postgresql.UUID()
  • SQLite: CHAR(36) string representation

Timestamp#

  • PostgreSQL: TIMESTAMP(timezone=True)
  • SQLite: DATETIME() naive, stored as UTC
  • always returns UTC timezone on read

JSON#

  • PostgreSQL: postgresql.JSONB(none_as_null=True) - binary, indexed, queryable
  • SQLite: sqlite.JSON(none_as_null=True)
  • sanitizes special floats (NaN, Infinity) unsupported by PostgreSQL

Pydantic#

  • wrapper over JSON that auto-serializes/deserializes Pydantic models
  • used for: StateDetails, FlowRunPolicy, CreatedBy, etc.

GenerateUUID (server default)#

  • PostgreSQL: GEN_RANDOM_UUID()
  • SQLite: complex hex(randomblob()) construction mimicking UUID v4

key models#

core execution#

model key fields notes
Flow name, tags, labels unique name constraint
FlowRun flow_id, state_type, state_name, parameters, empirical_policy inherits from Run mixin
TaskRun flow_run_id, task_key, dynamic_key, cache_key inherits from Run mixin
FlowRunState flow_run_id, type, name, timestamp, state_details one-to-many with FlowRun
TaskRunState task_run_id, type, name, timestamp, state_details one-to-many with TaskRun

Run mixin (shared by FlowRun and TaskRun)#

state_type: StateType enum
state_name: string
state_timestamp: datetime
start_time: datetime (nullable)
end_time: datetime (nullable)
expected_start_time: datetime (nullable)
total_run_time: float (seconds)
run_count: int

deployments#

model key fields notes
Deployment flow_id, name, status, concurrency_limit unique (flow_id, name)
DeploymentSchedule deployment_id, schedule, active cron/interval schedules

work management#

model key fields notes
WorkPool name, type, status pool infrastructure config
WorkQueue work_pool_id, name, priority, concurrency_limit routes runs to workers
Worker work_pool_id, name, last_heartbeat_time heartbeat tracking

blocks#

model key fields notes
BlockType name, slug schema definitions
BlockSchema block_type_id, checksum, capabilities versioned schemas
BlockDocument block_type_id, block_schema_id, name, data encrypted credential storage

artifacts#

model key fields notes
Artifact key, type, data, flow_run_id, task_run_id flexible result storage
ArtifactCollection key, latest_id tracks latest artifact per key

relationship patterns#

one-to-many (standard)#

# Flow → FlowRuns
flow_runs: Mapped[list["FlowRun"]] = relationship(
    back_populates="flow", lazy="raise"
)

lazy loading strategies#

  • lazy="raise" - prevents implicit queries (catches N+1 problems)
  • lazy="selectin" - eager loads in separate SELECT with IN clause

foreign keys#

# cascade delete
flow_id = mapped_column(sa.ForeignKey("flow.id", ondelete="cascade"))

# nullable with soft delete
parent_task_run_id = mapped_column(
    sa.ForeignKey("task_run.id", ondelete="SET NULL", use_alter=True)
)

use_alter=True defers FK constraint creation (handles circular dependencies)

state tracking pattern#

models maintain BOTH:

  1. state history - via many-to-one relationship to State table
  2. current state pointer - via optional FK with use_alter=True

unique constraint on (run_id, timestamp DESC) ensures one state per instant.

JSON field patterns#

# plain JSON with defaults
tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list)
parameters: Mapped[dict] = mapped_column(JSON, server_default="{}", default=dict)

# Pydantic-backed JSON
empirical_policy: Mapped[FlowRunPolicy] = mapped_column(
    Pydantic(FlowRunPolicy),
    server_default="{}",
    default=FlowRunPolicy,
)

FlowRunPolicy schema#

the empirical_policy column stores retry/pause behavior as JSON:

field type default description
max_retries int 0 deprecated - use retries
retry_delay_seconds float 0 deprecated - use retry_delay
retries int? null number of retries allowed
retry_delay int? null delay between retries (seconds)
pause_keys set[str]? [] tracks pauses observed
resuming bool? false indicates resuming from pause
retry_type "in_process" | "reschedule"? null retry execution mode

this is the key schema for implementing RetryFailedFlows orchestration rule.

PostgreSQL-specific indexes#

# GIN index for JSON operations
sa.Index("ix_events__related_resource_ids_gin", "related_resource_ids",
         postgresql_using="gin").ddl_if(dialect="postgresql")

# partial index (both dialects)
sa.Index("ix_flow_run__state_type_scheduled",
    cls.deployment_id, cls.auto_scheduled, cls.next_scheduled_start_time,
    postgresql_where=cls.state_type == StateType.SCHEDULED,
    sqlite_where=cls.state_type == StateType.SCHEDULED,
)

default value pattern#

both server and Python defaults for safety:

created: Mapped[DateTime] = mapped_column(
    server_default=sa.func.now(),  # database-side
    default=lambda: now("UTC")      # ORM-side
)