prefect server in zig
at main 250 lines 9.7 kB view raw
1//! Unified lease storage interface 2//! 3//! Backend selection: 4//! - Memory: default (single-instance deployments) 5//! - Redis: when PREFECT_BROKER_BACKEND=redis (HA deployments) 6//! 7//! This matches Python Prefect's pattern where leases are NEVER stored in the database. 8//! Leases are ephemeral by default (memory) or use Redis for HA. 9 10const std = @import("std"); 11const Allocator = std.mem.Allocator; 12 13const log = @import("../logging.zig"); 14const redis_storage = @import("redis.zig"); 15const memory_storage = @import("memory.zig"); 16 17pub const Lease = struct { 18 id: []const u8, 19 resource_ids: []const []const u8, 20 slots: i64, 21 expiration_us: i64, 22 created_at_us: i64, 23 holder_type: ?[]const u8, 24 holder_id: ?[]const u8, 25}; 26 27pub const LeaseStorage = union(enum) { 28 redis: *redis_storage.RedisLeaseStorage, 29 memory: *memory_storage.MemoryLeaseStorage, 30 31 pub fn createLease( 32 self: LeaseStorage, 33 alloc: Allocator, 34 resource_ids: []const []const u8, 35 ttl_seconds: f64, 36 slots: i64, 37 holder_type: ?[]const u8, 38 holder_id: ?[]const u8, 39 ) !Lease { 40 _ = alloc; 41 switch (self) { 42 .redis => |r| { 43 const lease = try r.createLease(resource_ids, ttl_seconds, slots, holder_type, holder_id); 44 return Lease{ 45 .id = lease.id, 46 .resource_ids = lease.resource_ids, 47 .slots = lease.slots, 48 .expiration_us = lease.expiration_us, 49 .created_at_us = lease.created_at_us, 50 .holder_type = lease.holder_type, 51 .holder_id = lease.holder_id, 52 }; 53 }, 54 .memory => |m| { 55 const lease = try m.createLease(resource_ids, ttl_seconds, slots, holder_type, holder_id); 56 return Lease{ 57 .id = lease.id, 58 .resource_ids = lease.resource_ids, 59 .slots = lease.slots, 60 .expiration_us = lease.expiration_us, 61 .created_at_us = lease.created_at_us, 62 .holder_type = lease.holder_type, 63 .holder_id = lease.holder_id, 64 }; 65 }, 66 } 67 } 68 69 pub fn getById(self: LeaseStorage, alloc: Allocator, lease_id: []const u8) !?Lease { 70 _ = alloc; 71 switch (self) { 72 .redis => |r| { 73 if (try r.getById(lease_id)) |lease| { 74 return Lease{ 75 .id = lease.id, 76 .resource_ids = lease.resource_ids, 77 .slots = lease.slots, 78 .expiration_us = lease.expiration_us, 79 .created_at_us = lease.created_at_us, 80 .holder_type = lease.holder_type, 81 .holder_id = lease.holder_id, 82 }; 83 } 84 return null; 85 }, 86 .memory => |m| { 87 if (m.getById(lease_id)) |lease| { 88 return Lease{ 89 .id = lease.id, 90 .resource_ids = lease.resource_ids, 91 .slots = lease.slots, 92 .expiration_us = lease.expiration_us, 93 .created_at_us = lease.created_at_us, 94 .holder_type = lease.holder_type, 95 .holder_id = lease.holder_id, 96 }; 97 } 98 return null; 99 }, 100 } 101 } 102 103 pub fn renewLease(self: LeaseStorage, lease_id: []const u8, ttl_seconds: f64) !bool { 104 switch (self) { 105 .redis => |r| return try r.renewLease(lease_id, ttl_seconds), 106 .memory => |m| return try m.renewLease(lease_id, ttl_seconds), 107 } 108 } 109 110 pub fn revokeLease(self: LeaseStorage, lease_id: []const u8) !bool { 111 switch (self) { 112 .redis => |r| return try r.revokeLease(lease_id), 113 .memory => |m| return try m.revokeLease(lease_id), 114 } 115 } 116 117 pub fn getExpiredLeaseIds(self: LeaseStorage, alloc: Allocator, limit: usize) ![][]const u8 { 118 switch (self) { 119 .redis => |r| return try r.getExpiredLeaseIds(limit), 120 .memory => |m| return try m.getExpiredLeaseIds(alloc, limit), 121 } 122 } 123 124 pub fn getActiveLeases(self: LeaseStorage, alloc: Allocator, limit: usize, offset: usize) ![]Lease { 125 switch (self) { 126 .redis => |r| { 127 const redis_leases = try r.getActiveLeases(limit, offset); 128 var results = try alloc.alloc(Lease, redis_leases.len); 129 for (redis_leases, 0..) |lease, i| { 130 results[i] = Lease{ 131 .id = lease.id, 132 .resource_ids = lease.resource_ids, 133 .slots = lease.slots, 134 .expiration_us = lease.expiration_us, 135 .created_at_us = lease.created_at_us, 136 .holder_type = lease.holder_type, 137 .holder_id = lease.holder_id, 138 }; 139 } 140 return results; 141 }, 142 .memory => |m| { 143 const mem_leases = try m.getActiveLeases(alloc, limit, offset); 144 var results = try alloc.alloc(Lease, mem_leases.len); 145 for (mem_leases, 0..) |lease, i| { 146 results[i] = Lease{ 147 .id = lease.id, 148 .resource_ids = lease.resource_ids, 149 .slots = lease.slots, 150 .expiration_us = lease.expiration_us, 151 .created_at_us = lease.created_at_us, 152 .holder_type = lease.holder_type, 153 .holder_id = lease.holder_id, 154 }; 155 } 156 return results; 157 }, 158 } 159 } 160 161 pub fn deleteExpiredLease(self: LeaseStorage, alloc: Allocator, lease_id: []const u8) !?Lease { 162 switch (self) { 163 .redis => |r| { 164 if (try r.deleteExpiredLease(lease_id)) |lease| { 165 return Lease{ 166 .id = lease.id, 167 .resource_ids = lease.resource_ids, 168 .slots = lease.slots, 169 .expiration_us = lease.expiration_us, 170 .created_at_us = lease.created_at_us, 171 .holder_type = lease.holder_type, 172 .holder_id = lease.holder_id, 173 }; 174 } 175 return null; 176 }, 177 .memory => |m| { 178 if (try m.deleteExpiredLease(alloc, lease_id)) |lease| { 179 return Lease{ 180 .id = lease.id, 181 .resource_ids = lease.resource_ids, 182 .slots = lease.slots, 183 .expiration_us = lease.expiration_us, 184 .created_at_us = lease.created_at_us, 185 .holder_type = lease.holder_type, 186 .holder_id = lease.holder_id, 187 }; 188 } 189 return null; 190 }, 191 } 192 } 193 194 pub fn calculateOccupancy(lease: Lease) f64 { 195 return memory_storage.MemoryLeaseStorage.calculateOccupancy(.{ 196 .id = lease.id, 197 .resource_ids = &.{}, 198 .slots = lease.slots, 199 .expiration_us = lease.expiration_us, 200 .created_at_us = lease.created_at_us, 201 .holder_type = lease.holder_type, 202 .holder_id = lease.holder_id, 203 }); 204 } 205}; 206 207// Global lease storage instance 208var storage: ?LeaseStorage = null; 209var redis_instance: ?redis_storage.RedisLeaseStorage = null; 210var memory_instance: ?memory_storage.MemoryLeaseStorage = null; 211 212/// Initialize lease storage based on environment 213pub fn init() !void { 214 if (storage != null) return; 215 216 const alloc = std.heap.page_allocator; 217 const broker_backend = std.posix.getenv("PREFECT_BROKER_BACKEND") orelse "memory"; 218 219 if (std.mem.eql(u8, broker_backend, "redis")) { 220 // Use Redis for lease storage (HA mode) 221 const host = std.posix.getenv("PREFECT_REDIS_MESSAGING_HOST") orelse "localhost"; 222 const port_str = std.posix.getenv("PREFECT_REDIS_MESSAGING_PORT") orelse "6379"; 223 const port = std.fmt.parseInt(u16, port_str, 10) catch 6379; 224 const password = std.posix.getenv("PREFECT_REDIS_MESSAGING_PASSWORD") orelse ""; 225 const db_str = std.posix.getenv("PREFECT_REDIS_MESSAGING_DB") orelse "0"; 226 const db_num = std.fmt.parseInt(u8, db_str, 10) catch 0; 227 228 redis_instance = redis_storage.RedisLeaseStorage.init(alloc, host, port, password, db_num); 229 storage = .{ .redis = &redis_instance.? }; 230 log.info("leases", "initialized redis lease storage ({s}:{d})", .{ host, port }); 231 } else { 232 // Use in-memory storage (default - matches Python) 233 memory_instance = memory_storage.MemoryLeaseStorage.init(alloc); 234 storage = .{ .memory = &memory_instance.? }; 235 log.info("leases", "initialized memory lease storage", .{}); 236 } 237} 238 239/// Get the global lease storage instance 240pub fn get() LeaseStorage { 241 if (storage == null) { 242 // Auto-initialize if not already done 243 init() catch { 244 // Fall back to memory if Redis fails 245 memory_instance = memory_storage.MemoryLeaseStorage.init(std.heap.page_allocator); 246 storage = .{ .memory = &memory_instance.? }; 247 }; 248 } 249 return storage.?; 250}