prefect server in zig
1# ORM models
2
3## base class
4
5all models inherit from `Base` (DeclarativeBase):
6
7- **auto table names**: CamelCase → snake_case (e.g., `FlowRun` → `flow_run`)
8- **universal columns**:
9 - `id`: UUID primary key with server-side generation (`GenerateUUID()`)
10 - `created`: timestamp with UTC timezone and server default
11 - `updated`: indexed timestamp with `onupdate=sa.func.now()`
12- **eager defaults**: `eager_defaults=True` fetches server defaults after INSERT without extra queries
13
14## custom type decorators
15
16### UUID
17- **PostgreSQL**: native `postgresql.UUID()`
18- **SQLite**: `CHAR(36)` string representation
19
20### Timestamp
21- **PostgreSQL**: `TIMESTAMP(timezone=True)`
22- **SQLite**: `DATETIME()` naive, stored as UTC
23- always returns UTC timezone on read
24
25### JSON
26- **PostgreSQL**: `postgresql.JSONB(none_as_null=True)` - binary, indexed, queryable
27- **SQLite**: `sqlite.JSON(none_as_null=True)`
28- sanitizes special floats (NaN, Infinity) unsupported by PostgreSQL
29
30### Pydantic
31- wrapper over JSON that auto-serializes/deserializes Pydantic models
32- used for: `StateDetails`, `FlowRunPolicy`, `CreatedBy`, etc.
33
34### GenerateUUID (server default)
35- **PostgreSQL**: `GEN_RANDOM_UUID()`
36- **SQLite**: complex `hex(randomblob())` construction mimicking UUID v4
37
38## key models
39
40### core execution
41
42| model | key fields | notes |
43|-------|------------|-------|
44| Flow | name, tags, labels | unique name constraint |
45| FlowRun | flow_id, state_type, state_name, parameters, empirical_policy | inherits from `Run` mixin |
46| TaskRun | flow_run_id, task_key, dynamic_key, cache_key | inherits from `Run` mixin |
47| FlowRunState | flow_run_id, type, name, timestamp, state_details | one-to-many with FlowRun |
48| TaskRunState | task_run_id, type, name, timestamp, state_details | one-to-many with TaskRun |
49
50### Run mixin (shared by FlowRun and TaskRun)
51
52```
53state_type: StateType enum
54state_name: string
55state_timestamp: datetime
56start_time: datetime (nullable)
57end_time: datetime (nullable)
58expected_start_time: datetime (nullable)
59total_run_time: float (seconds)
60run_count: int
61```
62
63### deployments
64
65| model | key fields | notes |
66|-------|------------|-------|
67| Deployment | flow_id, name, status, concurrency_limit | unique (flow_id, name) |
68| DeploymentSchedule | deployment_id, schedule, active | cron/interval schedules |
69
70### work management
71
72| model | key fields | notes |
73|-------|------------|-------|
74| WorkPool | name, type, status | pool infrastructure config |
75| WorkQueue | work_pool_id, name, priority, concurrency_limit | routes runs to workers |
76| Worker | work_pool_id, name, last_heartbeat_time | heartbeat tracking |
77
78### blocks
79
80| model | key fields | notes |
81|-------|------------|-------|
82| BlockType | name, slug | schema definitions |
83| BlockSchema | block_type_id, checksum, capabilities | versioned schemas |
84| BlockDocument | block_type_id, block_schema_id, name, data | encrypted credential storage |
85
86### artifacts
87
88| model | key fields | notes |
89|-------|------------|-------|
90| Artifact | key, type, data, flow_run_id, task_run_id | flexible result storage |
91| ArtifactCollection | key, latest_id | tracks latest artifact per key |
92
93## relationship patterns
94
95### one-to-many (standard)
96```python
97# Flow → FlowRuns
98flow_runs: Mapped[list["FlowRun"]] = relationship(
99 back_populates="flow", lazy="raise"
100)
101```
102
103### lazy loading strategies
104- `lazy="raise"` - prevents implicit queries (catches N+1 problems)
105- `lazy="selectin"` - eager loads in separate SELECT with IN clause
106
107### foreign keys
108```python
109# cascade delete
110flow_id = mapped_column(sa.ForeignKey("flow.id", ondelete="cascade"))
111
112# nullable with soft delete
113parent_task_run_id = mapped_column(
114 sa.ForeignKey("task_run.id", ondelete="SET NULL", use_alter=True)
115)
116```
117
118`use_alter=True` defers FK constraint creation (handles circular dependencies)
119
120## state tracking pattern
121
122models maintain BOTH:
1231. **state history** - via many-to-one relationship to State table
1242. **current state pointer** - via optional FK with `use_alter=True`
125
126unique constraint on `(run_id, timestamp DESC)` ensures one state per instant.
127
128## JSON field patterns
129
130```python
131# plain JSON with defaults
132tags: Mapped[list[str]] = mapped_column(JSON, server_default="[]", default=list)
133parameters: Mapped[dict] = mapped_column(JSON, server_default="{}", default=dict)
134
135# Pydantic-backed JSON
136empirical_policy: Mapped[FlowRunPolicy] = mapped_column(
137 Pydantic(FlowRunPolicy),
138 server_default="{}",
139 default=FlowRunPolicy,
140)
141```
142
143### FlowRunPolicy schema
144
145the `empirical_policy` column stores retry/pause behavior as JSON:
146
147| field | type | default | description |
148|-------|------|---------|-------------|
149| max_retries | int | 0 | **deprecated** - use `retries` |
150| retry_delay_seconds | float | 0 | **deprecated** - use `retry_delay` |
151| retries | int? | null | number of retries allowed |
152| retry_delay | int? | null | delay between retries (seconds) |
153| pause_keys | set[str]? | [] | tracks pauses observed |
154| resuming | bool? | false | indicates resuming from pause |
155| retry_type | "in_process" \| "reschedule"? | null | retry execution mode |
156
157this is the key schema for implementing RetryFailedFlows orchestration rule.
158
159## PostgreSQL-specific indexes
160
161```python
162# GIN index for JSON operations
163sa.Index("ix_events__related_resource_ids_gin", "related_resource_ids",
164 postgresql_using="gin").ddl_if(dialect="postgresql")
165
166# partial index (both dialects)
167sa.Index("ix_flow_run__state_type_scheduled",
168 cls.deployment_id, cls.auto_scheduled, cls.next_scheduled_start_time,
169 postgresql_where=cls.state_type == StateType.SCHEDULED,
170 sqlite_where=cls.state_type == StateType.SCHEDULED,
171)
172```
173
174## default value pattern
175
176both server and Python defaults for safety:
177```python
178created: Mapped[DateTime] = mapped_column(
179 server_default=sa.func.now(), # database-side
180 default=lambda: now("UTC") # ORM-side
181)
182```