prefect server in zig
at main 147 lines 3.9 kB view raw view rendered
1# database 2 3dual backend support: sqlite via [zqlite](https://github.com/zigzap/zqlite) and postgresql via [pg.zig](https://github.com/karlseguin/pg.zig). 4 5## backend selection 6 7environment variables control backend: 8 9```bash 10# sqlite (default) 11PREFECT_DATABASE_BACKEND=sqlite 12PREFECT_DATABASE_PATH=prefect.db 13 14# postgresql 15PREFECT_DATABASE_BACKEND=postgres 16PREFECT_DATABASE_URL=postgresql://user:pass@localhost:5432/prefect 17``` 18 19## architecture 20 21``` 22src/db/ 23├── backend.zig # unified Backend, Row, Rows types 24├── dialect.zig # SQL dialect helpers 25├── schema/ 26│ ├── sqlite.zig # SQLite DDL 27│ └── postgres.zig # PostgreSQL DDL 28├── sqlite.zig # schema init + migration 29├── flows.zig # flow entity 30├── flow_runs.zig # flow run entity 31├── task_runs.zig # task run entity 32├── events.zig # event entity 33├── block_types.zig 34├── block_schemas.zig 35└── block_documents.zig 36``` 37 38## connection models 39 40### sqlite 41 42single global connection with mutex for thread safety. pragmas: 43- `journal_mode=WAL` 44- `busy_timeout=5000` 45- `foreign_keys=ON` 46 47### postgresql 48 49connection pool (size=10) via pg.zig. parses standard connection URI format. 50 51## tables 52 53- `flow` - flow definitions 54- `flow_run` - flow run instances 55- `flow_run_state` - state history (FK cascade delete) 56- `task_run` - task run instances 57- `events` - persisted events 58- `block_type` - block type definitions 59- `block_schema` - block schemas 60- `block_document` - block document instances 61 62## query pattern 63 64entity modules use the unified backend API: 65 66```zig 67const backend = @import("backend.zig"); 68 69pub fn getFlowById(alloc: Allocator, id: []const u8) !?FlowRow { 70 var r = backend.db.row( 71 "SELECT id, created, updated, name, tags FROM flow WHERE id = ?", 72 .{id}, 73 ) catch return null; 74 75 if (r) |*row| { 76 defer row.deinit(); 77 return FlowRow{ 78 .id = try alloc.dupe(u8, row.text(0)), 79 .created = try alloc.dupe(u8, row.text(1)), 80 // ... 81 }; 82 } 83 return null; 84} 85``` 86 87placeholder rewriting is automatic: `?` becomes `$1, $2` for postgres. 88 89## transactions 90 91use `Transaction` type for atomic multi-statement operations. mutex locking is only needed for sqlite (postgres pool handles concurrency): 92 93```zig 94// lock mutex only for sqlite 95if (backend.db.dialect == .sqlite) { 96 backend.db.mutex.lock(); 97} 98defer if (backend.db.dialect == .sqlite) { 99 backend.db.mutex.unlock(); 100}; 101 102// begin transaction - for postgres this acquires a dedicated connection 103var txn = backend.db.beginTransaction() catch |err| { 104 log.err("database", "begin transaction error: {}", .{err}); 105 return err; 106}; 107errdefer txn.rollback(); 108 109// execute within transaction (uses same connection for postgres) 110txn.exec("UPDATE foo SET bar = ? WHERE id = ?", .{ bar, id }) catch |err| { 111 return err; 112}; 113txn.exec("INSERT INTO baz (foo_id, val) VALUES (?, ?)", .{ id, val }) catch |err| { 114 return err; 115}; 116 117txn.commit() catch |err| { 118 log.err("database", "commit error: {}", .{err}); 119 return err; 120}; 121``` 122 123the `Transaction` type ensures all statements execute on the same connection, which is critical for postgres where the pool might otherwise give different connections for each query. 124 125## migrations 126 127schema changes are managed via embedded migrations. on startup, the server: 1281. creates `_migrations` table if needed 1292. applies any pending migrations in order 1303. records applied migrations 131 132see [migrations.md](./migrations.md) for details on adding new migrations. 133 134see [python-reference/](./python-reference/) for how prefect python handles migrations with alembic. 135 136## testing 137 138```bash 139# test sqlite backend 140./scripts/test-db-backends sqlite 141 142# test postgresql (requires docker) 143./scripts/test-db-backends postgres 144 145# test both 146./scripts/test-db-backends all 147```