commits
- add sqlite to nixpkgs for sanity check queries
- remove postgres test - refuses to run as root in CI container
- postgres must be tested locally: ./scripts/test-db-backends postgres
CI now covers: sqlite + memory, sqlite + redis
full matrix (including postgres) tested locally before push
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
test scripts use grep, tr, kill, sleep, etc.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- CLAUDE.md: add "before pushing new features" section requiring
full backend matrix tests (sqlite/postgres + memory/redis)
- CI: test all backend combinations via Nix packages:
- sqlite + memory broker (via test scripts)
- sqlite + redis broker (native redis-server)
- postgres + memory broker (native postgres via initdb)
- uses postgres and redis from nixpkgs, no Docker needed
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- reactive trigger evaluation via broker subscription
- proactive trigger evaluation via periodic loop
- bucket-based counting for threshold triggers
- action execution: run-deployment, pause/resume-deployment,
cancel-flow-run, pause/resume-automation
- in-memory automation cache with refresh
- add deployments.updatePaused() helper
split into modules to stay under 500-line limit:
- automations.zig: main service loop, cache management
- automations/triggers.zig: trigger parsing and evaluation
- automations/actions.zig: action execution
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add 005_automations migration (automation, automation_bucket tables)
- add db/automations.zig with CRUD and bucket operations
- add api/automations.zig endpoints (create, get, list, delete, patch)
- wire automations route in routes.zig
- fix api/logs.zig JSON building to use std.json.Stringify
- fix api/automations.zig to use std.json.Stringify with beginWriteRaw
for raw JSON passthrough of pre-serialized fields
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- create log table with migration 004_log_table
- POST /api/logs/ - batch insert logs (name, level, message, timestamp, flow_run_id, task_run_id)
- POST /api/logs/filter - filter by flow_run_id, task_run_id, level (ge/le), timestamp (after/before)
- supports both sqlite and postgres backends
- logs survive server restart (previously in-memory only)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- GET /api/flow_run_states/{id} - get state by ID
- GET /api/flow_run_states/?flow_run_id=... - list states for flow run
- GET /api/task_run_states/{id} - get state by ID
- GET /api/task_run_states/?task_run_id=... - list states for task run
also:
- add task_run_state table to migrations
- record state history in task_runs.setState (matches flow_runs)
- condense test functions to meet line limit
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
marks flow runs as Late if they haven't started within
PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS (default 15s)
after their scheduled start time.
- add getLateRuns query to flow_runs.zig
- add late_runs service with configurable threshold and loop interval
- register service in services.zig
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
leases are NEVER stored in the database (matches Python behavior):
- memory storage: default for single-instance deployments
- redis storage: when PREFECT_BROKER_BACKEND=redis (HA)
moved from src/concurrency/lease_storage.zig to src/leases/:
- memory.zig: in-memory lease storage
- redis.zig: redis-backed lease storage
- storage.zig: unified interface
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add lease_cleanup service (src/services/lease_cleanup.zig)
- runs every 10 seconds
- processes up to 100 expired leases per cycle
- releases slots and revokes expired leases automatically
- register service in services.zig
completes concurrency limits v2 implementation
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add in-memory lease storage module (src/concurrency/lease_storage.zig)
- thread-safe singleton with mutex protection
- lease CRUD: create, read, renew, revoke
- TTL-based expiration tracking
- extend API with lease endpoints:
- POST /increment now creates leases (mode=concurrency, default)
- POST /decrement supports lease_id for release
- POST /leases/filter - list active leases
- POST /leases/renew - extend lease TTL
- DELETE /leases/{id} - revoke lease and release slots
- add mode parameter: "concurrency" (with leases) vs "rate_limit" (no leases)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
implements basic CRUD and slot management for global concurrency limits:
- add concurrency_limit table via migration 003
- add src/db/concurrency_limits.zig with CRUD and slot operations
- add src/api/concurrency_limits_v2.zig with endpoints:
- POST /v2/concurrency_limits/ (create)
- GET /v2/concurrency_limits/{id_or_name} (read)
- PATCH /v2/concurrency_limits/{id_or_name} (update)
- DELETE /v2/concurrency_limits/{id_or_name} (delete)
- POST /v2/concurrency_limits/filter (list)
- POST /v2/concurrency_limits/increment (acquire slots)
- POST /v2/concurrency_limits/decrement (release slots)
slot management returns 423 Locked with Retry-After header when
slots cannot be acquired. matches python prefect API.
phase 2 (lease storage) and phase 4 (cleanup service) still pending.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
matches python architecture - database stores limit config/counters,
lease storage (memory/filesystem/redis) handles transient lease state.
removes concurrency_lease table, adds proper lease storage abstraction.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
documents v1 vs v2 architecture, migration story, and our v2-only
strategy. includes database schema, api endpoints, slot math, lease
storage design, and phased implementation plan.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
root cause: test was observing partial scheduler tick. each flow run
insertion commits separately, so test could see 25 runs mid-tick and
then 50 when tick completed - appearing as a failure.
fix: wait for count to stabilize before recording baseline, ensuring
we measure after the scheduler tick completes rather than during it.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- flows: PATCH /flows/{id} (update tags), DELETE /flows/{id}
- flow_runs: DELETE /flow_runs/{id} (cascade deletes states)
- test-matrix: ensure port released between tests
- update ROADMAP.md
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- httpSpan() and sqlSpan() now live in logfire-zig where they belong
- removed inline span formatting from backend.zig
- updated to logfire-zig with BatchSpanProcessor (200x faster)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- HTTP layer: wrap onRequest with http.request span
- DB layer: wrap exec/query methods with db.exec/db.query spans
- records errors on failures via span.recordError()
without LOGFIRE_WRITE_TOKEN, spans are no-op.
with token, exports to Logfire via OTLP/protobuf.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
also fix empirical_policy type: JSONB -> TEXT for consistency with sqlite
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
stack buffer for scheduled_time was escaping scope via ctx.scheduleRetry().
moved buffer to RuleContext struct to ensure lifetime.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
utilities/json.zig:
- getPath(value, "path.to.field") - navigate by dot-separated path
- getString, getInt, getBool, getFloat, getArray, getObject helpers
- extractAt(T, alloc, value, .{"path"}) - comptime typed extraction
- comprehensive tests
events_api.zig:
- parseFilterOptions: 54 lines → 28 lines using path helpers
- filterNext: simplified token field extraction
- filter: cleaner limit parsing
inspired by zat's internal/json.zig pattern
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- POST /events/filter with occurred, event, resource filters
- GET /events/filter/next with page tokens for pagination
- GET /events/count for total event count
code quality improvements:
- src/prefect.zig: clean public interface re-exporting main modules
- src/utilities/encoding.zig: base64 and URL encoding helpers
- db/backend.zig: add getBackend() for consistent access pattern
- db/events.zig: consolidate parseJsonStringArray, add column constants
- events_api.zig: use json.send/sendStatus helpers consistently
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
reset postgres schema before each benchmark run to ensure clean state,
matching the sqlite behavior of deleting the db file.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- use DEVNULL during normal benchmark runs to avoid pipe buffer deadlock
- on startup failure, restart with PIPE to capture error output
- display actual error message instead of generic "failed to start server"
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
keep valuable reference info, mark implementation status:
- blocks-implementation.md: API sequences and schema (implemented)
- timestamps.md: timing handling, .serve() vs workers (resolved)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add docs/scratch/signal-handling.md documenting graceful shutdown fix
- delete blocks-implementation.md (blocks fully implemented)
- delete timestamps.md (issues resolved, notes outdated)
- update scratch README with current contents
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
main() now catches errors from runServer/runServicesOnly and returns
cleanly (exit 0) if shutdown_requested is true. this ensures parity
with Python Prefect server behavior.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add SIGTERM and SIGINT signal handlers
- zap.stop() triggers clean shutdown when signal received
- services flush pending work (events, etc.) before exit
- add scripts/test-graceful-shutdown to verify behavior
- update roadmap and production-readiness docs
note: SIGTERM exits with code 1 due to zap/facil.io quirk, but
shutdown sequence completes correctly (services stop, data flushes)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
documents missing features for production use:
- tier 1: concurrency limits, automations, artifacts, event filtering
- tier 2: graceful shutdown, log persistence, auth, ui endpoints
- tier 3: background services, mutation endpoints, task rules
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
orchestration:
- add PreventDuplicateTransitions rule for idempotent state transitions
- uses transition_id comparison to detect duplicate requests
- add state_transition_id column via migration 002
api tests:
- add serve_pattern test (Runner/.serve() workflow)
- add worker_pattern test (Worker polling work pools)
- add flow_with_task_runs test (ETL-style flow execution)
- add retry_failed_flows and cancellation_flow tests
- optimize scheduler tests: polling instead of fixed sleeps (~36% faster)
- use client-side filtering (server filter doesn't support deployment_id yet)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- .claude/commands/implement-orchestration-rule.md
- .claude/commands/update-database-schema.md
- README.md: add contributing section with playbook links
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- src/db/migrate.zig: migration runner with statement parser
- src/db/migrations_data.zig: migration registry using @embedFile
- src/db/migrations/001_initial/: full schema for sqlite and postgres
- replaces old CREATE TABLE IF NOT EXISTS approach
- tracks applied migrations in _migrations table
- tested on sqlite (local) and postgres (k8s)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- docs/migrations.md: comprehensive analysis of migration options
- zmig (sqlite-only zig tool)
- atlas (external go-based tool, multi-dialect)
- minimal DIY approach
- hybrid recommendation (atlas for dev, embedded SQL for runtime)
- phased implementation plan
- references to python prefect alembic patterns
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add policy mutation flags to RuleContext (set_retry_type_in_process,
clear_retry_type, clear_pause_keys, set_resuming_false)
- RetryFailedFlows sets flags on retry: retry_type="in_process",
resuming=false, pause_keys=[]
- RetryFailedFlows clears retry_type when retries exhausted
- API handler builds updated policy JSON from flags
- setStateWithSchedule accepts optional empirical_policy update
- add tests for policy mutation flags
matches python prefect server behavior for retry policy handling.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
documents how Python prefect server implements database layer:
- architecture: three-tier abstraction (config, queries, ORM)
- orm-models: SQLAlchemy models, custom types, relationships
- migrations: alembic setup, dialect-specific patterns
- query-patterns: JSON handling, time series, transactions
- zig-compat-notes: comparison with our implementation
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
when a flow run transitions from RUNNING to FAILED:
- if retries configured and run_count <= retries, reject FAILED
- schedule AwaitingRetry state with retry_delay offset
- return REJECT status with SCHEDULED state (matching python behavior)
changes:
- add empirical_policy column (JSON with retries/retry_delay)
- extend RuleContext with retry fields and scheduleRetry method
- implement rule in flow_rules.zig with 6 unit tests
- wire up API to parse empirical_policy and handle retry response
- add test-retry script and justfile target
- use PREFECT_PROFILE=oss for test harness (uses prefect profiles)
- add loq exceptions for files with inline tests
- reference zig 0.15 notes in CLAUDE.md
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
single binary with subcommands:
prefect-server # server + services (default)
prefect-server --no-services # API only (horizontal scaling)
prefect-server services # background services only
kubernetes:
- k8s/ manifests for horizontal scaling deployment
- api-deployment: multiple replicas with --no-services
- services-deployment: single instance with `services` subcommand
- postgres + redis for data layer
- kustomize support
ci:
- tangled.org workflow for tests and build
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
orchestration:
- implement WaitForScheduledTime rule with unit tests
- refactor orchestration module structure (rules, transforms, types)
- add StateTypeSet bitfield for state matching
testing:
- add test-matrix script for all backend combinations
(sqlite×memory, sqlite×redis, postgres×memory, postgres×redis)
- fix postgres schema: add missing next_scheduled_start_time column
- add next_scheduled_start_time index for postgres
other:
- refactor broker and services module structure
- improve scheduler with cron support
- update benchmarks and test scripts
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- rename docker-compose.yml to compose.yml
- add PREFECT_SERVER_API_HOST=0.0.0.0 for container networking
- fix env var names to match code (PREFECT_DATABASE_URL, PREFECT_REDIS_MESSAGING_HOST)
- add docker-full-test recipe for testing postgres + redis stack
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
evaluates active deployment schedules every 30 seconds, creating
SCHEDULED flow runs based on interval configuration. supports
max_scheduled_runs limits and skips paused deployments.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
worker polling endpoint:
- POST /work_pools/{name}/get_scheduled_flow_runs
- returns WorkerFlowRunResponse[] (work_pool_id, work_queue_id, flow_run)
- updates queue/pool/deployment status to READY on poll
- supports work_queue_names filter and scheduled_before time filter
datetime standardization:
- unified ISO 8601 format: 2025-01-21T12:34:56.123456Z
- updated sqlite schema defaults: strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
- updated postgres schema defaults: TO_CHAR(NOW() AT TIME ZONE 'UTC', ...)
- added time utilities: parse(), lessOrEqual(), isPast(), formatMicros()
deployment work_queue_id resolution:
- resolve work_queue_id from work_pool_name on deployment create
- uses pool's default queue if work_queue_name not specified
- enables flow runs created from deployments to be found by worker polling
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- default host: 127.0.0.1 (was 0.0.0.0)
- default log level: WARNING (was INFO)
- default limits: 200 for work_pools/queues/workers (was 10)
- postgres pool_size: 5 (was 10)
- move config audit to docs/scratch/ (working notes, not permanent docs)
dev mode still uses DEBUG logging via justfile.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
documents magic values in zig implementation and their python equivalents,
with recommendations for improving parity.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- dockerfile: multi-stage build with zig 0.15.2, copies facil.io shared lib
- docker-compose: server, redis, postgres, test services with healthchecks
- justfile: docker-test, docker-full, bench-compare, test-client commands
- benchmark: fix avg calculation to include results with timing data
- test-serve: add uv script header for prefect dependency
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
adds full deployment support including:
- deployment table with CRUD API (create, read, update, delete, filter, count)
- deployment_schedule table with CRUD API
- get_scheduled_flow_runs endpoint for runner polling
- create_flow_run from deployment
- pause/resume deployment endpoints
- .serve() now works end-to-end
also fixes event backfill to apply subscriber filters
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
adds the foundation for worker-based execution:
- work_pool table + full CRUD API
- work_queue table + CRUD (default queue auto-created with pool)
- worker table + heartbeat upsert
- status tracking (NOT_READY → READY on first heartbeat)
- reserved pool protection (prefect-* pools)
split API handlers into separate files to stay under line limits:
- src/api/work_pools.zig (pool CRUD + router)
- src/api/work_pool_queues.zig (queue handlers)
- src/api/work_pool_workers.zig (worker handlers)
tested with Python prefect client - all operations compatible.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- remove endpoint listing (see ROADMAP.md for details)
- add what works section with broad feature overview
- point to justfile for commands
- add configuration table
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
test harness:
- add variables test with full CRUD coverage
- add response validation helper for field/type checking
- validate flow, flow_run, variable, and block_type responses
api consistency:
- include variable name in conflict error message (like Python)
- fix timestamp format consistency: use ISO 8601 (with T, Z, microseconds)
instead of SQLite's datetime('now') format
- ensures create and update return consistent timestamp format
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
full CRUD for variables with endpoints:
- POST /api/variables/ (create)
- POST /api/variables/filter (list)
- POST /api/variables/count
- GET /api/variables/{id}
- GET /api/variables/name/{name}
- PATCH /api/variables/{id}
- PATCH /api/variables/name/{name}
- DELETE /api/variables/{id}
- DELETE /api/variables/name/{name}
adds variable table to both sqlite and postgres schemas with:
- id, name (unique), value (json), tags (json array)
- created/updated timestamps
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- broker: document heap allocation and ephemeral group cleanup
- db: document events module functions including backfill query
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- refactor StoredMessage to heap allocation (no more 8KB truncation)
- refactor ParsedEvent to heap allocation (no more fixed buffer truncation)
- add ephemeral Redis group cleanup on unsubscribe
- implement backfill for /events/out (queries recent events before streaming)
- use std.json.Stringify.valueAlloc for proper JSON escaping
- add Col struct for documenting column indices in queryRecent
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- database backends (sqlite + postgres)
- message broker (memory + redis)
- event_broadcaster service
- postgres connection pooling
- updated priority notes
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
memory broker was pre-allocating 50,000 × 8KB = 400MB for message
storage. replaced with ArrayList-backed Queue that grows on demand,
matching Python's asyncio.Queue() behavior.
memory usage: 432MB → 39MB
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- parse client filter in /events/out (was always match_all)
- increase buffer sizes: persister 32KB payload, broadcaster 64KB+heap
- add subscribeEphemeral for broadcaster ($ start_id, no replay)
- join flush timer thread on shutdown
- release mutex before websocket writes (avoids blocking)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- fix event pipeline: /events/in publishes to broker, event_broadcaster
subscribes with ephemeral consumer group for WS fan-out
- fix postgres backend: add bigint() method for BIGINT columns, use
int() for INTEGER (i32) columns to match pg.zig strict typing
- add redis broker backend with XADD/XREADGROUP/XAUTOCLAIM for streams
- add benchmark matrix: --matrix flag tests sqlite/postgres × memory/redis
- fix redis RESP: handle fragmented TCP with readMore() and buffer growth
- fix redis command buffer: dynamic allocation for large events (>16KB)
- rename ws_distributor → event_broadcaster (cleaner abstraction)
- add CLAUDE.md files for broker/, db/, services/ subsystems
- add docker-compose.yml for redis + postgres dev services
benchmark results (3 iterations avg):
| db | broker | time | memory |
|----------|--------|------|---------|
| sqlite | memory | 27ms | 329MB |
| sqlite | redis | 28ms | 39MB |
| postgres | memory | 67ms | 428MB |
| postgres | redis | 79ms | 34MB |
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
introduces broker module with similar pattern to database backend:
- Broker union type with memory and redis variants
- Message struct with id, topic, data, attributes, timestamp
- Publisher/Consumer interfaces for pub/sub pattern
- BoundedChannel generic for in-process message passing
- MemoryBroker implementation using bounded channels
- RedisBroker stub for future Redis Streams implementation
infrastructure changes:
- main.zig initializes broker on startup (PREFECT_BROKER_BACKEND env var)
- test script: scripts/test-broker-backends
- justfile: `just test-broker [memory|redis|all]`
the broker is initialized but not yet wired into event_persister or WS
subscribers - this sets up the foundation for incremental migration.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add sqlite to nixpkgs for sanity check queries
- remove postgres test - refuses to run as root in CI container
- postgres must be tested locally: ./scripts/test-db-backends postgres
CI now covers: sqlite + memory, sqlite + redis
full matrix (including postgres) tested locally before push
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- CLAUDE.md: add "before pushing new features" section requiring
full backend matrix tests (sqlite/postgres + memory/redis)
- CI: test all backend combinations via Nix packages:
- sqlite + memory broker (via test scripts)
- sqlite + redis broker (native redis-server)
- postgres + memory broker (native postgres via initdb)
- uses postgres and redis from nixpkgs, no Docker needed
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- reactive trigger evaluation via broker subscription
- proactive trigger evaluation via periodic loop
- bucket-based counting for threshold triggers
- action execution: run-deployment, pause/resume-deployment,
cancel-flow-run, pause/resume-automation
- in-memory automation cache with refresh
- add deployments.updatePaused() helper
split into modules to stay under 500-line limit:
- automations.zig: main service loop, cache management
- automations/triggers.zig: trigger parsing and evaluation
- automations/actions.zig: action execution
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add 005_automations migration (automation, automation_bucket tables)
- add db/automations.zig with CRUD and bucket operations
- add api/automations.zig endpoints (create, get, list, delete, patch)
- wire automations route in routes.zig
- fix api/logs.zig JSON building to use std.json.Stringify
- fix api/automations.zig to use std.json.Stringify with beginWriteRaw
for raw JSON passthrough of pre-serialized fields
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- create log table with migration 004_log_table
- POST /api/logs/ - batch insert logs (name, level, message, timestamp, flow_run_id, task_run_id)
- POST /api/logs/filter - filter by flow_run_id, task_run_id, level (ge/le), timestamp (after/before)
- supports both sqlite and postgres backends
- logs survive server restart (previously in-memory only)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- GET /api/flow_run_states/{id} - get state by ID
- GET /api/flow_run_states/?flow_run_id=... - list states for flow run
- GET /api/task_run_states/{id} - get state by ID
- GET /api/task_run_states/?task_run_id=... - list states for task run
also:
- add task_run_state table to migrations
- record state history in task_runs.setState (matches flow_runs)
- condense test functions to meet line limit
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
marks flow runs as Late if they haven't started within
PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS (default 15s)
after their scheduled start time.
- add getLateRuns query to flow_runs.zig
- add late_runs service with configurable threshold and loop interval
- register service in services.zig
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
leases are NEVER stored in the database (matches Python behavior):
- memory storage: default for single-instance deployments
- redis storage: when PREFECT_BROKER_BACKEND=redis (HA)
moved from src/concurrency/lease_storage.zig to src/leases/:
- memory.zig: in-memory lease storage
- redis.zig: redis-backed lease storage
- storage.zig: unified interface
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add lease_cleanup service (src/services/lease_cleanup.zig)
- runs every 10 seconds
- processes up to 100 expired leases per cycle
- releases slots and revokes expired leases automatically
- register service in services.zig
completes concurrency limits v2 implementation
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add in-memory lease storage module (src/concurrency/lease_storage.zig)
- thread-safe singleton with mutex protection
- lease CRUD: create, read, renew, revoke
- TTL-based expiration tracking
- extend API with lease endpoints:
- POST /increment now creates leases (mode=concurrency, default)
- POST /decrement supports lease_id for release
- POST /leases/filter - list active leases
- POST /leases/renew - extend lease TTL
- DELETE /leases/{id} - revoke lease and release slots
- add mode parameter: "concurrency" (with leases) vs "rate_limit" (no leases)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
implements basic CRUD and slot management for global concurrency limits:
- add concurrency_limit table via migration 003
- add src/db/concurrency_limits.zig with CRUD and slot operations
- add src/api/concurrency_limits_v2.zig with endpoints:
- POST /v2/concurrency_limits/ (create)
- GET /v2/concurrency_limits/{id_or_name} (read)
- PATCH /v2/concurrency_limits/{id_or_name} (update)
- DELETE /v2/concurrency_limits/{id_or_name} (delete)
- POST /v2/concurrency_limits/filter (list)
- POST /v2/concurrency_limits/increment (acquire slots)
- POST /v2/concurrency_limits/decrement (release slots)
slot management returns 423 Locked with Retry-After header when
slots cannot be acquired. matches python prefect API.
phase 2 (lease storage) and phase 4 (cleanup service) still pending.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
matches python architecture - database stores limit config/counters,
lease storage (memory/filesystem/redis) handles transient lease state.
removes concurrency_lease table, adds proper lease storage abstraction.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
root cause: test was observing partial scheduler tick. each flow run
insertion commits separately, so test could see 25 runs mid-tick and
then 50 when tick completed - appearing as a failure.
fix: wait for count to stabilize before recording baseline, ensuring
we measure after the scheduler tick completes rather than during it.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- flows: PATCH /flows/{id} (update tags), DELETE /flows/{id}
- flow_runs: DELETE /flow_runs/{id} (cascade deletes states)
- test-matrix: ensure port released between tests
- update ROADMAP.md
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- HTTP layer: wrap onRequest with http.request span
- DB layer: wrap exec/query methods with db.exec/db.query spans
- records errors on failures via span.recordError()
without LOGFIRE_WRITE_TOKEN, spans are no-op.
with token, exports to Logfire via OTLP/protobuf.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
utilities/json.zig:
- getPath(value, "path.to.field") - navigate by dot-separated path
- getString, getInt, getBool, getFloat, getArray, getObject helpers
- extractAt(T, alloc, value, .{"path"}) - comptime typed extraction
- comprehensive tests
events_api.zig:
- parseFilterOptions: 54 lines → 28 lines using path helpers
- filterNext: simplified token field extraction
- filter: cleaner limit parsing
inspired by zat's internal/json.zig pattern
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- POST /events/filter with occurred, event, resource filters
- GET /events/filter/next with page tokens for pagination
- GET /events/count for total event count
code quality improvements:
- src/prefect.zig: clean public interface re-exporting main modules
- src/utilities/encoding.zig: base64 and URL encoding helpers
- db/backend.zig: add getBackend() for consistent access pattern
- db/events.zig: consolidate parseJsonStringArray, add column constants
- events_api.zig: use json.send/sendStatus helpers consistently
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- use DEVNULL during normal benchmark runs to avoid pipe buffer deadlock
- on startup failure, restart with PIPE to capture error output
- display actual error message instead of generic "failed to start server"
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
keep valuable reference info, mark implementation status:
- blocks-implementation.md: API sequences and schema (implemented)
- timestamps.md: timing handling, .serve() vs workers (resolved)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add docs/scratch/signal-handling.md documenting graceful shutdown fix
- delete blocks-implementation.md (blocks fully implemented)
- delete timestamps.md (issues resolved, notes outdated)
- update scratch README with current contents
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add SIGTERM and SIGINT signal handlers
- zap.stop() triggers clean shutdown when signal received
- services flush pending work (events, etc.) before exit
- add scripts/test-graceful-shutdown to verify behavior
- update roadmap and production-readiness docs
note: SIGTERM exits with code 1 due to zap/facil.io quirk, but
shutdown sequence completes correctly (services stop, data flushes)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
documents missing features for production use:
- tier 1: concurrency limits, automations, artifacts, event filtering
- tier 2: graceful shutdown, log persistence, auth, ui endpoints
- tier 3: background services, mutation endpoints, task rules
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
orchestration:
- add PreventDuplicateTransitions rule for idempotent state transitions
- uses transition_id comparison to detect duplicate requests
- add state_transition_id column via migration 002
api tests:
- add serve_pattern test (Runner/.serve() workflow)
- add worker_pattern test (Worker polling work pools)
- add flow_with_task_runs test (ETL-style flow execution)
- add retry_failed_flows and cancellation_flow tests
- optimize scheduler tests: polling instead of fixed sleeps (~36% faster)
- use client-side filtering (server filter doesn't support deployment_id yet)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- src/db/migrate.zig: migration runner with statement parser
- src/db/migrations_data.zig: migration registry using @embedFile
- src/db/migrations/001_initial/: full schema for sqlite and postgres
- replaces old CREATE TABLE IF NOT EXISTS approach
- tracks applied migrations in _migrations table
- tested on sqlite (local) and postgres (k8s)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- docs/migrations.md: comprehensive analysis of migration options
- zmig (sqlite-only zig tool)
- atlas (external go-based tool, multi-dialect)
- minimal DIY approach
- hybrid recommendation (atlas for dev, embedded SQL for runtime)
- phased implementation plan
- references to python prefect alembic patterns
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- add policy mutation flags to RuleContext (set_retry_type_in_process,
clear_retry_type, clear_pause_keys, set_resuming_false)
- RetryFailedFlows sets flags on retry: retry_type="in_process",
resuming=false, pause_keys=[]
- RetryFailedFlows clears retry_type when retries exhausted
- API handler builds updated policy JSON from flags
- setStateWithSchedule accepts optional empirical_policy update
- add tests for policy mutation flags
matches python prefect server behavior for retry policy handling.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
documents how Python prefect server implements database layer:
- architecture: three-tier abstraction (config, queries, ORM)
- orm-models: SQLAlchemy models, custom types, relationships
- migrations: alembic setup, dialect-specific patterns
- query-patterns: JSON handling, time series, transactions
- zig-compat-notes: comparison with our implementation
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
when a flow run transitions from RUNNING to FAILED:
- if retries configured and run_count <= retries, reject FAILED
- schedule AwaitingRetry state with retry_delay offset
- return REJECT status with SCHEDULED state (matching python behavior)
changes:
- add empirical_policy column (JSON with retries/retry_delay)
- extend RuleContext with retry fields and scheduleRetry method
- implement rule in flow_rules.zig with 6 unit tests
- wire up API to parse empirical_policy and handle retry response
- add test-retry script and justfile target
- use PREFECT_PROFILE=oss for test harness (uses prefect profiles)
- add loq exceptions for files with inline tests
- reference zig 0.15 notes in CLAUDE.md
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
single binary with subcommands:
prefect-server # server + services (default)
prefect-server --no-services # API only (horizontal scaling)
prefect-server services # background services only
kubernetes:
- k8s/ manifests for horizontal scaling deployment
- api-deployment: multiple replicas with --no-services
- services-deployment: single instance with `services` subcommand
- postgres + redis for data layer
- kustomize support
ci:
- tangled.org workflow for tests and build
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
orchestration:
- implement WaitForScheduledTime rule with unit tests
- refactor orchestration module structure (rules, transforms, types)
- add StateTypeSet bitfield for state matching
testing:
- add test-matrix script for all backend combinations
(sqlite×memory, sqlite×redis, postgres×memory, postgres×redis)
- fix postgres schema: add missing next_scheduled_start_time column
- add next_scheduled_start_time index for postgres
other:
- refactor broker and services module structure
- improve scheduler with cron support
- update benchmarks and test scripts
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- rename docker-compose.yml to compose.yml
- add PREFECT_SERVER_API_HOST=0.0.0.0 for container networking
- fix env var names to match code (PREFECT_DATABASE_URL, PREFECT_REDIS_MESSAGING_HOST)
- add docker-full-test recipe for testing postgres + redis stack
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
worker polling endpoint:
- POST /work_pools/{name}/get_scheduled_flow_runs
- returns WorkerFlowRunResponse[] (work_pool_id, work_queue_id, flow_run)
- updates queue/pool/deployment status to READY on poll
- supports work_queue_names filter and scheduled_before time filter
datetime standardization:
- unified ISO 8601 format: 2025-01-21T12:34:56.123456Z
- updated sqlite schema defaults: strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
- updated postgres schema defaults: TO_CHAR(NOW() AT TIME ZONE 'UTC', ...)
- added time utilities: parse(), lessOrEqual(), isPast(), formatMicros()
deployment work_queue_id resolution:
- resolve work_queue_id from work_pool_name on deployment create
- uses pool's default queue if work_queue_name not specified
- enables flow runs created from deployments to be found by worker polling
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- default host: 127.0.0.1 (was 0.0.0.0)
- default log level: WARNING (was INFO)
- default limits: 200 for work_pools/queues/workers (was 10)
- postgres pool_size: 5 (was 10)
- move config audit to docs/scratch/ (working notes, not permanent docs)
dev mode still uses DEBUG logging via justfile.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- dockerfile: multi-stage build with zig 0.15.2, copies facil.io shared lib
- docker-compose: server, redis, postgres, test services with healthchecks
- justfile: docker-test, docker-full, bench-compare, test-client commands
- benchmark: fix avg calculation to include results with timing data
- test-serve: add uv script header for prefect dependency
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
adds full deployment support including:
- deployment table with CRUD API (create, read, update, delete, filter, count)
- deployment_schedule table with CRUD API
- get_scheduled_flow_runs endpoint for runner polling
- create_flow_run from deployment
- pause/resume deployment endpoints
- .serve() now works end-to-end
also fixes event backfill to apply subscriber filters
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
adds the foundation for worker-based execution:
- work_pool table + full CRUD API
- work_queue table + CRUD (default queue auto-created with pool)
- worker table + heartbeat upsert
- status tracking (NOT_READY → READY on first heartbeat)
- reserved pool protection (prefect-* pools)
split API handlers into separate files to stay under line limits:
- src/api/work_pools.zig (pool CRUD + router)
- src/api/work_pool_queues.zig (queue handlers)
- src/api/work_pool_workers.zig (worker handlers)
tested with Python prefect client - all operations compatible.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
test harness:
- add variables test with full CRUD coverage
- add response validation helper for field/type checking
- validate flow, flow_run, variable, and block_type responses
api consistency:
- include variable name in conflict error message (like Python)
- fix timestamp format consistency: use ISO 8601 (with T, Z, microseconds)
instead of SQLite's datetime('now') format
- ensures create and update return consistent timestamp format
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
full CRUD for variables with endpoints:
- POST /api/variables/ (create)
- POST /api/variables/filter (list)
- POST /api/variables/count
- GET /api/variables/{id}
- GET /api/variables/name/{name}
- PATCH /api/variables/{id}
- PATCH /api/variables/name/{name}
- DELETE /api/variables/{id}
- DELETE /api/variables/name/{name}
adds variable table to both sqlite and postgres schemas with:
- id, name (unique), value (json), tags (json array)
- created/updated timestamps
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- refactor StoredMessage to heap allocation (no more 8KB truncation)
- refactor ParsedEvent to heap allocation (no more fixed buffer truncation)
- add ephemeral Redis group cleanup on unsubscribe
- implement backfill for /events/out (queries recent events before streaming)
- use std.json.Stringify.valueAlloc for proper JSON escaping
- add Col struct for documenting column indices in queryRecent
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
memory broker was pre-allocating 50,000 × 8KB = 400MB for message
storage. replaced with ArrayList-backed Queue that grows on demand,
matching Python's asyncio.Queue() behavior.
memory usage: 432MB → 39MB
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- parse client filter in /events/out (was always match_all)
- increase buffer sizes: persister 32KB payload, broadcaster 64KB+heap
- add subscribeEphemeral for broadcaster ($ start_id, no replay)
- join flush timer thread on shutdown
- release mutex before websocket writes (avoids blocking)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- fix event pipeline: /events/in publishes to broker, event_broadcaster
subscribes with ephemeral consumer group for WS fan-out
- fix postgres backend: add bigint() method for BIGINT columns, use
int() for INTEGER (i32) columns to match pg.zig strict typing
- add redis broker backend with XADD/XREADGROUP/XAUTOCLAIM for streams
- add benchmark matrix: --matrix flag tests sqlite/postgres × memory/redis
- fix redis RESP: handle fragmented TCP with readMore() and buffer growth
- fix redis command buffer: dynamic allocation for large events (>16KB)
- rename ws_distributor → event_broadcaster (cleaner abstraction)
- add CLAUDE.md files for broker/, db/, services/ subsystems
- add docker-compose.yml for redis + postgres dev services
benchmark results (3 iterations avg):
| db | broker | time | memory |
|----------|--------|------|---------|
| sqlite | memory | 27ms | 329MB |
| sqlite | redis | 28ms | 39MB |
| postgres | memory | 67ms | 428MB |
| postgres | redis | 79ms | 34MB |
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
introduces broker module with similar pattern to database backend:
- Broker union type with memory and redis variants
- Message struct with id, topic, data, attributes, timestamp
- Publisher/Consumer interfaces for pub/sub pattern
- BoundedChannel generic for in-process message passing
- MemoryBroker implementation using bounded channels
- RedisBroker stub for future Redis Streams implementation
infrastructure changes:
- main.zig initializes broker on startup (PREFECT_BROKER_BACKEND env var)
- test script: scripts/test-broker-backends
- justfile: `just test-broker [memory|redis|all]`
the broker is initialized but not yet wired into event_persister or WS
subscribers - this sets up the foundation for incremental migration.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>