prefect server in zig
1//! Unified lease storage interface
2//!
3//! Backend selection:
4//! - Memory: default (single-instance deployments)
5//! - Redis: when PREFECT_BROKER_BACKEND=redis (HA deployments)
6//!
7//! This matches Python Prefect's pattern where leases are NEVER stored in the database.
8//! Leases are ephemeral by default (memory) or use Redis for HA.
9
10const std = @import("std");
11const Allocator = std.mem.Allocator;
12
13const log = @import("../logging.zig");
14const redis_storage = @import("redis.zig");
15const memory_storage = @import("memory.zig");
16
17pub const Lease = struct {
18 id: []const u8,
19 resource_ids: []const []const u8,
20 slots: i64,
21 expiration_us: i64,
22 created_at_us: i64,
23 holder_type: ?[]const u8,
24 holder_id: ?[]const u8,
25};
26
27pub const LeaseStorage = union(enum) {
28 redis: *redis_storage.RedisLeaseStorage,
29 memory: *memory_storage.MemoryLeaseStorage,
30
31 pub fn createLease(
32 self: LeaseStorage,
33 alloc: Allocator,
34 resource_ids: []const []const u8,
35 ttl_seconds: f64,
36 slots: i64,
37 holder_type: ?[]const u8,
38 holder_id: ?[]const u8,
39 ) !Lease {
40 _ = alloc;
41 switch (self) {
42 .redis => |r| {
43 const lease = try r.createLease(resource_ids, ttl_seconds, slots, holder_type, holder_id);
44 return Lease{
45 .id = lease.id,
46 .resource_ids = lease.resource_ids,
47 .slots = lease.slots,
48 .expiration_us = lease.expiration_us,
49 .created_at_us = lease.created_at_us,
50 .holder_type = lease.holder_type,
51 .holder_id = lease.holder_id,
52 };
53 },
54 .memory => |m| {
55 const lease = try m.createLease(resource_ids, ttl_seconds, slots, holder_type, holder_id);
56 return Lease{
57 .id = lease.id,
58 .resource_ids = lease.resource_ids,
59 .slots = lease.slots,
60 .expiration_us = lease.expiration_us,
61 .created_at_us = lease.created_at_us,
62 .holder_type = lease.holder_type,
63 .holder_id = lease.holder_id,
64 };
65 },
66 }
67 }
68
69 pub fn getById(self: LeaseStorage, alloc: Allocator, lease_id: []const u8) !?Lease {
70 _ = alloc;
71 switch (self) {
72 .redis => |r| {
73 if (try r.getById(lease_id)) |lease| {
74 return Lease{
75 .id = lease.id,
76 .resource_ids = lease.resource_ids,
77 .slots = lease.slots,
78 .expiration_us = lease.expiration_us,
79 .created_at_us = lease.created_at_us,
80 .holder_type = lease.holder_type,
81 .holder_id = lease.holder_id,
82 };
83 }
84 return null;
85 },
86 .memory => |m| {
87 if (m.getById(lease_id)) |lease| {
88 return Lease{
89 .id = lease.id,
90 .resource_ids = lease.resource_ids,
91 .slots = lease.slots,
92 .expiration_us = lease.expiration_us,
93 .created_at_us = lease.created_at_us,
94 .holder_type = lease.holder_type,
95 .holder_id = lease.holder_id,
96 };
97 }
98 return null;
99 },
100 }
101 }
102
103 pub fn renewLease(self: LeaseStorage, lease_id: []const u8, ttl_seconds: f64) !bool {
104 switch (self) {
105 .redis => |r| return try r.renewLease(lease_id, ttl_seconds),
106 .memory => |m| return try m.renewLease(lease_id, ttl_seconds),
107 }
108 }
109
110 pub fn revokeLease(self: LeaseStorage, lease_id: []const u8) !bool {
111 switch (self) {
112 .redis => |r| return try r.revokeLease(lease_id),
113 .memory => |m| return try m.revokeLease(lease_id),
114 }
115 }
116
117 pub fn getExpiredLeaseIds(self: LeaseStorage, alloc: Allocator, limit: usize) ![][]const u8 {
118 switch (self) {
119 .redis => |r| return try r.getExpiredLeaseIds(limit),
120 .memory => |m| return try m.getExpiredLeaseIds(alloc, limit),
121 }
122 }
123
124 pub fn getActiveLeases(self: LeaseStorage, alloc: Allocator, limit: usize, offset: usize) ![]Lease {
125 switch (self) {
126 .redis => |r| {
127 const redis_leases = try r.getActiveLeases(limit, offset);
128 var results = try alloc.alloc(Lease, redis_leases.len);
129 for (redis_leases, 0..) |lease, i| {
130 results[i] = Lease{
131 .id = lease.id,
132 .resource_ids = lease.resource_ids,
133 .slots = lease.slots,
134 .expiration_us = lease.expiration_us,
135 .created_at_us = lease.created_at_us,
136 .holder_type = lease.holder_type,
137 .holder_id = lease.holder_id,
138 };
139 }
140 return results;
141 },
142 .memory => |m| {
143 const mem_leases = try m.getActiveLeases(alloc, limit, offset);
144 var results = try alloc.alloc(Lease, mem_leases.len);
145 for (mem_leases, 0..) |lease, i| {
146 results[i] = Lease{
147 .id = lease.id,
148 .resource_ids = lease.resource_ids,
149 .slots = lease.slots,
150 .expiration_us = lease.expiration_us,
151 .created_at_us = lease.created_at_us,
152 .holder_type = lease.holder_type,
153 .holder_id = lease.holder_id,
154 };
155 }
156 return results;
157 },
158 }
159 }
160
161 pub fn deleteExpiredLease(self: LeaseStorage, alloc: Allocator, lease_id: []const u8) !?Lease {
162 switch (self) {
163 .redis => |r| {
164 if (try r.deleteExpiredLease(lease_id)) |lease| {
165 return Lease{
166 .id = lease.id,
167 .resource_ids = lease.resource_ids,
168 .slots = lease.slots,
169 .expiration_us = lease.expiration_us,
170 .created_at_us = lease.created_at_us,
171 .holder_type = lease.holder_type,
172 .holder_id = lease.holder_id,
173 };
174 }
175 return null;
176 },
177 .memory => |m| {
178 if (try m.deleteExpiredLease(alloc, lease_id)) |lease| {
179 return Lease{
180 .id = lease.id,
181 .resource_ids = lease.resource_ids,
182 .slots = lease.slots,
183 .expiration_us = lease.expiration_us,
184 .created_at_us = lease.created_at_us,
185 .holder_type = lease.holder_type,
186 .holder_id = lease.holder_id,
187 };
188 }
189 return null;
190 },
191 }
192 }
193
194 pub fn calculateOccupancy(lease: Lease) f64 {
195 return memory_storage.MemoryLeaseStorage.calculateOccupancy(.{
196 .id = lease.id,
197 .resource_ids = &.{},
198 .slots = lease.slots,
199 .expiration_us = lease.expiration_us,
200 .created_at_us = lease.created_at_us,
201 .holder_type = lease.holder_type,
202 .holder_id = lease.holder_id,
203 });
204 }
205};
206
207// Global lease storage instance
208var storage: ?LeaseStorage = null;
209var redis_instance: ?redis_storage.RedisLeaseStorage = null;
210var memory_instance: ?memory_storage.MemoryLeaseStorage = null;
211
212/// Initialize lease storage based on environment
213pub fn init() !void {
214 if (storage != null) return;
215
216 const alloc = std.heap.page_allocator;
217 const broker_backend = std.posix.getenv("PREFECT_BROKER_BACKEND") orelse "memory";
218
219 if (std.mem.eql(u8, broker_backend, "redis")) {
220 // Use Redis for lease storage (HA mode)
221 const host = std.posix.getenv("PREFECT_REDIS_MESSAGING_HOST") orelse "localhost";
222 const port_str = std.posix.getenv("PREFECT_REDIS_MESSAGING_PORT") orelse "6379";
223 const port = std.fmt.parseInt(u16, port_str, 10) catch 6379;
224 const password = std.posix.getenv("PREFECT_REDIS_MESSAGING_PASSWORD") orelse "";
225 const db_str = std.posix.getenv("PREFECT_REDIS_MESSAGING_DB") orelse "0";
226 const db_num = std.fmt.parseInt(u8, db_str, 10) catch 0;
227
228 redis_instance = redis_storage.RedisLeaseStorage.init(alloc, host, port, password, db_num);
229 storage = .{ .redis = &redis_instance.? };
230 log.info("leases", "initialized redis lease storage ({s}:{d})", .{ host, port });
231 } else {
232 // Use in-memory storage (default - matches Python)
233 memory_instance = memory_storage.MemoryLeaseStorage.init(alloc);
234 storage = .{ .memory = &memory_instance.? };
235 log.info("leases", "initialized memory lease storage", .{});
236 }
237}
238
239/// Get the global lease storage instance
240pub fn get() LeaseStorage {
241 if (storage == null) {
242 // Auto-initialize if not already done
243 init() catch {
244 // Fall back to memory if Redis fails
245 memory_instance = memory_storage.MemoryLeaseStorage.init(std.heap.page_allocator);
246 storage = .{ .memory = &memory_instance.? };
247 };
248 }
249 return storage.?;
250}