prefect server in zig

add get_scheduled_flow_runs endpoint with datetime standardization

worker polling endpoint:
- POST /work_pools/{name}/get_scheduled_flow_runs
- returns WorkerFlowRunResponse[] (work_pool_id, work_queue_id, flow_run)
- updates queue/pool/deployment status to READY on poll
- supports work_queue_names filter and scheduled_before time filter

datetime standardization:
- unified ISO 8601 format: 2025-01-21T12:34:56.123456Z
- updated sqlite schema defaults: strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
- updated postgres schema defaults: TO_CHAR(NOW() AT TIME ZONE 'UTC', ...)
- added time utilities: parse(), lessOrEqual(), isPast(), formatMicros()

deployment work_queue_id resolution:
- resolve work_queue_id from work_pool_name on deployment create
- uses pool's default queue if work_queue_name not specified
- enables flow runs created from deployments to be found by worker polling

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+650 -58
+21
docs/scratch/configuration-audit.md
··· 60 60 |---------|-----------|----------------|----------------|-------| 61 61 | schedule_max_runs | n/a | 50 | `PREFECT_SERVER_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS` | scheduler not implemented yet | 62 62 63 + ## datetime handling 64 + 65 + | aspect | zig implementation | python | notes | 66 + |--------|-------------------|--------|-------| 67 + | format | ISO 8601 (`2025-01-21T12:34:56.123456Z`) | ISO 8601 | ✅ matches | 68 + | timezone | UTC (Z suffix) | UTC-aware | ✅ matches | 69 + | precision | microseconds | microseconds | ✅ matches | 70 + | storage | TEXT column | TIMESTAMP column | sqlite/postgres both use TEXT for consistency | 71 + | parsing | supports multiple formats | pydantic datetime | handles SQLite/postgres/ISO formats | 72 + 73 + ### utilities (`src/utilities/time.zig`) 74 + - `timestamp()` - generate current time as ISO 8601 string 75 + - `nowMicros()` - get current time as microseconds since epoch 76 + - `parse()` - parse ISO 8601 strings (handles `T`/space separator, optional micros, timezone) 77 + - `lessOrEqual()` - compare two timestamp strings 78 + - `isPast()` - check if timestamp is in the past 79 + 80 + ### schema defaults 81 + - SQLite: `strftime('%Y-%m-%dT%H:%M:%fZ', 'now')` - ISO 8601 with 3-digit millis 82 + - PostgreSQL: `TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"')` - ISO 8601 with 6-digit micros 83 + 63 84 ## remaining work 64 85 65 86 ### medium priority (production safety)
+85
scripts/test-api-sequence
··· 837 837 return True 838 838 839 839 840 + def test_get_scheduled_flow_runs(client: CountingClient) -> bool: 841 + """Test get_scheduled_flow_runs endpoint (worker polling).""" 842 + from datetime import datetime, timezone 843 + 844 + def fail(msg: str) -> bool: 845 + if not QUIET: console.print(f"[red]FAIL[/red]: {msg}") 846 + return False 847 + 848 + def log(msg: str) -> None: 849 + if not QUIET: console.print(msg) 850 + 851 + # setup 852 + log("[bold]setup: create flow[/bold]") 853 + resp = client.post("/flows/", json={"name": f"schedule-flow-{uuid.uuid4().hex[:8]}"}) 854 + if resp.status_code not in (200, 201): return fail(f"create flow {resp.status_code}") 855 + flow_id = resp.json().get("id") 856 + 857 + pool_name = f"schedule-pool-{uuid.uuid4().hex[:8]}" 858 + resp = client.post("/work_pools/", json={"name": pool_name, "type": "process"}) 859 + if resp.status_code not in (200, 201): return fail(f"create work_pool {resp.status_code}") 860 + pool = resp.json() 861 + pool_id, default_queue_id = pool.get("id"), pool.get("default_queue_id") 862 + log(f" pool: {pool_id}") 863 + if pool.get("status") != "NOT_READY": return fail(f"expected pool NOT_READY, got {pool.get('status')}") 864 + 865 + resp = client.post("/deployments/", json={"name": f"schedule-deploy-{uuid.uuid4().hex[:8]}", "flow_id": flow_id, "work_pool_name": pool_name}) 866 + if resp.status_code not in (200, 201): return fail(f"create deployment {resp.status_code}") 867 + deployment = resp.json() 868 + deployment_id = deployment.get("id") 869 + log(f" deployment: {deployment_id}") 870 + if deployment.get("status") != "NOT_READY": return fail(f"expected deployment NOT_READY, got {deployment.get('status')}") 871 + 872 + # create scheduled flow run 873 + log("[bold]create scheduled flow run[/bold]") 874 + resp = client.post(f"/deployments/{deployment_id}/create_flow_run", json={"state": {"type": "SCHEDULED", "name": "Scheduled"}}) 875 + if resp.status_code not in (200, 201): return fail(f"create flow run {resp.status_code}") 876 + flow_run = resp.json() 877 + flow_run_id = flow_run.get("id") 878 + log(f" flow_run: {flow_run_id}") 879 + log(f" state: {flow_run.get('state_type')}") 880 + if flow_run.get("state_type") != "SCHEDULED": return fail(f"expected SCHEDULED, got {flow_run.get('state_type')}") 881 + 882 + # test polling 883 + log("[bold]get_scheduled_flow_runs[/bold]") 884 + resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={}) 885 + if resp.status_code != 200: return fail(f"get_scheduled_flow_runs {resp.status_code}") 886 + scheduled_runs = resp.json() 887 + if not isinstance(scheduled_runs, list): return fail(f"expected list, got {type(scheduled_runs)}") 888 + log(f" returned {len(scheduled_runs)} runs") 889 + 890 + # verify our run is in results 891 + found = any(item.get("flow_run", {}).get("id") == flow_run_id and 892 + item.get("work_pool_id") == pool_id and item.get("work_queue_id") == default_queue_id 893 + for item in scheduled_runs) 894 + if not found: return fail("scheduled flow run not found in results") 895 + log(" flow run found in results") 896 + 897 + # verify status changes 898 + resp = client.get(f"/work_pools/{pool_name}") 899 + if resp.status_code != 200 or resp.json().get("status") != "READY": 900 + return fail(f"expected pool READY after polling") 901 + log(" pool status: READY") 902 + 903 + resp = client.get(f"/deployments/{deployment_id}") 904 + if resp.status_code != 200 or resp.json().get("status") != "READY": 905 + return fail(f"expected deployment READY after polling") 906 + log(" deployment status: READY") 907 + 908 + # test filters 909 + resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"work_queue_names": ["default"]}) 910 + if resp.status_code != 200: return fail(f"filter test {resp.status_code}") 911 + log(" filtered by queue: ok") 912 + 913 + resp = client.post(f"/work_pools/{pool_name}/get_scheduled_flow_runs", json={"scheduled_before": datetime.now(timezone.utc).isoformat()}) 914 + if resp.status_code != 200: return fail(f"scheduled_before test {resp.status_code}") 915 + log(f" scheduled_before filter: {len(resp.json())} runs") 916 + 917 + # cleanup 918 + client.delete(f"/deployments/{deployment_id}") 919 + client.delete(f"/work_pools/{pool_name}") 920 + log(" cleanup: ok") 921 + return True 922 + 923 + 840 924 def main(): 841 925 json_output = "--json" in sys.argv 842 926 ··· 856 940 results.append(run_test("blocks", test_blocks)) 857 941 results.append(run_test("work_pools", test_work_pools)) 858 942 results.append(run_test("deployments", test_deployments)) 943 + results.append(run_test("get_scheduled_flow_runs", test_get_scheduled_flow_runs)) 859 944 860 945 total_duration = sum(r.duration_ms for r in results) 861 946 total_requests = sum(r.requests for r in results)
+18 -1
src/api/deployments.zig
··· 186 186 var id_buf: [36]u8 = undefined; 187 187 const new_id = uuid_util.generate(&id_buf); 188 188 189 - db.deployments.insert(new_id, name, flow_id, now, buildInsertParams(obj)) catch { 189 + // Resolve work_queue_id from work_pool_name if provided 190 + var insert_params = buildInsertParams(obj); 191 + if (insert_params.work_pool_name) |pool_name| { 192 + if (db.work_pools.getByName(alloc, pool_name) catch null) |pool| { 193 + if (insert_params.work_queue_name) |queue_name| { 194 + // Look up specific queue by name 195 + if (db.work_queues.getByPoolAndName(alloc, pool.id, queue_name) catch null) |queue| { 196 + insert_params.work_queue_id = queue.id; 197 + } 198 + } else if (pool.default_queue_id) |default_id| { 199 + // Use pool's default queue 200 + insert_params.work_queue_id = default_id; 201 + insert_params.work_queue_name = "default"; 202 + } 203 + } 204 + } 205 + 206 + db.deployments.insert(new_id, name, flow_id, now, insert_params) catch { 190 207 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 191 208 return; 192 209 };
+250
src/api/work_pool_schedule.zig
··· 1 + const std = @import("std"); 2 + const zap = @import("zap"); 3 + const json = std.json; 4 + 5 + const db = @import("../db/sqlite.zig"); 6 + const time_util = @import("../utilities/time.zig"); 7 + const json_util = @import("../utilities/json.zig"); 8 + 9 + /// Handle POST /work_pools/{name}/get_scheduled_flow_runs 10 + pub fn handle(r: zap.Request, target: []const u8) !void { 11 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 12 + defer arena.deinit(); 13 + const alloc = arena.allocator(); 14 + 15 + const pool_name = extractPoolName(target) orelse { 16 + json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 17 + return; 18 + }; 19 + 20 + const pool = db.work_pools.getByName(alloc, pool_name) catch null orelse { 21 + json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 22 + return; 23 + }; 24 + 25 + // Parse request body 26 + var limit: usize = 200; 27 + var scheduled_before: ?[]const u8 = null; 28 + var work_queue_names = std.ArrayListUnmanaged([]const u8){}; 29 + 30 + if (r.body) |body| { 31 + if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 32 + const obj = parsed.value.object; 33 + 34 + if (obj.get("limit")) |v| { 35 + if (v == .integer) limit = @intCast(v.integer); 36 + } 37 + 38 + if (obj.get("scheduled_before")) |v| { 39 + if (v == .string) scheduled_before = v.string; 40 + } 41 + 42 + if (obj.get("work_queue_names")) |v| { 43 + if (v == .array) { 44 + for (v.array.items) |item| { 45 + if (item == .string) { 46 + try work_queue_names.append(alloc, item.string); 47 + } 48 + } 49 + } 50 + } 51 + } else |_| {} 52 + } 53 + 54 + // Default scheduled_before to now if not provided 55 + var ts_buf: [32]u8 = undefined; 56 + const now = time_util.timestamp(&ts_buf); 57 + if (scheduled_before == null) { 58 + scheduled_before = now; 59 + } 60 + 61 + // Get work queues 62 + var queue_ids = std.ArrayListUnmanaged([]const u8){}; 63 + var work_queues = std.ArrayListUnmanaged(db.work_queues.WorkQueueRow){}; 64 + 65 + if (work_queue_names.items.len > 0) { 66 + // Filter by specific queue names 67 + for (work_queue_names.items) |qname| { 68 + if (db.work_queues.getByPoolAndName(alloc, pool.id, qname) catch null) |queue| { 69 + try queue_ids.append(alloc, queue.id); 70 + try work_queues.append(alloc, queue); 71 + } 72 + } 73 + } else { 74 + // Get all queues in pool 75 + const all_queues = db.work_queues.listByPool(alloc, pool.id, 1000, 0) catch { 76 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 77 + return; 78 + }; 79 + for (all_queues) |queue| { 80 + try queue_ids.append(alloc, queue.id); 81 + try work_queues.append(alloc, queue); 82 + } 83 + } 84 + 85 + // Get scheduled flow runs 86 + const flow_runs = db.flow_runs.getScheduledByWorkQueues( 87 + alloc, 88 + queue_ids.items, 89 + scheduled_before, 90 + limit, 91 + ) catch { 92 + json_util.sendStatus(r, "{\"detail\":\"database error\"}", .internal_server_error); 93 + return; 94 + }; 95 + 96 + // Update last_polled for work queues and mark as ready if needed 97 + for (work_queues.items) |queue| { 98 + const new_status: db.work_queues.Status = if (queue.is_paused) .paused else .ready; 99 + _ = db.work_queues.updateLastPolled(pool.id, queue.name, now, new_status) catch {}; 100 + } 101 + 102 + // Mark pool as ready if it was not_ready 103 + if (pool.status == .not_ready) { 104 + _ = db.work_pools.updateStatus(pool_name, .ready, now) catch {}; 105 + } 106 + 107 + // Mark deployments as ready (deployments for the queues we polled) 108 + db.deployments.markReadyByWorkQueues(queue_ids.items, now) catch {}; 109 + 110 + // Build response as WorkerFlowRunResponse[] 111 + var output: std.io.Writer.Allocating = .init(alloc); 112 + var jw: json.Stringify = .{ .writer = &output.writer }; 113 + 114 + jw.beginArray() catch { 115 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 116 + return; 117 + }; 118 + 119 + for (flow_runs) |run| { 120 + writeWorkerFlowRunResponse(&jw, pool.id, run) catch continue; 121 + } 122 + 123 + jw.endArray() catch {}; 124 + 125 + json_util.send(r, output.toOwnedSlice() catch "[]"); 126 + } 127 + 128 + fn extractPoolName(target: []const u8) ?[]const u8 { 129 + const prefix = "/api/work_pools/"; 130 + if (!std.mem.startsWith(u8, target, prefix)) return null; 131 + 132 + const rest = target[prefix.len..]; 133 + const end = std.mem.indexOf(u8, rest, "/") orelse return null; 134 + if (end == 0) return null; 135 + 136 + return rest[0..end]; 137 + } 138 + 139 + fn writeWorkerFlowRunResponse(jw: *json.Stringify, work_pool_id: []const u8, run: db.flow_runs.FlowRunRow) !void { 140 + try jw.beginObject(); 141 + 142 + try jw.objectField("work_pool_id"); 143 + try jw.write(work_pool_id); 144 + 145 + try jw.objectField("work_queue_id"); 146 + try jw.write(run.work_queue_id); 147 + 148 + try jw.objectField("flow_run"); 149 + try writeFlowRunObject(jw, run); 150 + 151 + try jw.endObject(); 152 + } 153 + 154 + fn writeFlowRunObject(jw: *json.Stringify, run: db.flow_runs.FlowRunRow) !void { 155 + try jw.beginObject(); 156 + 157 + try jw.objectField("id"); 158 + try jw.write(run.id); 159 + 160 + try jw.objectField("created"); 161 + try jw.write(run.created); 162 + 163 + try jw.objectField("updated"); 164 + try jw.write(run.updated); 165 + 166 + try jw.objectField("name"); 167 + try jw.write(run.name); 168 + 169 + try jw.objectField("flow_id"); 170 + try jw.write(run.flow_id); 171 + 172 + try jw.objectField("state_type"); 173 + try jw.write(run.state_type); 174 + 175 + try jw.objectField("state_name"); 176 + try jw.write(run.state_name); 177 + 178 + try jw.objectField("state_timestamp"); 179 + try jw.write(run.state_timestamp); 180 + 181 + try jw.objectField("parameters"); 182 + try jw.beginWriteRaw(); 183 + try jw.writer.writeAll(run.parameters); 184 + jw.endWriteRaw(); 185 + 186 + try jw.objectField("tags"); 187 + try jw.beginWriteRaw(); 188 + try jw.writer.writeAll(run.tags); 189 + jw.endWriteRaw(); 190 + 191 + try jw.objectField("run_count"); 192 + try jw.write(run.run_count); 193 + 194 + try jw.objectField("expected_start_time"); 195 + if (run.expected_start_time) |t| { 196 + try jw.write(t); 197 + } else { 198 + try jw.write(null); 199 + } 200 + 201 + try jw.objectField("start_time"); 202 + if (run.start_time) |t| { 203 + try jw.write(t); 204 + } else { 205 + try jw.write(null); 206 + } 207 + 208 + try jw.objectField("end_time"); 209 + if (run.end_time) |t| { 210 + try jw.write(t); 211 + } else { 212 + try jw.write(null); 213 + } 214 + 215 + try jw.objectField("total_run_time"); 216 + try jw.write(run.total_run_time); 217 + 218 + try jw.objectField("deployment_id"); 219 + if (run.deployment_id) |d| { 220 + try jw.write(d); 221 + } else { 222 + try jw.write(null); 223 + } 224 + 225 + try jw.objectField("deployment_version"); 226 + if (run.deployment_version) |v| { 227 + try jw.write(v); 228 + } else { 229 + try jw.write(null); 230 + } 231 + 232 + try jw.objectField("work_queue_name"); 233 + if (run.work_queue_name) |n| { 234 + try jw.write(n); 235 + } else { 236 + try jw.write(null); 237 + } 238 + 239 + try jw.objectField("work_queue_id"); 240 + if (run.work_queue_id) |q| { 241 + try jw.write(q); 242 + } else { 243 + try jw.write(null); 244 + } 245 + 246 + try jw.objectField("auto_scheduled"); 247 + try jw.write(run.auto_scheduled); 248 + 249 + try jw.endObject(); 250 + }
+2 -20
src/api/work_pools.zig
··· 11 11 // sub-handlers 12 12 const queues = @import("work_pool_queues.zig"); 13 13 const workers = @import("work_pool_workers.zig"); 14 + const schedule = @import("work_pool_schedule.zig"); 14 15 15 16 pub fn handle(r: zap.Request) !void { 16 17 const target = r.path orelse "/"; ··· 38 39 39 40 // POST /work_pools/{name}/get_scheduled_flow_runs 40 41 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/get_scheduled_flow_runs")) { 41 - try getScheduledFlowRuns(r, target); 42 + try schedule.handle(r, target); 42 43 return; 43 44 } 44 45 ··· 461 462 jw.endArray() catch {}; 462 463 463 464 json_util.send(r, output.toOwnedSlice() catch "[]"); 464 - } 465 - 466 - fn getScheduledFlowRuns(r: zap.Request, target: []const u8) !void { 467 - var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 468 - defer arena.deinit(); 469 - const alloc = arena.allocator(); 470 - 471 - const pool_name = extractPoolName(target) orelse { 472 - json_util.sendStatus(r, "{\"detail\":\"work pool name required\"}", .bad_request); 473 - return; 474 - }; 475 - 476 - _ = db.work_pools.getByName(alloc, pool_name) catch null orelse { 477 - json_util.sendStatus(r, "{\"detail\":\"Work pool not found.\"}", .not_found); 478 - return; 479 - }; 480 - 481 - // TODO: Implement when deployments are added 482 - json_util.send(r, "[]"); 483 465 } 484 466 485 467 // JSON serialization
+14
src/db/deployments.zig
··· 356 356 } 357 357 return 0; 358 358 } 359 + 360 + /// Mark deployments as READY for the given work queue IDs 361 + /// Called when workers poll for scheduled runs 362 + pub fn markReadyByWorkQueues(work_queue_ids: []const []const u8, updated: []const u8) !void { 363 + for (work_queue_ids) |queue_id| { 364 + backend.db.exec( 365 + "UPDATE deployment SET status = 'READY', last_polled = ?, updated = ? WHERE work_queue_id = ? AND status = 'NOT_READY'", 366 + .{ updated, updated, queue_id }, 367 + ) catch |err| { 368 + log.err("database", "mark deployments ready error: {}", .{err}); 369 + continue; 370 + }; 371 + } 372 + }
+86
src/db/flow_runs.zig
··· 330 330 return results.toOwnedSlice(alloc); 331 331 } 332 332 333 + /// Get scheduled flow runs for a work queue with optional time filter 334 + pub fn getScheduledByWorkQueue( 335 + alloc: Allocator, 336 + work_queue_id: []const u8, 337 + scheduled_before: ?[]const u8, 338 + limit: usize, 339 + ) ![]FlowRunRow { 340 + var results = std.ArrayListUnmanaged(FlowRunRow){}; 341 + errdefer results.deinit(alloc); 342 + 343 + if (scheduled_before) |before| { 344 + var rows = backend.db.query( 345 + \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 346 + \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time, 347 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled 348 + \\FROM flow_run WHERE work_queue_id = ? AND state_type = 'SCHEDULED' 349 + \\ AND (expected_start_time IS NULL OR expected_start_time <= ?) 350 + \\ORDER BY expected_start_time ASC LIMIT ? 351 + , .{ work_queue_id, before, @as(i64, @intCast(limit)) }) catch |err| { 352 + log.err("database", "get scheduled flow_runs by queue error: {}", .{err}); 353 + return err; 354 + }; 355 + defer rows.deinit(); 356 + 357 + while (rows.next()) |r| { 358 + try results.append(alloc, rowToFlowRun(alloc, r)); 359 + } 360 + } else { 361 + var rows = backend.db.query( 362 + \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 363 + \\ parameters, tags, run_count, expected_start_time, start_time, end_time, total_run_time, 364 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled 365 + \\FROM flow_run WHERE work_queue_id = ? AND state_type = 'SCHEDULED' 366 + \\ORDER BY expected_start_time ASC LIMIT ? 367 + , .{ work_queue_id, @as(i64, @intCast(limit)) }) catch |err| { 368 + log.err("database", "get scheduled flow_runs by queue error: {}", .{err}); 369 + return err; 370 + }; 371 + defer rows.deinit(); 372 + 373 + while (rows.next()) |r| { 374 + try results.append(alloc, rowToFlowRun(alloc, r)); 375 + } 376 + } 377 + 378 + return results.toOwnedSlice(alloc); 379 + } 380 + 381 + /// Get scheduled flow runs for multiple work queues 382 + pub fn getScheduledByWorkQueues( 383 + alloc: Allocator, 384 + work_queue_ids: []const []const u8, 385 + scheduled_before: ?[]const u8, 386 + limit: usize, 387 + ) ![]FlowRunRow { 388 + var results = std.ArrayListUnmanaged(FlowRunRow){}; 389 + errdefer results.deinit(alloc); 390 + 391 + // Query each queue and combine results 392 + for (work_queue_ids) |queue_id| { 393 + const queue_runs = try getScheduledByWorkQueue(alloc, queue_id, scheduled_before, limit); 394 + for (queue_runs) |run| { 395 + try results.append(alloc, run); 396 + if (results.items.len >= limit) break; 397 + } 398 + if (results.items.len >= limit) break; 399 + } 400 + 401 + // Sort by expected_start_time 402 + const items = results.items; 403 + std.mem.sort(FlowRunRow, items, {}, struct { 404 + fn lessThan(_: void, a: FlowRunRow, b: FlowRunRow) bool { 405 + const a_time = a.expected_start_time orelse ""; 406 + const b_time = b.expected_start_time orelse ""; 407 + return std.mem.lessThan(u8, a_time, b_time); 408 + } 409 + }.lessThan); 410 + 411 + // Trim to limit 412 + if (items.len > limit) { 413 + results.shrinkRetainingCapacity(limit); 414 + } 415 + 416 + return results.toOwnedSlice(alloc); 417 + } 418 + 333 419 fn rowToFlowRun(alloc: Allocator, r: anytype) FlowRunRow { 334 420 return .{ 335 421 .id = alloc.dupe(u8, r.text(0)) catch "",
+18 -18
src/db/schema/postgres.zig
··· 9 9 try backend.db.exec( 10 10 \\CREATE TABLE IF NOT EXISTS flow ( 11 11 \\ id TEXT PRIMARY KEY, 12 - \\ created TEXT DEFAULT NOW()::TEXT, 13 - \\ updated TEXT DEFAULT NOW()::TEXT, 12 + \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 13 + \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 14 14 \\ name TEXT NOT NULL UNIQUE, 15 15 \\ tags JSONB DEFAULT '[]' 16 16 \\) ··· 22 22 try backend.db.exec( 23 23 \\CREATE TABLE IF NOT EXISTS flow_run ( 24 24 \\ id TEXT PRIMARY KEY, 25 - \\ created TEXT DEFAULT NOW()::TEXT, 26 - \\ updated TEXT DEFAULT NOW()::TEXT, 25 + \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 26 + \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 27 27 \\ flow_id TEXT REFERENCES flow(id), 28 28 \\ name TEXT NOT NULL, 29 29 \\ parameters JSONB DEFAULT '{}', ··· 49 49 try backend.db.exec( 50 50 \\CREATE TABLE IF NOT EXISTS flow_run_state ( 51 51 \\ id TEXT PRIMARY KEY, 52 - \\ created TEXT DEFAULT NOW()::TEXT, 52 + \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 53 53 \\ flow_run_id TEXT REFERENCES flow_run(id) ON DELETE CASCADE, 54 54 \\ type TEXT NOT NULL, 55 55 \\ name TEXT NOT NULL, 56 - \\ timestamp TEXT DEFAULT NOW()::TEXT 56 + \\ timestamp TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') 57 57 \\) 58 58 , .{}); 59 59 ··· 61 61 try backend.db.exec( 62 62 \\CREATE TABLE IF NOT EXISTS task_run ( 63 63 \\ id TEXT PRIMARY KEY, 64 - \\ created TEXT DEFAULT NOW()::TEXT, 65 - \\ updated TEXT DEFAULT NOW()::TEXT, 64 + \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 65 + \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 66 66 \\ flow_run_id TEXT REFERENCES flow_run(id), 67 67 \\ name TEXT NOT NULL, 68 68 \\ task_key TEXT NOT NULL, ··· 85 85 try backend.db.exec( 86 86 \\CREATE TABLE IF NOT EXISTS events ( 87 87 \\ id TEXT PRIMARY KEY, 88 - \\ created TEXT DEFAULT NOW()::TEXT, 89 - \\ updated TEXT DEFAULT NOW()::TEXT, 88 + \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 89 + \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 90 90 \\ occurred TEXT NOT NULL, 91 91 \\ event TEXT NOT NULL, 92 92 \\ resource_id TEXT NOT NULL, ··· 105 105 try backend.db.exec( 106 106 \\CREATE TABLE IF NOT EXISTS block_type ( 107 107 \\ id TEXT PRIMARY KEY, 108 - \\ created TEXT DEFAULT NOW()::TEXT, 109 - \\ updated TEXT DEFAULT NOW()::TEXT, 108 + \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 109 + \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 110 110 \\ name TEXT NOT NULL, 111 111 \\ slug TEXT NOT NULL UNIQUE, 112 112 \\ logo_url TEXT, ··· 121 121 try backend.db.exec( 122 122 \\CREATE TABLE IF NOT EXISTS block_schema ( 123 123 \\ id TEXT PRIMARY KEY, 124 - \\ created TEXT DEFAULT NOW()::TEXT, 125 - \\ updated TEXT DEFAULT NOW()::TEXT, 124 + \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 125 + \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 126 126 \\ checksum TEXT NOT NULL, 127 127 \\ fields JSONB NOT NULL DEFAULT '{}', 128 128 \\ capabilities JSONB NOT NULL DEFAULT '[]', ··· 137 137 try backend.db.exec( 138 138 \\CREATE TABLE IF NOT EXISTS block_document ( 139 139 \\ id TEXT PRIMARY KEY, 140 - \\ created TEXT DEFAULT NOW()::TEXT, 141 - \\ updated TEXT DEFAULT NOW()::TEXT, 140 + \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 141 + \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 142 142 \\ name TEXT, 143 143 \\ data JSONB NOT NULL DEFAULT '{}', 144 144 \\ is_anonymous INTEGER DEFAULT 0, ··· 153 153 try backend.db.exec( 154 154 \\CREATE TABLE IF NOT EXISTS variable ( 155 155 \\ id TEXT PRIMARY KEY, 156 - \\ created TEXT DEFAULT NOW()::TEXT, 157 - \\ updated TEXT DEFAULT NOW()::TEXT, 156 + \\ created TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 157 + \\ updated TEXT DEFAULT TO_CHAR(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), 158 158 \\ name TEXT NOT NULL UNIQUE, 159 159 \\ value JSONB DEFAULT 'null', 160 160 \\ tags JSONB DEFAULT '[]'
+18 -18
src/db/schema/sqlite.zig
··· 8 8 try backend.db.exec( 9 9 \\CREATE TABLE IF NOT EXISTS flow ( 10 10 \\ id TEXT PRIMARY KEY, 11 - \\ created TEXT DEFAULT (datetime('now')), 12 - \\ updated TEXT DEFAULT (datetime('now')), 11 + \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 12 + \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 13 13 \\ name TEXT NOT NULL UNIQUE, 14 14 \\ tags TEXT DEFAULT '[]' 15 15 \\) ··· 19 19 try backend.db.exec( 20 20 \\CREATE TABLE IF NOT EXISTS flow_run ( 21 21 \\ id TEXT PRIMARY KEY, 22 - \\ created TEXT DEFAULT (datetime('now')), 23 - \\ updated TEXT DEFAULT (datetime('now')), 22 + \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 23 + \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 24 24 \\ flow_id TEXT REFERENCES flow(id), 25 25 \\ name TEXT NOT NULL, 26 26 \\ parameters TEXT DEFAULT '{}', ··· 46 46 try backend.db.exec( 47 47 \\CREATE TABLE IF NOT EXISTS flow_run_state ( 48 48 \\ id TEXT PRIMARY KEY, 49 - \\ created TEXT DEFAULT (datetime('now')), 49 + \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 50 50 \\ flow_run_id TEXT REFERENCES flow_run(id) ON DELETE CASCADE, 51 51 \\ type TEXT NOT NULL, 52 52 \\ name TEXT NOT NULL, 53 - \\ timestamp TEXT DEFAULT (datetime('now')) 53 + \\ timestamp TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')) 54 54 \\) 55 55 , .{}); 56 56 ··· 58 58 try backend.db.exec( 59 59 \\CREATE TABLE IF NOT EXISTS task_run ( 60 60 \\ id TEXT PRIMARY KEY, 61 - \\ created TEXT DEFAULT (datetime('now')), 62 - \\ updated TEXT DEFAULT (datetime('now')), 61 + \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 62 + \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 63 63 \\ flow_run_id TEXT REFERENCES flow_run(id), 64 64 \\ name TEXT NOT NULL, 65 65 \\ task_key TEXT NOT NULL, ··· 82 82 try backend.db.exec( 83 83 \\CREATE TABLE IF NOT EXISTS events ( 84 84 \\ id TEXT PRIMARY KEY, 85 - \\ created TEXT DEFAULT (datetime('now')), 86 - \\ updated TEXT DEFAULT (datetime('now')), 85 + \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 86 + \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 87 87 \\ occurred TEXT NOT NULL, 88 88 \\ event TEXT NOT NULL, 89 89 \\ resource_id TEXT NOT NULL, ··· 101 101 try backend.db.exec( 102 102 \\CREATE TABLE IF NOT EXISTS block_type ( 103 103 \\ id TEXT PRIMARY KEY, 104 - \\ created TEXT DEFAULT (datetime('now')), 105 - \\ updated TEXT DEFAULT (datetime('now')), 104 + \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 105 + \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 106 106 \\ name TEXT NOT NULL, 107 107 \\ slug TEXT NOT NULL UNIQUE, 108 108 \\ logo_url TEXT, ··· 117 117 try backend.db.exec( 118 118 \\CREATE TABLE IF NOT EXISTS block_schema ( 119 119 \\ id TEXT PRIMARY KEY, 120 - \\ created TEXT DEFAULT (datetime('now')), 121 - \\ updated TEXT DEFAULT (datetime('now')), 120 + \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 121 + \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 122 122 \\ checksum TEXT NOT NULL, 123 123 \\ fields TEXT NOT NULL DEFAULT '{}', 124 124 \\ capabilities TEXT NOT NULL DEFAULT '[]', ··· 132 132 try backend.db.exec( 133 133 \\CREATE TABLE IF NOT EXISTS block_document ( 134 134 \\ id TEXT PRIMARY KEY, 135 - \\ created TEXT DEFAULT (datetime('now')), 136 - \\ updated TEXT DEFAULT (datetime('now')), 135 + \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 136 + \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 137 137 \\ name TEXT, 138 138 \\ data TEXT NOT NULL DEFAULT '{}', 139 139 \\ is_anonymous INTEGER DEFAULT 0, ··· 148 148 try backend.db.exec( 149 149 \\CREATE TABLE IF NOT EXISTS variable ( 150 150 \\ id TEXT PRIMARY KEY, 151 - \\ created TEXT DEFAULT (datetime('now')), 152 - \\ updated TEXT DEFAULT (datetime('now')), 151 + \\ created TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 152 + \\ updated TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), 153 153 \\ name TEXT NOT NULL UNIQUE, 154 154 \\ value TEXT DEFAULT 'null', 155 155 \\ tags TEXT DEFAULT '[]'
+138 -1
src/utilities/time.zig
··· 1 1 const std = @import("std"); 2 2 3 + /// Microseconds since Unix epoch 4 + pub const Timestamp = i64; 5 + 3 6 /// Generate ISO 8601 timestamp with microseconds (e.g., 2025-01-17T12:34:56.123456Z) 4 7 pub fn timestamp(buf: *[32]u8) []const u8 { 5 - const ts_us = std.time.microTimestamp(); 8 + return formatMicros(buf, std.time.microTimestamp()); 9 + } 10 + 11 + /// Format microseconds since epoch as ISO 8601 string 12 + pub fn formatMicros(buf: *[32]u8, ts_us: i64) []const u8 { 6 13 const epoch_us: u64 = @intCast(ts_us); 7 14 const epoch_secs = epoch_us / 1_000_000; 8 15 const micros = epoch_us % 1_000_000; ··· 42 49 }) catch "2025-01-17T00:00:00.000000Z"; 43 50 } 44 51 52 + /// Get current time as microseconds since epoch 53 + pub fn nowMicros() Timestamp { 54 + return std.time.microTimestamp(); 55 + } 56 + 57 + /// Parse ISO 8601 timestamp to microseconds since epoch. 58 + /// Handles multiple formats: 59 + /// - 2025-01-17T12:34:56.123456Z (full ISO 8601) 60 + /// - 2025-01-17T12:34:56Z (no microseconds) 61 + /// - 2025-01-17 12:34:56.123456 (space separator, no Z) 62 + /// - 2025-01-17 12:34:56 (SQLite datetime format) 63 + /// - 2025-01-17T12:34:56.123456+00:00 (with timezone offset) 64 + pub fn parse(s: []const u8) ?Timestamp { 65 + if (s.len < 19) return null; 66 + 67 + // Parse date: YYYY-MM-DD 68 + const year = std.fmt.parseInt(u64, s[0..4], 10) catch return null; 69 + if (s[4] != '-') return null; 70 + const month = std.fmt.parseInt(u64, s[5..7], 10) catch return null; 71 + if (s[7] != '-') return null; 72 + const day = std.fmt.parseInt(u64, s[8..10], 10) catch return null; 73 + 74 + // Accept either 'T' or space as separator 75 + if (s[10] != 'T' and s[10] != ' ') return null; 76 + 77 + // Parse time: HH:MM:SS 78 + const hours = std.fmt.parseInt(u64, s[11..13], 10) catch return null; 79 + if (s[13] != ':') return null; 80 + const mins = std.fmt.parseInt(u64, s[14..16], 10) catch return null; 81 + if (s[16] != ':') return null; 82 + const secs = std.fmt.parseInt(u64, s[17..19], 10) catch return null; 83 + 84 + // Parse optional microseconds 85 + var micros: u64 = 0; 86 + var pos: usize = 19; 87 + if (pos < s.len and s[pos] == '.') { 88 + pos += 1; 89 + var frac_digits: usize = 0; 90 + var frac_value: u64 = 0; 91 + while (pos < s.len and s[pos] >= '0' and s[pos] <= '9') { 92 + frac_value = frac_value * 10 + (s[pos] - '0'); 93 + frac_digits += 1; 94 + pos += 1; 95 + } 96 + // Normalize to 6 digits (microseconds) 97 + while (frac_digits < 6) : (frac_digits += 1) { 98 + frac_value *= 10; 99 + } 100 + while (frac_digits > 6) : (frac_digits -= 1) { 101 + frac_value /= 10; 102 + } 103 + micros = frac_value; 104 + } 105 + 106 + // Ignore timezone suffix (Z, +00:00, etc.) - we treat all as UTC 107 + 108 + // Convert to microseconds since epoch 109 + const days_before_year = daysSinceEpoch(year, 1, 1); 110 + const days_before_month = daysInMonthsBefore(year, month); 111 + const total_days = days_before_year + days_before_month + (day - 1); 112 + const total_secs = total_days * 86400 + hours * 3600 + mins * 60 + secs; 113 + 114 + return @as(Timestamp, @intCast(total_secs * 1_000_000 + micros)); 115 + } 116 + 117 + /// Compare two timestamp strings. Returns true if a <= b. 118 + /// Handles mixed formats by parsing to microseconds. 119 + pub fn lessOrEqual(a: []const u8, b: []const u8) bool { 120 + const a_us = parse(a) orelse return true; // treat parse failure as "earliest" 121 + const b_us = parse(b) orelse return false; 122 + return a_us <= b_us; 123 + } 124 + 125 + /// Compare timestamp string with current time. Returns true if ts <= now. 126 + pub fn isPast(ts: []const u8) bool { 127 + const ts_us = parse(ts) orelse return true; 128 + return ts_us <= nowMicros(); 129 + } 130 + 45 131 fn isLeapYear(year: u64) bool { 46 132 return (year % 4 == 0 and year % 100 != 0) or (year % 400 == 0); 47 133 } 134 + 135 + fn daysSinceEpoch(year: u64, month: u64, day: u64) u64 { 136 + _ = month; 137 + _ = day; 138 + var days: u64 = 0; 139 + var y: u64 = 1970; 140 + while (y < year) : (y += 1) { 141 + days += if (isLeapYear(y)) 366 else 365; 142 + } 143 + return days; 144 + } 145 + 146 + fn daysInMonthsBefore(year: u64, month: u64) u64 { 147 + const month_days = if (isLeapYear(year)) 148 + [_]u64{ 31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31 } 149 + else 150 + [_]u64{ 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31 }; 151 + 152 + var days: u64 = 0; 153 + var m: usize = 0; 154 + while (m + 1 < month) : (m += 1) { 155 + days += month_days[m]; 156 + } 157 + return days; 158 + } 159 + 160 + // Tests 161 + test "parse ISO 8601 full format" { 162 + const ts = parse("2025-01-17T12:34:56.123456Z"); 163 + try std.testing.expect(ts != null); 164 + } 165 + 166 + test "parse SQLite datetime format" { 167 + const ts = parse("2025-01-17 12:34:56"); 168 + try std.testing.expect(ts != null); 169 + } 170 + 171 + test "parse with timezone offset" { 172 + const ts = parse("2025-01-17T12:34:56.123456+00:00"); 173 + try std.testing.expect(ts != null); 174 + } 175 + 176 + test "timestamp roundtrip" { 177 + var buf: [32]u8 = undefined; 178 + const now_us = nowMicros(); 179 + const formatted = formatMicros(&buf, now_us); 180 + const parsed = parse(formatted); 181 + try std.testing.expect(parsed != null); 182 + // Allow 1 microsecond difference for rounding 183 + try std.testing.expect(@abs(parsed.? - now_us) <= 1); 184 + }