prefect server in zig
1const std = @import("std");
2const Allocator = std.mem.Allocator;
3
4const backend = @import("backend.zig");
5const log = @import("../logging.zig");
6const time_util = @import("../utilities/time.zig");
7
8pub const FlowRunRow = struct {
9 id: []const u8,
10 created: []const u8,
11 updated: []const u8,
12 flow_id: []const u8,
13 name: []const u8,
14 state_type: []const u8,
15 state_name: []const u8,
16 state_timestamp: []const u8,
17 parameters: []const u8,
18 tags: []const u8,
19 run_count: i64,
20 expected_start_time: ?[]const u8,
21 next_scheduled_start_time: ?[]const u8,
22 start_time: ?[]const u8,
23 end_time: ?[]const u8,
24 total_run_time: f64,
25 // deployment fields
26 deployment_id: ?[]const u8,
27 deployment_version: ?[]const u8,
28 work_queue_name: ?[]const u8,
29 work_queue_id: ?[]const u8,
30 auto_scheduled: bool,
31 idempotency_key: ?[]const u8,
32 // retry policy (empirical_policy JSON)
33 empirical_policy: []const u8,
34 // idempotency: transition_id from current state
35 state_transition_id: ?[]const u8,
36};
37
38pub const InsertParams = struct {
39 deployment_id: ?[]const u8 = null,
40 deployment_version: ?[]const u8 = null,
41 work_queue_name: ?[]const u8 = null,
42 work_queue_id: ?[]const u8 = null,
43 auto_scheduled: bool = false,
44 expected_start_time: ?[]const u8 = null,
45 next_scheduled_start_time: ?[]const u8 = null,
46 idempotency_key: ?[]const u8 = null,
47 parameters: ?[]const u8 = null,
48 empirical_policy: ?[]const u8 = null, // JSON: {"retries": N, "retry_delay": N}
49};
50
51pub fn insert(
52 id: []const u8,
53 flow_id: []const u8,
54 name: []const u8,
55 state_type: []const u8,
56 state_name: []const u8,
57 timestamp: []const u8,
58 params: InsertParams,
59) !void {
60 backend.db.exec(
61 \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp,
62 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled,
63 \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters, empirical_policy)
64 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
65 , .{
66 id,
67 flow_id,
68 name,
69 state_type,
70 state_name,
71 timestamp,
72 params.deployment_id,
73 params.deployment_version,
74 params.work_queue_name,
75 params.work_queue_id,
76 @as(i64, if (params.auto_scheduled) 1 else 0),
77 params.expected_start_time,
78 params.next_scheduled_start_time,
79 params.idempotency_key,
80 params.parameters orelse "{}",
81 params.empirical_policy orelse "{}",
82 }) catch |err| {
83 log.err("database", "insert flow_run error: {}", .{err});
84 return err;
85 };
86}
87
88/// Insert a flow run idempotently - silently ignores duplicates based on (flow_id, idempotency_key).
89/// Used by the scheduler to safely create runs without duplicates.
90pub fn insertOrIgnore(
91 id: []const u8,
92 flow_id: []const u8,
93 name: []const u8,
94 state_type: []const u8,
95 state_name: []const u8,
96 timestamp: []const u8,
97 params: InsertParams,
98) !bool {
99 // Use INSERT OR IGNORE for SQLite, INSERT ... ON CONFLICT DO NOTHING for PostgreSQL
100 const sql = if (backend.db.dialect == .sqlite)
101 \\INSERT OR IGNORE INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp,
102 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled,
103 \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters, empirical_policy)
104 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
105 else
106 \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp,
107 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled,
108 \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters, empirical_policy)
109 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
110 \\ON CONFLICT (flow_id, idempotency_key) WHERE idempotency_key IS NOT NULL DO NOTHING
111 ;
112
113 backend.db.exec(sql, .{
114 id,
115 flow_id,
116 name,
117 state_type,
118 state_name,
119 timestamp,
120 params.deployment_id,
121 params.deployment_version,
122 params.work_queue_name,
123 params.work_queue_id,
124 @as(i64, if (params.auto_scheduled) 1 else 0),
125 params.expected_start_time,
126 params.next_scheduled_start_time,
127 params.idempotency_key,
128 params.parameters orelse "{}",
129 params.empirical_policy orelse "{}",
130 }) catch |err| {
131 log.err("database", "insertOrIgnore flow_run error: {}", .{err});
132 return err;
133 };
134
135 // For SQLite, check if insert happened via changes()
136 // For PostgreSQL, we'd need to check affected rows - for now assume success
137 return true;
138}
139
140pub fn get(alloc: Allocator, id: []const u8) !?FlowRunRow {
141 var rows = backend.db.query(
142 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp,
143 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time,
144 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id
145 \\FROM flow_run WHERE id = ?
146 , .{id}) catch return null;
147 defer rows.deinit();
148
149 if (rows.next()) |row| {
150 return rowToFlowRun(alloc, row);
151 }
152 return null;
153}
154
155pub fn setState(
156 run_id: []const u8,
157 state_id: []const u8,
158 state_type: []const u8,
159 state_name: []const u8,
160 timestamp: []const u8,
161 start_time: ?[]const u8,
162 end_time: ?[]const u8,
163 run_count: i64,
164 total_run_time: f64,
165 expected_start_time: ?[]const u8,
166 state_transition_id: ?[]const u8,
167) !void {
168 // Lock mutex only for SQLite (postgres pool handles concurrency)
169 if (backend.db.dialect == .sqlite) {
170 backend.db.mutex.lock();
171 }
172 defer if (backend.db.dialect == .sqlite) {
173 backend.db.mutex.unlock();
174 };
175
176 // Begin transaction - for postgres this acquires a dedicated connection
177 var txn = backend.db.beginTransaction() catch |err| {
178 log.err("database", "begin transaction error: {}", .{err});
179 return err;
180 };
181 errdefer txn.rollback();
182
183 // Execute within transaction (uses same connection for postgres)
184 // Only update expected_start_time if provided (from CopyScheduledTime rule)
185 if (expected_start_time) |est| {
186 txn.exec(
187 \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?,
188 \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?, expected_start_time = ?, state_transition_id = ?
189 \\WHERE id = ?
190 , .{
191 state_id, state_type, state_name, timestamp, timestamp,
192 start_time, end_time, run_count, total_run_time, est,
193 state_transition_id, run_id,
194 }) catch |err| {
195 log.err("database", "update flow_run error: {}", .{err});
196 return err;
197 };
198 } else {
199 txn.exec(
200 \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?,
201 \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?, state_transition_id = ?
202 \\WHERE id = ?
203 , .{
204 state_id, state_type, state_name, timestamp, timestamp,
205 start_time, end_time, run_count, total_run_time, state_transition_id,
206 run_id,
207 }) catch |err| {
208 log.err("database", "update flow_run error: {}", .{err});
209 return err;
210 };
211 }
212
213 txn.exec(
214 \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp)
215 \\VALUES (?, ?, ?, ?, ?)
216 , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| {
217 log.err("database", "insert flow_run_state error: {}", .{err});
218 return err;
219 };
220
221 txn.commit() catch |err| {
222 log.err("database", "commit error: {}", .{err});
223 return err;
224 };
225}
226
227/// Set flow run state with next_scheduled_start_time (for AwaitingRetry state)
228/// optionally updates empirical_policy if provided
229pub fn setStateWithSchedule(
230 run_id: []const u8,
231 state_id: []const u8,
232 state_type: []const u8,
233 state_name: []const u8,
234 timestamp: []const u8,
235 start_time: ?[]const u8,
236 end_time: ?[]const u8,
237 run_count: i64,
238 total_run_time: f64,
239 expected_start_time: ?[]const u8,
240 next_scheduled_start_time: ?[]const u8,
241 empirical_policy: ?[]const u8,
242 state_transition_id: ?[]const u8,
243) !void {
244 if (backend.db.dialect == .sqlite) {
245 backend.db.mutex.lock();
246 }
247 defer if (backend.db.dialect == .sqlite) {
248 backend.db.mutex.unlock();
249 };
250
251 var txn = backend.db.beginTransaction() catch |err| {
252 log.err("database", "begin transaction error: {}", .{err});
253 return err;
254 };
255 errdefer txn.rollback();
256
257 if (empirical_policy) |policy| {
258 txn.exec(
259 \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?,
260 \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?,
261 \\ expected_start_time = COALESCE(?, expected_start_time),
262 \\ next_scheduled_start_time = ?,
263 \\ empirical_policy = ?,
264 \\ state_transition_id = ?
265 \\WHERE id = ?
266 , .{
267 state_id,
268 state_type,
269 state_name,
270 timestamp,
271 timestamp,
272 start_time,
273 end_time,
274 run_count,
275 total_run_time,
276 expected_start_time,
277 next_scheduled_start_time,
278 policy,
279 state_transition_id,
280 run_id,
281 }) catch |err| {
282 log.err("database", "update flow_run error: {}", .{err});
283 return err;
284 };
285 } else {
286 txn.exec(
287 \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?,
288 \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?,
289 \\ expected_start_time = COALESCE(?, expected_start_time),
290 \\ next_scheduled_start_time = ?,
291 \\ state_transition_id = ?
292 \\WHERE id = ?
293 , .{
294 state_id,
295 state_type,
296 state_name,
297 timestamp,
298 timestamp,
299 start_time,
300 end_time,
301 run_count,
302 total_run_time,
303 expected_start_time,
304 next_scheduled_start_time,
305 state_transition_id,
306 run_id,
307 }) catch |err| {
308 log.err("database", "update flow_run error: {}", .{err});
309 return err;
310 };
311 }
312
313 txn.exec(
314 \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp)
315 \\VALUES (?, ?, ?, ?, ?)
316 , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| {
317 log.err("database", "insert flow_run_state error: {}", .{err});
318 return err;
319 };
320
321 txn.commit() catch |err| {
322 log.err("database", "commit error: {}", .{err});
323 return err;
324 };
325}
326
327pub fn list(alloc: Allocator, limit: usize) ![]FlowRunRow {
328 var results = std.ArrayListUnmanaged(FlowRunRow){};
329 errdefer results.deinit(alloc);
330
331 var rows = backend.db.query(
332 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp,
333 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time,
334 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id
335 \\FROM flow_run ORDER BY created DESC LIMIT ?
336 , .{@as(i64, @intCast(limit))}) catch |err| {
337 log.err("database", "list flow_runs error: {}", .{err});
338 return err;
339 };
340 defer rows.deinit();
341
342 while (rows.next()) |r| {
343 try results.append(alloc, rowToFlowRun(alloc, r));
344 }
345
346 return results.toOwnedSlice(alloc);
347}
348
349/// Get scheduled flow runs for a specific deployment
350pub fn getScheduledByDeployment(alloc: Allocator, deployment_id: []const u8, limit: usize) ![]FlowRunRow {
351 var results = std.ArrayListUnmanaged(FlowRunRow){};
352 errdefer results.deinit(alloc);
353
354 var rows = backend.db.query(
355 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp,
356 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time,
357 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id
358 \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED'
359 \\ORDER BY expected_start_time ASC LIMIT ?
360 , .{ deployment_id, @as(i64, @intCast(limit)) }) catch |err| {
361 log.err("database", "get scheduled flow_runs error: {}", .{err});
362 return err;
363 };
364 defer rows.deinit();
365
366 while (rows.next()) |r| {
367 try results.append(alloc, rowToFlowRun(alloc, r));
368 }
369
370 return results.toOwnedSlice(alloc);
371}
372
373/// Get scheduled flow runs for multiple deployments
374pub fn getScheduledByDeployments(
375 alloc: Allocator,
376 deployment_ids: []const []const u8,
377 scheduled_before: ?[]const u8,
378 limit: usize,
379) ![]FlowRunRow {
380 var results = std.ArrayListUnmanaged(FlowRunRow){};
381 errdefer results.deinit(alloc);
382
383 // Query each deployment and combine results
384 for (deployment_ids) |dep_id| {
385 const dep_runs = try getScheduledByDeploymentBefore(alloc, dep_id, scheduled_before, limit);
386 for (dep_runs) |run| {
387 try results.append(alloc, run);
388 if (results.items.len >= limit) break;
389 }
390 if (results.items.len >= limit) break;
391 }
392
393 // Sort by next_scheduled_start_time using proper timestamp parsing
394 const items = results.items;
395 std.mem.sort(FlowRunRow, items, {}, struct {
396 fn lessThan(_: void, a: FlowRunRow, b: FlowRunRow) bool {
397 const a_time = a.next_scheduled_start_time orelse a.expected_start_time orelse "";
398 const b_time = b.next_scheduled_start_time orelse b.expected_start_time orelse "";
399 // Parse to microseconds for proper comparison
400 const a_us = time_util.parse(a_time) orelse 0;
401 const b_us = time_util.parse(b_time) orelse 0;
402 return a_us < b_us;
403 }
404 }.lessThan);
405
406 // Trim to limit
407 if (items.len > limit) {
408 results.shrinkRetainingCapacity(limit);
409 }
410
411 return results.toOwnedSlice(alloc);
412}
413
414/// Get scheduled flow runs for a deployment with optional time filter.
415/// Filters on next_scheduled_start_time to match Python Prefect behavior.
416fn getScheduledByDeploymentBefore(
417 alloc: Allocator,
418 deployment_id: []const u8,
419 scheduled_before: ?[]const u8,
420 limit: usize,
421) ![]FlowRunRow {
422 var results = std.ArrayListUnmanaged(FlowRunRow){};
423 errdefer results.deinit(alloc);
424
425 if (scheduled_before) |before| {
426 var rows = backend.db.query(
427 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp,
428 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time,
429 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id
430 \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED'
431 \\ AND (next_scheduled_start_time IS NULL OR next_scheduled_start_time <= ?)
432 \\ORDER BY next_scheduled_start_time ASC LIMIT ?
433 , .{ deployment_id, before, @as(i64, @intCast(limit)) }) catch |err| {
434 log.err("database", "get scheduled flow_runs error: {}", .{err});
435 return err;
436 };
437 defer rows.deinit();
438
439 while (rows.next()) |r| {
440 try results.append(alloc, rowToFlowRun(alloc, r));
441 }
442 } else {
443 var rows = backend.db.query(
444 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp,
445 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time,
446 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id
447 \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED'
448 \\ORDER BY next_scheduled_start_time ASC LIMIT ?
449 , .{ deployment_id, @as(i64, @intCast(limit)) }) catch |err| {
450 log.err("database", "get scheduled flow_runs error: {}", .{err});
451 return err;
452 };
453 defer rows.deinit();
454
455 while (rows.next()) |r| {
456 try results.append(alloc, rowToFlowRun(alloc, r));
457 }
458 }
459
460 return results.toOwnedSlice(alloc);
461}
462
463/// Get scheduled flow runs for a work queue with optional time filter.
464/// Filters on next_scheduled_start_time to match Python Prefect behavior.
465pub fn getScheduledByWorkQueue(
466 alloc: Allocator,
467 work_queue_id: []const u8,
468 scheduled_before: ?[]const u8,
469 limit: usize,
470) ![]FlowRunRow {
471 var results = std.ArrayListUnmanaged(FlowRunRow){};
472 errdefer results.deinit(alloc);
473
474 if (scheduled_before) |before| {
475 var rows = backend.db.query(
476 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp,
477 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time,
478 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id
479 \\FROM flow_run WHERE work_queue_id = ? AND state_type = 'SCHEDULED'
480 \\ AND (next_scheduled_start_time IS NULL OR next_scheduled_start_time <= ?)
481 \\ORDER BY next_scheduled_start_time ASC LIMIT ?
482 , .{ work_queue_id, before, @as(i64, @intCast(limit)) }) catch |err| {
483 log.err("database", "get scheduled flow_runs by queue error: {}", .{err});
484 return err;
485 };
486 defer rows.deinit();
487
488 while (rows.next()) |r| {
489 try results.append(alloc, rowToFlowRun(alloc, r));
490 }
491 } else {
492 var rows = backend.db.query(
493 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp,
494 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time,
495 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id
496 \\FROM flow_run WHERE work_queue_id = ? AND state_type = 'SCHEDULED'
497 \\ORDER BY next_scheduled_start_time ASC LIMIT ?
498 , .{ work_queue_id, @as(i64, @intCast(limit)) }) catch |err| {
499 log.err("database", "get scheduled flow_runs by queue error: {}", .{err});
500 return err;
501 };
502 defer rows.deinit();
503
504 while (rows.next()) |r| {
505 try results.append(alloc, rowToFlowRun(alloc, r));
506 }
507 }
508
509 return results.toOwnedSlice(alloc);
510}
511
512/// Get scheduled flow runs for multiple work queues
513pub fn getScheduledByWorkQueues(
514 alloc: Allocator,
515 work_queue_ids: []const []const u8,
516 scheduled_before: ?[]const u8,
517 limit: usize,
518) ![]FlowRunRow {
519 var results = std.ArrayListUnmanaged(FlowRunRow){};
520 errdefer results.deinit(alloc);
521
522 // Query each queue and combine results
523 for (work_queue_ids) |queue_id| {
524 const queue_runs = try getScheduledByWorkQueue(alloc, queue_id, scheduled_before, limit);
525 for (queue_runs) |run| {
526 try results.append(alloc, run);
527 if (results.items.len >= limit) break;
528 }
529 if (results.items.len >= limit) break;
530 }
531
532 // Sort by next_scheduled_start_time using proper timestamp parsing
533 const items = results.items;
534 std.mem.sort(FlowRunRow, items, {}, struct {
535 fn lessThan(_: void, a: FlowRunRow, b: FlowRunRow) bool {
536 const a_time = a.next_scheduled_start_time orelse a.expected_start_time orelse "";
537 const b_time = b.next_scheduled_start_time orelse b.expected_start_time orelse "";
538 // Parse to microseconds for proper comparison
539 const a_us = time_util.parse(a_time) orelse 0;
540 const b_us = time_util.parse(b_time) orelse 0;
541 return a_us < b_us;
542 }
543 }.lessThan);
544
545 // Trim to limit
546 if (items.len > limit) {
547 results.shrinkRetainingCapacity(limit);
548 }
549
550 return results.toOwnedSlice(alloc);
551}
552
553fn rowToFlowRun(alloc: Allocator, r: anytype) FlowRunRow {
554 return .{
555 .id = alloc.dupe(u8, r.text(0)) catch "",
556 .created = alloc.dupe(u8, r.text(1)) catch "",
557 .updated = alloc.dupe(u8, r.text(2)) catch "",
558 .flow_id = alloc.dupe(u8, r.text(3)) catch "",
559 .name = alloc.dupe(u8, r.text(4)) catch "",
560 .state_type = alloc.dupe(u8, r.text(5)) catch "",
561 .state_name = alloc.dupe(u8, r.text(6)) catch "",
562 .state_timestamp = alloc.dupe(u8, r.text(7)) catch "",
563 .parameters = alloc.dupe(u8, r.text(8)) catch "{}",
564 .tags = alloc.dupe(u8, r.text(9)) catch "[]",
565 .run_count = r.bigint(10),
566 .expected_start_time = if (r.text(11).len > 0) alloc.dupe(u8, r.text(11)) catch null else null,
567 .next_scheduled_start_time = if (r.text(12).len > 0) alloc.dupe(u8, r.text(12)) catch null else null,
568 .start_time = if (r.text(13).len > 0) alloc.dupe(u8, r.text(13)) catch null else null,
569 .end_time = if (r.text(14).len > 0) alloc.dupe(u8, r.text(14)) catch null else null,
570 .total_run_time = r.float(15),
571 .deployment_id = if (r.text(16).len > 0) alloc.dupe(u8, r.text(16)) catch null else null,
572 .deployment_version = if (r.text(17).len > 0) alloc.dupe(u8, r.text(17)) catch null else null,
573 .work_queue_name = if (r.text(18).len > 0) alloc.dupe(u8, r.text(18)) catch null else null,
574 .work_queue_id = if (r.text(19).len > 0) alloc.dupe(u8, r.text(19)) catch null else null,
575 .auto_scheduled = r.int(20) != 0,
576 .idempotency_key = if (r.text(21).len > 0) alloc.dupe(u8, r.text(21)) catch null else null,
577 .empirical_policy = alloc.dupe(u8, r.text(22)) catch "{}",
578 .state_transition_id = if (r.text(23).len > 0) alloc.dupe(u8, r.text(23)) catch null else null,
579 };
580}
581
582/// Patch a flow run with optional fields
583/// Currently supports: infrastructure_pid
584pub fn patch(id: []const u8, infrastructure_pid: ?[]const u8) !void {
585 // For now, just accept the PATCH request without actually updating
586 // The runner uses PATCH to set infrastructure metadata, but we can
587 // skip this for basic functionality - the state transitions are what matter
588 _ = id;
589 _ = infrastructure_pid;
590 // TODO: add infrastructure_pid column and update it
591}
592
593/// Get late flow runs (SCHEDULED with next_scheduled_start_time before cutoff).
594pub fn getLateRuns(alloc: Allocator, cutoff_timestamp: []const u8, limit: usize) ![]FlowRunRow {
595 var results = std.ArrayListUnmanaged(FlowRunRow){};
596 errdefer results.deinit(alloc);
597 var rows = backend.db.query(
598 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp,
599 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time,
600 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id
601 \\FROM flow_run WHERE state_type = 'SCHEDULED' AND state_name = 'Scheduled'
602 \\ AND next_scheduled_start_time IS NOT NULL AND next_scheduled_start_time <= ?
603 \\ORDER BY next_scheduled_start_time ASC LIMIT ?
604 , .{ cutoff_timestamp, @as(i64, @intCast(limit)) }) catch |err| {
605 log.err("database", "get late flow_runs error: {}", .{err});
606 return err;
607 };
608 defer rows.deinit();
609 while (rows.next()) |r| try results.append(alloc, rowToFlowRun(alloc, r));
610 return results.toOwnedSlice(alloc);
611}
612
613pub fn delete(id: []const u8) !bool {
614 // check if flow run exists
615 var rows = backend.db.query(
616 "SELECT id FROM flow_run WHERE id = ?",
617 .{id},
618 ) catch return false;
619 defer rows.deinit();
620
621 if (rows.next() == null) {
622 return false;
623 }
624
625 // delete associated state records first
626 backend.db.exec(
627 "DELETE FROM flow_run_state WHERE flow_run_id = ?",
628 .{id},
629 ) catch |err| {
630 log.err("database", "delete flow_run_state error: {}", .{err});
631 return false;
632 };
633
634 // delete the flow run
635 backend.db.exec(
636 "DELETE FROM flow_run WHERE id = ?",
637 .{id},
638 ) catch |err| {
639 log.err("database", "delete flow_run error: {}", .{err});
640 return false;
641 };
642
643 return true;
644}