prefect server in zig
1# database
2
3dual backend support: sqlite via [zqlite](https://github.com/zigzap/zqlite) and postgresql via [pg.zig](https://github.com/karlseguin/pg.zig).
4
5## backend selection
6
7environment variables control backend:
8
9```bash
10# sqlite (default)
11PREFECT_DATABASE_BACKEND=sqlite
12PREFECT_DATABASE_PATH=prefect.db
13
14# postgresql
15PREFECT_DATABASE_BACKEND=postgres
16PREFECT_DATABASE_URL=postgresql://user:pass@localhost:5432/prefect
17```
18
19## architecture
20
21```
22src/db/
23├── backend.zig # unified Backend, Row, Rows types
24├── dialect.zig # SQL dialect helpers
25├── schema/
26│ ├── sqlite.zig # SQLite DDL
27│ └── postgres.zig # PostgreSQL DDL
28├── sqlite.zig # schema init + migration
29├── flows.zig # flow entity
30├── flow_runs.zig # flow run entity
31├── task_runs.zig # task run entity
32├── events.zig # event entity
33├── block_types.zig
34├── block_schemas.zig
35└── block_documents.zig
36```
37
38## connection models
39
40### sqlite
41
42single global connection with mutex for thread safety. pragmas:
43- `journal_mode=WAL`
44- `busy_timeout=5000`
45- `foreign_keys=ON`
46
47### postgresql
48
49connection pool (size=10) via pg.zig. parses standard connection URI format.
50
51## tables
52
53- `flow` - flow definitions
54- `flow_run` - flow run instances
55- `flow_run_state` - state history (FK cascade delete)
56- `task_run` - task run instances
57- `events` - persisted events
58- `block_type` - block type definitions
59- `block_schema` - block schemas
60- `block_document` - block document instances
61
62## query pattern
63
64entity modules use the unified backend API:
65
66```zig
67const backend = @import("backend.zig");
68
69pub fn getFlowById(alloc: Allocator, id: []const u8) !?FlowRow {
70 var r = backend.db.row(
71 "SELECT id, created, updated, name, tags FROM flow WHERE id = ?",
72 .{id},
73 ) catch return null;
74
75 if (r) |*row| {
76 defer row.deinit();
77 return FlowRow{
78 .id = try alloc.dupe(u8, row.text(0)),
79 .created = try alloc.dupe(u8, row.text(1)),
80 // ...
81 };
82 }
83 return null;
84}
85```
86
87placeholder rewriting is automatic: `?` becomes `$1, $2` for postgres.
88
89## transactions
90
91use `Transaction` type for atomic multi-statement operations. mutex locking is only needed for sqlite (postgres pool handles concurrency):
92
93```zig
94// lock mutex only for sqlite
95if (backend.db.dialect == .sqlite) {
96 backend.db.mutex.lock();
97}
98defer if (backend.db.dialect == .sqlite) {
99 backend.db.mutex.unlock();
100};
101
102// begin transaction - for postgres this acquires a dedicated connection
103var txn = backend.db.beginTransaction() catch |err| {
104 log.err("database", "begin transaction error: {}", .{err});
105 return err;
106};
107errdefer txn.rollback();
108
109// execute within transaction (uses same connection for postgres)
110txn.exec("UPDATE foo SET bar = ? WHERE id = ?", .{ bar, id }) catch |err| {
111 return err;
112};
113txn.exec("INSERT INTO baz (foo_id, val) VALUES (?, ?)", .{ id, val }) catch |err| {
114 return err;
115};
116
117txn.commit() catch |err| {
118 log.err("database", "commit error: {}", .{err});
119 return err;
120};
121```
122
123the `Transaction` type ensures all statements execute on the same connection, which is critical for postgres where the pool might otherwise give different connections for each query.
124
125## migrations
126
127schema changes are managed via embedded migrations. on startup, the server:
1281. creates `_migrations` table if needed
1292. applies any pending migrations in order
1303. records applied migrations
131
132see [migrations.md](./migrations.md) for details on adding new migrations.
133
134see [python-reference/](./python-reference/) for how prefect python handles migrations with alembic.
135
136## testing
137
138```bash
139# test sqlite backend
140./scripts/test-db-backends sqlite
141
142# test postgresql (requires docker)
143./scripts/test-db-backends postgres
144
145# test both
146./scripts/test-db-backends all
147```