prefect server in zig
at main 644 lines 26 kB view raw
1const std = @import("std"); 2const Allocator = std.mem.Allocator; 3 4const backend = @import("backend.zig"); 5const log = @import("../logging.zig"); 6const time_util = @import("../utilities/time.zig"); 7 8pub const FlowRunRow = struct { 9 id: []const u8, 10 created: []const u8, 11 updated: []const u8, 12 flow_id: []const u8, 13 name: []const u8, 14 state_type: []const u8, 15 state_name: []const u8, 16 state_timestamp: []const u8, 17 parameters: []const u8, 18 tags: []const u8, 19 run_count: i64, 20 expected_start_time: ?[]const u8, 21 next_scheduled_start_time: ?[]const u8, 22 start_time: ?[]const u8, 23 end_time: ?[]const u8, 24 total_run_time: f64, 25 // deployment fields 26 deployment_id: ?[]const u8, 27 deployment_version: ?[]const u8, 28 work_queue_name: ?[]const u8, 29 work_queue_id: ?[]const u8, 30 auto_scheduled: bool, 31 idempotency_key: ?[]const u8, 32 // retry policy (empirical_policy JSON) 33 empirical_policy: []const u8, 34 // idempotency: transition_id from current state 35 state_transition_id: ?[]const u8, 36}; 37 38pub const InsertParams = struct { 39 deployment_id: ?[]const u8 = null, 40 deployment_version: ?[]const u8 = null, 41 work_queue_name: ?[]const u8 = null, 42 work_queue_id: ?[]const u8 = null, 43 auto_scheduled: bool = false, 44 expected_start_time: ?[]const u8 = null, 45 next_scheduled_start_time: ?[]const u8 = null, 46 idempotency_key: ?[]const u8 = null, 47 parameters: ?[]const u8 = null, 48 empirical_policy: ?[]const u8 = null, // JSON: {"retries": N, "retry_delay": N} 49}; 50 51pub fn insert( 52 id: []const u8, 53 flow_id: []const u8, 54 name: []const u8, 55 state_type: []const u8, 56 state_name: []const u8, 57 timestamp: []const u8, 58 params: InsertParams, 59) !void { 60 backend.db.exec( 61 \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp, 62 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, 63 \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters, empirical_policy) 64 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 65 , .{ 66 id, 67 flow_id, 68 name, 69 state_type, 70 state_name, 71 timestamp, 72 params.deployment_id, 73 params.deployment_version, 74 params.work_queue_name, 75 params.work_queue_id, 76 @as(i64, if (params.auto_scheduled) 1 else 0), 77 params.expected_start_time, 78 params.next_scheduled_start_time, 79 params.idempotency_key, 80 params.parameters orelse "{}", 81 params.empirical_policy orelse "{}", 82 }) catch |err| { 83 log.err("database", "insert flow_run error: {}", .{err}); 84 return err; 85 }; 86} 87 88/// Insert a flow run idempotently - silently ignores duplicates based on (flow_id, idempotency_key). 89/// Used by the scheduler to safely create runs without duplicates. 90pub fn insertOrIgnore( 91 id: []const u8, 92 flow_id: []const u8, 93 name: []const u8, 94 state_type: []const u8, 95 state_name: []const u8, 96 timestamp: []const u8, 97 params: InsertParams, 98) !bool { 99 // Use INSERT OR IGNORE for SQLite, INSERT ... ON CONFLICT DO NOTHING for PostgreSQL 100 const sql = if (backend.db.dialect == .sqlite) 101 \\INSERT OR IGNORE INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp, 102 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, 103 \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters, empirical_policy) 104 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 105 else 106 \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp, 107 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, 108 \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters, empirical_policy) 109 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 110 \\ON CONFLICT (flow_id, idempotency_key) WHERE idempotency_key IS NOT NULL DO NOTHING 111 ; 112 113 backend.db.exec(sql, .{ 114 id, 115 flow_id, 116 name, 117 state_type, 118 state_name, 119 timestamp, 120 params.deployment_id, 121 params.deployment_version, 122 params.work_queue_name, 123 params.work_queue_id, 124 @as(i64, if (params.auto_scheduled) 1 else 0), 125 params.expected_start_time, 126 params.next_scheduled_start_time, 127 params.idempotency_key, 128 params.parameters orelse "{}", 129 params.empirical_policy orelse "{}", 130 }) catch |err| { 131 log.err("database", "insertOrIgnore flow_run error: {}", .{err}); 132 return err; 133 }; 134 135 // For SQLite, check if insert happened via changes() 136 // For PostgreSQL, we'd need to check affected rows - for now assume success 137 return true; 138} 139 140pub fn get(alloc: Allocator, id: []const u8) !?FlowRunRow { 141 var rows = backend.db.query( 142 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 143 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 144 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id 145 \\FROM flow_run WHERE id = ? 146 , .{id}) catch return null; 147 defer rows.deinit(); 148 149 if (rows.next()) |row| { 150 return rowToFlowRun(alloc, row); 151 } 152 return null; 153} 154 155pub fn setState( 156 run_id: []const u8, 157 state_id: []const u8, 158 state_type: []const u8, 159 state_name: []const u8, 160 timestamp: []const u8, 161 start_time: ?[]const u8, 162 end_time: ?[]const u8, 163 run_count: i64, 164 total_run_time: f64, 165 expected_start_time: ?[]const u8, 166 state_transition_id: ?[]const u8, 167) !void { 168 // Lock mutex only for SQLite (postgres pool handles concurrency) 169 if (backend.db.dialect == .sqlite) { 170 backend.db.mutex.lock(); 171 } 172 defer if (backend.db.dialect == .sqlite) { 173 backend.db.mutex.unlock(); 174 }; 175 176 // Begin transaction - for postgres this acquires a dedicated connection 177 var txn = backend.db.beginTransaction() catch |err| { 178 log.err("database", "begin transaction error: {}", .{err}); 179 return err; 180 }; 181 errdefer txn.rollback(); 182 183 // Execute within transaction (uses same connection for postgres) 184 // Only update expected_start_time if provided (from CopyScheduledTime rule) 185 if (expected_start_time) |est| { 186 txn.exec( 187 \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?, 188 \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?, expected_start_time = ?, state_transition_id = ? 189 \\WHERE id = ? 190 , .{ 191 state_id, state_type, state_name, timestamp, timestamp, 192 start_time, end_time, run_count, total_run_time, est, 193 state_transition_id, run_id, 194 }) catch |err| { 195 log.err("database", "update flow_run error: {}", .{err}); 196 return err; 197 }; 198 } else { 199 txn.exec( 200 \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?, 201 \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?, state_transition_id = ? 202 \\WHERE id = ? 203 , .{ 204 state_id, state_type, state_name, timestamp, timestamp, 205 start_time, end_time, run_count, total_run_time, state_transition_id, 206 run_id, 207 }) catch |err| { 208 log.err("database", "update flow_run error: {}", .{err}); 209 return err; 210 }; 211 } 212 213 txn.exec( 214 \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp) 215 \\VALUES (?, ?, ?, ?, ?) 216 , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| { 217 log.err("database", "insert flow_run_state error: {}", .{err}); 218 return err; 219 }; 220 221 txn.commit() catch |err| { 222 log.err("database", "commit error: {}", .{err}); 223 return err; 224 }; 225} 226 227/// Set flow run state with next_scheduled_start_time (for AwaitingRetry state) 228/// optionally updates empirical_policy if provided 229pub fn setStateWithSchedule( 230 run_id: []const u8, 231 state_id: []const u8, 232 state_type: []const u8, 233 state_name: []const u8, 234 timestamp: []const u8, 235 start_time: ?[]const u8, 236 end_time: ?[]const u8, 237 run_count: i64, 238 total_run_time: f64, 239 expected_start_time: ?[]const u8, 240 next_scheduled_start_time: ?[]const u8, 241 empirical_policy: ?[]const u8, 242 state_transition_id: ?[]const u8, 243) !void { 244 if (backend.db.dialect == .sqlite) { 245 backend.db.mutex.lock(); 246 } 247 defer if (backend.db.dialect == .sqlite) { 248 backend.db.mutex.unlock(); 249 }; 250 251 var txn = backend.db.beginTransaction() catch |err| { 252 log.err("database", "begin transaction error: {}", .{err}); 253 return err; 254 }; 255 errdefer txn.rollback(); 256 257 if (empirical_policy) |policy| { 258 txn.exec( 259 \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?, 260 \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?, 261 \\ expected_start_time = COALESCE(?, expected_start_time), 262 \\ next_scheduled_start_time = ?, 263 \\ empirical_policy = ?, 264 \\ state_transition_id = ? 265 \\WHERE id = ? 266 , .{ 267 state_id, 268 state_type, 269 state_name, 270 timestamp, 271 timestamp, 272 start_time, 273 end_time, 274 run_count, 275 total_run_time, 276 expected_start_time, 277 next_scheduled_start_time, 278 policy, 279 state_transition_id, 280 run_id, 281 }) catch |err| { 282 log.err("database", "update flow_run error: {}", .{err}); 283 return err; 284 }; 285 } else { 286 txn.exec( 287 \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?, 288 \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?, 289 \\ expected_start_time = COALESCE(?, expected_start_time), 290 \\ next_scheduled_start_time = ?, 291 \\ state_transition_id = ? 292 \\WHERE id = ? 293 , .{ 294 state_id, 295 state_type, 296 state_name, 297 timestamp, 298 timestamp, 299 start_time, 300 end_time, 301 run_count, 302 total_run_time, 303 expected_start_time, 304 next_scheduled_start_time, 305 state_transition_id, 306 run_id, 307 }) catch |err| { 308 log.err("database", "update flow_run error: {}", .{err}); 309 return err; 310 }; 311 } 312 313 txn.exec( 314 \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp) 315 \\VALUES (?, ?, ?, ?, ?) 316 , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| { 317 log.err("database", "insert flow_run_state error: {}", .{err}); 318 return err; 319 }; 320 321 txn.commit() catch |err| { 322 log.err("database", "commit error: {}", .{err}); 323 return err; 324 }; 325} 326 327pub fn list(alloc: Allocator, limit: usize) ![]FlowRunRow { 328 var results = std.ArrayListUnmanaged(FlowRunRow){}; 329 errdefer results.deinit(alloc); 330 331 var rows = backend.db.query( 332 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 333 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 334 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id 335 \\FROM flow_run ORDER BY created DESC LIMIT ? 336 , .{@as(i64, @intCast(limit))}) catch |err| { 337 log.err("database", "list flow_runs error: {}", .{err}); 338 return err; 339 }; 340 defer rows.deinit(); 341 342 while (rows.next()) |r| { 343 try results.append(alloc, rowToFlowRun(alloc, r)); 344 } 345 346 return results.toOwnedSlice(alloc); 347} 348 349/// Get scheduled flow runs for a specific deployment 350pub fn getScheduledByDeployment(alloc: Allocator, deployment_id: []const u8, limit: usize) ![]FlowRunRow { 351 var results = std.ArrayListUnmanaged(FlowRunRow){}; 352 errdefer results.deinit(alloc); 353 354 var rows = backend.db.query( 355 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 356 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 357 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id 358 \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED' 359 \\ORDER BY expected_start_time ASC LIMIT ? 360 , .{ deployment_id, @as(i64, @intCast(limit)) }) catch |err| { 361 log.err("database", "get scheduled flow_runs error: {}", .{err}); 362 return err; 363 }; 364 defer rows.deinit(); 365 366 while (rows.next()) |r| { 367 try results.append(alloc, rowToFlowRun(alloc, r)); 368 } 369 370 return results.toOwnedSlice(alloc); 371} 372 373/// Get scheduled flow runs for multiple deployments 374pub fn getScheduledByDeployments( 375 alloc: Allocator, 376 deployment_ids: []const []const u8, 377 scheduled_before: ?[]const u8, 378 limit: usize, 379) ![]FlowRunRow { 380 var results = std.ArrayListUnmanaged(FlowRunRow){}; 381 errdefer results.deinit(alloc); 382 383 // Query each deployment and combine results 384 for (deployment_ids) |dep_id| { 385 const dep_runs = try getScheduledByDeploymentBefore(alloc, dep_id, scheduled_before, limit); 386 for (dep_runs) |run| { 387 try results.append(alloc, run); 388 if (results.items.len >= limit) break; 389 } 390 if (results.items.len >= limit) break; 391 } 392 393 // Sort by next_scheduled_start_time using proper timestamp parsing 394 const items = results.items; 395 std.mem.sort(FlowRunRow, items, {}, struct { 396 fn lessThan(_: void, a: FlowRunRow, b: FlowRunRow) bool { 397 const a_time = a.next_scheduled_start_time orelse a.expected_start_time orelse ""; 398 const b_time = b.next_scheduled_start_time orelse b.expected_start_time orelse ""; 399 // Parse to microseconds for proper comparison 400 const a_us = time_util.parse(a_time) orelse 0; 401 const b_us = time_util.parse(b_time) orelse 0; 402 return a_us < b_us; 403 } 404 }.lessThan); 405 406 // Trim to limit 407 if (items.len > limit) { 408 results.shrinkRetainingCapacity(limit); 409 } 410 411 return results.toOwnedSlice(alloc); 412} 413 414/// Get scheduled flow runs for a deployment with optional time filter. 415/// Filters on next_scheduled_start_time to match Python Prefect behavior. 416fn getScheduledByDeploymentBefore( 417 alloc: Allocator, 418 deployment_id: []const u8, 419 scheduled_before: ?[]const u8, 420 limit: usize, 421) ![]FlowRunRow { 422 var results = std.ArrayListUnmanaged(FlowRunRow){}; 423 errdefer results.deinit(alloc); 424 425 if (scheduled_before) |before| { 426 var rows = backend.db.query( 427 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 428 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 429 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id 430 \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED' 431 \\ AND (next_scheduled_start_time IS NULL OR next_scheduled_start_time <= ?) 432 \\ORDER BY next_scheduled_start_time ASC LIMIT ? 433 , .{ deployment_id, before, @as(i64, @intCast(limit)) }) catch |err| { 434 log.err("database", "get scheduled flow_runs error: {}", .{err}); 435 return err; 436 }; 437 defer rows.deinit(); 438 439 while (rows.next()) |r| { 440 try results.append(alloc, rowToFlowRun(alloc, r)); 441 } 442 } else { 443 var rows = backend.db.query( 444 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 445 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 446 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id 447 \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED' 448 \\ORDER BY next_scheduled_start_time ASC LIMIT ? 449 , .{ deployment_id, @as(i64, @intCast(limit)) }) catch |err| { 450 log.err("database", "get scheduled flow_runs error: {}", .{err}); 451 return err; 452 }; 453 defer rows.deinit(); 454 455 while (rows.next()) |r| { 456 try results.append(alloc, rowToFlowRun(alloc, r)); 457 } 458 } 459 460 return results.toOwnedSlice(alloc); 461} 462 463/// Get scheduled flow runs for a work queue with optional time filter. 464/// Filters on next_scheduled_start_time to match Python Prefect behavior. 465pub fn getScheduledByWorkQueue( 466 alloc: Allocator, 467 work_queue_id: []const u8, 468 scheduled_before: ?[]const u8, 469 limit: usize, 470) ![]FlowRunRow { 471 var results = std.ArrayListUnmanaged(FlowRunRow){}; 472 errdefer results.deinit(alloc); 473 474 if (scheduled_before) |before| { 475 var rows = backend.db.query( 476 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 477 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 478 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id 479 \\FROM flow_run WHERE work_queue_id = ? AND state_type = 'SCHEDULED' 480 \\ AND (next_scheduled_start_time IS NULL OR next_scheduled_start_time <= ?) 481 \\ORDER BY next_scheduled_start_time ASC LIMIT ? 482 , .{ work_queue_id, before, @as(i64, @intCast(limit)) }) catch |err| { 483 log.err("database", "get scheduled flow_runs by queue error: {}", .{err}); 484 return err; 485 }; 486 defer rows.deinit(); 487 488 while (rows.next()) |r| { 489 try results.append(alloc, rowToFlowRun(alloc, r)); 490 } 491 } else { 492 var rows = backend.db.query( 493 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 494 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 495 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id 496 \\FROM flow_run WHERE work_queue_id = ? AND state_type = 'SCHEDULED' 497 \\ORDER BY next_scheduled_start_time ASC LIMIT ? 498 , .{ work_queue_id, @as(i64, @intCast(limit)) }) catch |err| { 499 log.err("database", "get scheduled flow_runs by queue error: {}", .{err}); 500 return err; 501 }; 502 defer rows.deinit(); 503 504 while (rows.next()) |r| { 505 try results.append(alloc, rowToFlowRun(alloc, r)); 506 } 507 } 508 509 return results.toOwnedSlice(alloc); 510} 511 512/// Get scheduled flow runs for multiple work queues 513pub fn getScheduledByWorkQueues( 514 alloc: Allocator, 515 work_queue_ids: []const []const u8, 516 scheduled_before: ?[]const u8, 517 limit: usize, 518) ![]FlowRunRow { 519 var results = std.ArrayListUnmanaged(FlowRunRow){}; 520 errdefer results.deinit(alloc); 521 522 // Query each queue and combine results 523 for (work_queue_ids) |queue_id| { 524 const queue_runs = try getScheduledByWorkQueue(alloc, queue_id, scheduled_before, limit); 525 for (queue_runs) |run| { 526 try results.append(alloc, run); 527 if (results.items.len >= limit) break; 528 } 529 if (results.items.len >= limit) break; 530 } 531 532 // Sort by next_scheduled_start_time using proper timestamp parsing 533 const items = results.items; 534 std.mem.sort(FlowRunRow, items, {}, struct { 535 fn lessThan(_: void, a: FlowRunRow, b: FlowRunRow) bool { 536 const a_time = a.next_scheduled_start_time orelse a.expected_start_time orelse ""; 537 const b_time = b.next_scheduled_start_time orelse b.expected_start_time orelse ""; 538 // Parse to microseconds for proper comparison 539 const a_us = time_util.parse(a_time) orelse 0; 540 const b_us = time_util.parse(b_time) orelse 0; 541 return a_us < b_us; 542 } 543 }.lessThan); 544 545 // Trim to limit 546 if (items.len > limit) { 547 results.shrinkRetainingCapacity(limit); 548 } 549 550 return results.toOwnedSlice(alloc); 551} 552 553fn rowToFlowRun(alloc: Allocator, r: anytype) FlowRunRow { 554 return .{ 555 .id = alloc.dupe(u8, r.text(0)) catch "", 556 .created = alloc.dupe(u8, r.text(1)) catch "", 557 .updated = alloc.dupe(u8, r.text(2)) catch "", 558 .flow_id = alloc.dupe(u8, r.text(3)) catch "", 559 .name = alloc.dupe(u8, r.text(4)) catch "", 560 .state_type = alloc.dupe(u8, r.text(5)) catch "", 561 .state_name = alloc.dupe(u8, r.text(6)) catch "", 562 .state_timestamp = alloc.dupe(u8, r.text(7)) catch "", 563 .parameters = alloc.dupe(u8, r.text(8)) catch "{}", 564 .tags = alloc.dupe(u8, r.text(9)) catch "[]", 565 .run_count = r.bigint(10), 566 .expected_start_time = if (r.text(11).len > 0) alloc.dupe(u8, r.text(11)) catch null else null, 567 .next_scheduled_start_time = if (r.text(12).len > 0) alloc.dupe(u8, r.text(12)) catch null else null, 568 .start_time = if (r.text(13).len > 0) alloc.dupe(u8, r.text(13)) catch null else null, 569 .end_time = if (r.text(14).len > 0) alloc.dupe(u8, r.text(14)) catch null else null, 570 .total_run_time = r.float(15), 571 .deployment_id = if (r.text(16).len > 0) alloc.dupe(u8, r.text(16)) catch null else null, 572 .deployment_version = if (r.text(17).len > 0) alloc.dupe(u8, r.text(17)) catch null else null, 573 .work_queue_name = if (r.text(18).len > 0) alloc.dupe(u8, r.text(18)) catch null else null, 574 .work_queue_id = if (r.text(19).len > 0) alloc.dupe(u8, r.text(19)) catch null else null, 575 .auto_scheduled = r.int(20) != 0, 576 .idempotency_key = if (r.text(21).len > 0) alloc.dupe(u8, r.text(21)) catch null else null, 577 .empirical_policy = alloc.dupe(u8, r.text(22)) catch "{}", 578 .state_transition_id = if (r.text(23).len > 0) alloc.dupe(u8, r.text(23)) catch null else null, 579 }; 580} 581 582/// Patch a flow run with optional fields 583/// Currently supports: infrastructure_pid 584pub fn patch(id: []const u8, infrastructure_pid: ?[]const u8) !void { 585 // For now, just accept the PATCH request without actually updating 586 // The runner uses PATCH to set infrastructure metadata, but we can 587 // skip this for basic functionality - the state transitions are what matter 588 _ = id; 589 _ = infrastructure_pid; 590 // TODO: add infrastructure_pid column and update it 591} 592 593/// Get late flow runs (SCHEDULED with next_scheduled_start_time before cutoff). 594pub fn getLateRuns(alloc: Allocator, cutoff_timestamp: []const u8, limit: usize) ![]FlowRunRow { 595 var results = std.ArrayListUnmanaged(FlowRunRow){}; 596 errdefer results.deinit(alloc); 597 var rows = backend.db.query( 598 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 599 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 600 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id 601 \\FROM flow_run WHERE state_type = 'SCHEDULED' AND state_name = 'Scheduled' 602 \\ AND next_scheduled_start_time IS NOT NULL AND next_scheduled_start_time <= ? 603 \\ORDER BY next_scheduled_start_time ASC LIMIT ? 604 , .{ cutoff_timestamp, @as(i64, @intCast(limit)) }) catch |err| { 605 log.err("database", "get late flow_runs error: {}", .{err}); 606 return err; 607 }; 608 defer rows.deinit(); 609 while (rows.next()) |r| try results.append(alloc, rowToFlowRun(alloc, r)); 610 return results.toOwnedSlice(alloc); 611} 612 613pub fn delete(id: []const u8) !bool { 614 // check if flow run exists 615 var rows = backend.db.query( 616 "SELECT id FROM flow_run WHERE id = ?", 617 .{id}, 618 ) catch return false; 619 defer rows.deinit(); 620 621 if (rows.next() == null) { 622 return false; 623 } 624 625 // delete associated state records first 626 backend.db.exec( 627 "DELETE FROM flow_run_state WHERE flow_run_id = ?", 628 .{id}, 629 ) catch |err| { 630 log.err("database", "delete flow_run_state error: {}", .{err}); 631 return false; 632 }; 633 634 // delete the flow run 635 backend.db.exec( 636 "DELETE FROM flow_run WHERE id = ?", 637 .{id}, 638 ) catch |err| { 639 log.err("database", "delete flow_run error: {}", .{err}); 640 return false; 641 }; 642 643 return true; 644}