prefect server in zig

add dual sqlite/postgres database backend abstraction

- add backend.zig with unified Backend, Row, Rows types
- add dialect.zig for SQL dialect helpers (placeholder rewriting)
- add pg.zig dependency for PostgreSQL support
- migrate all entity modules to use backend abstraction
- add schema/ directory with dialect-specific DDL
- add test-db-backends script for backend testing
- add hashing.zig for canonical JSON checksum generation

the backend abstraction supports:
- automatic placeholder rewriting (? → $1, $2 for postgres)
- unified row interface across both backends
- connection pooling for postgres, single conn + mutex for sqlite
- environment-based backend selection (PREFECT_DATABASE_BACKEND)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+1723 -317
+48 -3
CLAUDE.md
··· 19 just test # runs scripts/test-flow against localhost:4200 20 ``` 21 22 ## benchmarking 23 24 ``` ··· 32 33 - `PREFECT_SERVER_LOGGING_LEVEL` / `PREFECT_LOGGING_LEVEL`: DEBUG, INFO, WARNING, ERROR, CRITICAL 34 - `PREFECT_SERVER_PORT`: default 4200 35 - - `PREFECT_DATABASE_PATH`: default prefect.db 36 37 ## implemented endpoints 38 ··· 65 66 ## database 67 68 - sqlite via zqlite. tables: flow, flow_run, flow_run_state, events, block_type, block_schema, block_document 69 70 ## architecture 71 72 ``` 73 src/ 74 ├── api/ # http endpoints + websocket handlers 75 - ├── db/ # sqlite database layer 76 ├── orchestration/ # state transition bookkeeping 77 ├── services/ # background workers 78 ├── utilities/ # general infrastructure
··· 19 just test # runs scripts/test-flow against localhost:4200 20 ``` 21 22 + ## testing 23 + 24 + **NEVER run the server and test scripts in the same bash command.** The server is a long-running process. Use the benchmark scripts which handle server lifecycle, or run server and tests separately: 25 + 26 + ```bash 27 + # WRONG - will hang forever 28 + ./zig-out/bin/prefect-server & ./scripts/test-api-sequence 29 + 30 + # RIGHT - use benchmark script (handles server lifecycle) 31 + ./scripts/benchmark --server zig 32 + 33 + # RIGHT - or run separately in different terminals 34 + # terminal 1: ./zig-out/bin/prefect-server 35 + # terminal 2: ./scripts/test-api-sequence 36 + ``` 37 + 38 ## benchmarking 39 40 ``` ··· 48 49 - `PREFECT_SERVER_LOGGING_LEVEL` / `PREFECT_LOGGING_LEVEL`: DEBUG, INFO, WARNING, ERROR, CRITICAL 50 - `PREFECT_SERVER_PORT`: default 4200 51 + - `PREFECT_DATABASE_BACKEND`: `sqlite` (default) or `postgres` 52 + - `PREFECT_DATABASE_PATH`: SQLite database path (default: prefect.db) 53 + - `PREFECT_DATABASE_URL`: PostgreSQL connection string (e.g., `postgresql://user:pass@localhost:5432/prefect`) 54 55 ## implemented endpoints 56 ··· 83 84 ## database 85 86 + supports sqlite (zqlite) and postgresql (pg.zig). tables: flow, flow_run, flow_run_state, events, block_type, block_schema, block_document 87 + 88 + ### backend abstraction (WIP) 89 + 90 + ``` 91 + src/db/ 92 + ├── backend.zig # unified Backend, Row, Rows types 93 + ├── dialect.zig # SQL dialect helpers (placeholders, datetime) 94 + ├── schema/ 95 + │ ├── sqlite.zig # SQLite DDL 96 + │ └── postgres.zig # PostgreSQL DDL 97 + ├── sqlite.zig # current implementation (uses zqlite directly) 98 + └── [entity].zig # flows, flow_runs, etc. 99 + ``` 100 + 101 + the backend abstraction provides: 102 + - `Backend` union type supporting both sqlite and postgres 103 + - unified `Row` interface for accessing column values 104 + - automatic placeholder rewriting (`?` → `$1, $2` for postgres) 105 + - dialect-specific DDL in schema/ directory 106 + 107 + ### testing backends 108 + 109 + ```bash 110 + ./scripts/test-db-backends sqlite # test SQLite 111 + ./scripts/test-db-backends postgres # test PostgreSQL (requires running postgres) 112 + ./scripts/test-db-backends all # test both 113 + ``` 114 115 ## architecture 116 117 ``` 118 src/ 119 ├── api/ # http endpoints + websocket handlers 120 + ├── db/ # database layer (sqlite + postgres abstraction) 121 ├── orchestration/ # state transition bookkeeping 122 ├── services/ # background workers 123 ├── utilities/ # general infrastructure
+7
build.zig
··· 20 .openssl = false, 21 }); 22 23 const exe = b.addExecutable(.{ 24 .name = "prefect-server", 25 .root_module = b.createModule(.{ ··· 30 .{ .name = "uuid", .module = uuid_dep.module("uuid") }, 31 .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 32 .{ .name = "zap", .module = zap.module("zap") }, 33 }, 34 }), 35 }); ··· 58 .{ .name = "uuid", .module = uuid_dep.module("uuid") }, 59 .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 60 .{ .name = "zap", .module = zap.module("zap") }, 61 }, 62 }), 63 });
··· 20 .openssl = false, 21 }); 22 23 + const pg = b.dependency("pg", .{ 24 + .target = target, 25 + .optimize = optimize, 26 + }); 27 + 28 const exe = b.addExecutable(.{ 29 .name = "prefect-server", 30 .root_module = b.createModule(.{ ··· 35 .{ .name = "uuid", .module = uuid_dep.module("uuid") }, 36 .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 37 .{ .name = "zap", .module = zap.module("zap") }, 38 + .{ .name = "pg", .module = pg.module("pg") }, 39 }, 40 }), 41 }); ··· 64 .{ .name = "uuid", .module = uuid_dep.module("uuid") }, 65 .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 66 .{ .name = "zap", .module = zap.module("zap") }, 67 + .{ .name = "pg", .module = pg.module("pg") }, 68 }, 69 }), 70 });
+4
build.zig.zon
··· 16 .url = "git+https://github.com/zigzap/zap?ref=v0.11.0#66c5dc42c781bbb8a9100afda3c7e69ee96eddf3", 17 .hash = "zap-0.10.6-GoeB8xCEJABLgoiZjWZMMT5TsoZ5OO2EZe6j24RTUYEH", 18 }, 19 }, 20 .paths = .{ 21 "build.zig",
··· 16 .url = "git+https://github.com/zigzap/zap?ref=v0.11.0#66c5dc42c781bbb8a9100afda3c7e69ee96eddf3", 17 .hash = "zap-0.10.6-GoeB8xCEJABLgoiZjWZMMT5TsoZ5OO2EZe6j24RTUYEH", 18 }, 19 + .pg = .{ 20 + .url = "git+https://github.com/karlseguin/pg.zig?ref=master#f8d4892387fbad2abdf775783e101e50a7114335", 21 + .hash = "pg-0.0.0-Wp_7gag6BgD_QAZrPhNNEGpnUZR_LEkKT40Ura3p-4yX", 22 + }, 23 }, 24 .paths = .{ 25 "build.zig",
+102
docs/blocks-implementation.md
···
··· 1 + # blocks implementation plan 2 + 3 + ## api call sequence 4 + 5 + When user calls `block.save("name")`: 6 + 1. `GET /block_types/slug/{slug}` → 404 if not found 7 + 2. `POST /block_types/` → create type (or `PATCH /block_types/{id}` if exists) 8 + 3. `GET /block_schemas/checksum/{checksum}` → 404 if not found 9 + 4. `POST /block_schemas/` → create schema 10 + 5. `POST /block_documents/` → create document 11 + - if 409 conflict (name exists): `GET` + `PATCH` to update 12 + 13 + When user calls `Block.load("name")`: 14 + 1. `GET /block_types/slug/{slug}/block_documents/name/{name}` → return document with nested schema/type 15 + 16 + ## database tables 17 + 18 + ```sql 19 + block_type ( 20 + id TEXT PRIMARY KEY, 21 + created, updated, 22 + name TEXT NOT NULL, 23 + slug TEXT NOT NULL UNIQUE, 24 + logo_url, documentation_url, description, code_example TEXT, 25 + is_protected INTEGER DEFAULT 0 26 + ) 27 + 28 + block_schema ( 29 + id TEXT PRIMARY KEY, 30 + created, updated, 31 + checksum TEXT NOT NULL, 32 + fields TEXT DEFAULT '{}', -- JSON schema 33 + capabilities TEXT DEFAULT '[]', -- JSON array 34 + version TEXT DEFAULT '1', 35 + block_type_id TEXT FK, 36 + UNIQUE(checksum, version) 37 + ) 38 + 39 + block_document ( 40 + id TEXT PRIMARY KEY, 41 + created, updated, 42 + name TEXT, 43 + data TEXT DEFAULT '{}', -- JSON (encrypted in python, plain for us) 44 + is_anonymous INTEGER DEFAULT 0, 45 + block_type_id TEXT FK, 46 + block_type_name TEXT, -- denormalized 47 + block_schema_id TEXT FK, 48 + UNIQUE(block_type_id, name) 49 + ) 50 + ``` 51 + 52 + ## implementation phases 53 + 54 + ### phase 1: save() support (minimum viable) 55 + - [x] add tables to schema 56 + - [ ] `db/block_types.zig` - insert, getBySlug, update 57 + - [ ] `db/block_schemas.zig` - insert, getByChecksum 58 + - [ ] `db/block_documents.zig` - insert, getById, update 59 + - [ ] `api/block_types.zig`: 60 + - [ ] `GET /block_types/slug/{slug}` 61 + - [ ] `POST /block_types/` 62 + - [ ] `PATCH /block_types/{id}` 63 + - [ ] `api/block_schemas.zig`: 64 + - [ ] `GET /block_schemas/checksum/{checksum}` 65 + - [ ] `POST /block_schemas/` 66 + - [ ] `api/block_documents.zig`: 67 + - [ ] `POST /block_documents/` 68 + - [ ] `PATCH /block_documents/{id}` 69 + - [ ] `GET /block_documents/{id}` 70 + 71 + ### phase 2: load() support 72 + - [ ] `GET /block_types/slug/{slug}/block_documents/name/{name}` 73 + 74 + ### phase 3: filter endpoints 75 + - [ ] `POST /block_types/filter` 76 + - [ ] `POST /block_schemas/filter` 77 + - [ ] `POST /block_documents/filter` 78 + 79 + ### phase 4: nested blocks (if needed) 80 + - [ ] block_schema_reference table 81 + - [ ] block_document_reference table 82 + - [ ] recursive document hydration 83 + 84 + ## test script 85 + 86 + ```python 87 + from prefect.blocks.system import Secret 88 + 89 + # save 90 + secret = Secret(value="my-secret-value") 91 + secret.save("test-secret") 92 + 93 + # load 94 + loaded = Secret.load("test-secret") 95 + print(loaded.get()) 96 + ``` 97 + 98 + ## response formats 99 + 100 + See prefect source for exact JSON shapes: 101 + - `src/prefect/client/schemas/responses.py` 102 + - `src/prefect/server/schemas/core.py`
+221
scripts/test-db-backends
···
··· 1 + #!/usr/bin/env bash 2 + set -euo pipefail 3 + 4 + # Test harness for database backends 5 + # Usage: 6 + # ./scripts/test-db-backends # test SQLite only (default) 7 + # ./scripts/test-db-backends sqlite # test SQLite 8 + # ./scripts/test-db-backends postgres # test PostgreSQL 9 + # ./scripts/test-db-backends all # test both 10 + 11 + SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" 12 + PROJECT_DIR="$(dirname "$SCRIPT_DIR")" 13 + 14 + RED='\033[0;31m' 15 + GREEN='\033[0;32m' 16 + YELLOW='\033[1;33m' 17 + NC='\033[0m' 18 + 19 + log_info() { echo -e "${GREEN}[INFO]${NC} $*"; } 20 + log_warn() { echo -e "${YELLOW}[WARN]${NC} $*"; } 21 + log_error() { echo -e "${RED}[ERROR]${NC} $*"; } 22 + 23 + BACKEND="${1:-sqlite}" 24 + TEST_DB_PATH="/tmp/prefect-test-$$.db" 25 + POSTGRES_URL="${PREFECT_TEST_POSTGRES_URL:-postgresql://localhost:5432/prefect_test}" 26 + 27 + cleanup() { 28 + rm -f "$TEST_DB_PATH" 2>/dev/null || true 29 + } 30 + trap cleanup EXIT 31 + 32 + test_sqlite() { 33 + log_info "Testing SQLite backend..." 34 + 35 + export PREFECT_DATABASE_BACKEND=sqlite 36 + export PREFECT_DATABASE_PATH="$TEST_DB_PATH" 37 + 38 + # Remove any existing test database 39 + rm -f "$TEST_DB_PATH" 40 + 41 + # Build and run tests 42 + cd "$PROJECT_DIR" 43 + 44 + log_info "Building..." 45 + zig build 2>&1 || { log_error "Build failed"; return 1; } 46 + 47 + log_info "Running backend unit tests..." 48 + zig build test 2>&1 || { log_error "Unit tests failed"; return 1; } 49 + 50 + log_info "Starting server for integration tests..." 51 + ./zig-out/bin/prefect-server & 52 + SERVER_PID=$! 53 + sleep 2 54 + 55 + # Basic health check 56 + log_info "Health check..." 57 + if curl -s http://localhost:4200/api/health | grep -q "ok"; then 58 + log_info "Health check passed" 59 + else 60 + log_error "Health check failed" 61 + kill $SERVER_PID 2>/dev/null || true 62 + return 1 63 + fi 64 + 65 + # Test flow creation 66 + log_info "Testing flow creation..." 67 + FLOW_RESPONSE=$(curl -s -X POST http://localhost:4200/api/flows/ \ 68 + -H "Content-Type: application/json" \ 69 + -d '{"name": "test-flow-sqlite"}') 70 + 71 + if echo "$FLOW_RESPONSE" | grep -q '"id"'; then 72 + log_info "Flow creation passed" 73 + FLOW_ID=$(echo "$FLOW_RESPONSE" | grep -o '"id":"[^"]*"' | head -1 | cut -d'"' -f4) 74 + else 75 + log_error "Flow creation failed: $FLOW_RESPONSE" 76 + kill $SERVER_PID 2>/dev/null || true 77 + return 1 78 + fi 79 + 80 + # Test flow retrieval 81 + log_info "Testing flow retrieval..." 82 + FLOW_GET=$(curl -s "http://localhost:4200/api/flows/$FLOW_ID") 83 + if echo "$FLOW_GET" | grep -q "test-flow-sqlite"; then 84 + log_info "Flow retrieval passed" 85 + else 86 + log_error "Flow retrieval failed: $FLOW_GET" 87 + kill $SERVER_PID 2>/dev/null || true 88 + return 1 89 + fi 90 + 91 + # Test flow run creation 92 + log_info "Testing flow run creation..." 93 + FLOW_RUN_RESPONSE=$(curl -s -X POST http://localhost:4200/api/flow_runs/ \ 94 + -H "Content-Type: application/json" \ 95 + -d "{\"flow_id\": \"$FLOW_ID\", \"name\": \"test-run-1\"}") 96 + 97 + if echo "$FLOW_RUN_RESPONSE" | grep -q '"id"'; then 98 + log_info "Flow run creation passed" 99 + FLOW_RUN_ID=$(echo "$FLOW_RUN_RESPONSE" | grep -o '"id":"[^"]*"' | head -1 | cut -d'"' -f4) 100 + else 101 + log_error "Flow run creation failed: $FLOW_RUN_RESPONSE" 102 + kill $SERVER_PID 2>/dev/null || true 103 + return 1 104 + fi 105 + 106 + # Test state transition 107 + log_info "Testing state transition..." 108 + STATE_RESPONSE=$(curl -s -X POST "http://localhost:4200/api/flow_runs/$FLOW_RUN_ID/set_state" \ 109 + -H "Content-Type: application/json" \ 110 + -d '{"state": {"type": "RUNNING", "name": "Running"}}') 111 + 112 + if echo "$STATE_RESPONSE" | grep -q "RUNNING"; then 113 + log_info "State transition passed" 114 + else 115 + log_error "State transition failed: $STATE_RESPONSE" 116 + kill $SERVER_PID 2>/dev/null || true 117 + return 1 118 + fi 119 + 120 + # Test block type creation 121 + log_info "Testing block type creation..." 122 + BLOCK_TYPE_RESPONSE=$(curl -s -X POST http://localhost:4200/api/block_types/ \ 123 + -H "Content-Type: application/json" \ 124 + -d '{"name": "TestBlock", "slug": "test-block"}') 125 + 126 + if echo "$BLOCK_TYPE_RESPONSE" | grep -q '"id"'; then 127 + log_info "Block type creation passed" 128 + else 129 + log_error "Block type creation failed: $BLOCK_TYPE_RESPONSE" 130 + kill $SERVER_PID 2>/dev/null || true 131 + return 1 132 + fi 133 + 134 + # Cleanup 135 + kill $SERVER_PID 2>/dev/null || true 136 + wait $SERVER_PID 2>/dev/null || true 137 + 138 + log_info "SQLite backend tests PASSED" 139 + return 0 140 + } 141 + 142 + test_postgres() { 143 + log_info "Testing PostgreSQL backend..." 144 + 145 + # Check if postgres is available 146 + if ! command -v psql &> /dev/null; then 147 + log_warn "psql not found, skipping PostgreSQL tests" 148 + return 0 149 + fi 150 + 151 + # Check if we can connect 152 + if ! psql "$POSTGRES_URL" -c "SELECT 1" &> /dev/null; then 153 + log_warn "Cannot connect to PostgreSQL at $POSTGRES_URL, skipping tests" 154 + log_warn "Set PREFECT_TEST_POSTGRES_URL to a valid PostgreSQL connection string" 155 + return 0 156 + fi 157 + 158 + export PREFECT_DATABASE_BACKEND=postgres 159 + export PREFECT_DATABASE_URL="$POSTGRES_URL" 160 + 161 + # Clean up test database 162 + log_info "Cleaning up test database..." 163 + psql "$POSTGRES_URL" -c "DROP TABLE IF EXISTS block_document, block_schema, block_type, events, task_run, flow_run_state, flow_run, flow CASCADE" 2>/dev/null || true 164 + 165 + cd "$PROJECT_DIR" 166 + 167 + log_info "Building..." 168 + zig build 2>&1 || { log_error "Build failed"; return 1; } 169 + 170 + log_info "Starting server..." 171 + ./zig-out/bin/prefect-server & 172 + SERVER_PID=$! 173 + sleep 2 174 + 175 + # Check if server started (it should fail gracefully if postgres not implemented) 176 + if ! kill -0 $SERVER_PID 2>/dev/null; then 177 + log_warn "Server exited - PostgreSQL backend may not be implemented yet" 178 + return 0 179 + fi 180 + 181 + # Run same tests as SQLite 182 + log_info "Health check..." 183 + if curl -s http://localhost:4200/api/health | grep -q "ok"; then 184 + log_info "Health check passed" 185 + else 186 + log_error "Health check failed" 187 + kill $SERVER_PID 2>/dev/null || true 188 + return 1 189 + fi 190 + 191 + # ... (same tests as SQLite) 192 + 193 + kill $SERVER_PID 2>/dev/null || true 194 + wait $SERVER_PID 2>/dev/null || true 195 + 196 + log_info "PostgreSQL backend tests PASSED" 197 + return 0 198 + } 199 + 200 + main() { 201 + case "$BACKEND" in 202 + sqlite) 203 + test_sqlite 204 + ;; 205 + postgres|postgresql) 206 + test_postgres 207 + ;; 208 + all) 209 + log_info "Running all backend tests..." 210 + test_sqlite 211 + test_postgres 212 + ;; 213 + *) 214 + log_error "Unknown backend: $BACKEND" 215 + echo "Usage: $0 [sqlite|postgres|all]" 216 + exit 1 217 + ;; 218 + esac 219 + } 220 + 221 + main
+67 -8
src/api/block_documents.zig
··· 176 177 const obj = parsed.value.object; 178 179 - // serialize data 180 const data = blk: { 181 - if (obj.get("data")) |v| { 182 - var out: std.Io.Writer.Allocating = .init(alloc); 183 - var jw: json.Stringify = .{ .writer = &out.writer }; 184 - jw.write(v) catch break :blk "{}"; 185 - break :blk out.toOwnedSlice() catch "{}"; 186 - } else break :blk "{}"; 187 }; 188 189 const block_schema_id = if (obj.get("block_schema_id")) |v| if (v == .string) v.string else null else null; 190 191 - db.block_documents.update(id, data, block_schema_id) catch { 192 sendJsonStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 193 return; 194 };
··· 176 177 const obj = parsed.value.object; 178 179 + // merge_existing_data defaults to true (matching python behavior) 180 + const merge_existing_data = if (obj.get("merge_existing_data")) |v| 181 + (v == .bool and v.bool) 182 + else 183 + true; 184 + 185 + // get the new data from request (if provided) 186 + const new_data_value = obj.get("data"); 187 + 188 + // determine final data to store 189 const data = blk: { 190 + if (new_data_value) |new_val| { 191 + if (merge_existing_data) { 192 + // fetch current document to merge with 193 + const current = db.block_documents.getById(alloc, id) catch null orelse { 194 + sendJsonStatus(r, "{\"detail\":\"Block document not found\"}", .not_found); 195 + return; 196 + }; 197 + 198 + // parse current data 199 + const current_parsed = json.parseFromSlice(json.Value, alloc, current.data, .{}) catch { 200 + // if current data isn't valid json, just use new data 201 + var out: std.Io.Writer.Allocating = .init(alloc); 202 + var jw: json.Stringify = .{ .writer = &out.writer }; 203 + jw.write(new_val) catch break :blk "{}"; 204 + break :blk out.toOwnedSlice() catch "{}"; 205 + }; 206 + defer current_parsed.deinit(); 207 + 208 + // merge: overlay new data onto current data 209 + // modify the current object in place with new values 210 + if (current_parsed.value == .object and new_val == .object) { 211 + var current_obj = current_parsed.value.object; 212 + var it = new_val.object.iterator(); 213 + while (it.next()) |entry| { 214 + current_obj.put(entry.key_ptr.*, entry.value_ptr.*) catch continue; 215 + } 216 + // serialize merged result 217 + var out: std.Io.Writer.Allocating = .init(alloc); 218 + var jw: json.Stringify = .{ .writer = &out.writer }; 219 + jw.write(json.Value{ .object = current_obj }) catch break :blk "{}"; 220 + break :blk out.toOwnedSlice() catch "{}"; 221 + } 222 + 223 + // fallback: just use new data if types don't match 224 + var out: std.Io.Writer.Allocating = .init(alloc); 225 + var jw: json.Stringify = .{ .writer = &out.writer }; 226 + jw.write(new_val) catch break :blk "{}"; 227 + break :blk out.toOwnedSlice() catch "{}"; 228 + } else { 229 + // no merge - just use new data directly 230 + var out: std.Io.Writer.Allocating = .init(alloc); 231 + var jw: json.Stringify = .{ .writer = &out.writer }; 232 + jw.write(new_val) catch break :blk "{}"; 233 + break :blk out.toOwnedSlice() catch "{}"; 234 + } 235 + } else { 236 + // no data provided - don't update data field 237 + break :blk null; 238 + } 239 }; 240 241 const block_schema_id = if (obj.get("block_schema_id")) |v| if (v == .string) v.string else null else null; 242 243 + // only update if we have something to update 244 + if (data == null and block_schema_id == null) { 245 + r.setStatus(.no_content); 246 + r.sendBody("") catch {}; 247 + return; 248 + } 249 + 250 + db.block_documents.update(id, data orelse "{}", block_schema_id) catch { 251 sendJsonStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 252 return; 253 };
+6 -11
src/api/block_schemas.zig
··· 6 const db = @import("../db/sqlite.zig"); 7 const uuid_util = @import("../utilities/uuid.zig"); 8 const time_util = @import("../utilities/time.zig"); 9 - const crypto = std.crypto.hash.sha2; 10 11 fn sendJson(r: zap.Request, body: []const u8) void { 12 r.setHeader("content-type", "application/json") catch {}; ··· 110 } else break :blk "[]"; 111 }; 112 113 - // compute checksum from fields (sha256) 114 - var checksum_buf: [71]u8 = undefined; // "sha256:" (7) + 64 hex chars 115 - const checksum = blk: { 116 - var hasher = crypto.Sha256.init(.{}); 117 - hasher.update(fields); 118 - const hash = hasher.finalResult(); 119 - const hex = std.fmt.bytesToHex(hash, .lower); 120 - @memcpy(checksum_buf[0..7], "sha256:"); 121 - @memcpy(checksum_buf[7..71], &hex); 122 - break :blk checksum_buf[0..71]; 123 }; 124 125 // check if schema already exists (idempotent)
··· 6 const db = @import("../db/sqlite.zig"); 7 const uuid_util = @import("../utilities/uuid.zig"); 8 const time_util = @import("../utilities/time.zig"); 9 + const hashing = @import("../utilities/hashing.zig"); 10 11 fn sendJson(r: zap.Request, body: []const u8) void { 12 r.setHeader("content-type", "application/json") catch {}; ··· 110 } else break :blk "[]"; 111 }; 112 113 + // compute checksum from fields (sha256, sorted keys to match python prefect) 114 + // python uses hash_objects(fields) which serializes as [[fields], {}] with sort_keys=True 115 + const checksum = hashing.hashJson(alloc, fields) catch { 116 + sendJsonStatus(r, "{\"detail\":\"checksum computation failed\"}", .internal_server_error); 117 + return; 118 }; 119 120 // check if schema already exists (idempotent)
+14 -3
src/api/block_types.zig
··· 7 const uuid_util = @import("../utilities/uuid.zig"); 8 const time_util = @import("../utilities/time.zig"); 9 10 // JSON output types 11 const BlockTypeJson = struct { 12 id: []const u8, ··· 136 return; 137 }; 138 139 - // forbid Prefect- prefix 140 - if (mem.startsWith(u8, name, "Prefect")) { 141 - sendJsonStatus(r, "{\"detail\":\"block type names starting with 'Prefect' are reserved\"}", .forbidden); 142 return; 143 } 144
··· 7 const uuid_util = @import("../utilities/uuid.zig"); 8 const time_util = @import("../utilities/time.zig"); 9 10 + const ascii = std.ascii; 11 + 12 + /// check if string starts with prefix, ignoring case 13 + fn startsWithIgnoreCase(haystack: []const u8, prefix: []const u8) bool { 14 + if (haystack.len < prefix.len) return false; 15 + for (haystack[0..prefix.len], prefix) |h, p| { 16 + if (ascii.toLower(h) != ascii.toLower(p)) return false; 17 + } 18 + return true; 19 + } 20 + 21 // JSON output types 22 const BlockTypeJson = struct { 23 id: []const u8, ··· 147 return; 148 }; 149 150 + // forbid Prefect- prefix (case-insensitive, matching python behavior) 151 + if (startsWithIgnoreCase(name, "prefect")) { 152 + sendJsonStatus(r, "{\"detail\":\"Block type names beginning with 'Prefect' are reserved.\"}", .forbidden); 153 return; 154 } 155
+498
src/db/backend.zig
···
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + const Thread = std.Thread; 4 + 5 + const dialect_mod = @import("dialect.zig"); 6 + pub const Dialect = dialect_mod.Dialect; 7 + 8 + const log = @import("../logging.zig"); 9 + 10 + // backend libraries 11 + const zqlite = @import("zqlite"); 12 + const pg = @import("pg"); 13 + 14 + /// Unified row interface that abstracts over backend-specific row types 15 + pub const Row = union(enum) { 16 + sqlite: zqlite.Row, 17 + postgres: PostgresRow, 18 + 19 + const PostgresRow = union(enum) { 20 + from_result: pg.Row, // from iterating Result 21 + from_query: pg.QueryRow, // from pool.row() 22 + 23 + fn getText(self: PostgresRow, col: usize) []const u8 { 24 + return switch (self) { 25 + .from_result => |r| r.get([]const u8, col), 26 + .from_query => |r| r.get([]const u8, col), 27 + }; 28 + } 29 + 30 + fn getTextOrNull(self: PostgresRow, col: usize) ?[]const u8 { 31 + return switch (self) { 32 + .from_result => |r| r.get(?[]const u8, col), 33 + .from_query => |r| r.get(?[]const u8, col), 34 + }; 35 + } 36 + 37 + fn getInt(self: PostgresRow, col: usize) i64 { 38 + return switch (self) { 39 + .from_result => |r| r.get(i64, col), 40 + .from_query => |r| r.get(i64, col), 41 + }; 42 + } 43 + 44 + fn getFloat(self: PostgresRow, col: usize) f64 { 45 + return switch (self) { 46 + .from_result => |r| r.get(f64, col), 47 + .from_query => |r| r.get(f64, col), 48 + }; 49 + } 50 + }; 51 + 52 + pub fn text(self: Row, col: usize) []const u8 { 53 + return switch (self) { 54 + .sqlite => |r| r.text(col), 55 + .postgres => |r| r.getText(col), 56 + }; 57 + } 58 + 59 + pub fn textOrNull(self: Row, col: usize) ?[]const u8 { 60 + return switch (self) { 61 + .sqlite => |r| r.textOrNull(col), 62 + .postgres => |r| r.getTextOrNull(col), 63 + }; 64 + } 65 + 66 + pub fn int(self: Row, col: usize) i64 { 67 + return switch (self) { 68 + .sqlite => |r| r.int(col), 69 + .postgres => |r| r.getInt(col), 70 + }; 71 + } 72 + 73 + pub fn float(self: Row, col: usize) f64 { 74 + return switch (self) { 75 + .sqlite => |r| r.float(col), 76 + .postgres => |r| r.getFloat(col), 77 + }; 78 + } 79 + 80 + pub fn deinit(self: *Row) void { 81 + switch (self.*) { 82 + .sqlite => |*r| r.deinit(), 83 + .postgres => |*r| switch (r.*) { 84 + .from_result => {}, // Result.deinit() handles cleanup 85 + .from_query => |*qr| qr.deinit() catch {}, 86 + }, 87 + } 88 + } 89 + }; 90 + 91 + /// Unified rows iterator 92 + pub const Rows = struct { 93 + backend: RowsBackend, 94 + 95 + const RowsBackend = union(enum) { 96 + sqlite: zqlite.Rows, 97 + postgres: *pg.Result, 98 + }; 99 + 100 + pub fn next(self: *Rows) ?Row { 101 + return switch (self.backend) { 102 + .sqlite => |*r| { 103 + if (r.next()) |sqlite_row| { 104 + return Row{ .sqlite = sqlite_row }; 105 + } 106 + return null; 107 + }, 108 + .postgres => |r| { 109 + if (r.next() catch null) |pg_row| { 110 + return Row{ .postgres = .{ .from_result = pg_row } }; 111 + } 112 + return null; 113 + }, 114 + }; 115 + } 116 + 117 + pub fn deinit(self: *Rows) void { 118 + switch (self.backend) { 119 + .sqlite => |*r| r.deinit(), 120 + .postgres => |r| r.deinit(), 121 + } 122 + } 123 + }; 124 + 125 + /// Main database backend abstraction 126 + pub const Backend = struct { 127 + impl: BackendImpl, 128 + dialect: Dialect, 129 + mutex: Thread.Mutex = .{}, 130 + allocator: Allocator, 131 + 132 + const BackendImpl = union(enum) { 133 + sqlite: SqliteBackend, 134 + postgres: PostgresBackend, 135 + }; 136 + 137 + const SqliteBackend = struct { 138 + conn: zqlite.Conn, 139 + }; 140 + 141 + const PostgresBackend = struct { 142 + pool: *pg.Pool, 143 + }; 144 + 145 + /// Initialize SQLite backend 146 + pub fn initSqlite(allocator: Allocator, path: [*:0]const u8) !Backend { 147 + const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite; 148 + const conn = zqlite.open(path, flags) catch |err| { 149 + log.err("database", "failed to open sqlite: {}", .{err}); 150 + return err; 151 + }; 152 + 153 + // SQLite-specific pragmas 154 + _ = conn.exec("PRAGMA journal_mode=WAL", .{}) catch {}; 155 + _ = conn.exec("PRAGMA busy_timeout=5000", .{}) catch {}; 156 + _ = conn.exec("PRAGMA foreign_keys=ON", .{}) catch {}; 157 + 158 + return Backend{ 159 + .impl = .{ .sqlite = .{ .conn = conn } }, 160 + .dialect = .sqlite, 161 + .allocator = allocator, 162 + }; 163 + } 164 + 165 + /// Initialize PostgreSQL backend 166 + pub fn initPostgres(allocator: Allocator, connection_string: []const u8) !Backend { 167 + // Parse connection string to extract components 168 + // Format: postgresql://user:pass@host:port/database 169 + const uri = std.Uri.parse(connection_string) catch { 170 + log.err("database", "invalid postgres connection string", .{}); 171 + return error.InvalidConnectionString; 172 + }; 173 + 174 + const pool = pg.Pool.init(allocator, .{ 175 + .size = 10, 176 + .connect = .{ 177 + .host = if (uri.host) |h| blk: { 178 + // convert to sentinel-terminated string 179 + const host_slice = switch (h) { 180 + .raw => |r| r, 181 + .percent_encoded => |p| p, 182 + }; 183 + var buf: [256]u8 = undefined; 184 + @memcpy(buf[0..host_slice.len], host_slice); 185 + buf[host_slice.len] = 0; 186 + break :blk buf[0..host_slice.len :0]; 187 + } else "127.0.0.1", 188 + .port = uri.port orelse 5432, 189 + }, 190 + .auth = .{ 191 + .username = if (uri.user) |u| blk: { 192 + const user_slice = switch (u) { 193 + .raw => |r| r, 194 + .percent_encoded => |p| p, 195 + }; 196 + var buf: [64]u8 = undefined; 197 + @memcpy(buf[0..user_slice.len], user_slice); 198 + buf[user_slice.len] = 0; 199 + break :blk buf[0..user_slice.len :0]; 200 + } else "postgres", 201 + .database = blk: { 202 + // path is like "/dbname", skip the leading / 203 + const path_slice = switch (uri.path) { 204 + .raw => |r| r, 205 + .percent_encoded => |p| p, 206 + }; 207 + if (path_slice.len > 1) { 208 + const db_name = path_slice[1..]; 209 + var buf: [64]u8 = undefined; 210 + @memcpy(buf[0..db_name.len], db_name); 211 + buf[db_name.len] = 0; 212 + break :blk buf[0..db_name.len :0]; 213 + } 214 + break :blk "prefect"; 215 + }, 216 + .password = if (uri.password) |p| blk: { 217 + const pass_slice = switch (p) { 218 + .raw => |r| r, 219 + .percent_encoded => |pe| pe, 220 + }; 221 + var buf: [128]u8 = undefined; 222 + @memcpy(buf[0..pass_slice.len], pass_slice); 223 + buf[pass_slice.len] = 0; 224 + break :blk buf[0..pass_slice.len :0]; 225 + } else null, 226 + }, 227 + }) catch |err| { 228 + log.err("database", "failed to init postgres pool: {}", .{err}); 229 + return err; 230 + }; 231 + 232 + return Backend{ 233 + .impl = .{ .postgres = .{ .pool = pool } }, 234 + .dialect = .postgres, 235 + .allocator = allocator, 236 + }; 237 + } 238 + 239 + pub fn deinit(self: *Backend) void { 240 + switch (self.impl) { 241 + .sqlite => |*s| s.conn.close(), 242 + .postgres => |*p| p.pool.deinit(), 243 + } 244 + } 245 + 246 + /// Execute a statement that doesn't return rows (caller must hold mutex for transactions) 247 + pub fn execUnsafe(self: *Backend, sql: []const u8, args: anytype) !void { 248 + switch (self.impl) { 249 + .sqlite => |*s| { 250 + s.conn.exec(sql, args) catch |err| { 251 + log.err("database", "exec error: {}", .{err}); 252 + return err; 253 + }; 254 + }, 255 + .postgres => |*p| { 256 + // Rewrite placeholders for postgres 257 + const rewritten = try self.dialect.rewritePlaceholders(self.allocator, sql); 258 + defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten); 259 + 260 + _ = p.pool.exec(rewritten, args) catch |err| { 261 + log.err("database", "postgres exec error: {}", .{err}); 262 + return err; 263 + }; 264 + }, 265 + } 266 + } 267 + 268 + /// Execute a statement that doesn't return rows (thread-safe) 269 + pub fn exec(self: *Backend, sql: []const u8, args: anytype) !void { 270 + self.mutex.lock(); 271 + defer self.mutex.unlock(); 272 + return self.execUnsafe(sql, args); 273 + } 274 + 275 + /// Execute a statement without arguments 276 + pub fn execNoArgs(self: *Backend, sql: []const u8) !void { 277 + self.mutex.lock(); 278 + defer self.mutex.unlock(); 279 + 280 + switch (self.impl) { 281 + .sqlite => |*s| { 282 + s.conn.execNoArgs(sql) catch |err| { 283 + log.err("database", "execNoArgs error: {}", .{err}); 284 + return err; 285 + }; 286 + }, 287 + .postgres => |*p| { 288 + _ = p.pool.exec(sql, .{}) catch |err| { 289 + log.err("database", "postgres exec error: {}", .{err}); 290 + return err; 291 + }; 292 + }, 293 + } 294 + } 295 + 296 + /// Query for a single row 297 + pub fn row(self: *Backend, sql: []const u8, args: anytype) !?Row { 298 + self.mutex.lock(); 299 + defer self.mutex.unlock(); 300 + 301 + switch (self.impl) { 302 + .sqlite => |*s| { 303 + const r = s.conn.row(sql, args) catch return null; 304 + if (r) |sqlite_row| { 305 + return Row{ .sqlite = sqlite_row }; 306 + } 307 + return null; 308 + }, 309 + .postgres => |*p| { 310 + const rewritten = try self.dialect.rewritePlaceholders(self.allocator, sql); 311 + defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten); 312 + 313 + const r = p.pool.row(rewritten, args) catch return null; 314 + if (r) |pg_row| { 315 + return Row{ .postgres = .{ .from_query = pg_row } }; 316 + } 317 + return null; 318 + }, 319 + } 320 + } 321 + 322 + /// Query for multiple rows 323 + pub fn query(self: *Backend, sql: []const u8, args: anytype) !Rows { 324 + self.mutex.lock(); 325 + defer self.mutex.unlock(); 326 + 327 + switch (self.impl) { 328 + .sqlite => |*s| { 329 + const rows = s.conn.rows(sql, args) catch |err| { 330 + log.err("database", "query error: {}", .{err}); 331 + return err; 332 + }; 333 + return Rows{ .backend = .{ .sqlite = rows } }; 334 + }, 335 + .postgres => |*p| { 336 + const rewritten = try self.dialect.rewritePlaceholders(self.allocator, sql); 337 + defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten); 338 + 339 + const result = p.pool.query(rewritten, args) catch |err| { 340 + log.err("database", "postgres query error: {}", .{err}); 341 + return err; 342 + }; 343 + return Rows{ .backend = .{ .postgres = result } }; 344 + }, 345 + } 346 + } 347 + 348 + /// Begin a transaction 349 + pub fn begin(self: *Backend) !void { 350 + switch (self.impl) { 351 + .sqlite => |*s| { 352 + _ = s.conn.transaction() catch |err| { 353 + log.err("database", "begin transaction error: {}", .{err}); 354 + return err; 355 + }; 356 + }, 357 + .postgres => |*p| { 358 + _ = p.pool.exec("BEGIN", .{}) catch |err| { 359 + log.err("database", "postgres begin error: {}", .{err}); 360 + return err; 361 + }; 362 + }, 363 + } 364 + } 365 + 366 + /// Commit a transaction 367 + pub fn commit(self: *Backend) !void { 368 + switch (self.impl) { 369 + .sqlite => |*s| { 370 + s.conn.commit() catch |err| { 371 + log.err("database", "commit error: {}", .{err}); 372 + return err; 373 + }; 374 + }, 375 + .postgres => |*p| { 376 + _ = p.pool.exec("COMMIT", .{}) catch |err| { 377 + log.err("database", "postgres commit error: {}", .{err}); 378 + return err; 379 + }; 380 + }, 381 + } 382 + } 383 + 384 + /// Rollback a transaction 385 + pub fn rollback(self: *Backend) void { 386 + switch (self.impl) { 387 + .sqlite => |*s| { 388 + s.conn.rollback(); 389 + }, 390 + .postgres => |*p| { 391 + _ = p.pool.exec("ROLLBACK", .{}) catch {}; 392 + }, 393 + } 394 + } 395 + 396 + /// Get affected row count from last operation 397 + pub fn changes(self: *Backend) i64 { 398 + switch (self.impl) { 399 + .sqlite => |*s| return @intCast(s.conn.changes()), 400 + .postgres => return 0, // pg.zig returns this from exec directly 401 + } 402 + } 403 + }; 404 + 405 + // Global backend instance 406 + pub var db: Backend = undefined; 407 + var initialized: bool = false; 408 + 409 + /// Initialize the database with environment-based configuration 410 + pub fn init() !void { 411 + if (initialized) return; 412 + 413 + const allocator = std.heap.page_allocator; 414 + const backend_env = std.posix.getenv("PREFECT_DATABASE_BACKEND") orelse "sqlite"; 415 + 416 + if (std.mem.eql(u8, backend_env, "postgres") or std.mem.eql(u8, backend_env, "postgresql")) { 417 + const url = std.posix.getenv("PREFECT_DATABASE_URL") orelse "postgresql://localhost:5432/prefect"; 418 + log.info("database", "initializing postgres: {s}", .{url}); 419 + db = try Backend.initPostgres(allocator, url); 420 + } else { 421 + // SQLite (default) 422 + const path_env = std.posix.getenv("PREFECT_DATABASE_PATH") orelse "prefect.db"; 423 + var path_buf: [256]u8 = undefined; 424 + @memcpy(path_buf[0..path_env.len], path_env); 425 + path_buf[path_env.len] = 0; 426 + const path: [*:0]const u8 = path_buf[0..path_env.len :0]; 427 + 428 + log.info("database", "initializing sqlite: {s}", .{path_env}); 429 + db = try Backend.initSqlite(allocator, path); 430 + } 431 + 432 + initialized = true; 433 + } 434 + 435 + /// Close the database 436 + pub fn close() void { 437 + if (initialized) { 438 + db.deinit(); 439 + initialized = false; 440 + } 441 + } 442 + 443 + /// Get the current dialect 444 + pub fn getDialect() Dialect { 445 + return db.dialect; 446 + } 447 + 448 + test "sqlite backend basic operations" { 449 + const allocator = std.testing.allocator; 450 + 451 + // init sqlite with in-memory db 452 + var backend = try Backend.initSqlite(allocator, ":memory:"); 453 + defer backend.deinit(); 454 + 455 + // create a test table (use exec with empty args since execNoArgs needs null-terminated) 456 + try backend.exec("CREATE TABLE test (id TEXT PRIMARY KEY, name TEXT)", .{}); 457 + 458 + // insert a row 459 + try backend.exec("INSERT INTO test (id, name) VALUES (?, ?)", .{ "1", "alice" }); 460 + 461 + // query single row 462 + var r = try backend.row("SELECT id, name FROM test WHERE id = ?", .{"1"}); 463 + if (r) |*row| { 464 + defer row.deinit(); 465 + try std.testing.expectEqualStrings("1", row.text(0)); 466 + try std.testing.expectEqualStrings("alice", row.text(1)); 467 + } else { 468 + return error.RowNotFound; 469 + } 470 + 471 + // insert more rows 472 + try backend.exec("INSERT INTO test (id, name) VALUES (?, ?)", .{ "2", "bob" }); 473 + try backend.exec("INSERT INTO test (id, name) VALUES (?, ?)", .{ "3", "charlie" }); 474 + 475 + // query multiple rows 476 + var rows = try backend.query("SELECT id, name FROM test ORDER BY id", .{}); 477 + defer rows.deinit(); 478 + 479 + var count: usize = 0; 480 + while (rows.next()) |_| { 481 + count += 1; 482 + } 483 + try std.testing.expectEqual(@as(usize, 3), count); 484 + } 485 + 486 + test "dialect placeholder rewriting" { 487 + const allocator = std.testing.allocator; 488 + 489 + // sqlite - no change 490 + const sqlite_sql = "SELECT * FROM t WHERE a = ? AND b = ?"; 491 + const sqlite_result = try Dialect.sqlite.rewritePlaceholders(allocator, sqlite_sql); 492 + try std.testing.expectEqualStrings(sqlite_sql, sqlite_result); 493 + 494 + // postgres - rewrite to $1, $2 495 + const pg_result = try Dialect.postgres.rewritePlaceholders(allocator, sqlite_sql); 496 + defer allocator.free(pg_result); 497 + try std.testing.expectEqualStrings("SELECT * FROM t WHERE a = $1 AND b = $2", pg_result); 498 + }
+33 -53
src/db/block_documents.zig
··· 1 const std = @import("std"); 2 const Allocator = std.mem.Allocator; 3 4 - const sqlite = @import("sqlite.zig"); 5 const log = @import("../logging.zig"); 6 7 pub const BlockDocumentRow = struct { ··· 25 block_type_name: ?[]const u8, 26 block_schema_id: []const u8, 27 ) !void { 28 - sqlite.mutex.lock(); 29 - defer sqlite.mutex.unlock(); 30 - 31 - sqlite.conn.exec( 32 \\INSERT INTO block_document (id, name, data, is_anonymous, block_type_id, block_type_name, block_schema_id) 33 \\VALUES (?, ?, ?, ?, ?, ?, ?) 34 , .{ ··· 46 } 47 48 pub fn getById(alloc: Allocator, id: []const u8) !?BlockDocumentRow { 49 - sqlite.mutex.lock(); 50 - defer sqlite.mutex.unlock(); 51 - 52 - const row = sqlite.conn.row( 53 \\SELECT id, created, updated, name, data, is_anonymous, 54 \\ block_type_id, block_type_name, block_schema_id 55 \\FROM block_document WHERE id = ? 56 , .{id}) catch return null; 57 58 - if (row) |r| { 59 - defer r.deinit(); 60 return BlockDocumentRow{ 61 - .id = try alloc.dupe(u8, r.text(0)), 62 - .created = try alloc.dupe(u8, r.text(1)), 63 - .updated = try alloc.dupe(u8, r.text(2)), 64 - .name = if (r.text(3).len > 0) try alloc.dupe(u8, r.text(3)) else null, 65 - .data = try alloc.dupe(u8, r.text(4)), 66 - .is_anonymous = r.int(5) != 0, 67 - .block_type_id = try alloc.dupe(u8, r.text(6)), 68 - .block_type_name = if (r.text(7).len > 0) try alloc.dupe(u8, r.text(7)) else null, 69 - .block_schema_id = try alloc.dupe(u8, r.text(8)), 70 }; 71 } 72 return null; ··· 77 block_type_slug: []const u8, 78 name: []const u8, 79 ) !?BlockDocumentRow { 80 - sqlite.mutex.lock(); 81 - defer sqlite.mutex.unlock(); 82 - 83 - const row = sqlite.conn.row( 84 \\SELECT bd.id, bd.created, bd.updated, bd.name, bd.data, bd.is_anonymous, 85 \\ bd.block_type_id, bd.block_type_name, bd.block_schema_id 86 \\FROM block_document bd ··· 88 \\WHERE bt.slug = ? AND bd.name = ? 89 , .{ block_type_slug, name }) catch return null; 90 91 - if (row) |r| { 92 - defer r.deinit(); 93 return BlockDocumentRow{ 94 - .id = try alloc.dupe(u8, r.text(0)), 95 - .created = try alloc.dupe(u8, r.text(1)), 96 - .updated = try alloc.dupe(u8, r.text(2)), 97 - .name = if (r.text(3).len > 0) try alloc.dupe(u8, r.text(3)) else null, 98 - .data = try alloc.dupe(u8, r.text(4)), 99 - .is_anonymous = r.int(5) != 0, 100 - .block_type_id = try alloc.dupe(u8, r.text(6)), 101 - .block_type_name = if (r.text(7).len > 0) try alloc.dupe(u8, r.text(7)) else null, 102 - .block_schema_id = try alloc.dupe(u8, r.text(8)), 103 }; 104 } 105 return null; ··· 110 data: []const u8, 111 block_schema_id: ?[]const u8, 112 ) !void { 113 - sqlite.mutex.lock(); 114 - defer sqlite.mutex.unlock(); 115 - 116 if (block_schema_id) |schema_id| { 117 - sqlite.conn.exec( 118 \\UPDATE block_document SET data = ?, block_schema_id = ?, updated = datetime('now') 119 \\WHERE id = ? 120 , .{ data, schema_id, id }) catch |err| { ··· 122 return err; 123 }; 124 } else { 125 - sqlite.conn.exec( 126 \\UPDATE block_document SET data = ?, updated = datetime('now') 127 \\WHERE id = ? 128 , .{ data, id }) catch |err| { ··· 133 } 134 135 pub fn list(alloc: Allocator, limit: usize) ![]BlockDocumentRow { 136 - sqlite.mutex.lock(); 137 - defer sqlite.mutex.unlock(); 138 - 139 var results = std.ArrayListUnmanaged(BlockDocumentRow){}; 140 errdefer results.deinit(alloc); 141 142 - var rows = sqlite.conn.rows( 143 \\SELECT id, created, updated, name, data, is_anonymous, 144 \\ block_type_id, block_type_name, block_schema_id 145 \\FROM block_document ORDER BY created DESC LIMIT ? ··· 167 } 168 169 pub fn delete(id: []const u8) !bool { 170 - sqlite.mutex.lock(); 171 - defer sqlite.mutex.unlock(); 172 - 173 - sqlite.conn.exec( 174 \\DELETE FROM block_document WHERE id = ? 175 , .{id}) catch |err| { 176 log.err("database", "delete block_document error: {}", .{err}); 177 return err; 178 }; 179 180 - return sqlite.conn.changes() > 0; 181 } 182 183 pub fn listByTypeSlug(alloc: Allocator, block_type_slug: []const u8, limit: usize) ![]BlockDocumentRow { 184 - sqlite.mutex.lock(); 185 - defer sqlite.mutex.unlock(); 186 - 187 var results = std.ArrayListUnmanaged(BlockDocumentRow){}; 188 errdefer results.deinit(alloc); 189 190 - var rows = sqlite.conn.rows( 191 \\SELECT bd.id, bd.created, bd.updated, bd.name, bd.data, bd.is_anonymous, 192 \\ bd.block_type_id, bd.block_type_name, bd.block_schema_id 193 \\FROM block_document bd
··· 1 const std = @import("std"); 2 const Allocator = std.mem.Allocator; 3 4 + const backend = @import("backend.zig"); 5 const log = @import("../logging.zig"); 6 7 pub const BlockDocumentRow = struct { ··· 25 block_type_name: ?[]const u8, 26 block_schema_id: []const u8, 27 ) !void { 28 + backend.db.exec( 29 \\INSERT INTO block_document (id, name, data, is_anonymous, block_type_id, block_type_name, block_schema_id) 30 \\VALUES (?, ?, ?, ?, ?, ?, ?) 31 , .{ ··· 43 } 44 45 pub fn getById(alloc: Allocator, id: []const u8) !?BlockDocumentRow { 46 + var r = backend.db.row( 47 \\SELECT id, created, updated, name, data, is_anonymous, 48 \\ block_type_id, block_type_name, block_schema_id 49 \\FROM block_document WHERE id = ? 50 , .{id}) catch return null; 51 52 + if (r) |*row| { 53 + defer row.deinit(); 54 return BlockDocumentRow{ 55 + .id = try alloc.dupe(u8, row.text(0)), 56 + .created = try alloc.dupe(u8, row.text(1)), 57 + .updated = try alloc.dupe(u8, row.text(2)), 58 + .name = if (row.text(3).len > 0) try alloc.dupe(u8, row.text(3)) else null, 59 + .data = try alloc.dupe(u8, row.text(4)), 60 + .is_anonymous = row.int(5) != 0, 61 + .block_type_id = try alloc.dupe(u8, row.text(6)), 62 + .block_type_name = if (row.text(7).len > 0) try alloc.dupe(u8, row.text(7)) else null, 63 + .block_schema_id = try alloc.dupe(u8, row.text(8)), 64 }; 65 } 66 return null; ··· 71 block_type_slug: []const u8, 72 name: []const u8, 73 ) !?BlockDocumentRow { 74 + var r = backend.db.row( 75 \\SELECT bd.id, bd.created, bd.updated, bd.name, bd.data, bd.is_anonymous, 76 \\ bd.block_type_id, bd.block_type_name, bd.block_schema_id 77 \\FROM block_document bd ··· 79 \\WHERE bt.slug = ? AND bd.name = ? 80 , .{ block_type_slug, name }) catch return null; 81 82 + if (r) |*row| { 83 + defer row.deinit(); 84 return BlockDocumentRow{ 85 + .id = try alloc.dupe(u8, row.text(0)), 86 + .created = try alloc.dupe(u8, row.text(1)), 87 + .updated = try alloc.dupe(u8, row.text(2)), 88 + .name = if (row.text(3).len > 0) try alloc.dupe(u8, row.text(3)) else null, 89 + .data = try alloc.dupe(u8, row.text(4)), 90 + .is_anonymous = row.int(5) != 0, 91 + .block_type_id = try alloc.dupe(u8, row.text(6)), 92 + .block_type_name = if (row.text(7).len > 0) try alloc.dupe(u8, row.text(7)) else null, 93 + .block_schema_id = try alloc.dupe(u8, row.text(8)), 94 }; 95 } 96 return null; ··· 101 data: []const u8, 102 block_schema_id: ?[]const u8, 103 ) !void { 104 + // TODO: Use dialect helper for datetime('now') vs NOW() 105 if (block_schema_id) |schema_id| { 106 + backend.db.exec( 107 \\UPDATE block_document SET data = ?, block_schema_id = ?, updated = datetime('now') 108 \\WHERE id = ? 109 , .{ data, schema_id, id }) catch |err| { ··· 111 return err; 112 }; 113 } else { 114 + backend.db.exec( 115 \\UPDATE block_document SET data = ?, updated = datetime('now') 116 \\WHERE id = ? 117 , .{ data, id }) catch |err| { ··· 122 } 123 124 pub fn list(alloc: Allocator, limit: usize) ![]BlockDocumentRow { 125 var results = std.ArrayListUnmanaged(BlockDocumentRow){}; 126 errdefer results.deinit(alloc); 127 128 + var rows = backend.db.query( 129 \\SELECT id, created, updated, name, data, is_anonymous, 130 \\ block_type_id, block_type_name, block_schema_id 131 \\FROM block_document ORDER BY created DESC LIMIT ? ··· 153 } 154 155 pub fn delete(id: []const u8) !bool { 156 + backend.db.exec( 157 \\DELETE FROM block_document WHERE id = ? 158 , .{id}) catch |err| { 159 log.err("database", "delete block_document error: {}", .{err}); 160 return err; 161 }; 162 163 + return backend.db.changes() > 0; 164 } 165 166 pub fn listByTypeSlug(alloc: Allocator, block_type_slug: []const u8, limit: usize) ![]BlockDocumentRow { 167 var results = std.ArrayListUnmanaged(BlockDocumentRow){}; 168 errdefer results.deinit(alloc); 169 170 + var rows = backend.db.query( 171 \\SELECT bd.id, bd.created, bd.updated, bd.name, bd.data, bd.is_anonymous, 172 \\ bd.block_type_id, bd.block_type_name, bd.block_schema_id 173 \\FROM block_document bd
+27 -39
src/db/block_schemas.zig
··· 1 const std = @import("std"); 2 const Allocator = std.mem.Allocator; 3 4 - const sqlite = @import("sqlite.zig"); 5 const log = @import("../logging.zig"); 6 7 pub const BlockSchemaRow = struct { ··· 23 version: []const u8, 24 block_type_id: []const u8, 25 ) !void { 26 - sqlite.mutex.lock(); 27 - defer sqlite.mutex.unlock(); 28 - 29 - sqlite.conn.exec( 30 \\INSERT INTO block_schema (id, checksum, fields, capabilities, version, block_type_id) 31 \\VALUES (?, ?, ?, ?, ?, ?) 32 , .{ id, checksum, fields, capabilities, version, block_type_id }) catch |err| { ··· 40 checksum: []const u8, 41 version: ?[]const u8, 42 ) !?BlockSchemaRow { 43 - sqlite.mutex.lock(); 44 - defer sqlite.mutex.unlock(); 45 - 46 - const row = if (version) |v| 47 - sqlite.conn.row( 48 \\SELECT id, created, updated, checksum, fields, capabilities, version, block_type_id 49 \\FROM block_schema WHERE checksum = ? AND version = ? 50 , .{ checksum, v }) catch return null 51 else 52 - sqlite.conn.row( 53 \\SELECT id, created, updated, checksum, fields, capabilities, version, block_type_id 54 \\FROM block_schema WHERE checksum = ? 55 , .{checksum}) catch return null; 56 57 - if (row) |r| { 58 - defer r.deinit(); 59 return BlockSchemaRow{ 60 - .id = try alloc.dupe(u8, r.text(0)), 61 - .created = try alloc.dupe(u8, r.text(1)), 62 - .updated = try alloc.dupe(u8, r.text(2)), 63 - .checksum = try alloc.dupe(u8, r.text(3)), 64 - .fields = try alloc.dupe(u8, r.text(4)), 65 - .capabilities = try alloc.dupe(u8, r.text(5)), 66 - .version = try alloc.dupe(u8, r.text(6)), 67 - .block_type_id = try alloc.dupe(u8, r.text(7)), 68 }; 69 } 70 return null; 71 } 72 73 pub fn getById(alloc: Allocator, id: []const u8) !?BlockSchemaRow { 74 - sqlite.mutex.lock(); 75 - defer sqlite.mutex.unlock(); 76 - 77 - const row = sqlite.conn.row( 78 \\SELECT id, created, updated, checksum, fields, capabilities, version, block_type_id 79 \\FROM block_schema WHERE id = ? 80 , .{id}) catch return null; 81 82 - if (row) |r| { 83 - defer r.deinit(); 84 return BlockSchemaRow{ 85 - .id = try alloc.dupe(u8, r.text(0)), 86 - .created = try alloc.dupe(u8, r.text(1)), 87 - .updated = try alloc.dupe(u8, r.text(2)), 88 - .checksum = try alloc.dupe(u8, r.text(3)), 89 - .fields = try alloc.dupe(u8, r.text(4)), 90 - .capabilities = try alloc.dupe(u8, r.text(5)), 91 - .version = try alloc.dupe(u8, r.text(6)), 92 - .block_type_id = try alloc.dupe(u8, r.text(7)), 93 }; 94 } 95 return null; 96 } 97 98 pub fn list(alloc: Allocator, limit: usize) ![]BlockSchemaRow { 99 - sqlite.mutex.lock(); 100 - defer sqlite.mutex.unlock(); 101 - 102 var results = std.ArrayListUnmanaged(BlockSchemaRow){}; 103 errdefer results.deinit(alloc); 104 105 - var rows = sqlite.conn.rows( 106 \\SELECT id, created, updated, checksum, fields, capabilities, version, block_type_id 107 \\FROM block_schema ORDER BY created DESC LIMIT ? 108 , .{@as(i64, @intCast(limit))}) catch |err| {
··· 1 const std = @import("std"); 2 const Allocator = std.mem.Allocator; 3 4 + const backend = @import("backend.zig"); 5 const log = @import("../logging.zig"); 6 7 pub const BlockSchemaRow = struct { ··· 23 version: []const u8, 24 block_type_id: []const u8, 25 ) !void { 26 + backend.db.exec( 27 \\INSERT INTO block_schema (id, checksum, fields, capabilities, version, block_type_id) 28 \\VALUES (?, ?, ?, ?, ?, ?) 29 , .{ id, checksum, fields, capabilities, version, block_type_id }) catch |err| { ··· 37 checksum: []const u8, 38 version: ?[]const u8, 39 ) !?BlockSchemaRow { 40 + var r = if (version) |v| 41 + backend.db.row( 42 \\SELECT id, created, updated, checksum, fields, capabilities, version, block_type_id 43 \\FROM block_schema WHERE checksum = ? AND version = ? 44 , .{ checksum, v }) catch return null 45 else 46 + backend.db.row( 47 \\SELECT id, created, updated, checksum, fields, capabilities, version, block_type_id 48 \\FROM block_schema WHERE checksum = ? 49 , .{checksum}) catch return null; 50 51 + if (r) |*row| { 52 + defer row.deinit(); 53 return BlockSchemaRow{ 54 + .id = try alloc.dupe(u8, row.text(0)), 55 + .created = try alloc.dupe(u8, row.text(1)), 56 + .updated = try alloc.dupe(u8, row.text(2)), 57 + .checksum = try alloc.dupe(u8, row.text(3)), 58 + .fields = try alloc.dupe(u8, row.text(4)), 59 + .capabilities = try alloc.dupe(u8, row.text(5)), 60 + .version = try alloc.dupe(u8, row.text(6)), 61 + .block_type_id = try alloc.dupe(u8, row.text(7)), 62 }; 63 } 64 return null; 65 } 66 67 pub fn getById(alloc: Allocator, id: []const u8) !?BlockSchemaRow { 68 + var r = backend.db.row( 69 \\SELECT id, created, updated, checksum, fields, capabilities, version, block_type_id 70 \\FROM block_schema WHERE id = ? 71 , .{id}) catch return null; 72 73 + if (r) |*row| { 74 + defer row.deinit(); 75 return BlockSchemaRow{ 76 + .id = try alloc.dupe(u8, row.text(0)), 77 + .created = try alloc.dupe(u8, row.text(1)), 78 + .updated = try alloc.dupe(u8, row.text(2)), 79 + .checksum = try alloc.dupe(u8, row.text(3)), 80 + .fields = try alloc.dupe(u8, row.text(4)), 81 + .capabilities = try alloc.dupe(u8, row.text(5)), 82 + .version = try alloc.dupe(u8, row.text(6)), 83 + .block_type_id = try alloc.dupe(u8, row.text(7)), 84 }; 85 } 86 return null; 87 } 88 89 pub fn list(alloc: Allocator, limit: usize) ![]BlockSchemaRow { 90 var results = std.ArrayListUnmanaged(BlockSchemaRow){}; 91 errdefer results.deinit(alloc); 92 93 + var rows = backend.db.query( 94 \\SELECT id, created, updated, checksum, fields, capabilities, version, block_type_id 95 \\FROM block_schema ORDER BY created DESC LIMIT ? 96 , .{@as(i64, @intCast(limit))}) catch |err| {
+31 -45
src/db/block_types.zig
··· 1 const std = @import("std"); 2 const Allocator = std.mem.Allocator; 3 4 - const sqlite = @import("sqlite.zig"); 5 const log = @import("../logging.zig"); 6 7 pub const BlockTypeRow = struct { ··· 27 code_example: ?[]const u8, 28 is_protected: bool, 29 ) !void { 30 - sqlite.mutex.lock(); 31 - defer sqlite.mutex.unlock(); 32 - 33 - sqlite.conn.exec( 34 \\INSERT INTO block_type (id, name, slug, logo_url, documentation_url, description, code_example, is_protected) 35 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 36 , .{ ··· 49 } 50 51 pub fn getBySlug(alloc: Allocator, slug: []const u8) !?BlockTypeRow { 52 - sqlite.mutex.lock(); 53 - defer sqlite.mutex.unlock(); 54 - 55 - const row = sqlite.conn.row( 56 \\SELECT id, created, updated, name, slug, logo_url, documentation_url, 57 \\ description, code_example, is_protected 58 \\FROM block_type WHERE slug = ? 59 , .{slug}) catch return null; 60 61 - if (row) |r| { 62 - defer r.deinit(); 63 return BlockTypeRow{ 64 - .id = try alloc.dupe(u8, r.text(0)), 65 - .created = try alloc.dupe(u8, r.text(1)), 66 - .updated = try alloc.dupe(u8, r.text(2)), 67 - .name = try alloc.dupe(u8, r.text(3)), 68 - .slug = try alloc.dupe(u8, r.text(4)), 69 - .logo_url = if (r.text(5).len > 0) try alloc.dupe(u8, r.text(5)) else null, 70 - .documentation_url = if (r.text(6).len > 0) try alloc.dupe(u8, r.text(6)) else null, 71 - .description = if (r.text(7).len > 0) try alloc.dupe(u8, r.text(7)) else null, 72 - .code_example = if (r.text(8).len > 0) try alloc.dupe(u8, r.text(8)) else null, 73 - .is_protected = r.int(9) != 0, 74 }; 75 } 76 return null; 77 } 78 79 pub fn getById(alloc: Allocator, id: []const u8) !?BlockTypeRow { 80 - sqlite.mutex.lock(); 81 - defer sqlite.mutex.unlock(); 82 - 83 - const row = sqlite.conn.row( 84 \\SELECT id, created, updated, name, slug, logo_url, documentation_url, 85 \\ description, code_example, is_protected 86 \\FROM block_type WHERE id = ? 87 , .{id}) catch return null; 88 89 - if (row) |r| { 90 - defer r.deinit(); 91 return BlockTypeRow{ 92 - .id = try alloc.dupe(u8, r.text(0)), 93 - .created = try alloc.dupe(u8, r.text(1)), 94 - .updated = try alloc.dupe(u8, r.text(2)), 95 - .name = try alloc.dupe(u8, r.text(3)), 96 - .slug = try alloc.dupe(u8, r.text(4)), 97 - .logo_url = if (r.text(5).len > 0) try alloc.dupe(u8, r.text(5)) else null, 98 - .documentation_url = if (r.text(6).len > 0) try alloc.dupe(u8, r.text(6)) else null, 99 - .description = if (r.text(7).len > 0) try alloc.dupe(u8, r.text(7)) else null, 100 - .code_example = if (r.text(8).len > 0) try alloc.dupe(u8, r.text(8)) else null, 101 - .is_protected = r.int(9) != 0, 102 }; 103 } 104 return null; ··· 111 description: ?[]const u8, 112 code_example: ?[]const u8, 113 ) !void { 114 - sqlite.mutex.lock(); 115 - defer sqlite.mutex.unlock(); 116 - 117 - sqlite.conn.exec( 118 \\UPDATE block_type SET 119 \\ logo_url = ?, documentation_url = ?, description = ?, code_example = ?, 120 \\ updated = datetime('now') ··· 126 } 127 128 pub fn list(alloc: Allocator, limit: usize) ![]BlockTypeRow { 129 - sqlite.mutex.lock(); 130 - defer sqlite.mutex.unlock(); 131 - 132 var results = std.ArrayListUnmanaged(BlockTypeRow){}; 133 errdefer results.deinit(alloc); 134 135 - var rows = sqlite.conn.rows( 136 \\SELECT id, created, updated, name, slug, logo_url, documentation_url, 137 \\ description, code_example, is_protected 138 \\FROM block_type ORDER BY created DESC LIMIT ?
··· 1 const std = @import("std"); 2 const Allocator = std.mem.Allocator; 3 4 + const backend = @import("backend.zig"); 5 const log = @import("../logging.zig"); 6 7 pub const BlockTypeRow = struct { ··· 27 code_example: ?[]const u8, 28 is_protected: bool, 29 ) !void { 30 + backend.db.exec( 31 \\INSERT INTO block_type (id, name, slug, logo_url, documentation_url, description, code_example, is_protected) 32 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 33 , .{ ··· 46 } 47 48 pub fn getBySlug(alloc: Allocator, slug: []const u8) !?BlockTypeRow { 49 + var r = backend.db.row( 50 \\SELECT id, created, updated, name, slug, logo_url, documentation_url, 51 \\ description, code_example, is_protected 52 \\FROM block_type WHERE slug = ? 53 , .{slug}) catch return null; 54 55 + if (r) |*row| { 56 + defer row.deinit(); 57 return BlockTypeRow{ 58 + .id = try alloc.dupe(u8, row.text(0)), 59 + .created = try alloc.dupe(u8, row.text(1)), 60 + .updated = try alloc.dupe(u8, row.text(2)), 61 + .name = try alloc.dupe(u8, row.text(3)), 62 + .slug = try alloc.dupe(u8, row.text(4)), 63 + .logo_url = if (row.text(5).len > 0) try alloc.dupe(u8, row.text(5)) else null, 64 + .documentation_url = if (row.text(6).len > 0) try alloc.dupe(u8, row.text(6)) else null, 65 + .description = if (row.text(7).len > 0) try alloc.dupe(u8, row.text(7)) else null, 66 + .code_example = if (row.text(8).len > 0) try alloc.dupe(u8, row.text(8)) else null, 67 + .is_protected = row.int(9) != 0, 68 }; 69 } 70 return null; 71 } 72 73 pub fn getById(alloc: Allocator, id: []const u8) !?BlockTypeRow { 74 + var r = backend.db.row( 75 \\SELECT id, created, updated, name, slug, logo_url, documentation_url, 76 \\ description, code_example, is_protected 77 \\FROM block_type WHERE id = ? 78 , .{id}) catch return null; 79 80 + if (r) |*row| { 81 + defer row.deinit(); 82 return BlockTypeRow{ 83 + .id = try alloc.dupe(u8, row.text(0)), 84 + .created = try alloc.dupe(u8, row.text(1)), 85 + .updated = try alloc.dupe(u8, row.text(2)), 86 + .name = try alloc.dupe(u8, row.text(3)), 87 + .slug = try alloc.dupe(u8, row.text(4)), 88 + .logo_url = if (row.text(5).len > 0) try alloc.dupe(u8, row.text(5)) else null, 89 + .documentation_url = if (row.text(6).len > 0) try alloc.dupe(u8, row.text(6)) else null, 90 + .description = if (row.text(7).len > 0) try alloc.dupe(u8, row.text(7)) else null, 91 + .code_example = if (row.text(8).len > 0) try alloc.dupe(u8, row.text(8)) else null, 92 + .is_protected = row.int(9) != 0, 93 }; 94 } 95 return null; ··· 102 description: ?[]const u8, 103 code_example: ?[]const u8, 104 ) !void { 105 + // TODO: Use dialect helper for datetime('now') vs NOW() 106 + backend.db.exec( 107 \\UPDATE block_type SET 108 \\ logo_url = ?, documentation_url = ?, description = ?, code_example = ?, 109 \\ updated = datetime('now') ··· 115 } 116 117 pub fn list(alloc: Allocator, limit: usize) ![]BlockTypeRow { 118 var results = std.ArrayListUnmanaged(BlockTypeRow){}; 119 errdefer results.deinit(alloc); 120 121 + var rows = backend.db.query( 122 \\SELECT id, created, updated, name, slug, logo_url, documentation_url, 123 \\ description, code_example, is_protected 124 \\FROM block_type ORDER BY created DESC LIMIT ?
+101
src/db/dialect.zig
···
··· 1 + const std = @import("std"); 2 + const mem = std.mem; 3 + const Allocator = mem.Allocator; 4 + 5 + pub const Dialect = enum { 6 + sqlite, 7 + postgres, 8 + 9 + /// Returns the function/expression for current timestamp 10 + pub fn now(self: Dialect) []const u8 { 11 + return switch (self) { 12 + .sqlite => "datetime('now')", 13 + .postgres => "NOW()", 14 + }; 15 + } 16 + 17 + /// Returns the placeholder format for parameter N (1-indexed) 18 + pub fn placeholder(self: Dialect, buf: *[8]u8, n: usize) []const u8 { 19 + return switch (self) { 20 + .sqlite => "?", 21 + .postgres => std.fmt.bufPrint(buf, "${d}", .{n}) catch "?", 22 + }; 23 + } 24 + 25 + /// Rewrites SQL with ? placeholders to dialect-specific format 26 + /// For SQLite: returns input unchanged 27 + /// For PostgreSQL: rewrites ? to $1, $2, etc. 28 + pub fn rewritePlaceholders(self: Dialect, alloc: Allocator, sql: []const u8) ![]const u8 { 29 + if (self == .sqlite) { 30 + return sql; // no rewrite needed 31 + } 32 + 33 + // count placeholders 34 + var count: usize = 0; 35 + for (sql) |c| { 36 + if (c == '?') count += 1; 37 + } 38 + 39 + if (count == 0) { 40 + return sql; 41 + } 42 + 43 + // allocate output (each ? becomes $N, max 4 chars for reasonable N) 44 + var output = try alloc.alloc(u8, sql.len + count * 4); 45 + var out_idx: usize = 0; 46 + var param_num: usize = 1; 47 + 48 + for (sql) |c| { 49 + if (c == '?') { 50 + var buf: [8]u8 = undefined; 51 + const ph = std.fmt.bufPrint(&buf, "${d}", .{param_num}) catch "$?"; 52 + @memcpy(output[out_idx..][0..ph.len], ph); 53 + out_idx += ph.len; 54 + param_num += 1; 55 + } else { 56 + output[out_idx] = c; 57 + out_idx += 1; 58 + } 59 + } 60 + 61 + // Shrink to exact size so free() works correctly 62 + if (alloc.resize(output, out_idx)) { 63 + return output[0..out_idx]; 64 + } 65 + // If resize not supported, realloc 66 + const exact = alloc.realloc(output, out_idx) catch return output[0..out_idx]; 67 + return exact; 68 + } 69 + 70 + /// Returns the INSERT conflict ignore syntax 71 + /// SQLite: INSERT OR IGNORE INTO ... 72 + /// PostgreSQL: INSERT INTO ... ON CONFLICT DO NOTHING 73 + pub fn insertOrIgnorePrefix(self: Dialect) []const u8 { 74 + return switch (self) { 75 + .sqlite => "INSERT OR IGNORE INTO", 76 + .postgres => "INSERT INTO", 77 + }; 78 + } 79 + 80 + pub fn insertOrIgnoreSuffix(self: Dialect) []const u8 { 81 + return switch (self) { 82 + .sqlite => "", 83 + .postgres => " ON CONFLICT DO NOTHING", 84 + }; 85 + } 86 + }; 87 + 88 + test "rewritePlaceholders - sqlite unchanged" { 89 + const alloc = std.testing.allocator; 90 + const sql = "SELECT * FROM foo WHERE a = ? AND b = ?"; 91 + const result = try Dialect.sqlite.rewritePlaceholders(alloc, sql); 92 + try std.testing.expectEqualStrings(sql, result); 93 + } 94 + 95 + test "rewritePlaceholders - postgres" { 96 + const alloc = std.testing.allocator; 97 + const sql = "SELECT * FROM foo WHERE a = ? AND b = ?"; 98 + const result = try Dialect.postgres.rewritePlaceholders(alloc, sql); 99 + defer alloc.free(result); 100 + try std.testing.expectEqualStrings("SELECT * FROM foo WHERE a = $1 AND b = $2", result); 101 + }
+7 -18
src/db/events.zig
··· 1 - const sqlite = @import("sqlite.zig"); 2 const log = @import("../logging.zig"); 3 4 pub fn insertDeduped( ··· 14 recorded: []const u8, 15 follows: ?[]const u8, 16 ) !void { 17 - sqlite.mutex.lock(); 18 - defer sqlite.mutex.unlock(); 19 - 20 - sqlite.conn.exec( 21 \\INSERT OR IGNORE INTO events (id, occurred, event, resource_id, resource, 22 \\ related_resource_ids, related, payload, received, recorded, follows) 23 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ··· 40 } 41 42 pub fn trim(cutoff: []const u8) !usize { 43 - sqlite.mutex.lock(); 44 - defer sqlite.mutex.unlock(); 45 - 46 - sqlite.conn.exec("DELETE FROM events WHERE occurred < ?", .{cutoff}) catch |err| { 47 log.err("database", "trim events error: {}", .{err}); 48 return err; 49 }; 50 51 - if (sqlite.conn.row("SELECT changes()", .{}) catch null) |row| { 52 - defer row.deinit(); 53 - return @intCast(row.int(0)); 54 - } 55 - return 0; 56 } 57 58 pub fn count() usize { 59 - sqlite.mutex.lock(); 60 - defer sqlite.mutex.unlock(); 61 - 62 - if (sqlite.conn.row("SELECT COUNT(*) FROM events", .{}) catch null) |row| { 63 defer row.deinit(); 64 return @intCast(row.int(0)); 65 }
··· 1 + const backend = @import("backend.zig"); 2 const log = @import("../logging.zig"); 3 4 pub fn insertDeduped( ··· 14 recorded: []const u8, 15 follows: ?[]const u8, 16 ) !void { 17 + // TODO: Use dialect helper for INSERT OR IGNORE vs ON CONFLICT DO NOTHING 18 + backend.db.exec( 19 \\INSERT OR IGNORE INTO events (id, occurred, event, resource_id, resource, 20 \\ related_resource_ids, related, payload, received, recorded, follows) 21 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ··· 38 } 39 40 pub fn trim(cutoff: []const u8) !usize { 41 + backend.db.exec("DELETE FROM events WHERE occurred < ?", .{cutoff}) catch |err| { 42 log.err("database", "trim events error: {}", .{err}); 43 return err; 44 }; 45 46 + return @intCast(backend.db.changes()); 47 } 48 49 pub fn count() usize { 50 + var r = backend.db.row("SELECT COUNT(*) FROM events", .{}) catch return 0; 51 + if (r) |*row| { 52 defer row.deinit(); 53 return @intCast(row.int(0)); 54 }
+30 -37
src/db/flow_runs.zig
··· 1 const std = @import("std"); 2 const Allocator = std.mem.Allocator; 3 4 - const sqlite = @import("sqlite.zig"); 5 const log = @import("../logging.zig"); 6 7 pub const FlowRunRow = struct { ··· 30 state_name: []const u8, 31 timestamp: []const u8, 32 ) !void { 33 - sqlite.mutex.lock(); 34 - defer sqlite.mutex.unlock(); 35 - 36 - sqlite.conn.exec( 37 \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp) 38 \\VALUES (?, ?, ?, ?, ?, ?) 39 , .{ id, flow_id, name, state_type, state_name, timestamp }) catch |err| { ··· 43 } 44 45 pub fn get(alloc: Allocator, id: []const u8) !?FlowRunRow { 46 - sqlite.mutex.lock(); 47 - defer sqlite.mutex.unlock(); 48 - 49 - const row = sqlite.conn.row( 50 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 51 \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time 52 \\FROM flow_run WHERE id = ? 53 , .{id}) catch return null; 54 55 - if (row) |r| { 56 - defer r.deinit(); 57 return FlowRunRow{ 58 - .id = try alloc.dupe(u8, r.text(0)), 59 - .created = try alloc.dupe(u8, r.text(1)), 60 - .updated = try alloc.dupe(u8, r.text(2)), 61 - .flow_id = try alloc.dupe(u8, r.text(3)), 62 - .name = try alloc.dupe(u8, r.text(4)), 63 - .state_type = try alloc.dupe(u8, r.text(5)), 64 - .state_name = try alloc.dupe(u8, r.text(6)), 65 - .state_timestamp = try alloc.dupe(u8, r.text(7)), 66 - .parameters = try alloc.dupe(u8, r.text(8)), 67 - .tags = try alloc.dupe(u8, r.text(9)), 68 - .run_count = r.int(10), 69 - .expected_start_time = if (r.text(11).len > 0) try alloc.dupe(u8, r.text(11)) else null, 70 - .start_time = if (r.text(12).len > 0) try alloc.dupe(u8, r.text(12)) else null, 71 - .end_time = if (r.text(13).len > 0) try alloc.dupe(u8, r.text(13)) else null, 72 - .total_run_time = r.float(14), 73 }; 74 } 75 return null; ··· 86 run_count: i64, 87 total_run_time: f64, 88 ) !void { 89 - sqlite.mutex.lock(); 90 - defer sqlite.mutex.unlock(); 91 92 - sqlite.conn.transaction() catch |err| { 93 log.err("database", "begin transaction error: {}", .{err}); 94 return err; 95 }; 96 - errdefer sqlite.conn.rollback(); 97 98 - sqlite.conn.exec( 99 \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?, 100 \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ? 101 \\WHERE id = ? ··· 107 return err; 108 }; 109 110 - sqlite.conn.exec( 111 \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp) 112 \\VALUES (?, ?, ?, ?, ?) 113 , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| { ··· 115 return err; 116 }; 117 118 - sqlite.conn.commit() catch |err| { 119 log.err("database", "commit error: {}", .{err}); 120 return err; 121 }; 122 } 123 124 pub fn list(alloc: Allocator, limit: usize) ![]FlowRunRow { 125 - sqlite.mutex.lock(); 126 - defer sqlite.mutex.unlock(); 127 - 128 var results = std.ArrayListUnmanaged(FlowRunRow){}; 129 errdefer results.deinit(alloc); 130 131 - var rows = sqlite.conn.rows( 132 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 133 \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time 134 \\FROM flow_run ORDER BY created DESC LIMIT ?
··· 1 const std = @import("std"); 2 const Allocator = std.mem.Allocator; 3 4 + const backend = @import("backend.zig"); 5 const log = @import("../logging.zig"); 6 7 pub const FlowRunRow = struct { ··· 30 state_name: []const u8, 31 timestamp: []const u8, 32 ) !void { 33 + backend.db.exec( 34 \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp) 35 \\VALUES (?, ?, ?, ?, ?, ?) 36 , .{ id, flow_id, name, state_type, state_name, timestamp }) catch |err| { ··· 40 } 41 42 pub fn get(alloc: Allocator, id: []const u8) !?FlowRunRow { 43 + var r = backend.db.row( 44 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 45 \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time 46 \\FROM flow_run WHERE id = ? 47 , .{id}) catch return null; 48 49 + if (r) |*row| { 50 + defer row.deinit(); 51 return FlowRunRow{ 52 + .id = try alloc.dupe(u8, row.text(0)), 53 + .created = try alloc.dupe(u8, row.text(1)), 54 + .updated = try alloc.dupe(u8, row.text(2)), 55 + .flow_id = try alloc.dupe(u8, row.text(3)), 56 + .name = try alloc.dupe(u8, row.text(4)), 57 + .state_type = try alloc.dupe(u8, row.text(5)), 58 + .state_name = try alloc.dupe(u8, row.text(6)), 59 + .state_timestamp = try alloc.dupe(u8, row.text(7)), 60 + .parameters = try alloc.dupe(u8, row.text(8)), 61 + .tags = try alloc.dupe(u8, row.text(9)), 62 + .run_count = row.int(10), 63 + .expected_start_time = if (row.text(11).len > 0) try alloc.dupe(u8, row.text(11)) else null, 64 + .start_time = if (row.text(12).len > 0) try alloc.dupe(u8, row.text(12)) else null, 65 + .end_time = if (row.text(13).len > 0) try alloc.dupe(u8, row.text(13)) else null, 66 + .total_run_time = row.float(14), 67 }; 68 } 69 return null; ··· 80 run_count: i64, 81 total_run_time: f64, 82 ) !void { 83 + // Lock mutex for entire transaction 84 + backend.db.mutex.lock(); 85 + defer backend.db.mutex.unlock(); 86 87 + backend.db.begin() catch |err| { 88 log.err("database", "begin transaction error: {}", .{err}); 89 return err; 90 }; 91 + errdefer backend.db.rollback(); 92 93 + // Use execUnsafe since we already hold the mutex 94 + backend.db.execUnsafe( 95 \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?, 96 \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ? 97 \\WHERE id = ? ··· 103 return err; 104 }; 105 106 + backend.db.execUnsafe( 107 \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp) 108 \\VALUES (?, ?, ?, ?, ?) 109 , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| { ··· 111 return err; 112 }; 113 114 + backend.db.commit() catch |err| { 115 log.err("database", "commit error: {}", .{err}); 116 return err; 117 }; 118 } 119 120 pub fn list(alloc: Allocator, limit: usize) ![]FlowRunRow { 121 var results = std.ArrayListUnmanaged(FlowRunRow){}; 122 errdefer results.deinit(alloc); 123 124 + var rows = backend.db.query( 125 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 126 \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time 127 \\FROM flow_run ORDER BY created DESC LIMIT ?
+19 -31
src/db/flows.zig
··· 1 const std = @import("std"); 2 const Allocator = std.mem.Allocator; 3 4 - const sqlite = @import("sqlite.zig"); 5 const log = @import("../logging.zig"); 6 7 pub const FlowRow = struct { ··· 13 }; 14 15 pub fn getByName(alloc: Allocator, name: []const u8) !?FlowRow { 16 - sqlite.mutex.lock(); 17 - defer sqlite.mutex.unlock(); 18 - 19 - const row = sqlite.conn.row( 20 "SELECT id, created, updated, name, tags FROM flow WHERE name = ?", 21 .{name}, 22 ) catch return null; 23 24 - if (row) |r| { 25 - defer r.deinit(); 26 return FlowRow{ 27 - .id = try alloc.dupe(u8, r.text(0)), 28 - .created = try alloc.dupe(u8, r.text(1)), 29 - .updated = try alloc.dupe(u8, r.text(2)), 30 - .name = try alloc.dupe(u8, r.text(3)), 31 - .tags = try alloc.dupe(u8, r.text(4)), 32 }; 33 } 34 return null; 35 } 36 37 pub fn getById(alloc: Allocator, id: []const u8) !?FlowRow { 38 - sqlite.mutex.lock(); 39 - defer sqlite.mutex.unlock(); 40 - 41 - const row = sqlite.conn.row( 42 "SELECT id, created, updated, name, tags FROM flow WHERE id = ?", 43 .{id}, 44 ) catch return null; 45 46 - if (row) |r| { 47 - defer r.deinit(); 48 return FlowRow{ 49 - .id = try alloc.dupe(u8, r.text(0)), 50 - .created = try alloc.dupe(u8, r.text(1)), 51 - .updated = try alloc.dupe(u8, r.text(2)), 52 - .name = try alloc.dupe(u8, r.text(3)), 53 - .tags = try alloc.dupe(u8, r.text(4)), 54 }; 55 } 56 return null; 57 } 58 59 pub fn insert(id: []const u8, name: []const u8) !void { 60 - sqlite.mutex.lock(); 61 - defer sqlite.mutex.unlock(); 62 - 63 - sqlite.conn.exec( 64 "INSERT INTO flow (id, name) VALUES (?, ?)", 65 .{ id, name }, 66 ) catch |err| { ··· 70 } 71 72 pub fn list(alloc: Allocator, limit: usize) ![]FlowRow { 73 - sqlite.mutex.lock(); 74 - defer sqlite.mutex.unlock(); 75 - 76 var results = std.ArrayListUnmanaged(FlowRow){}; 77 errdefer results.deinit(alloc); 78 79 - var rows = sqlite.conn.rows( 80 "SELECT id, created, updated, name, tags FROM flow ORDER BY created DESC LIMIT ?", 81 .{@as(i64, @intCast(limit))}, 82 ) catch |err| {
··· 1 const std = @import("std"); 2 const Allocator = std.mem.Allocator; 3 4 + const backend = @import("backend.zig"); 5 const log = @import("../logging.zig"); 6 7 pub const FlowRow = struct { ··· 13 }; 14 15 pub fn getByName(alloc: Allocator, name: []const u8) !?FlowRow { 16 + var r = backend.db.row( 17 "SELECT id, created, updated, name, tags FROM flow WHERE name = ?", 18 .{name}, 19 ) catch return null; 20 21 + if (r) |*row| { 22 + defer row.deinit(); 23 return FlowRow{ 24 + .id = try alloc.dupe(u8, row.text(0)), 25 + .created = try alloc.dupe(u8, row.text(1)), 26 + .updated = try alloc.dupe(u8, row.text(2)), 27 + .name = try alloc.dupe(u8, row.text(3)), 28 + .tags = try alloc.dupe(u8, row.text(4)), 29 }; 30 } 31 return null; 32 } 33 34 pub fn getById(alloc: Allocator, id: []const u8) !?FlowRow { 35 + var r = backend.db.row( 36 "SELECT id, created, updated, name, tags FROM flow WHERE id = ?", 37 .{id}, 38 ) catch return null; 39 40 + if (r) |*row| { 41 + defer row.deinit(); 42 return FlowRow{ 43 + .id = try alloc.dupe(u8, row.text(0)), 44 + .created = try alloc.dupe(u8, row.text(1)), 45 + .updated = try alloc.dupe(u8, row.text(2)), 46 + .name = try alloc.dupe(u8, row.text(3)), 47 + .tags = try alloc.dupe(u8, row.text(4)), 48 }; 49 } 50 return null; 51 } 52 53 pub fn insert(id: []const u8, name: []const u8) !void { 54 + backend.db.exec( 55 "INSERT INTO flow (id, name) VALUES (?, ?)", 56 .{ id, name }, 57 ) catch |err| { ··· 61 } 62 63 pub fn list(alloc: Allocator, limit: usize) ![]FlowRow { 64 var results = std.ArrayListUnmanaged(FlowRow){}; 65 errdefer results.deinit(alloc); 66 67 + var rows = backend.db.query( 68 "SELECT id, created, updated, name, tags FROM flow ORDER BY created DESC LIMIT ?", 69 .{@as(i64, @intCast(limit))}, 70 ) catch |err| {
+159
src/db/schema/postgres.zig
···
··· 1 + const std = @import("std"); 2 + const backend = @import("../backend.zig"); 3 + const log = @import("../../logging.zig"); 4 + 5 + /// Initialize PostgreSQL schema 6 + pub fn init() !void { 7 + // flow table 8 + try backend.db.execNoArgs( 9 + \\CREATE TABLE IF NOT EXISTS flow ( 10 + \\ id TEXT PRIMARY KEY, 11 + \\ created TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 12 + \\ updated TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 13 + \\ name TEXT NOT NULL UNIQUE, 14 + \\ tags JSONB DEFAULT '[]' 15 + \\) 16 + ); 17 + 18 + // flow_run table 19 + try backend.db.execNoArgs( 20 + \\CREATE TABLE IF NOT EXISTS flow_run ( 21 + \\ id TEXT PRIMARY KEY, 22 + \\ created TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 23 + \\ updated TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 24 + \\ flow_id TEXT REFERENCES flow(id), 25 + \\ name TEXT NOT NULL, 26 + \\ parameters JSONB DEFAULT '{}', 27 + \\ tags JSONB DEFAULT '[]', 28 + \\ state_id TEXT, 29 + \\ state_type TEXT, 30 + \\ state_name TEXT, 31 + \\ state_timestamp TIMESTAMP WITH TIME ZONE, 32 + \\ run_count INTEGER DEFAULT 0, 33 + \\ expected_start_time TIMESTAMP WITH TIME ZONE, 34 + \\ start_time TIMESTAMP WITH TIME ZONE, 35 + \\ end_time TIMESTAMP WITH TIME ZONE, 36 + \\ total_run_time DOUBLE PRECISION DEFAULT 0.0 37 + \\) 38 + ); 39 + 40 + // flow_run_state table 41 + try backend.db.execNoArgs( 42 + \\CREATE TABLE IF NOT EXISTS flow_run_state ( 43 + \\ id TEXT PRIMARY KEY, 44 + \\ created TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 45 + \\ flow_run_id TEXT REFERENCES flow_run(id) ON DELETE CASCADE, 46 + \\ type TEXT NOT NULL, 47 + \\ name TEXT NOT NULL, 48 + \\ timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW() 49 + \\) 50 + ); 51 + 52 + // task_run table 53 + try backend.db.execNoArgs( 54 + \\CREATE TABLE IF NOT EXISTS task_run ( 55 + \\ id TEXT PRIMARY KEY, 56 + \\ created TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 57 + \\ updated TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 58 + \\ flow_run_id TEXT REFERENCES flow_run(id), 59 + \\ name TEXT NOT NULL, 60 + \\ task_key TEXT NOT NULL, 61 + \\ dynamic_key TEXT NOT NULL, 62 + \\ cache_key TEXT, 63 + \\ tags JSONB DEFAULT '[]', 64 + \\ state_id TEXT, 65 + \\ state_type TEXT, 66 + \\ state_name TEXT, 67 + \\ state_timestamp TIMESTAMP WITH TIME ZONE, 68 + \\ run_count INTEGER DEFAULT 0, 69 + \\ expected_start_time TIMESTAMP WITH TIME ZONE, 70 + \\ start_time TIMESTAMP WITH TIME ZONE, 71 + \\ end_time TIMESTAMP WITH TIME ZONE, 72 + \\ total_run_time DOUBLE PRECISION DEFAULT 0.0 73 + \\) 74 + ); 75 + 76 + // events table 77 + try backend.db.execNoArgs( 78 + \\CREATE TABLE IF NOT EXISTS events ( 79 + \\ id TEXT PRIMARY KEY, 80 + \\ created TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 81 + \\ updated TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 82 + \\ occurred TIMESTAMP WITH TIME ZONE NOT NULL, 83 + \\ event TEXT NOT NULL, 84 + \\ resource_id TEXT NOT NULL, 85 + \\ resource JSONB NOT NULL DEFAULT '{}', 86 + \\ related_resource_ids JSONB DEFAULT '[]', 87 + \\ related JSONB DEFAULT '[]', 88 + \\ payload JSONB DEFAULT '{}', 89 + \\ received TIMESTAMP WITH TIME ZONE NOT NULL, 90 + \\ recorded TIMESTAMP WITH TIME ZONE NOT NULL, 91 + \\ follows TEXT 92 + \\) 93 + ); 94 + 95 + // block_type table 96 + try backend.db.execNoArgs( 97 + \\CREATE TABLE IF NOT EXISTS block_type ( 98 + \\ id TEXT PRIMARY KEY, 99 + \\ created TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 100 + \\ updated TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 101 + \\ name TEXT NOT NULL, 102 + \\ slug TEXT NOT NULL UNIQUE, 103 + \\ logo_url TEXT, 104 + \\ documentation_url TEXT, 105 + \\ description TEXT, 106 + \\ code_example TEXT, 107 + \\ is_protected BOOLEAN DEFAULT FALSE 108 + \\) 109 + ); 110 + 111 + // block_schema table 112 + try backend.db.execNoArgs( 113 + \\CREATE TABLE IF NOT EXISTS block_schema ( 114 + \\ id TEXT PRIMARY KEY, 115 + \\ created TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 116 + \\ updated TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 117 + \\ checksum TEXT NOT NULL, 118 + \\ fields JSONB NOT NULL DEFAULT '{}', 119 + \\ capabilities JSONB NOT NULL DEFAULT '[]', 120 + \\ version TEXT NOT NULL DEFAULT '1', 121 + \\ block_type_id TEXT NOT NULL REFERENCES block_type(id) ON DELETE CASCADE, 122 + \\ UNIQUE(checksum, version) 123 + \\) 124 + ); 125 + 126 + // block_document table 127 + try backend.db.execNoArgs( 128 + \\CREATE TABLE IF NOT EXISTS block_document ( 129 + \\ id TEXT PRIMARY KEY, 130 + \\ created TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 131 + \\ updated TIMESTAMP WITH TIME ZONE DEFAULT NOW(), 132 + \\ name TEXT, 133 + \\ data JSONB NOT NULL DEFAULT '{}', 134 + \\ is_anonymous BOOLEAN DEFAULT FALSE, 135 + \\ block_type_id TEXT NOT NULL REFERENCES block_type(id), 136 + \\ block_type_name TEXT, 137 + \\ block_schema_id TEXT NOT NULL REFERENCES block_schema(id), 138 + \\ UNIQUE(block_type_id, name) 139 + \\) 140 + ); 141 + 142 + // indexes (PostgreSQL syntax) 143 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)"); 144 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)"); 145 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id)"); 146 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_task_run__flow_run_id ON task_run(flow_run_id)"); 147 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_task_run__task_key_dynamic_key ON task_run(task_key, dynamic_key)"); 148 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_events__occurred ON events(occurred)"); 149 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_events__event__id ON events(event, id)"); 150 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_events__event_resource_id_occurred ON events(event, resource_id, occurred)"); 151 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_events__occurred_id ON events(occurred, id)"); 152 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_block_schema__block_type_id ON block_schema(block_type_id)"); 153 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_block_schema__checksum ON block_schema(checksum)"); 154 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_block_document__block_type_id ON block_document(block_type_id)"); 155 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_block_document__block_schema_id ON block_document(block_schema_id)"); 156 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_block_document__name ON block_document(name)"); 157 + 158 + log.info("database", "postgres schema initialized", .{}); 159 + }
+159
src/db/schema/sqlite.zig
···
··· 1 + const std = @import("std"); 2 + const backend = @import("../backend.zig"); 3 + const log = @import("../../logging.zig"); 4 + 5 + /// Initialize SQLite schema 6 + pub fn init() !void { 7 + // flow table 8 + try backend.db.execNoArgs( 9 + \\CREATE TABLE IF NOT EXISTS flow ( 10 + \\ id TEXT PRIMARY KEY, 11 + \\ created TEXT DEFAULT (datetime('now')), 12 + \\ updated TEXT DEFAULT (datetime('now')), 13 + \\ name TEXT NOT NULL UNIQUE, 14 + \\ tags TEXT DEFAULT '[]' 15 + \\) 16 + ); 17 + 18 + // flow_run table 19 + try backend.db.execNoArgs( 20 + \\CREATE TABLE IF NOT EXISTS flow_run ( 21 + \\ id TEXT PRIMARY KEY, 22 + \\ created TEXT DEFAULT (datetime('now')), 23 + \\ updated TEXT DEFAULT (datetime('now')), 24 + \\ flow_id TEXT REFERENCES flow(id), 25 + \\ name TEXT NOT NULL, 26 + \\ parameters TEXT DEFAULT '{}', 27 + \\ tags TEXT DEFAULT '[]', 28 + \\ state_id TEXT, 29 + \\ state_type TEXT, 30 + \\ state_name TEXT, 31 + \\ state_timestamp TEXT, 32 + \\ run_count INTEGER DEFAULT 0, 33 + \\ expected_start_time TEXT, 34 + \\ start_time TEXT, 35 + \\ end_time TEXT, 36 + \\ total_run_time REAL DEFAULT 0.0 37 + \\) 38 + ); 39 + 40 + // flow_run_state table 41 + try backend.db.execNoArgs( 42 + \\CREATE TABLE IF NOT EXISTS flow_run_state ( 43 + \\ id TEXT PRIMARY KEY, 44 + \\ created TEXT DEFAULT (datetime('now')), 45 + \\ flow_run_id TEXT REFERENCES flow_run(id) ON DELETE CASCADE, 46 + \\ type TEXT NOT NULL, 47 + \\ name TEXT NOT NULL, 48 + \\ timestamp TEXT DEFAULT (datetime('now')) 49 + \\) 50 + ); 51 + 52 + // task_run table 53 + try backend.db.execNoArgs( 54 + \\CREATE TABLE IF NOT EXISTS task_run ( 55 + \\ id TEXT PRIMARY KEY, 56 + \\ created TEXT DEFAULT (datetime('now')), 57 + \\ updated TEXT DEFAULT (datetime('now')), 58 + \\ flow_run_id TEXT REFERENCES flow_run(id), 59 + \\ name TEXT NOT NULL, 60 + \\ task_key TEXT NOT NULL, 61 + \\ dynamic_key TEXT NOT NULL, 62 + \\ cache_key TEXT, 63 + \\ tags TEXT DEFAULT '[]', 64 + \\ state_id TEXT, 65 + \\ state_type TEXT, 66 + \\ state_name TEXT, 67 + \\ state_timestamp TEXT, 68 + \\ run_count INTEGER DEFAULT 0, 69 + \\ expected_start_time TEXT, 70 + \\ start_time TEXT, 71 + \\ end_time TEXT, 72 + \\ total_run_time REAL DEFAULT 0.0 73 + \\) 74 + ); 75 + 76 + // events table 77 + try backend.db.execNoArgs( 78 + \\CREATE TABLE IF NOT EXISTS events ( 79 + \\ id TEXT PRIMARY KEY, 80 + \\ created TEXT DEFAULT (datetime('now')), 81 + \\ updated TEXT DEFAULT (datetime('now')), 82 + \\ occurred TEXT NOT NULL, 83 + \\ event TEXT NOT NULL, 84 + \\ resource_id TEXT NOT NULL, 85 + \\ resource TEXT NOT NULL DEFAULT '{}', 86 + \\ related_resource_ids TEXT DEFAULT '[]', 87 + \\ related TEXT DEFAULT '[]', 88 + \\ payload TEXT DEFAULT '{}', 89 + \\ received TEXT NOT NULL, 90 + \\ recorded TEXT NOT NULL, 91 + \\ follows TEXT 92 + \\) 93 + ); 94 + 95 + // block_type table 96 + try backend.db.execNoArgs( 97 + \\CREATE TABLE IF NOT EXISTS block_type ( 98 + \\ id TEXT PRIMARY KEY, 99 + \\ created TEXT DEFAULT (datetime('now')), 100 + \\ updated TEXT DEFAULT (datetime('now')), 101 + \\ name TEXT NOT NULL, 102 + \\ slug TEXT NOT NULL UNIQUE, 103 + \\ logo_url TEXT, 104 + \\ documentation_url TEXT, 105 + \\ description TEXT, 106 + \\ code_example TEXT, 107 + \\ is_protected INTEGER DEFAULT 0 108 + \\) 109 + ); 110 + 111 + // block_schema table 112 + try backend.db.execNoArgs( 113 + \\CREATE TABLE IF NOT EXISTS block_schema ( 114 + \\ id TEXT PRIMARY KEY, 115 + \\ created TEXT DEFAULT (datetime('now')), 116 + \\ updated TEXT DEFAULT (datetime('now')), 117 + \\ checksum TEXT NOT NULL, 118 + \\ fields TEXT NOT NULL DEFAULT '{}', 119 + \\ capabilities TEXT NOT NULL DEFAULT '[]', 120 + \\ version TEXT NOT NULL DEFAULT '1', 121 + \\ block_type_id TEXT NOT NULL REFERENCES block_type(id) ON DELETE CASCADE, 122 + \\ UNIQUE(checksum, version) 123 + \\) 124 + ); 125 + 126 + // block_document table 127 + try backend.db.execNoArgs( 128 + \\CREATE TABLE IF NOT EXISTS block_document ( 129 + \\ id TEXT PRIMARY KEY, 130 + \\ created TEXT DEFAULT (datetime('now')), 131 + \\ updated TEXT DEFAULT (datetime('now')), 132 + \\ name TEXT, 133 + \\ data TEXT NOT NULL DEFAULT '{}', 134 + \\ is_anonymous INTEGER DEFAULT 0, 135 + \\ block_type_id TEXT NOT NULL REFERENCES block_type(id), 136 + \\ block_type_name TEXT, 137 + \\ block_schema_id TEXT NOT NULL REFERENCES block_schema(id), 138 + \\ UNIQUE(block_type_id, name) 139 + \\) 140 + ); 141 + 142 + // indexes 143 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_flow_run__state_type ON flow_run(state_type)"); 144 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_flow_run__flow_id ON flow_run(flow_id)"); 145 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_flow_run_state__flow_run_id ON flow_run_state(flow_run_id)"); 146 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_task_run__flow_run_id ON task_run(flow_run_id)"); 147 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_task_run__task_key_dynamic_key ON task_run(task_key, dynamic_key)"); 148 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_events__occurred ON events(occurred)"); 149 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_events__event__id ON events(event, id)"); 150 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_events__event_resource_id_occurred ON events(event, resource_id, occurred)"); 151 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_events__occurred_id ON events(occurred, id)"); 152 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_block_schema__block_type_id ON block_schema(block_type_id)"); 153 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_block_schema__checksum ON block_schema(checksum)"); 154 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_block_document__block_type_id ON block_document(block_type_id)"); 155 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_block_document__block_schema_id ON block_document(block_schema_id)"); 156 + try backend.db.execNoArgs("CREATE INDEX IF NOT EXISTS ix_block_document__name ON block_document(name)"); 157 + 158 + log.info("database", "sqlite schema initialized", .{}); 159 + }
+15 -18
src/db/sqlite.zig
··· 2 const zqlite = @import("zqlite"); 3 const Thread = std.Thread; 4 const log = @import("../logging.zig"); 5 6 // sub-modules 7 pub const flows = @import("flows.zig"); ··· 38 pub const trimEvents = events.trim; 39 pub const countEvents = events.count; 40 41 - // shared state 42 pub var conn: zqlite.Conn = undefined; 43 pub var mutex: Thread.Mutex = .{}; 44 45 var path_buf: [256]u8 = undefined; 46 47 pub fn init() !void { 48 - const path_env = std.posix.getenv("PREFECT_DATABASE_PATH") orelse "prefect.db"; 49 - 50 - @memcpy(path_buf[0..path_env.len], path_env); 51 - path_buf[path_env.len] = 0; 52 - const path: [*:0]const u8 = path_buf[0..path_env.len :0]; 53 - 54 - log.debug("database", "opening {s}", .{path_env}); 55 - 56 - const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite; 57 - conn = zqlite.open(path, flags) catch |err| { 58 - log.err("database", "failed to open: {}", .{err}); 59 - return err; 60 - }; 61 62 - _ = conn.exec("PRAGMA journal_mode=WAL", .{}) catch {}; 63 - _ = conn.exec("PRAGMA busy_timeout=5000", .{}) catch {}; 64 - _ = conn.exec("PRAGMA foreign_keys=ON", .{}) catch {}; 65 66 try initSchema(); 67 } 68 69 pub fn close() void { 70 - conn.close(); 71 } 72 73 fn initSchema() !void {
··· 2 const zqlite = @import("zqlite"); 3 const Thread = std.Thread; 4 const log = @import("../logging.zig"); 5 + const backend = @import("backend.zig"); 6 7 // sub-modules 8 pub const flows = @import("flows.zig"); ··· 39 pub const trimEvents = events.trim; 40 pub const countEvents = events.count; 41 42 + // legacy shared state (deprecated - use backend.db instead) 43 pub var conn: zqlite.Conn = undefined; 44 pub var mutex: Thread.Mutex = .{}; 45 46 var path_buf: [256]u8 = undefined; 47 48 pub fn init() !void { 49 + // Initialize the unified backend (SQLite or PostgreSQL based on env vars) 50 + try backend.init(); 51 52 + // For backward compatibility with entity modules not yet migrated, 53 + // also store the raw SQLite connection if using SQLite backend 54 + switch (backend.db.impl) { 55 + .sqlite => |s| { 56 + conn = s.conn; 57 + }, 58 + .postgres => { 59 + // postgres doesn't have a single conn - leave conn undefined 60 + }, 61 + } 62 63 try initSchema(); 64 } 65 66 pub fn close() void { 67 + backend.close(); 68 } 69 70 fn initSchema() !void {
+36 -51
src/db/task_runs.zig
··· 1 const std = @import("std"); 2 const Allocator = std.mem.Allocator; 3 4 - const sqlite = @import("sqlite.zig"); 5 const log = @import("../logging.zig"); 6 7 pub const TaskRunRow = struct { ··· 29 state_name: []const u8, 30 timestamp: []const u8, 31 ) !void { 32 - sqlite.mutex.lock(); 33 - defer sqlite.mutex.unlock(); 34 - 35 - sqlite.conn.exec( 36 \\INSERT INTO task_run (id, flow_run_id, name, task_key, dynamic_key, state_type, state_name, state_timestamp) 37 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 38 , .{ ··· 44 } 45 46 pub fn get(alloc: Allocator, id: []const u8) !?TaskRunRow { 47 - sqlite.mutex.lock(); 48 - defer sqlite.mutex.unlock(); 49 - 50 - const row = sqlite.conn.row( 51 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, 52 \\ state_type, state_name, state_timestamp, tags, run_count 53 \\FROM task_run WHERE id = ? 54 , .{id}) catch return null; 55 56 - if (row) |r| { 57 - defer r.deinit(); 58 return TaskRunRow{ 59 - .id = try alloc.dupe(u8, r.text(0)), 60 - .created = try alloc.dupe(u8, r.text(1)), 61 - .updated = try alloc.dupe(u8, r.text(2)), 62 - .flow_run_id = try alloc.dupe(u8, r.text(3)), 63 - .name = try alloc.dupe(u8, r.text(4)), 64 - .task_key = try alloc.dupe(u8, r.text(5)), 65 - .dynamic_key = try alloc.dupe(u8, r.text(6)), 66 - .state_type = try alloc.dupe(u8, r.text(7)), 67 - .state_name = try alloc.dupe(u8, r.text(8)), 68 - .state_timestamp = try alloc.dupe(u8, r.text(9)), 69 - .tags = try alloc.dupe(u8, r.text(10)), 70 - .run_count = r.int(11), 71 }; 72 } 73 return null; ··· 79 task_key: []const u8, 80 dynamic_key: []const u8, 81 ) !?TaskRunRow { 82 - sqlite.mutex.lock(); 83 - defer sqlite.mutex.unlock(); 84 - 85 - const row = if (flow_run_id) |frid| 86 - sqlite.conn.row( 87 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, 88 \\ state_type, state_name, state_timestamp, tags, run_count 89 \\FROM task_run WHERE flow_run_id = ? AND task_key = ? AND dynamic_key = ? 90 , .{ frid, task_key, dynamic_key }) catch return null 91 else 92 - sqlite.conn.row( 93 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, 94 \\ state_type, state_name, state_timestamp, tags, run_count 95 \\FROM task_run WHERE flow_run_id IS NULL AND task_key = ? AND dynamic_key = ? 96 , .{ task_key, dynamic_key }) catch return null; 97 98 - if (row) |r| { 99 - defer r.deinit(); 100 return TaskRunRow{ 101 - .id = try alloc.dupe(u8, r.text(0)), 102 - .created = try alloc.dupe(u8, r.text(1)), 103 - .updated = try alloc.dupe(u8, r.text(2)), 104 - .flow_run_id = try alloc.dupe(u8, r.text(3)), 105 - .name = try alloc.dupe(u8, r.text(4)), 106 - .task_key = try alloc.dupe(u8, r.text(5)), 107 - .dynamic_key = try alloc.dupe(u8, r.text(6)), 108 - .state_type = try alloc.dupe(u8, r.text(7)), 109 - .state_name = try alloc.dupe(u8, r.text(8)), 110 - .state_timestamp = try alloc.dupe(u8, r.text(9)), 111 - .tags = try alloc.dupe(u8, r.text(10)), 112 - .run_count = r.int(11), 113 }; 114 } 115 return null; ··· 122 state_name: []const u8, 123 timestamp: []const u8, 124 ) !void { 125 - sqlite.mutex.lock(); 126 - defer sqlite.mutex.unlock(); 127 - 128 - sqlite.conn.exec( 129 \\UPDATE task_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ? 130 \\WHERE id = ? 131 , .{ state_id, state_type, state_name, timestamp, timestamp, run_id }) catch |err| { ··· 135 } 136 137 pub fn list(alloc: Allocator, limit: usize) ![]TaskRunRow { 138 - sqlite.mutex.lock(); 139 - defer sqlite.mutex.unlock(); 140 - 141 var results = std.ArrayListUnmanaged(TaskRunRow){}; 142 errdefer results.deinit(alloc); 143 144 - var rows = sqlite.conn.rows( 145 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, 146 \\ state_type, state_name, state_timestamp, tags, run_count 147 \\FROM task_run ORDER BY created DESC LIMIT ?
··· 1 const std = @import("std"); 2 const Allocator = std.mem.Allocator; 3 4 + const backend = @import("backend.zig"); 5 const log = @import("../logging.zig"); 6 7 pub const TaskRunRow = struct { ··· 29 state_name: []const u8, 30 timestamp: []const u8, 31 ) !void { 32 + backend.db.exec( 33 \\INSERT INTO task_run (id, flow_run_id, name, task_key, dynamic_key, state_type, state_name, state_timestamp) 34 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 35 , .{ ··· 41 } 42 43 pub fn get(alloc: Allocator, id: []const u8) !?TaskRunRow { 44 + var r = backend.db.row( 45 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, 46 \\ state_type, state_name, state_timestamp, tags, run_count 47 \\FROM task_run WHERE id = ? 48 , .{id}) catch return null; 49 50 + if (r) |*row| { 51 + defer row.deinit(); 52 return TaskRunRow{ 53 + .id = try alloc.dupe(u8, row.text(0)), 54 + .created = try alloc.dupe(u8, row.text(1)), 55 + .updated = try alloc.dupe(u8, row.text(2)), 56 + .flow_run_id = try alloc.dupe(u8, row.text(3)), 57 + .name = try alloc.dupe(u8, row.text(4)), 58 + .task_key = try alloc.dupe(u8, row.text(5)), 59 + .dynamic_key = try alloc.dupe(u8, row.text(6)), 60 + .state_type = try alloc.dupe(u8, row.text(7)), 61 + .state_name = try alloc.dupe(u8, row.text(8)), 62 + .state_timestamp = try alloc.dupe(u8, row.text(9)), 63 + .tags = try alloc.dupe(u8, row.text(10)), 64 + .run_count = row.int(11), 65 }; 66 } 67 return null; ··· 73 task_key: []const u8, 74 dynamic_key: []const u8, 75 ) !?TaskRunRow { 76 + var r = if (flow_run_id) |frid| 77 + backend.db.row( 78 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, 79 \\ state_type, state_name, state_timestamp, tags, run_count 80 \\FROM task_run WHERE flow_run_id = ? AND task_key = ? AND dynamic_key = ? 81 , .{ frid, task_key, dynamic_key }) catch return null 82 else 83 + backend.db.row( 84 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, 85 \\ state_type, state_name, state_timestamp, tags, run_count 86 \\FROM task_run WHERE flow_run_id IS NULL AND task_key = ? AND dynamic_key = ? 87 , .{ task_key, dynamic_key }) catch return null; 88 89 + if (r) |*row| { 90 + defer row.deinit(); 91 return TaskRunRow{ 92 + .id = try alloc.dupe(u8, row.text(0)), 93 + .created = try alloc.dupe(u8, row.text(1)), 94 + .updated = try alloc.dupe(u8, row.text(2)), 95 + .flow_run_id = try alloc.dupe(u8, row.text(3)), 96 + .name = try alloc.dupe(u8, row.text(4)), 97 + .task_key = try alloc.dupe(u8, row.text(5)), 98 + .dynamic_key = try alloc.dupe(u8, row.text(6)), 99 + .state_type = try alloc.dupe(u8, row.text(7)), 100 + .state_name = try alloc.dupe(u8, row.text(8)), 101 + .state_timestamp = try alloc.dupe(u8, row.text(9)), 102 + .tags = try alloc.dupe(u8, row.text(10)), 103 + .run_count = row.int(11), 104 }; 105 } 106 return null; ··· 113 state_name: []const u8, 114 timestamp: []const u8, 115 ) !void { 116 + backend.db.exec( 117 \\UPDATE task_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ? 118 \\WHERE id = ? 119 , .{ state_id, state_type, state_name, timestamp, timestamp, run_id }) catch |err| { ··· 123 } 124 125 pub fn list(alloc: Allocator, limit: usize) ![]TaskRunRow { 126 var results = std.ArrayListUnmanaged(TaskRunRow){}; 127 errdefer results.deinit(alloc); 128 129 + var rows = backend.db.query( 130 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, 131 \\ state_type, state_name, state_timestamp, tags, run_count 132 \\FROM task_run ORDER BY created DESC LIMIT ?
+8
src/main.zig
··· 3 const posix = std.posix; 4 5 const db = @import("db/sqlite.zig"); 6 const routes = @import("api/routes.zig"); 7 const events = @import("api/events.zig"); 8 const log = @import("logging.zig"); ··· 55 .workers = 1, 56 }); 57 }
··· 3 const posix = std.posix; 4 5 const db = @import("db/sqlite.zig"); 6 + const backend = @import("db/backend.zig"); 7 const routes = @import("api/routes.zig"); 8 const events = @import("api/events.zig"); 9 const log = @import("logging.zig"); ··· 56 .workers = 1, 57 }); 58 } 59 + 60 + test { 61 + // include tests from submodules 62 + _ = @import("db/backend.zig"); 63 + _ = @import("db/dialect.zig"); 64 + _ = @import("utilities/hashing.zig"); 65 + }
+131
src/utilities/hashing.zig
···
··· 1 + const std = @import("std"); 2 + const json = std.json; 3 + const mem = std.mem; 4 + const crypto = std.crypto.hash.sha2; 5 + 6 + /// hash a json value deterministically by sorting object keys. 7 + /// mirrors python prefect's hash_objects() behavior. 8 + /// 9 + /// python wraps args in ((args,), {}) before serializing, so the final 10 + /// format is: [[json_with_sorted_keys], {}] 11 + pub fn hashJson(alloc: std.mem.Allocator, input: []const u8) ![]const u8 { 12 + // parse the input 13 + const parsed = try json.parseFromSlice(json.Value, alloc, input, .{}); 14 + defer parsed.deinit(); 15 + 16 + // serialize with sorted keys, wrapped in python tuple format 17 + var output: std.Io.Writer.Allocating = .init(alloc); 18 + try output.writer.writeAll("[["); 19 + try writeSortedJson(&output.writer, parsed.value); 20 + try output.writer.writeAll("], {}]"); 21 + const to_hash = try output.toOwnedSlice(); 22 + defer alloc.free(to_hash); 23 + 24 + // sha256 hash 25 + var hasher = crypto.Sha256.init(.{}); 26 + hasher.update(to_hash); 27 + const hash = hasher.finalResult(); 28 + 29 + // format as "sha256:" + hex 30 + var result = try alloc.alloc(u8, 71); 31 + @memcpy(result[0..7], "sha256:"); 32 + const hex = std.fmt.bytesToHex(hash, .lower); 33 + @memcpy(result[7..71], &hex); 34 + return result; 35 + } 36 + 37 + /// write json value with object keys sorted alphabetically 38 + fn writeSortedJson(writer: *std.Io.Writer, value: json.Value) !void { 39 + switch (value) { 40 + .object => |obj| { 41 + // collect and sort keys 42 + var keys = try std.heap.page_allocator.alloc([]const u8, obj.count()); 43 + defer std.heap.page_allocator.free(keys); 44 + 45 + var i: usize = 0; 46 + var it = obj.iterator(); 47 + while (it.next()) |entry| { 48 + keys[i] = entry.key_ptr.*; 49 + i += 1; 50 + } 51 + 52 + mem.sort([]const u8, keys, {}, struct { 53 + fn lessThan(_: void, a: []const u8, b: []const u8) bool { 54 + return mem.order(u8, a, b) == .lt; 55 + } 56 + }.lessThan); 57 + 58 + try writer.writeByte('{'); 59 + for (keys, 0..) |key, idx| { 60 + if (idx > 0) try writer.writeAll(", "); 61 + try writeString(writer, key); 62 + try writer.writeAll(": "); // python json.dumps adds space after colon 63 + try writeSortedJson(writer, obj.get(key).?); 64 + } 65 + try writer.writeByte('}'); 66 + }, 67 + .array => |arr| { 68 + try writer.writeByte('['); 69 + for (arr.items, 0..) |item, idx| { 70 + if (idx > 0) try writer.writeAll(", "); 71 + try writeSortedJson(writer, item); 72 + } 73 + try writer.writeByte(']'); 74 + }, 75 + .string => |s| try writeString(writer, s), 76 + .number_string => |s| try writer.writeAll(s), 77 + .integer => |n| { 78 + var buf: [21]u8 = undefined; 79 + const slice = std.fmt.bufPrint(&buf, "{d}", .{n}) catch return error.OutOfMemory; 80 + try writer.writeAll(slice); 81 + }, 82 + .float => |f| { 83 + var buf: [32]u8 = undefined; 84 + const slice = std.fmt.bufPrint(&buf, "{d}", .{f}) catch return error.OutOfMemory; 85 + try writer.writeAll(slice); 86 + }, 87 + .bool => |b| try writer.writeAll(if (b) "true" else "false"), 88 + .null => try writer.writeAll("null"), 89 + } 90 + } 91 + 92 + /// write json-escaped string 93 + fn writeString(writer: *std.Io.Writer, s: []const u8) !void { 94 + try writer.writeByte('"'); 95 + for (s) |c| { 96 + switch (c) { 97 + '"' => try writer.writeAll("\\\""), 98 + '\\' => try writer.writeAll("\\\\"), 99 + '\n' => try writer.writeAll("\\n"), 100 + '\r' => try writer.writeAll("\\r"), 101 + '\t' => try writer.writeAll("\\t"), 102 + 0x00...0x08, 0x0b, 0x0c, 0x0e...0x1f => { 103 + var buf: [6]u8 = undefined; 104 + const slice = std.fmt.bufPrint(&buf, "\\u{x:0>4}", .{c}) catch unreachable; 105 + try writer.writeAll(slice); 106 + }, 107 + else => try writer.writeByte(c), 108 + } 109 + } 110 + try writer.writeByte('"'); 111 + } 112 + 113 + test "hashJson sorts keys" { 114 + const alloc = std.testing.allocator; 115 + // two inputs with same content but different key order should produce same hash 116 + const hash1 = try hashJson(alloc, "{\"z\":1,\"a\":2}"); 117 + defer alloc.free(hash1); 118 + const hash2 = try hashJson(alloc, "{\"a\":2,\"z\":1}"); 119 + defer alloc.free(hash2); 120 + try std.testing.expectEqualStrings(hash1, hash2); 121 + } 122 + 123 + test "hashJson matches python prefect" { 124 + const alloc = std.testing.allocator; 125 + // verified against: python -c 'from prefect.utilities.hashing import hash_objects; import hashlib; print(f"sha256:{hash_objects({"properties": {"value": {"type": "string"}}}, hash_algo=hashlib.sha256)}")' 126 + const input = "{\"properties\":{\"value\":{\"type\":\"string\"}}}"; 127 + const expected = "sha256:f41edbaa8236808b60085ab23e34f3a9c0a596c57ea66e303b9ed544d6b7e59b"; 128 + const result = try hashJson(alloc, input); 129 + defer alloc.free(result); 130 + try std.testing.expectEqualStrings(expected, result); 131 + }