prefect server in zig
at main 165 lines 5.5 kB view raw
1const std = @import("std"); 2const Allocator = std.mem.Allocator; 3 4const backend = @import("backend.zig"); 5const log = @import("../logging.zig"); 6 7pub const TaskRunRow = struct { 8 id: []const u8, 9 created: []const u8, 10 updated: []const u8, 11 flow_run_id: []const u8, 12 name: []const u8, 13 task_key: []const u8, 14 dynamic_key: []const u8, 15 state_type: []const u8, 16 state_name: []const u8, 17 state_timestamp: []const u8, 18 tags: []const u8, 19 run_count: i64, 20}; 21 22pub fn insert( 23 id: []const u8, 24 flow_run_id: ?[]const u8, 25 name: []const u8, 26 task_key: []const u8, 27 dynamic_key: []const u8, 28 state_type: []const u8, 29 state_name: []const u8, 30 timestamp: []const u8, 31) !void { 32 backend.db.exec( 33 \\INSERT INTO task_run (id, flow_run_id, name, task_key, dynamic_key, state_type, state_name, state_timestamp) 34 \\VALUES (?, ?, ?, ?, ?, ?, ?, ?) 35 , .{ 36 id, flow_run_id, name, task_key, dynamic_key, state_type, state_name, timestamp, 37 }) catch |err| { 38 log.err("database", "insert task_run error: {}", .{err}); 39 return err; 40 }; 41} 42 43pub fn get(alloc: Allocator, id: []const u8) !?TaskRunRow { 44 var rows = backend.db.query( 45 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, 46 \\ state_type, state_name, state_timestamp, tags, run_count 47 \\FROM task_run WHERE id = ? 48 , .{id}) catch return null; 49 defer rows.deinit(); 50 51 if (rows.next()) |row| { 52 return rowToTaskRun(alloc, row); 53 } 54 return null; 55} 56 57pub fn getByKey( 58 alloc: Allocator, 59 flow_run_id: ?[]const u8, 60 task_key: []const u8, 61 dynamic_key: []const u8, 62) !?TaskRunRow { 63 if (flow_run_id) |frid| { 64 var rows = backend.db.query( 65 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, 66 \\ state_type, state_name, state_timestamp, tags, run_count 67 \\FROM task_run WHERE flow_run_id = ? AND task_key = ? AND dynamic_key = ? 68 , .{ frid, task_key, dynamic_key }) catch return null; 69 defer rows.deinit(); 70 71 if (rows.next()) |row| { 72 return rowToTaskRun(alloc, row); 73 } 74 } else { 75 var rows = backend.db.query( 76 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, 77 \\ state_type, state_name, state_timestamp, tags, run_count 78 \\FROM task_run WHERE flow_run_id IS NULL AND task_key = ? AND dynamic_key = ? 79 , .{ task_key, dynamic_key }) catch return null; 80 defer rows.deinit(); 81 82 if (rows.next()) |row| { 83 return rowToTaskRun(alloc, row); 84 } 85 } 86 return null; 87} 88 89pub fn setState( 90 run_id: []const u8, 91 state_id: []const u8, 92 state_type: []const u8, 93 state_name: []const u8, 94 timestamp: []const u8, 95) !void { 96 // Lock mutex only for SQLite (postgres pool handles concurrency) 97 if (backend.db.dialect == .sqlite) backend.db.mutex.lock(); 98 defer if (backend.db.dialect == .sqlite) backend.db.mutex.unlock(); 99 100 var txn = backend.db.beginTransaction() catch |err| { 101 log.err("database", "begin transaction error: {}", .{err}); 102 return err; 103 }; 104 errdefer txn.rollback(); 105 106 txn.exec( 107 \\UPDATE task_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ? 108 \\WHERE id = ? 109 , .{ state_id, state_type, state_name, timestamp, timestamp, run_id }) catch |err| { 110 log.err("database", "update task_run error: {}", .{err}); 111 return err; 112 }; 113 114 // Insert state history record 115 txn.exec( 116 \\INSERT INTO task_run_state (id, task_run_id, type, name, timestamp) 117 \\VALUES (?, ?, ?, ?, ?) 118 , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| { 119 log.err("database", "insert task_run_state error: {}", .{err}); 120 return err; 121 }; 122 123 txn.commit() catch |err| { 124 log.err("database", "commit error: {}", .{err}); 125 return err; 126 }; 127} 128 129pub fn list(alloc: Allocator, limit: usize) ![]TaskRunRow { 130 var results = std.ArrayListUnmanaged(TaskRunRow){}; 131 errdefer results.deinit(alloc); 132 133 var rows = backend.db.query( 134 \\SELECT id, created, updated, flow_run_id, name, task_key, dynamic_key, 135 \\ state_type, state_name, state_timestamp, tags, run_count 136 \\FROM task_run ORDER BY created DESC LIMIT ? 137 , .{@as(i64, @intCast(limit))}) catch |err| { 138 log.err("database", "list task_runs error: {}", .{err}); 139 return err; 140 }; 141 defer rows.deinit(); 142 143 while (rows.next()) |row| { 144 try results.append(alloc, rowToTaskRun(alloc, row)); 145 } 146 147 return results.toOwnedSlice(alloc); 148} 149 150fn rowToTaskRun(alloc: Allocator, row: anytype) TaskRunRow { 151 return .{ 152 .id = alloc.dupe(u8, row.text(0)) catch "", 153 .created = alloc.dupe(u8, row.text(1)) catch "", 154 .updated = alloc.dupe(u8, row.text(2)) catch "", 155 .flow_run_id = alloc.dupe(u8, row.text(3)) catch "", 156 .name = alloc.dupe(u8, row.text(4)) catch "", 157 .task_key = alloc.dupe(u8, row.text(5)) catch "", 158 .dynamic_key = alloc.dupe(u8, row.text(6)) catch "", 159 .state_type = alloc.dupe(u8, row.text(7)) catch "", 160 .state_name = alloc.dupe(u8, row.text(8)) catch "", 161 .state_timestamp = alloc.dupe(u8, row.text(9)) catch "", 162 .tags = alloc.dupe(u8, row.text(10)) catch "[]", 163 .run_count = row.bigint(11), 164 }; 165}