prefect server in zig
prefect-server roadmap#
comparison with prefect server (python). checkmarks indicate implemented.
api endpoints#
core (required for basic flow execution)#
- GET /api/health
- GET /api/csrf-token
- GET /api/admin/version
- POST /api/flows/
- GET /api/flows/{id}
- POST /api/flows/filter
- PATCH /api/flows/{id}
- DELETE /api/flows/{id}
- POST /api/flow_runs/
- GET /api/flow_runs/{id}
- POST /api/flow_runs/{id}/set_state
- POST /api/flow_runs/filter
- PATCH /api/flow_runs/{id}
- DELETE /api/flow_runs/{id}
- POST /api/logs/
- POST /api/logs/filter
- WS /api/events/in
task runs#
- POST /api/task_runs/
- GET /api/task_runs/{id}
- POST /api/task_runs/{id}/set_state
- POST /api/task_runs/filter
state endpoints#
- GET /api/flow_run_states/{id}
- GET /api/flow_run_states/?flow_run_id=...
- GET /api/task_run_states/{id}
- GET /api/task_run_states/?task_run_id=...
deployments#
- POST /api/deployments/
- GET /api/deployments/{id}
- GET /api/deployments/name/{flow_name}/{deployment_name}
- POST /api/deployments/filter
- POST /api/deployments/count
- PATCH /api/deployments/{id}
- DELETE /api/deployments/{id}
- POST /api/deployments/{id}/create_flow_run
- POST /api/deployments/{id}/pause_deployment
- POST /api/deployments/{id}/resume_deployment
- GET /api/deployments/{id}/schedules
- POST /api/deployments/{id}/schedules
- PATCH /api/deployments/{id}/schedules/{schedule_id}
- DELETE /api/deployments/{id}/schedules/{schedule_id}
- POST /api/deployments/get_scheduled_flow_runs
work pools & workers#
- POST /api/work_pools/
- GET /api/work_pools/{name}
- PATCH /api/work_pools/{name}
- DELETE /api/work_pools/{name}
- POST /api/work_pools/filter
- POST /api/work_pools/{name}/queues/
- GET /api/work_pools/{name}/queues/{queue_name}
- PATCH /api/work_pools/{name}/queues/{queue_name}
- DELETE /api/work_pools/{name}/queues/{queue_name}
- POST /api/work_pools/{name}/queues/filter
- POST /api/work_pools/{name}/workers/heartbeat
- POST /api/work_pools/{name}/workers/filter
- DELETE /api/work_pools/{name}/workers/{worker_name}
- POST /api/work_pools/{name}/get_scheduled_flow_runs
blocks#
- POST /api/block_types/
- POST /api/block_types/filter
- GET /api/block_types/slug/{slug}
- PATCH /api/block_types/{id}
- GET /api/block_types/slug/{slug}/block_documents
- GET /api/block_types/slug/{slug}/block_documents/name/{name}
- POST /api/block_schemas/
- POST /api/block_schemas/filter
- GET /api/block_schemas/{id}
- GET /api/block_schemas/checksum/{checksum}
- POST /api/block_documents/
- POST /api/block_documents/filter
- GET /api/block_documents/{id}
- PATCH /api/block_documents/{id}
- DELETE /api/block_documents/{id}
- GET /api/block_capabilities/ (optional - discovery only, not used by client)
concurrency (v2 only - skip v1 API)#
- POST /api/v2/concurrency_limits/
- POST /api/v2/concurrency_limits/filter
- GET /api/v2/concurrency_limits/{id}
- DELETE /api/v2/concurrency_limits/{id}
- POST /api/v2/concurrency_limits/increment (acquire slots, returns lease_id)
- POST /api/v2/concurrency_limits/decrement (release slots by limit names)
- lease storage (memory default, redis for HA - matches Python)
- lease_cleanup background service (expires stale leases)
- note: v1 tag-based concurrency was reconstituted to use v2 in python prefect. we should only implement the v2 API to avoid the complexity of supporting both.
artifacts#
- POST /api/artifacts/
- POST /api/artifacts/filter
automations#
- POST /api/automations/
- POST /api/automations/filter
variables#
- POST /api/variables/
- POST /api/variables/filter
- POST /api/variables/count
- GET /api/variables/{id}
- GET /api/variables/name/{name}
- PATCH /api/variables/{id}
- PATCH /api/variables/name/{name}
- DELETE /api/variables/{id}
- DELETE /api/variables/name/{name}
events#
- POST /api/events/filter
- GET /api/events/count
- WS /api/events/out (subscribe)
ui endpoints#
- POST /api/ui/flows/
- POST /api/ui/flow_runs/
- POST /api/ui/task_runs/
- GET /api/ui/schemas/
orchestration#
global transforms (bookkeeping)#
- SetStartTime - set start_time when first entering RUNNING
- SetEndTime - set end_time when entering terminal state
- IncrementRunTime - accumulate total_run_time when exiting RUNNING
- IncrementRunCount - increment run_count when entering RUNNING
orchestration rules framework#
- ResponseStatus enum (ACCEPT, REJECT, WAIT, ABORT)
- OrchestrationRule abstraction (from_states, to_states, before_transition)
- Policy composition (CoreFlowPolicy as ordered list of rules)
- applyPolicy function to run rules in order
flow run rules (CoreFlowPolicy)#
- PreventPendingTransitions - reject PENDING/RUNNING/CANCELLING/CANCELLED → PENDING
- CopyScheduledTime - copy scheduled_time when SCHEDULED → PENDING
- WaitForScheduledTime - delay transition if scheduled_time in future
- RetryFailedFlows - schedule retry when RUNNING → FAILED if retries available
- PreventDuplicateTransitions - idempotency via transition_id
- HandleFlowTerminalStateTransitions - prevent leaving completed with persisted data
- HandlePausingFlows - govern RUNNING → PAUSED
- HandleResumingPausedFlows - govern PAUSED → RUNNING
task run rules (CoreTaskPolicy)#
- CacheRetrieval - check for cached results before execution
- CacheInsertion - write completed results to cache
- PreventRunningTasksFromStoppedFlows - tasks can't run if flow isn't running
- RetryFailedTasks - govern retry transitions
other#
- concurrency limits (v2 only)
background services#
- event_persister (batched event writes, deduplication, retention trimming)
- event_broadcaster (websocket fan-out to /api/events/out subscribers)
- scheduler (create flow runs from deployment schedules - interval + cron)
- lease_cleanup (expire stale concurrency leases)
- late_runs (mark runs as late)
- foreman (infrastructure management)
- cancellation_cleanup (clean up cancelled runs)
- pause_expirations (expire paused runs)
- task_run_recorder (record task run events)
- telemetry
message broker#
- memory broker (growable queue, in-process)
- redis broker (redis streams: XADD/XREADGROUP/XACK)
- broker abstraction layer (broker/core.zig)
- fan-out to consumer groups
database#
- flow table
- flow_run table
- flow_run_state table
- events table
- task_run table
- task_run_state table
- deployment table
- deployment_schedule table
- work_pool table
- work_queue table
- worker table
- block_type table
- block_document table
- block_schema table
- concurrency_limit table
- artifact table
- automation table
- variable table
- log table
database backends#
- sqlite (zqlite)
- postgres (pg.zig)
- backend abstraction layer (db/backend.zig)
infrastructure#
- http server (zap/facil.io)
- websocket support
- sqlite database (zqlite)
- postgres database (pg.zig)
- structured logging
- service lifecycle management
- postgres connection pooling (pg.zig built-in)
- sqlite connection pooling
- migrations
- graceful shutdown
- configuration file support
notes#
implementation order for worker-based execution#
workers are the primitive - they poll for work and execute runs. order matters:
-
work_pool - infrastructure configuration container✓- table + CRUD API
- types: process, docker, kubernetes, etc.
-
work_queue - routes runs to workers✓- table + CRUD API
- each pool has a default queue
- queues have priority
-
worker heartbeat - workers register and poll✓POST /work_pools/{name}/workers/heartbeat- tracks worker health and last seen
-
deployment - flow + schedule + work pool binding✓- table + CRUD API
- links flow_id → work_pool → work_queue
.serve()support viaPOST /deployments/get_scheduled_flow_runs
-
deployment_schedule - cron/interval/rrule schedules✓- table, linked to deployment
- CRUD API for managing schedules
-
scheduler service - creates runs from schedules✓- background service
- queries deployments needing runs
- creates flow_runs in SCHEDULED state
- supports interval + cron schedules
- parameter merging (schedule overrides deployment)
- idempotent via idempotency_key
-
get_scheduled_flow_runs - workers poll for work✓POST /work_pools/{name}/get_scheduled_flow_runs- returns runs ready to execute
- updates pool/deployment status to READY
short-term plan#
log persistence✓ - logs now persisted to database- automations - event-driven triggers, core feature for production use
what's working (~5x faster than python)#
- flow/flow_run/task_run lifecycle
- blocks (types, schemas, documents)
- variables (full CRUD)
- work pools, work queues, workers (full CRUD + heartbeat)
- deployments + schedules (full CRUD,
.serve()support) - scheduler service (interval + cron, idempotent, parameter merging)
- late_runs service (marks overdue scheduled runs as Late)
- get_scheduled_flow_runs (worker polling)
- events (ingest via websocket, persist, broadcast with filtered backfill)
- concurrency limits v2 (CRUD, lease-based slots, memory/redis storage)
- dual database backends (sqlite/postgres)
- dual message brokers (memory/redis)