prefect server in zig
1//! Redis-backed lease storage for HA deployments
2//!
3//! Stores leases in Redis for shared access across multiple server instances:
4//! - Lease data in `prefect:concurrency:lease:{id}` (JSON)
5//! - Expiration times in sorted set `prefect:concurrency:expirations`
6//! - Atomic operations via Redis commands
7
8const std = @import("std");
9const Allocator = std.mem.Allocator;
10const json = std.json;
11
12const redis = @import("redis");
13const log = @import("../logging.zig");
14const uuid_util = @import("../utilities/uuid.zig");
15const time_util = @import("../utilities/time.zig");
16
17pub const RedisLeaseStorage = struct {
18 const Self = @This();
19
20 alloc: Allocator,
21 host: []const u8,
22 port: u16,
23 password: []const u8,
24 db: u8,
25
26 const base_prefix = "prefect:concurrency:";
27 const lease_prefix = base_prefix ++ "lease:";
28 const expirations_key = base_prefix ++ "expirations";
29
30 pub const Lease = struct {
31 id: []const u8,
32 resource_ids: []const []const u8,
33 slots: i64,
34 expiration_us: i64,
35 created_at_us: i64,
36 holder_type: ?[]const u8,
37 holder_id: ?[]const u8,
38 };
39
40 pub fn init(alloc: Allocator, host: []const u8, port: u16, password: []const u8, db: u8) Self {
41 return .{
42 .alloc = alloc,
43 .host = host,
44 .port = port,
45 .password = password,
46 .db = db,
47 };
48 }
49
50 fn connect(self: *Self) !redis.Client {
51 return redis.Client.connectWithConfig(self.alloc, .{
52 .host = self.host,
53 .port = self.port,
54 .password = self.password,
55 .db = self.db,
56 });
57 }
58
59 fn leaseKey(buf: *[128]u8, lease_id: []const u8) []const u8 {
60 const result = std.fmt.bufPrint(buf, "{s}{s}", .{ lease_prefix, lease_id }) catch return lease_prefix;
61 return result;
62 }
63
64 /// Create a new lease and store in Redis
65 pub fn createLease(
66 self: *Self,
67 resource_ids: []const []const u8,
68 ttl_seconds: f64,
69 slots: i64,
70 holder_type: ?[]const u8,
71 holder_id: ?[]const u8,
72 ) !Lease {
73 var id_buf: [36]u8 = undefined;
74 const lease_id = uuid_util.generate(&id_buf);
75
76 const now_us = time_util.nowMicros();
77 const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000);
78 const expiration_us = now_us + ttl_us;
79 const expiration_ts: f64 = @as(f64, @floatFromInt(expiration_us)) / 1_000_000.0;
80
81 // Build JSON for lease data
82 var json_buf = std.ArrayListUnmanaged(u8){};
83 defer json_buf.deinit(self.alloc);
84
85 try json_buf.appendSlice(self.alloc, "{\"id\":\"");
86 try json_buf.appendSlice(self.alloc, lease_id);
87 try json_buf.appendSlice(self.alloc, "\",\"resource_ids\":[");
88 for (resource_ids, 0..) |rid, i| {
89 if (i > 0) try json_buf.append(self.alloc, ',');
90 try json_buf.append(self.alloc, '"');
91 try json_buf.appendSlice(self.alloc, rid);
92 try json_buf.append(self.alloc, '"');
93 }
94 try json_buf.appendSlice(self.alloc, "],\"slots\":");
95 var slots_buf: [20]u8 = undefined;
96 const slots_str = std.fmt.bufPrint(&slots_buf, "{d}", .{slots}) catch "0";
97 try json_buf.appendSlice(self.alloc, slots_str);
98 try json_buf.appendSlice(self.alloc, ",\"expiration_us\":");
99 var exp_buf: [20]u8 = undefined;
100 const exp_str = std.fmt.bufPrint(&exp_buf, "{d}", .{expiration_us}) catch "0";
101 try json_buf.appendSlice(self.alloc, exp_str);
102 try json_buf.appendSlice(self.alloc, ",\"created_at_us\":");
103 var created_buf: [20]u8 = undefined;
104 const created_str = std.fmt.bufPrint(&created_buf, "{d}", .{now_us}) catch "0";
105 try json_buf.appendSlice(self.alloc, created_str);
106 if (holder_type) |ht| {
107 try json_buf.appendSlice(self.alloc, ",\"holder_type\":\"");
108 try json_buf.appendSlice(self.alloc, ht);
109 try json_buf.append(self.alloc, '"');
110 }
111 if (holder_id) |hi| {
112 try json_buf.appendSlice(self.alloc, ",\"holder_id\":\"");
113 try json_buf.appendSlice(self.alloc, hi);
114 try json_buf.append(self.alloc, '"');
115 }
116 try json_buf.append(self.alloc, '}');
117
118 // Store in Redis
119 var client = try self.connect();
120 defer client.close();
121
122 var key_buf: [128]u8 = undefined;
123 const key = leaseKey(&key_buf, lease_id);
124
125 // SET lease data
126 var strings = client.strings();
127 try strings.set(key, json_buf.items);
128
129 // ZADD to expirations sorted set
130 var zsets = client.sortedSets();
131 _ = try zsets.zadd(expirations_key, expiration_ts, lease_id);
132
133 // Return lease (caller must dupe strings if needed)
134 const id_copy = try self.alloc.dupe(u8, lease_id);
135 var rids = try self.alloc.alloc([]const u8, resource_ids.len);
136 for (resource_ids, 0..) |rid, i| {
137 rids[i] = try self.alloc.dupe(u8, rid);
138 }
139
140 return Lease{
141 .id = id_copy,
142 .resource_ids = rids,
143 .slots = slots,
144 .expiration_us = expiration_us,
145 .created_at_us = now_us,
146 .holder_type = if (holder_type) |ht| try self.alloc.dupe(u8, ht) else null,
147 .holder_id = if (holder_id) |hi| try self.alloc.dupe(u8, hi) else null,
148 };
149 }
150
151 /// Read a lease by ID
152 pub fn getById(self: *Self, lease_id: []const u8) !?Lease {
153 var client = try self.connect();
154 defer client.close();
155
156 var key_buf: [128]u8 = undefined;
157 const key = leaseKey(&key_buf, lease_id);
158
159 var strings = client.strings();
160 const data = strings.get(key) catch return null;
161 if (data == null) return null;
162
163 return try self.parseLease(data.?);
164 }
165
166 fn parseLease(self: *Self, data: []const u8) !Lease {
167 const parsed = try json.parseFromSlice(json.Value, self.alloc, data, .{});
168 defer parsed.deinit();
169
170 const obj = parsed.value.object;
171
172 const id = try self.alloc.dupe(u8, obj.get("id").?.string);
173 const slots = obj.get("slots").?.integer;
174 const expiration_us = obj.get("expiration_us").?.integer;
175 const created_at_us = obj.get("created_at_us").?.integer;
176
177 // Parse resource_ids array
178 const rids_arr = obj.get("resource_ids").?.array;
179 var rids = try self.alloc.alloc([]const u8, rids_arr.items.len);
180 for (rids_arr.items, 0..) |item, i| {
181 rids[i] = try self.alloc.dupe(u8, item.string);
182 }
183
184 const holder_type = if (obj.get("holder_type")) |v| blk: {
185 if (v == .string) break :blk try self.alloc.dupe(u8, v.string);
186 break :blk null;
187 } else null;
188
189 const holder_id = if (obj.get("holder_id")) |v| blk: {
190 if (v == .string) break :blk try self.alloc.dupe(u8, v.string);
191 break :blk null;
192 } else null;
193
194 return Lease{
195 .id = id,
196 .resource_ids = rids,
197 .slots = slots,
198 .expiration_us = expiration_us,
199 .created_at_us = created_at_us,
200 .holder_type = holder_type,
201 .holder_id = holder_id,
202 };
203 }
204
205 /// Renew a lease by updating its expiration
206 pub fn renewLease(self: *Self, lease_id: []const u8, ttl_seconds: f64) !bool {
207 var client = try self.connect();
208 defer client.close();
209
210 var key_buf: [128]u8 = undefined;
211 const key = leaseKey(&key_buf, lease_id);
212
213 // Get existing lease
214 var strings = client.strings();
215 const data = strings.get(key) catch return false;
216 if (data == null) return false;
217
218 // Parse and update expiration
219 var parsed = try json.parseFromSlice(json.Value, self.alloc, data.?, .{});
220 defer parsed.deinit();
221
222 const now_us = time_util.nowMicros();
223 const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000);
224 const new_expiration_us = now_us + ttl_us;
225 const new_expiration_ts: f64 = @as(f64, @floatFromInt(new_expiration_us)) / 1_000_000.0;
226
227 // Rebuild JSON with new expiration
228 var json_buf = std.ArrayListUnmanaged(u8){};
229 defer json_buf.deinit(self.alloc);
230
231 // Simple approach: rebuild the JSON
232 const obj = parsed.value.object;
233 try json_buf.appendSlice(self.alloc, "{\"id\":\"");
234 try json_buf.appendSlice(self.alloc, obj.get("id").?.string);
235 try json_buf.appendSlice(self.alloc, "\",\"resource_ids\":");
236
237 // Serialize resource_ids
238 const rids = obj.get("resource_ids").?.array;
239 try json_buf.append(self.alloc, '[');
240 for (rids.items, 0..) |item, i| {
241 if (i > 0) try json_buf.append(self.alloc, ',');
242 try json_buf.append(self.alloc, '"');
243 try json_buf.appendSlice(self.alloc, item.string);
244 try json_buf.append(self.alloc, '"');
245 }
246 try json_buf.appendSlice(self.alloc, "],\"slots\":");
247 var slots_buf: [20]u8 = undefined;
248 const slots_str = std.fmt.bufPrint(&slots_buf, "{d}", .{obj.get("slots").?.integer}) catch "0";
249 try json_buf.appendSlice(self.alloc, slots_str);
250 try json_buf.appendSlice(self.alloc, ",\"expiration_us\":");
251 var exp_buf: [20]u8 = undefined;
252 const exp_str = std.fmt.bufPrint(&exp_buf, "{d}", .{new_expiration_us}) catch "0";
253 try json_buf.appendSlice(self.alloc, exp_str);
254 try json_buf.appendSlice(self.alloc, ",\"created_at_us\":");
255 var created_buf: [20]u8 = undefined;
256 const created_str = std.fmt.bufPrint(&created_buf, "{d}", .{obj.get("created_at_us").?.integer}) catch "0";
257 try json_buf.appendSlice(self.alloc, created_str);
258
259 if (obj.get("holder_type")) |v| {
260 if (v == .string) {
261 try json_buf.appendSlice(self.alloc, ",\"holder_type\":\"");
262 try json_buf.appendSlice(self.alloc, v.string);
263 try json_buf.append(self.alloc, '"');
264 }
265 }
266 if (obj.get("holder_id")) |v| {
267 if (v == .string) {
268 try json_buf.appendSlice(self.alloc, ",\"holder_id\":\"");
269 try json_buf.appendSlice(self.alloc, v.string);
270 try json_buf.append(self.alloc, '"');
271 }
272 }
273 try json_buf.append(self.alloc, '}');
274
275 // Update in Redis
276 try strings.set(key, json_buf.items);
277 var zsets = client.sortedSets();
278 _ = try zsets.zadd(expirations_key, new_expiration_ts, lease_id);
279
280 return true;
281 }
282
283 /// Revoke (delete) a lease
284 pub fn revokeLease(self: *Self, lease_id: []const u8) !bool {
285 var client = try self.connect();
286 defer client.close();
287
288 var key_buf: [128]u8 = undefined;
289 const key = leaseKey(&key_buf, lease_id);
290
291 // Delete lease data and remove from expirations
292 var keys_cmd = client.keys();
293 const deleted = try keys_cmd.del(&.{key});
294 var zsets = client.sortedSets();
295 _ = try zsets.zrem(expirations_key, &.{lease_id});
296
297 return deleted > 0;
298 }
299
300 /// Get expired lease IDs using ZRANGEBYSCORE with -inf
301 pub fn getExpiredLeaseIds(self: *Self, limit: usize) ![][]const u8 {
302 var client = try self.connect();
303 defer client.close();
304
305 const now_us = time_util.nowMicros();
306 const now_ts: f64 = @as(f64, @floatFromInt(now_us)) / 1_000_000.0;
307
308 // ZRANGEBYSCORE expirations -inf now LIMIT 0 limit
309 var ts_buf: [32]u8 = undefined;
310 const ts_str = std.fmt.bufPrint(&ts_buf, "{d:.6}", .{now_ts}) catch "0";
311
312 var limit_buf: [20]u8 = undefined;
313 const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{limit}) catch "100";
314
315 // Use sendCommand for -inf syntax
316 const result = client.sendCommand(&.{
317 "ZRANGEBYSCORE",
318 expirations_key,
319 "-inf",
320 ts_str,
321 "LIMIT",
322 "0",
323 limit_str,
324 }) catch return &[_][]const u8{};
325
326 // Parse array result
327 const arr = result.asArray() orelse return &[_][]const u8{};
328 if (arr.len == 0) return &[_][]const u8{};
329
330 // Copy results
331 var ids = try self.alloc.alloc([]const u8, arr.len);
332 var count: usize = 0;
333 for (arr) |item| {
334 if (item.asString()) |id| {
335 ids[count] = try self.alloc.dupe(u8, id);
336 count += 1;
337 }
338 }
339
340 return ids[0..count];
341 }
342
343 /// Get active (non-expired) leases with pagination
344 pub fn getActiveLeases(self: *Self, limit: usize, offset: usize) ![]Lease {
345 var client = try self.connect();
346 defer client.close();
347
348 const now_us = time_util.nowMicros();
349 const now_ts: f64 = @as(f64, @floatFromInt(now_us)) / 1_000_000.0;
350
351 var ts_buf: [32]u8 = undefined;
352 const ts_str = std.fmt.bufPrint(&ts_buf, "{d:.6}", .{now_ts}) catch "0";
353
354 var offset_buf: [20]u8 = undefined;
355 const offset_str = std.fmt.bufPrint(&offset_buf, "{d}", .{offset}) catch "0";
356
357 var limit_buf: [20]u8 = undefined;
358 const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{limit}) catch "100";
359
360 // ZRANGEBYSCORE expirations now +inf LIMIT offset limit
361 const result = client.sendCommand(&.{
362 "ZRANGEBYSCORE",
363 expirations_key,
364 ts_str,
365 "+inf",
366 "LIMIT",
367 offset_str,
368 limit_str,
369 }) catch return &[_]Lease{};
370
371 const arr = result.asArray() orelse return &[_]Lease{};
372 if (arr.len == 0) return &[_]Lease{};
373
374 // Fetch each lease
375 var results = std.ArrayListUnmanaged(Lease){};
376 for (arr) |item| {
377 if (item.asString()) |lease_id| {
378 if (try self.getById(lease_id)) |lease| {
379 try results.append(self.alloc, lease);
380 }
381 }
382 }
383
384 return results.toOwnedSlice(self.alloc);
385 }
386
387 /// Atomically delete expired lease and return it
388 pub fn deleteExpiredLease(self: *Self, lease_id: []const u8) !?Lease {
389 const now_us = time_util.nowMicros();
390
391 // Get the lease first
392 const maybe_lease = try self.getById(lease_id);
393 if (maybe_lease) |lease| {
394 if (lease.expiration_us < now_us) {
395 // Still expired, safe to delete
396 _ = try self.revokeLease(lease_id);
397 return lease;
398 }
399 // Lease was renewed, free and return null
400 self.freeLease(lease);
401 }
402 return null;
403 }
404
405 pub fn freeLease(self: *Self, lease: Lease) void {
406 self.alloc.free(lease.id);
407 for (lease.resource_ids) |rid| {
408 self.alloc.free(rid);
409 }
410 self.alloc.free(lease.resource_ids);
411 if (lease.holder_type) |ht| self.alloc.free(ht);
412 if (lease.holder_id) |hi| self.alloc.free(hi);
413 }
414
415 /// Calculate occupancy seconds for a lease
416 pub fn calculateOccupancy(lease: Lease) f64 {
417 const now_us = time_util.nowMicros();
418 const end_us = @min(now_us, lease.expiration_us);
419 const duration_us: f64 = @floatFromInt(@max(0, end_us - lease.created_at_us));
420 return duration_us / 1_000_000.0;
421 }
422};