prefect server in zig

prefect-server roadmap#

comparison with prefect server (python). checkmarks indicate implemented.

api endpoints#

core (required for basic flow execution)#

  • GET /api/health
  • GET /api/csrf-token
  • GET /api/admin/version
  • POST /api/flows/
  • GET /api/flows/{id}
  • POST /api/flows/filter
  • PATCH /api/flows/{id}
  • DELETE /api/flows/{id}
  • POST /api/flow_runs/
  • GET /api/flow_runs/{id}
  • POST /api/flow_runs/{id}/set_state
  • POST /api/flow_runs/filter
  • PATCH /api/flow_runs/{id}
  • DELETE /api/flow_runs/{id}
  • POST /api/logs/
  • POST /api/logs/filter
  • WS /api/events/in

task runs#

  • POST /api/task_runs/
  • GET /api/task_runs/{id}
  • POST /api/task_runs/{id}/set_state
  • POST /api/task_runs/filter

state endpoints#

  • GET /api/flow_run_states/{id}
  • GET /api/flow_run_states/?flow_run_id=...
  • GET /api/task_run_states/{id}
  • GET /api/task_run_states/?task_run_id=...

deployments#

  • POST /api/deployments/
  • GET /api/deployments/{id}
  • GET /api/deployments/name/{flow_name}/{deployment_name}
  • POST /api/deployments/filter
  • POST /api/deployments/count
  • PATCH /api/deployments/{id}
  • DELETE /api/deployments/{id}
  • POST /api/deployments/{id}/create_flow_run
  • POST /api/deployments/{id}/pause_deployment
  • POST /api/deployments/{id}/resume_deployment
  • GET /api/deployments/{id}/schedules
  • POST /api/deployments/{id}/schedules
  • PATCH /api/deployments/{id}/schedules/{schedule_id}
  • DELETE /api/deployments/{id}/schedules/{schedule_id}
  • POST /api/deployments/get_scheduled_flow_runs

work pools & workers#

  • POST /api/work_pools/
  • GET /api/work_pools/{name}
  • PATCH /api/work_pools/{name}
  • DELETE /api/work_pools/{name}
  • POST /api/work_pools/filter
  • POST /api/work_pools/{name}/queues/
  • GET /api/work_pools/{name}/queues/{queue_name}
  • PATCH /api/work_pools/{name}/queues/{queue_name}
  • DELETE /api/work_pools/{name}/queues/{queue_name}
  • POST /api/work_pools/{name}/queues/filter
  • POST /api/work_pools/{name}/workers/heartbeat
  • POST /api/work_pools/{name}/workers/filter
  • DELETE /api/work_pools/{name}/workers/{worker_name}
  • POST /api/work_pools/{name}/get_scheduled_flow_runs

blocks#

  • POST /api/block_types/
  • POST /api/block_types/filter
  • GET /api/block_types/slug/{slug}
  • PATCH /api/block_types/{id}
  • GET /api/block_types/slug/{slug}/block_documents
  • GET /api/block_types/slug/{slug}/block_documents/name/{name}
  • POST /api/block_schemas/
  • POST /api/block_schemas/filter
  • GET /api/block_schemas/{id}
  • GET /api/block_schemas/checksum/{checksum}
  • POST /api/block_documents/
  • POST /api/block_documents/filter
  • GET /api/block_documents/{id}
  • PATCH /api/block_documents/{id}
  • DELETE /api/block_documents/{id}
  • GET /api/block_capabilities/ (optional - discovery only, not used by client)

concurrency (v2 only - skip v1 API)#

  • POST /api/v2/concurrency_limits/
  • POST /api/v2/concurrency_limits/filter
  • GET /api/v2/concurrency_limits/{id}
  • DELETE /api/v2/concurrency_limits/{id}
  • POST /api/v2/concurrency_limits/increment (acquire slots, returns lease_id)
  • POST /api/v2/concurrency_limits/decrement (release slots by limit names)
  • lease storage (memory default, redis for HA - matches Python)
  • lease_cleanup background service (expires stale leases)
  • note: v1 tag-based concurrency was reconstituted to use v2 in python prefect. we should only implement the v2 API to avoid the complexity of supporting both.

artifacts#

  • POST /api/artifacts/
  • POST /api/artifacts/filter

automations#

  • POST /api/automations/
  • POST /api/automations/filter

variables#

  • POST /api/variables/
  • POST /api/variables/filter
  • POST /api/variables/count
  • GET /api/variables/{id}
  • GET /api/variables/name/{name}
  • PATCH /api/variables/{id}
  • PATCH /api/variables/name/{name}
  • DELETE /api/variables/{id}
  • DELETE /api/variables/name/{name}

events#

  • POST /api/events/filter
  • GET /api/events/count
  • WS /api/events/out (subscribe)

ui endpoints#

  • POST /api/ui/flows/
  • POST /api/ui/flow_runs/
  • POST /api/ui/task_runs/
  • GET /api/ui/schemas/

orchestration#

