prefect server in zig

refactor db layer to use zqlite with transactions

- replace raw sqlite3 C bindings with zqlite library
- add mutex-protected global connection for thread safety
- wrap state transitions in atomic transactions
- move SQL out of route handlers into db module
- add concise readme

+318 -330
+26
README.md
··· 1 + # prefect-zig 2 + 3 + minimal prefect server in zig - single binary, sqlite storage. 4 + 5 + ## build & run 6 + 7 + ```bash 8 + zig build && ./zig-out/bin/prefect-zig 9 + ``` 10 + 11 + server runs on `localhost:4200`. 12 + 13 + ## endpoints 14 + 15 + - `POST /flows/` - create/get flow by name 16 + - `POST /flow_runs/` - create flow run 17 + - `GET /flow_runs/{id}` - read flow run 18 + - `POST /flow_runs/{id}/set_state` - transition state 19 + - `POST /flow_runs/filter` - list flow runs 20 + - `GET /health` - health check 21 + 22 + ## test 23 + 24 + ```bash 25 + ./scripts/test-flow-sequence 26 + ```
+10 -4
build.zig
··· 4 4 const target = b.standardTargetOptions(.{}); 5 5 const optimize = b.standardOptimizeOption(.{}); 6 6 7 - // get uuid dependency 8 7 const uuid_dep = b.dependency("uuid", .{ 8 + .target = target, 9 + .optimize = optimize, 10 + }); 11 + 12 + const zqlite = b.dependency("zqlite", .{ 9 13 .target = target, 10 14 .optimize = optimize, 11 15 }); ··· 18 22 .optimize = optimize, 19 23 .imports = &.{ 20 24 .{ .name = "uuid", .module = uuid_dep.module("uuid") }, 25 + .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 21 26 }, 22 27 }), 23 28 }); 24 29 25 - // link sqlite3 for database 26 - exe.linkSystemLibrary("sqlite3"); 27 30 exe.linkLibC(); 28 31 29 32 b.installArtifact(exe); ··· 44 47 .root_source_file = b.path("src/main.zig"), 45 48 .target = target, 46 49 .optimize = optimize, 50 + .imports = &.{ 51 + .{ .name = "uuid", .module = uuid_dep.module("uuid") }, 52 + .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 53 + }, 47 54 }), 48 55 }); 49 - unit_tests.linkSystemLibrary("sqlite3"); 50 56 unit_tests.linkLibC(); 51 57 52 58 const run_unit_tests = b.addRunArtifact(unit_tests);
+4
build.zig.zon
··· 8 8 .url = "git+https://codeberg.org/r4gus/uuid-zig.git#97220fa411b69513df603f935fe2f8a294f1ca64", 9 9 .hash = "uuid-0.4.0-oOieIVh6AAA32OyOsZmY3yAkszkVtDyvIWaCufEfACsc", 10 10 }, 11 + .zqlite = .{ 12 + .url = "https://github.com/karlseguin/zqlite.zig/archive/refs/heads/master.tar.gz", 13 + .hash = "zqlite-0.0.0-RWLaY_y_mADh2LdbDrG_2HT2dBAcsAR8Jig_7-dOJd0B", 14 + }, 11 15 }, 12 16 .paths = .{ 13 17 "build.zig",
+22 -115
src/api/flow_runs.zig
··· 11 11 // GET /flow_runs/{id} - read flow run 12 12 // POST /flow_runs/{id}/set_state - set state 13 13 // POST /flow_runs/filter - list flow runs 14 - pub fn handle(request: *http.Server.Request, database: *db.Database, target: []const u8) !void { 14 + pub fn handle(request: *http.Server.Request, target: []const u8) !void { 15 15 const method = request.head.method; 16 16 17 17 // POST /flow_runs/ - create 18 18 if (method == .POST and (mem.eql(u8, target, "/flow_runs/") or mem.eql(u8, target, "/api/flow_runs/"))) { 19 - try create(request, database); 19 + try create(request); 20 20 return; 21 21 } 22 22 23 23 // POST /flow_runs/filter - list 24 24 if (method == .POST and (mem.endsWith(u8, target, "/filter"))) { 25 - try filter(request, database); 25 + try filter(request); 26 26 return; 27 27 } 28 28 ··· 31 31 const id = common.extractId(target, "/flow_runs/", "/set_state") orelse 32 32 common.extractId(target, "/api/flow_runs/", "/set_state"); 33 33 if (id) |flow_run_id| { 34 - try setState(request, database, flow_run_id); 34 + try setState(request, flow_run_id); 35 35 return; 36 36 } 37 37 } ··· 41 41 const id = common.extractIdSimple(target, "/flow_runs/") orelse 42 42 common.extractIdSimple(target, "/api/flow_runs/"); 43 43 if (id) |flow_run_id| { 44 - try read(request, database, flow_run_id); 44 + try read(request, flow_run_id); 45 45 return; 46 46 } 47 47 } ··· 49 49 try http_server.sendJsonStatus(request, "{\"detail\":\"not found\"}", .not_found); 50 50 } 51 51 52 - fn create(request: *http.Server.Request, database: *db.Database) !void { 52 + fn create(request: *http.Server.Request) !void { 53 53 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 54 54 defer arena.deinit(); 55 55 const alloc = arena.allocator(); ··· 101 101 var ts_buf: [32]u8 = undefined; 102 102 const now = common.getTimestamp(&ts_buf); 103 103 104 - var stmt = database.prepare( 105 - \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp) 106 - \\VALUES (?1, ?2, ?3, ?4, ?5, ?6) 107 - ) catch { 108 - try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 109 - return; 110 - }; 111 - defer stmt.deinit(); 112 - 113 - stmt.bindText(1, new_id) catch {}; 114 - stmt.bindText(2, flow_id) catch {}; 115 - stmt.bindText(3, name) catch {}; 116 - stmt.bindText(4, state_type) catch {}; 117 - stmt.bindText(5, state_name) catch {}; 118 - stmt.bindText(6, now) catch {}; 119 - 120 - _ = stmt.step() catch { 104 + db.insertFlowRun(new_id, flow_id, name, state_type, state_name, now) catch { 121 105 try http_server.sendJsonStatus(request, "{\"detail\":\"insert failed\"}", .internal_server_error); 122 106 return; 123 107 }; 124 108 125 - // return the created flow run (minimal response for now) 109 + // return the created flow run 126 110 const response = std.fmt.allocPrint(alloc, 127 111 \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}},"parameters":{{}},"tags":[],"run_count":0}} 128 112 , .{ new_id, now, now, name, flow_id, state_type, state_name, state_type, state_name, now, common.generateUuid(alloc) }) catch { ··· 133 117 try http_server.sendJsonStatus(request, response, .created); 134 118 } 135 119 136 - fn read(request: *http.Server.Request, database: *db.Database, raw_id: []const u8) !void { 120 + fn read(request: *http.Server.Request, raw_id: []const u8) !void { 137 121 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 138 122 defer arena.deinit(); 139 123 const alloc = arena.allocator(); 140 124 141 125 const id = common.normalizeUuid(alloc, raw_id); 142 126 143 - var stmt = database.prepare( 144 - \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, parameters, tags 145 - \\FROM flow_run WHERE id = ?1 146 - ) catch { 147 - try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 148 - return; 149 - }; 150 - defer stmt.deinit(); 151 - 152 - stmt.bindText(1, id) catch {}; 153 - 154 - if (!(stmt.step() catch false)) { 127 + const run = db.getFlowRun(id) orelse { 155 128 try http_server.sendJsonStatus(request, "{\"detail\":\"flow run not found\"}", .not_found); 156 129 return; 157 - } 158 - 159 - const run_id = stmt.columnText(0) orelse ""; 160 - const created = stmt.columnText(1) orelse ""; 161 - const updated = stmt.columnText(2) orelse ""; 162 - const flow_id = stmt.columnText(3) orelse ""; 163 - const run_name = stmt.columnText(4) orelse ""; 164 - const state_type = stmt.columnText(5) orelse "PENDING"; 165 - const state_name = stmt.columnText(6) orelse "Pending"; 166 - const state_timestamp = stmt.columnText(7) orelse created; 167 - const parameters = stmt.columnText(8) orelse "{}"; 168 - const tags = stmt.columnText(9) orelse "[]"; 130 + }; 169 131 170 132 const response = std.fmt.allocPrint(alloc, 171 133 \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"parameters":{s},"tags":{s},"run_count":0}} 172 - , .{ run_id, created, updated, run_name, flow_id, state_type, state_name, state_type, state_name, state_timestamp, parameters, tags }) catch { 134 + , .{ run.id, run.created, run.updated, run.name, run.flow_id, run.state_type, run.state_name, run.state_type, run.state_name, run.state_timestamp, run.parameters, run.tags }) catch { 173 135 try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 174 136 return; 175 137 }; ··· 177 139 try http_server.sendJson(request, response); 178 140 } 179 141 180 - fn setState(request: *http.Server.Request, database: *db.Database, raw_id: []const u8) !void { 142 + fn setState(request: *http.Server.Request, raw_id: []const u8) !void { 181 143 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 182 144 defer arena.deinit(); 183 145 const alloc = arena.allocator(); ··· 206 168 const state_name = if (state.object.get("name")) |v| v.string else "Pending"; 207 169 var ts_buf: [32]u8 = undefined; 208 170 const now = common.getTimestamp(&ts_buf); 171 + const state_id = common.generateUuid(alloc); 209 172 210 - // update the flow run state 211 - var stmt = database.prepare( 212 - \\UPDATE flow_run SET state_type = ?1, state_name = ?2, state_timestamp = ?3, updated = ?3 213 - \\WHERE id = ?4 214 - ) catch { 215 - try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 216 - return; 217 - }; 218 - defer stmt.deinit(); 219 - 220 - stmt.bindText(1, state_type) catch {}; 221 - stmt.bindText(2, state_name) catch {}; 222 - stmt.bindText(3, now) catch {}; 223 - stmt.bindText(4, id) catch {}; 224 - 225 - _ = stmt.step() catch { 173 + // atomic state transition (update + history insert in transaction) 174 + db.setFlowRunState(id, state_id, state_type, state_name, now) catch { 226 175 try http_server.sendJsonStatus(request, "{\"detail\":\"update failed\"}", .internal_server_error); 227 176 return; 228 177 }; 229 178 230 - // insert state history (non-fatal if it fails) 231 - var history_stmt = database.prepare( 232 - \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp) 233 - \\VALUES (?1, ?2, ?3, ?4, ?5) 234 - ) catch { 235 - // non-fatal, skip state history 236 - const state_id = common.generateUuid(alloc); 237 - const response = std.fmt.allocPrint(alloc, 238 - \\{{"status":"ACCEPT","details":{{}},"state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}}}} 239 - , .{ state_type, state_name, now, state_id }) catch { 240 - try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 241 - return; 242 - }; 243 - try http_server.sendJson(request, response); 244 - return; 245 - }; 246 - defer history_stmt.deinit(); 247 - 248 - history_stmt.bindText(1, common.generateUuid(alloc)) catch {}; 249 - history_stmt.bindText(2, id) catch {}; 250 - history_stmt.bindText(3, state_type) catch {}; 251 - history_stmt.bindText(4, state_name) catch {}; 252 - history_stmt.bindText(5, now) catch {}; 253 - _ = history_stmt.step() catch {}; // non-fatal 254 - 255 179 // return orchestration result 256 - const state_id = common.generateUuid(alloc); 257 180 const response = std.fmt.allocPrint(alloc, 258 181 \\{{"status":"ACCEPT","details":{{}},"state":{{"type":"{s}","name":"{s}","timestamp":"{s}","id":"{s}"}}}} 259 182 , .{ state_type, state_name, now, state_id }) catch { ··· 264 187 try http_server.sendJson(request, response); 265 188 } 266 189 267 - fn filter(request: *http.Server.Request, database: *db.Database) !void { 190 + fn filter(request: *http.Server.Request) !void { 268 191 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 269 192 defer arena.deinit(); 270 193 const alloc = arena.allocator(); 271 194 272 - // for now, just return all flow runs (TODO: implement filters) 273 - var stmt = database.prepare( 274 - \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp 275 - \\FROM flow_run ORDER BY created DESC LIMIT 50 276 - ) catch { 195 + const runs = db.listFlowRuns(alloc, 50) catch { 277 196 try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 278 197 return; 279 198 }; 280 - defer stmt.deinit(); 281 199 282 200 var results: std.ArrayList(u8) = .{}; 283 201 defer results.deinit(alloc); 284 202 285 203 try results.appendSlice(alloc, "["); 286 204 287 - var first = true; 288 - while (stmt.step() catch false) { 289 - if (!first) try results.appendSlice(alloc, ","); 290 - first = false; 291 - 292 - const run_id = stmt.columnText(0) orelse ""; 293 - const created = stmt.columnText(1) orelse ""; 294 - const updated = stmt.columnText(2) orelse ""; 295 - const flow_id = stmt.columnText(3) orelse ""; 296 - const run_name = stmt.columnText(4) orelse ""; 297 - const state_type = stmt.columnText(5) orelse "PENDING"; 298 - const state_name = stmt.columnText(6) orelse "Pending"; 299 - const state_timestamp = stmt.columnText(7) orelse created; 205 + for (runs, 0..) |run, i| { 206 + if (i > 0) try results.appendSlice(alloc, ","); 300 207 301 208 const item = std.fmt.allocPrint(alloc, 302 - \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"parameters":{{}},"tags":[],"run_count":0}} 303 - , .{ run_id, created, updated, run_name, flow_id, state_type, state_name, state_type, state_name, state_timestamp }) catch continue; 209 + \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","flow_id":"{s}","state_type":"{s}","state_name":"{s}","state":{{"type":"{s}","name":"{s}","timestamp":"{s}"}},"parameters":{s},"tags":{s},"run_count":0}} 210 + , .{ run.id, run.created, run.updated, run.name, run.flow_id, run.state_type, run.state_name, run.state_type, run.state_name, run.state_timestamp, run.parameters, run.tags }) catch continue; 304 211 305 212 try results.appendSlice(alloc, item); 306 213 }
+7 -32
src/api/flows.zig
··· 8 8 const common = @import("common.zig"); 9 9 10 10 // POST /flows/ - create or get flow by name 11 - // GET /flows/{id} - read flow by id 12 - pub fn handle(request: *http.Server.Request, database: *db.Database, target: []const u8) !void { 11 + pub fn handle(request: *http.Server.Request, target: []const u8) !void { 13 12 if (request.head.method == .POST and (mem.eql(u8, target, "/flows/") or mem.eql(u8, target, "/api/flows/"))) { 14 - try createFlow(request, database); 13 + try createFlow(request); 15 14 } else { 16 15 try http_server.sendJsonStatus(request, "{\"detail\":\"not implemented\"}", .not_implemented); 17 16 } 18 17 } 19 18 20 - fn createFlow(request: *http.Server.Request, database: *db.Database) !void { 19 + fn createFlow(request: *http.Server.Request) !void { 21 20 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 22 21 defer arena.deinit(); 23 22 const alloc = arena.allocator(); 24 23 25 24 // read request body 26 25 var body_buf: [8192]u8 = undefined; 27 - request.head.expect = null; // we don't send 100-continue 26 + request.head.expect = null; 28 27 const body_reader = request.readerExpectNone(&body_buf); 29 28 const body = body_reader.allocRemaining(alloc, .unlimited) catch { 30 29 try http_server.sendJsonStatus(request, "{\"detail\":\"failed to read body\"}", .bad_request); ··· 44 43 const name_str = name.string; 45 44 46 45 // try to get existing flow first 47 - var select_stmt = database.prepare("SELECT id, created, updated, name, tags FROM flow WHERE name = ?1") catch { 48 - try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 49 - return; 50 - }; 51 - defer select_stmt.deinit(); 52 - 53 - select_stmt.bindText(1, name_str) catch {}; 54 - 55 - if (select_stmt.step() catch false) { 56 - // flow exists, return it 57 - const id = select_stmt.columnText(0) orelse ""; 58 - const created = select_stmt.columnText(1) orelse ""; 59 - const updated = select_stmt.columnText(2) orelse ""; 60 - const flow_name = select_stmt.columnText(3) orelse ""; 61 - const tags = select_stmt.columnText(4) orelse "[]"; 62 - 46 + if (db.getFlowByName(name_str)) |flow| { 63 47 const response = std.fmt.allocPrint(alloc, 64 48 \\{{"id":"{s}","created":"{s}","updated":"{s}","name":"{s}","tags":{s}}} 65 - , .{ id, created, updated, flow_name, tags }) catch { 49 + , .{ flow.id, flow.created, flow.updated, flow.name, flow.tags }) catch { 66 50 try http_server.sendJsonStatus(request, "{\"detail\":\"format error\"}", .internal_server_error); 67 51 return; 68 52 }; 69 - 70 53 try http_server.sendJson(request, response); 71 54 return; 72 55 } ··· 74 57 // create new flow 75 58 const new_id = common.generateUuid(alloc); 76 59 77 - var insert_stmt = database.prepare("INSERT INTO flow (id, name) VALUES (?1, ?2)") catch { 78 - try http_server.sendJsonStatus(request, "{\"detail\":\"database error\"}", .internal_server_error); 79 - return; 80 - }; 81 - defer insert_stmt.deinit(); 82 - 83 - insert_stmt.bindText(1, new_id) catch {}; 84 - insert_stmt.bindText(2, name_str) catch {}; 85 - _ = insert_stmt.step() catch { 60 + db.insertFlow(new_id, name_str) catch { 86 61 try http_server.sendJsonStatus(request, "{\"detail\":\"insert failed\"}", .internal_server_error); 87 62 return; 88 63 };
-2
src/api/routes.zig
··· 1 1 const std = @import("std"); 2 2 const http = std.http; 3 3 4 - const db = @import("../db/sqlite.zig"); 5 - 6 4 // resource modules 7 5 pub const admin = @import("admin.zig"); 8 6 pub const flows = @import("flows.zig");
+240 -167
src/db/sqlite.zig
··· 1 1 const std = @import("std"); 2 - const c = @cImport({ 3 - @cInclude("sqlite3.h"); 4 - }); 2 + const zqlite = @import("zqlite"); 3 + const Thread = std.Thread; 5 4 6 - const Allocator = std.mem.Allocator; 5 + pub var conn: zqlite.Conn = undefined; 6 + pub var mutex: Thread.Mutex = .{}; 7 7 8 - // Use SQLITE_STATIC (null) - we ensure the data lives long enough 9 - // SQLITE_TRANSIENT (-1) would have sqlite copy the data but causes alignment issues 8 + var path_buf: [256]u8 = undefined; 10 9 11 - pub const Database = struct { 12 - conn: ?*c.sqlite3, 13 - allocator: Allocator, 10 + pub fn init() !void { 11 + const path_env = std.posix.getenv("PREFECT_DATABASE_PATH") orelse "prefect.db"; 14 12 15 - pub fn init(allocator: Allocator) !Database { 16 - const db_path = std.posix.getenv("PREFECT_DATABASE_PATH") orelse "prefect.db"; 13 + @memcpy(path_buf[0..path_env.len], path_env); 14 + path_buf[path_env.len] = 0; 15 + const path: [*:0]const u8 = path_buf[0..path_env.len :0]; 17 16 18 - var conn: ?*c.sqlite3 = null; 19 - const rc = c.sqlite3_open(db_path.ptr, &conn); 20 - if (rc != c.SQLITE_OK) { 21 - std.debug.print("sqlite open error: {s}\n", .{c.sqlite3_errmsg(conn)}); 22 - return error.DatabaseOpenFailed; 23 - } 17 + std.debug.print("opening database: {s}\n", .{path_env}); 24 18 25 - var db = Database{ .conn = conn, .allocator = allocator }; 26 - try db.configurePragmas(); 27 - try db.initSchema(); 19 + const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite; 20 + conn = zqlite.open(path, flags) catch |err| { 21 + std.debug.print("failed to open database: {}\n", .{err}); 22 + return err; 23 + }; 28 24 29 - std.debug.print("database initialized: {s}\n", .{db_path}); 30 - return db; 31 - } 25 + // configure pragmas 26 + _ = conn.exec("PRAGMA journal_mode=WAL", .{}) catch {}; 27 + _ = conn.exec("PRAGMA busy_timeout=5000", .{}) catch {}; 28 + _ = conn.exec("PRAGMA foreign_keys=ON", .{}) catch {}; 32 29 33 - fn configurePragmas(self: *Database) !void { 34 - // enable WAL mode for better concurrent read/write performance 35 - var err_msg: [*c]u8 = null; 36 - var rc = c.sqlite3_exec(self.conn, "PRAGMA journal_mode=WAL;", null, null, &err_msg); 37 - if (rc != c.SQLITE_OK) { 38 - std.debug.print("WAL mode error: {s}\n", .{err_msg}); 39 - c.sqlite3_free(err_msg); 40 - } 30 + try initSchema(); 41 31 42 - // set busy timeout to 5 seconds 43 - rc = c.sqlite3_exec(self.conn, "PRAGMA busy_timeout=5000;", null, null, &err_msg); 44 - if (rc != c.SQLITE_OK) { 45 - std.debug.print("busy_timeout error: {s}\n", .{err_msg}); 46 - c.sqlite3_free(err_msg); 47 - } 32 + std.debug.print("database initialized\n", .{}); 33 + } 34 + 35 + pub fn close() void { 36 + conn.close(); 37 + } 38 + 39 + fn initSchema() !void { 40 + mutex.lock(); 41 + defer mutex.unlock(); 42 + 43 + conn.execNoArgs( 44 + \\CREATE TABLE IF NOT EXISTS flow ( 45 + \\ id TEXT PRIMARY KEY, 46 + \\ created TEXT DEFAULT (datetime('now')), 47 + \\ updated TEXT DEFAULT (datetime('now')), 48 + \\ name TEXT NOT NULL UNIQUE, 49 + \\ tags TEXT DEFAULT '[]' 50 + \\) 51 + ) catch |err| { 52 + std.debug.print("failed to create flow table: {}\n", .{err}); 53 + return err; 54 + }; 48 55 49 - // enable foreign keys 50 - rc = c.sqlite3_exec(self.conn, "PRAGMA foreign_keys=ON;", null, null, &err_msg); 51 - if (rc != c.SQLITE_OK) { 52 - std.debug.print("foreign_keys error: {s}\n", .{err_msg}); 53 - c.sqlite3_free(err_msg); 54 - } 56 + conn.execNoArgs( 57 + \\CREATE TABLE IF NOT EXISTS flow_run ( 58 + \\ id TEXT PRIMARY KEY, 59 + \\ created TEXT DEFAULT (datetime('now')), 60 + \\ updated TEXT DEFAULT (datetime('now')), 61 + \\ flow_id TEXT REFERENCES flow(id), 62 + \\ name TEXT NOT NULL, 63 + \\ parameters TEXT DEFAULT '{}', 64 + \\ tags TEXT DEFAULT '[]', 65 + \\ state_id TEXT, 66 + \\ state_type TEXT, 67 + \\ state_name TEXT, 68 + \\ state_timestamp TEXT, 69 + \\ run_count INTEGER DEFAULT 0 70 + \\) 71 + ) catch |err| { 72 + std.debug.print("failed to create flow_run table: {}\n", .{err}); 73 + return err; 74 + }; 75 + 76 + conn.execNoArgs( 77 + \\CREATE TABLE IF NOT EXISTS flow_run_state ( 78 + \\ id TEXT PRIMARY KEY, 79 + \\ created TEXT DEFAULT (datetime('now')), 80 + \\ flow_run_id TEXT REFERENCES flow_run(id) ON DELETE CASCADE, 81 + \\ type TEXT NOT NULL, 82 + \\ name TEXT NOT NULL, 83 + \\ timestamp TEXT DEFAULT (datetime('now')) 84 + \\) 85 + ) catch |err| { 86 + std.debug.print("failed to create flow_run_state table: {}\n", .{err}); 87 + return err; 88 + }; 89 + 90 + // indexes 91 + conn.execNoArgs("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)") catch {}; 92 + conn.execNoArgs("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)") catch {}; 93 + conn.execNoArgs("CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id)") catch {}; 94 + } 95 + 96 + // --- flow operations --- 97 + 98 + pub fn getFlowByName(name: []const u8) ?FlowRow { 99 + mutex.lock(); 100 + defer mutex.unlock(); 101 + 102 + if (conn.row("SELECT id, created, updated, name, tags FROM flow WHERE name = ?", .{name}) catch null) |row| { 103 + defer row.deinit(); 104 + return FlowRow{ 105 + .id = row.text(0), 106 + .created = row.text(1), 107 + .updated = row.text(2), 108 + .name = row.text(3), 109 + .tags = row.text(4), 110 + }; 55 111 } 112 + return null; 113 + } 56 114 57 - pub fn deinit(self: *Database) void { 58 - if (self.conn) |conn| { 59 - _ = c.sqlite3_close(conn); 60 - } 115 + pub fn insertFlow(id: []const u8, name: []const u8) !void { 116 + mutex.lock(); 117 + defer mutex.unlock(); 118 + 119 + conn.exec("INSERT INTO flow (id, name) VALUES (?, ?)", .{ id, name }) catch |err| { 120 + std.debug.print("insert flow error: {}\n", .{err}); 121 + return err; 122 + }; 123 + } 124 + 125 + // --- flow run operations --- 126 + 127 + pub fn insertFlowRun( 128 + id: []const u8, 129 + flow_id: []const u8, 130 + name: []const u8, 131 + state_type: []const u8, 132 + state_name: []const u8, 133 + timestamp: []const u8, 134 + ) !void { 135 + mutex.lock(); 136 + defer mutex.unlock(); 137 + 138 + conn.exec( 139 + \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp) 140 + \\VALUES (?, ?, ?, ?, ?, ?) 141 + , .{ id, flow_id, name, state_type, state_name, timestamp }) catch |err| { 142 + std.debug.print("insert flow_run error: {}\n", .{err}); 143 + return err; 144 + }; 145 + } 146 + 147 + pub fn getFlowRun(id: []const u8) ?FlowRunRow { 148 + mutex.lock(); 149 + defer mutex.unlock(); 150 + 151 + if (conn.row( 152 + \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, parameters, tags 153 + \\FROM flow_run WHERE id = ? 154 + , .{id}) catch null) |row| { 155 + defer row.deinit(); 156 + return FlowRunRow{ 157 + .id = row.text(0), 158 + .created = row.text(1), 159 + .updated = row.text(2), 160 + .flow_id = row.text(3), 161 + .name = row.text(4), 162 + .state_type = row.text(5), 163 + .state_name = row.text(6), 164 + .state_timestamp = row.text(7), 165 + .parameters = row.text(8), 166 + .tags = row.text(9), 167 + }; 61 168 } 169 + return null; 170 + } 62 171 63 - fn initSchema(self: *Database) !void { 64 - const schema = 65 - \\-- flows table 66 - \\CREATE TABLE IF NOT EXISTS flow ( 67 - \\ id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))), 68 - \\ created TEXT DEFAULT (datetime('now')), 69 - \\ updated TEXT DEFAULT (datetime('now')), 70 - \\ name TEXT NOT NULL UNIQUE, 71 - \\ tags TEXT DEFAULT '[]' 72 - \\); 73 - \\ 74 - \\-- flow_run table 75 - \\CREATE TABLE IF NOT EXISTS flow_run ( 76 - \\ id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))), 77 - \\ created TEXT DEFAULT (datetime('now')), 78 - \\ updated TEXT DEFAULT (datetime('now')), 79 - \\ flow_id TEXT REFERENCES flow(id), 80 - \\ name TEXT NOT NULL, 81 - \\ flow_version TEXT, 82 - \\ parameters TEXT DEFAULT '{}', 83 - \\ context TEXT DEFAULT '{}', 84 - \\ tags TEXT DEFAULT '[]', 85 - \\ state_type TEXT, 86 - \\ state_name TEXT, 87 - \\ state_timestamp TEXT, 88 - \\ state_message TEXT, 89 - \\ state_details TEXT DEFAULT '{}', 90 - \\ run_count INTEGER DEFAULT 0, 91 - \\ expected_start_time TEXT, 92 - \\ start_time TEXT, 93 - \\ end_time TEXT, 94 - \\ total_run_time REAL DEFAULT 0, 95 - \\ idempotency_key TEXT, 96 - \\ empirical_policy TEXT DEFAULT '{}', 97 - \\ UNIQUE(flow_id, idempotency_key) 98 - \\); 99 - \\ 100 - \\-- flow_run_state table (history) 101 - \\CREATE TABLE IF NOT EXISTS flow_run_state ( 102 - \\ id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))), 103 - \\ created TEXT DEFAULT (datetime('now')), 104 - \\ flow_run_id TEXT REFERENCES flow_run(id) ON DELETE CASCADE, 105 - \\ type TEXT NOT NULL, 106 - \\ name TEXT NOT NULL, 107 - \\ timestamp TEXT DEFAULT (datetime('now')), 108 - \\ message TEXT, 109 - \\ state_details TEXT DEFAULT '{}' 110 - \\); 111 - \\ 112 - \\-- indexes 113 - \\CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type); 114 - \\CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id); 115 - \\CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id); 116 - ; 172 + /// atomically update flow run state and insert history record 173 + pub fn setFlowRunState( 174 + run_id: []const u8, 175 + state_id: []const u8, 176 + state_type: []const u8, 177 + state_name: []const u8, 178 + timestamp: []const u8, 179 + ) !void { 180 + mutex.lock(); 181 + defer mutex.unlock(); 117 182 118 - var err_msg: [*c]u8 = null; 119 - const rc = c.sqlite3_exec(self.conn, schema.ptr, null, null, &err_msg); 120 - if (rc != c.SQLITE_OK) { 121 - std.debug.print("schema init error: {s}\n", .{err_msg}); 122 - c.sqlite3_free(err_msg); 123 - return error.SchemaInitFailed; 124 - } 125 - } 183 + // wrap in transaction for atomicity 184 + conn.transaction() catch |err| { 185 + std.debug.print("begin transaction error: {}\n", .{err}); 186 + return err; 187 + }; 188 + errdefer conn.rollback(); 126 189 127 - pub fn exec(self: *Database, sql: [*:0]const u8) !void { 128 - var err_msg: [*c]u8 = null; 129 - const rc = c.sqlite3_exec(self.conn, sql, null, null, &err_msg); 130 - if (rc != c.SQLITE_OK) { 131 - std.debug.print("exec error: {s}\n", .{err_msg}); 132 - c.sqlite3_free(err_msg); 133 - return error.ExecFailed; 134 - } 135 - } 190 + // update current state 191 + conn.exec( 192 + \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ? 193 + \\WHERE id = ? 194 + , .{ state_id, state_type, state_name, timestamp, timestamp, run_id }) catch |err| { 195 + std.debug.print("update flow_run error: {}\n", .{err}); 196 + return err; 197 + }; 136 198 137 - pub fn prepare(self: *Database, sql: [*:0]const u8) !Statement { 138 - var stmt: ?*c.sqlite3_stmt = null; 139 - const rc = c.sqlite3_prepare_v2(self.conn, sql, -1, &stmt, null); 140 - if (rc != c.SQLITE_OK) { 141 - std.debug.print("prepare error: {s}\n", .{c.sqlite3_errmsg(self.conn)}); 142 - return error.PrepareFailed; 143 - } 144 - return Statement{ .stmt = stmt, .db = self }; 145 - } 146 - }; 199 + // insert state history 200 + conn.exec( 201 + \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp) 202 + \\VALUES (?, ?, ?, ?, ?) 203 + , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| { 204 + std.debug.print("insert flow_run_state error: {}\n", .{err}); 205 + return err; 206 + }; 147 207 148 - pub const Statement = struct { 149 - stmt: ?*c.sqlite3_stmt, 150 - db: *Database, 208 + conn.commit() catch |err| { 209 + std.debug.print("commit error: {}\n", .{err}); 210 + return err; 211 + }; 212 + } 151 213 152 - pub fn deinit(self: *Statement) void { 153 - if (self.stmt) |stmt| { 154 - _ = c.sqlite3_finalize(stmt); 155 - } 156 - } 214 + pub fn listFlowRuns(allocator: std.mem.Allocator, limit: usize) ![]FlowRunRow { 215 + mutex.lock(); 216 + defer mutex.unlock(); 157 217 158 - pub fn bindText(self: *Statement, idx: c_int, text: []const u8) !void { 159 - // Using null (SQLITE_STATIC) - caller ensures data lives until step() completes 160 - const rc = c.sqlite3_bind_text(self.stmt, idx, text.ptr, @intCast(text.len), null); 161 - if (rc != c.SQLITE_OK) return error.BindFailed; 162 - } 218 + var results: std.ArrayList(FlowRunRow) = .{}; 219 + errdefer results.deinit(allocator); 163 220 164 - pub fn bindNull(self: *Statement, idx: c_int) !void { 165 - const rc = c.sqlite3_bind_null(self.stmt, idx); 166 - if (rc != c.SQLITE_OK) return error.BindFailed; 167 - } 221 + var rows = conn.rows( 222 + \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, parameters, tags 223 + \\FROM flow_run ORDER BY created DESC LIMIT ? 224 + , .{@as(i64, @intCast(limit))}) catch |err| { 225 + std.debug.print("list flow_runs error: {}\n", .{err}); 226 + return err; 227 + }; 228 + defer rows.deinit(); 168 229 169 - pub fn step(self: *Statement) !bool { 170 - const rc = c.sqlite3_step(self.stmt); 171 - if (rc == c.SQLITE_ROW) return true; 172 - if (rc == c.SQLITE_DONE) return false; 173 - std.debug.print("step error: {s}\n", .{c.sqlite3_errmsg(self.db.conn)}); 174 - return error.StepFailed; 230 + while (rows.next()) |row| { 231 + try results.append(allocator, .{ 232 + .id = row.text(0), 233 + .created = row.text(1), 234 + .updated = row.text(2), 235 + .flow_id = row.text(3), 236 + .name = row.text(4), 237 + .state_type = row.text(5), 238 + .state_name = row.text(6), 239 + .state_timestamp = row.text(7), 240 + .parameters = row.text(8), 241 + .tags = row.text(9), 242 + }); 175 243 } 176 244 177 - pub fn reset(self: *Statement) !void { 178 - const rc = c.sqlite3_reset(self.stmt); 179 - if (rc != c.SQLITE_OK) return error.ResetFailed; 180 - } 245 + return results.toOwnedSlice(allocator); 246 + } 181 247 182 - pub fn columnText(self: *Statement, idx: c_int) ?[]const u8 { 183 - const ptr = c.sqlite3_column_text(self.stmt, idx); 184 - if (ptr == null) return null; 185 - const len = c.sqlite3_column_bytes(self.stmt, idx); 186 - return ptr[0..@intCast(len)]; 187 - } 248 + // --- row types --- 188 249 189 - pub fn columnInt(self: *Statement, idx: c_int) i32 { 190 - return c.sqlite3_column_int(self.stmt, idx); 191 - } 250 + pub const FlowRow = struct { 251 + id: []const u8, 252 + created: []const u8, 253 + updated: []const u8, 254 + name: []const u8, 255 + tags: []const u8, 192 256 }; 193 257 194 - pub fn init(allocator: Allocator) !Database { 195 - return Database.init(allocator); 196 - } 258 + pub const FlowRunRow = struct { 259 + id: []const u8, 260 + created: []const u8, 261 + updated: []const u8, 262 + flow_id: []const u8, 263 + name: []const u8, 264 + state_type: []const u8, 265 + state_name: []const u8, 266 + state_timestamp: []const u8, 267 + parameters: []const u8, 268 + tags: []const u8, 269 + };
+4 -4
src/main.zig
··· 15 15 defer _ = gpa.deinit(); 16 16 const allocator = gpa.allocator(); 17 17 18 - // init database 19 - var database = try db.init(allocator); 20 - defer database.deinit(); 18 + // init database (uses global state with mutex protection) 19 + try db.init(); 20 + defer db.close(); 21 21 22 22 // init thread pool for http connections 23 23 var pool: Thread.Pool = undefined; ··· 49 49 std.debug.print("failed to set socket timeout: {}\n", .{err}); 50 50 }; 51 51 52 - pool.spawn(server.handleConnection, .{ conn, &database }) catch |err| { 52 + pool.spawn(server.handleConnection, .{conn}) catch |err| { 53 53 std.debug.print("pool spawn error: {}\n", .{err}); 54 54 conn.stream.close(); 55 55 };
+5 -6
src/server/http.zig
··· 3 3 const http = std.http; 4 4 const mem = std.mem; 5 5 6 - const db = @import("../db/sqlite.zig"); 7 6 const api = @import("../api/routes.zig"); 8 7 9 8 const HTTP_BUF_SIZE = 8192; 10 9 11 - pub fn handleConnection(conn: net.Server.Connection, database: *db.Database) void { 10 + pub fn handleConnection(conn: net.Server.Connection) void { 12 11 defer conn.stream.close(); 13 12 14 13 var read_buffer: [HTTP_BUF_SIZE]u8 = undefined; ··· 26 25 } 27 26 return; 28 27 }; 29 - handleRequest(&request, database) catch |err| { 28 + handleRequest(&request) catch |err| { 30 29 std.debug.print("request error: {}\n", .{err}); 31 30 return; 32 31 }; ··· 34 33 } 35 34 } 36 35 37 - fn handleRequest(request: *http.Server.Request, database: *db.Database) !void { 36 + fn handleRequest(request: *http.Server.Request) !void { 38 37 const target = request.head.target; 39 38 40 39 std.debug.print("{s} {s}\n", .{ @tagName(request.head.method), target }); ··· 55 54 } else if (mem.startsWith(u8, target, "/api/logs") or mem.startsWith(u8, target, "/logs")) { 56 55 try api.logs.logs(request); 57 56 } else if (mem.startsWith(u8, target, "/api/flows") or mem.startsWith(u8, target, "/flows")) { 58 - try api.flows.handle(request, database, target); 57 + try api.flows.handle(request, target); 59 58 } else if (mem.startsWith(u8, target, "/api/flow_runs") or mem.startsWith(u8, target, "/flow_runs")) { 60 - try api.flow_runs.handle(request, database, target); 59 + try api.flow_runs.handle(request, target); 61 60 } else { 62 61 try sendNotFound(request); 63 62 }