global transforms (bookkeeping)#

  • SetStartTime - set start_time when first entering RUNNING
  • SetEndTime - set end_time when entering terminal state
  • IncrementRunTime - accumulate total_run_time when exiting RUNNING
  • IncrementRunCount - increment run_count when entering RUNNING

orchestration rules framework#

  • ResponseStatus enum (ACCEPT, REJECT, WAIT, ABORT)
  • OrchestrationRule abstraction (from_states, to_states, before_transition)
  • Policy composition (CoreFlowPolicy as ordered list of rules)
  • applyPolicy function to run rules in order

flow run rules (CoreFlowPolicy)#

  • PreventPendingTransitions - reject PENDING/RUNNING/CANCELLING/CANCELLED → PENDING
  • CopyScheduledTime - copy scheduled_time when SCHEDULED → PENDING
  • WaitForScheduledTime - delay transition if scheduled_time in future
  • RetryFailedFlows - schedule retry when RUNNING → FAILED if retries available
  • PreventDuplicateTransitions - idempotency via transition_id
  • HandleFlowTerminalStateTransitions - prevent leaving completed with persisted data
  • HandlePausingFlows - govern RUNNING → PAUSED
  • HandleResumingPausedFlows - govern PAUSED → RUNNING

task run rules (CoreTaskPolicy)#

  • CacheRetrieval - check for cached results before execution
  • CacheInsertion - write completed results to cache
  • PreventRunningTasksFromStoppedFlows - tasks can't run if flow isn't running
  • RetryFailedTasks - govern retry transitions

other#

  • concurrency limits (v2 only)

background services#

  • event_persister (batched event writes, deduplication, retention trimming)
  • event_broadcaster (websocket fan-out to /api/events/out subscribers)
  • scheduler (create flow runs from deployment schedules - interval + cron)
  • lease_cleanup (expire stale concurrency leases)
  • late_runs (mark runs as late)
  • foreman (infrastructure management)
  • cancellation_cleanup (clean up cancelled runs)
  • pause_expirations (expire paused runs)
  • task_run_recorder (record task run events)
  • telemetry

message broker#

  • memory broker (growable queue, in-process)
  • redis broker (redis streams: XADD/XREADGROUP/XACK)
  • broker abstraction layer (broker/core.zig)
  • fan-out to consumer groups

database#

  • flow table
  • flow_run table
  • flow_run_state table
  • events table
  • task_run table
  • task_run_state table
  • deployment table
  • deployment_schedule table
  • work_pool table
  • work_queue table
  • worker table
  • block_type table
  • block_document table
  • block_schema table
  • concurrency_limit table
  • artifact table
  • automation table
  • variable table
  • log table

database backends#

  • sqlite (zqlite)
  • postgres (pg.zig)
  • backend abstraction layer (db/backend.zig)

infrastructure#

  • http server (zap/facil.io)
  • websocket support
  • sqlite database (zqlite)
  • postgres database (pg.zig)
  • structured logging
  • service lifecycle management
  • postgres connection pooling (pg.zig built-in)
  • sqlite connection pooling
  • migrations
  • graceful shutdown
  • configuration file support

notes#

implementation order for worker-based execution#

workers are the primitive - they poll for work and execute runs. order matters:

  1. work_pool - infrastructure configuration container

    • table + CRUD API
    • types: process, docker, kubernetes, etc.
  2. work_queue - routes runs to workers

    • table + CRUD API
    • each pool has a default queue
    • queues have priority
  3. worker heartbeat - workers register and poll

    • POST /work_pools/{name}/workers/heartbeat
    • tracks worker health and last seen
  4. deployment - flow + schedule + work pool binding

    • table + CRUD API
    • links flow_id → work_pool → work_queue
    • .serve() support via POST /deployments/get_scheduled_flow_runs
  5. deployment_schedule - cron/interval/rrule schedules

    • table, linked to deployment
    • CRUD API for managing schedules
  6. scheduler service - creates runs from schedules

    • background service
    • queries deployments needing runs
    • creates flow_runs in SCHEDULED state
    • supports interval + cron schedules
    • parameter merging (schedule overrides deployment)
    • idempotent via idempotency_key
  7. get_scheduled_flow_runs - workers poll for work

    • POST /work_pools/{name}/get_scheduled_flow_runs
    • returns runs ready to execute
    • updates pool/deployment status to READY

short-term plan#

  1. log persistence ✓ - logs now persisted to database
  2. automations - event-driven triggers, core feature for production use

what's working (~5x faster than python)#

  • flow/flow_run/task_run lifecycle
  • blocks (types, schemas, documents)
  • variables (full CRUD)
  • work pools, work queues, workers (full CRUD + heartbeat)
  • deployments + schedules (full CRUD, .serve() support)
  • scheduler service (interval + cron, idempotent, parameter merging)
  • late_runs service (marks overdue scheduled runs as Late)
  • get_scheduled_flow_runs (worker polling)
  • events (ingest via websocket, persist, broadcast with filtered backfill)
  • concurrency limits v2 (CRUD, lease-based slots, memory/redis storage)
  • dual database backends (sqlite/postgres)
  • dual message brokers (memory/redis)