//! Redis-backed lease storage for HA deployments //! //! Stores leases in Redis for shared access across multiple server instances: //! - Lease data in `prefect:concurrency:lease:{id}` (JSON) //! - Expiration times in sorted set `prefect:concurrency:expirations` //! - Atomic operations via Redis commands const std = @import("std"); const Allocator = std.mem.Allocator; const json = std.json; const redis = @import("redis"); const log = @import("../logging.zig"); const uuid_util = @import("../utilities/uuid.zig"); const time_util = @import("../utilities/time.zig"); pub const RedisLeaseStorage = struct { const Self = @This(); alloc: Allocator, host: []const u8, port: u16, password: []const u8, db: u8, const base_prefix = "prefect:concurrency:"; const lease_prefix = base_prefix ++ "lease:"; const expirations_key = base_prefix ++ "expirations"; pub const Lease = struct { id: []const u8, resource_ids: []const []const u8, slots: i64, expiration_us: i64, created_at_us: i64, holder_type: ?[]const u8, holder_id: ?[]const u8, }; pub fn init(alloc: Allocator, host: []const u8, port: u16, password: []const u8, db: u8) Self { return .{ .alloc = alloc, .host = host, .port = port, .password = password, .db = db, }; } fn connect(self: *Self) !redis.Client { return redis.Client.connectWithConfig(self.alloc, .{ .host = self.host, .port = self.port, .password = self.password, .db = self.db, }); } fn leaseKey(buf: *[128]u8, lease_id: []const u8) []const u8 { const result = std.fmt.bufPrint(buf, "{s}{s}", .{ lease_prefix, lease_id }) catch return lease_prefix; return result; } /// Create a new lease and store in Redis pub fn createLease( self: *Self, resource_ids: []const []const u8, ttl_seconds: f64, slots: i64, holder_type: ?[]const u8, holder_id: ?[]const u8, ) !Lease { var id_buf: [36]u8 = undefined; const lease_id = uuid_util.generate(&id_buf); const now_us = time_util.nowMicros(); const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000); const expiration_us = now_us + ttl_us; const expiration_ts: f64 = @as(f64, @floatFromInt(expiration_us)) / 1_000_000.0; // Build JSON for lease data var json_buf = std.ArrayListUnmanaged(u8){}; defer json_buf.deinit(self.alloc); try json_buf.appendSlice(self.alloc, "{\"id\":\""); try json_buf.appendSlice(self.alloc, lease_id); try json_buf.appendSlice(self.alloc, "\",\"resource_ids\":["); for (resource_ids, 0..) |rid, i| { if (i > 0) try json_buf.append(self.alloc, ','); try json_buf.append(self.alloc, '"'); try json_buf.appendSlice(self.alloc, rid); try json_buf.append(self.alloc, '"'); } try json_buf.appendSlice(self.alloc, "],\"slots\":"); var slots_buf: [20]u8 = undefined; const slots_str = std.fmt.bufPrint(&slots_buf, "{d}", .{slots}) catch "0"; try json_buf.appendSlice(self.alloc, slots_str); try json_buf.appendSlice(self.alloc, ",\"expiration_us\":"); var exp_buf: [20]u8 = undefined; const exp_str = std.fmt.bufPrint(&exp_buf, "{d}", .{expiration_us}) catch "0"; try json_buf.appendSlice(self.alloc, exp_str); try json_buf.appendSlice(self.alloc, ",\"created_at_us\":"); var created_buf: [20]u8 = undefined; const created_str = std.fmt.bufPrint(&created_buf, "{d}", .{now_us}) catch "0"; try json_buf.appendSlice(self.alloc, created_str); if (holder_type) |ht| { try json_buf.appendSlice(self.alloc, ",\"holder_type\":\""); try json_buf.appendSlice(self.alloc, ht); try json_buf.append(self.alloc, '"'); } if (holder_id) |hi| { try json_buf.appendSlice(self.alloc, ",\"holder_id\":\""); try json_buf.appendSlice(self.alloc, hi); try json_buf.append(self.alloc, '"'); } try json_buf.append(self.alloc, '}'); // Store in Redis var client = try self.connect(); defer client.close(); var key_buf: [128]u8 = undefined; const key = leaseKey(&key_buf, lease_id); // SET lease data var strings = client.strings(); try strings.set(key, json_buf.items); // ZADD to expirations sorted set var zsets = client.sortedSets(); _ = try zsets.zadd(expirations_key, expiration_ts, lease_id); // Return lease (caller must dupe strings if needed) const id_copy = try self.alloc.dupe(u8, lease_id); var rids = try self.alloc.alloc([]const u8, resource_ids.len); for (resource_ids, 0..) |rid, i| { rids[i] = try self.alloc.dupe(u8, rid); } return Lease{ .id = id_copy, .resource_ids = rids, .slots = slots, .expiration_us = expiration_us, .created_at_us = now_us, .holder_type = if (holder_type) |ht| try self.alloc.dupe(u8, ht) else null, .holder_id = if (holder_id) |hi| try self.alloc.dupe(u8, hi) else null, }; } /// Read a lease by ID pub fn getById(self: *Self, lease_id: []const u8) !?Lease { var client = try self.connect(); defer client.close(); var key_buf: [128]u8 = undefined; const key = leaseKey(&key_buf, lease_id); var strings = client.strings(); const data = strings.get(key) catch return null; if (data == null) return null; return try self.parseLease(data.?); } fn parseLease(self: *Self, data: []const u8) !Lease { const parsed = try json.parseFromSlice(json.Value, self.alloc, data, .{}); defer parsed.deinit(); const obj = parsed.value.object; const id = try self.alloc.dupe(u8, obj.get("id").?.string); const slots = obj.get("slots").?.integer; const expiration_us = obj.get("expiration_us").?.integer; const created_at_us = obj.get("created_at_us").?.integer; // Parse resource_ids array const rids_arr = obj.get("resource_ids").?.array; var rids = try self.alloc.alloc([]const u8, rids_arr.items.len); for (rids_arr.items, 0..) |item, i| { rids[i] = try self.alloc.dupe(u8, item.string); } const holder_type = if (obj.get("holder_type")) |v| blk: { if (v == .string) break :blk try self.alloc.dupe(u8, v.string); break :blk null; } else null; const holder_id = if (obj.get("holder_id")) |v| blk: { if (v == .string) break :blk try self.alloc.dupe(u8, v.string); break :blk null; } else null; return Lease{ .id = id, .resource_ids = rids, .slots = slots, .expiration_us = expiration_us, .created_at_us = created_at_us, .holder_type = holder_type, .holder_id = holder_id, }; } /// Renew a lease by updating its expiration pub fn renewLease(self: *Self, lease_id: []const u8, ttl_seconds: f64) !bool { var client = try self.connect(); defer client.close(); var key_buf: [128]u8 = undefined; const key = leaseKey(&key_buf, lease_id); // Get existing lease var strings = client.strings(); const data = strings.get(key) catch return false; if (data == null) return false; // Parse and update expiration var parsed = try json.parseFromSlice(json.Value, self.alloc, data.?, .{}); defer parsed.deinit(); const now_us = time_util.nowMicros(); const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000); const new_expiration_us = now_us + ttl_us; const new_expiration_ts: f64 = @as(f64, @floatFromInt(new_expiration_us)) / 1_000_000.0; // Rebuild JSON with new expiration var json_buf = std.ArrayListUnmanaged(u8){}; defer json_buf.deinit(self.alloc); // Simple approach: rebuild the JSON const obj = parsed.value.object; try json_buf.appendSlice(self.alloc, "{\"id\":\""); try json_buf.appendSlice(self.alloc, obj.get("id").?.string); try json_buf.appendSlice(self.alloc, "\",\"resource_ids\":"); // Serialize resource_ids const rids = obj.get("resource_ids").?.array; try json_buf.append(self.alloc, '['); for (rids.items, 0..) |item, i| { if (i > 0) try json_buf.append(self.alloc, ','); try json_buf.append(self.alloc, '"'); try json_buf.appendSlice(self.alloc, item.string); try json_buf.append(self.alloc, '"'); } try json_buf.appendSlice(self.alloc, "],\"slots\":"); var slots_buf: [20]u8 = undefined; const slots_str = std.fmt.bufPrint(&slots_buf, "{d}", .{obj.get("slots").?.integer}) catch "0"; try json_buf.appendSlice(self.alloc, slots_str); try json_buf.appendSlice(self.alloc, ",\"expiration_us\":"); var exp_buf: [20]u8 = undefined; const exp_str = std.fmt.bufPrint(&exp_buf, "{d}", .{new_expiration_us}) catch "0"; try json_buf.appendSlice(self.alloc, exp_str); try json_buf.appendSlice(self.alloc, ",\"created_at_us\":"); var created_buf: [20]u8 = undefined; const created_str = std.fmt.bufPrint(&created_buf, "{d}", .{obj.get("created_at_us").?.integer}) catch "0"; try json_buf.appendSlice(self.alloc, created_str); if (obj.get("holder_type")) |v| { if (v == .string) { try json_buf.appendSlice(self.alloc, ",\"holder_type\":\""); try json_buf.appendSlice(self.alloc, v.string); try json_buf.append(self.alloc, '"'); } } if (obj.get("holder_id")) |v| { if (v == .string) { try json_buf.appendSlice(self.alloc, ",\"holder_id\":\""); try json_buf.appendSlice(self.alloc, v.string); try json_buf.append(self.alloc, '"'); } } try json_buf.append(self.alloc, '}'); // Update in Redis try strings.set(key, json_buf.items); var zsets = client.sortedSets(); _ = try zsets.zadd(expirations_key, new_expiration_ts, lease_id); return true; } /// Revoke (delete) a lease pub fn revokeLease(self: *Self, lease_id: []const u8) !bool { var client = try self.connect(); defer client.close(); var key_buf: [128]u8 = undefined; const key = leaseKey(&key_buf, lease_id); // Delete lease data and remove from expirations var keys_cmd = client.keys(); const deleted = try keys_cmd.del(&.{key}); var zsets = client.sortedSets(); _ = try zsets.zrem(expirations_key, &.{lease_id}); return deleted > 0; } /// Get expired lease IDs using ZRANGEBYSCORE with -inf pub fn getExpiredLeaseIds(self: *Self, limit: usize) ![][]const u8 { var client = try self.connect(); defer client.close(); const now_us = time_util.nowMicros(); const now_ts: f64 = @as(f64, @floatFromInt(now_us)) / 1_000_000.0; // ZRANGEBYSCORE expirations -inf now LIMIT 0 limit var ts_buf: [32]u8 = undefined; const ts_str = std.fmt.bufPrint(&ts_buf, "{d:.6}", .{now_ts}) catch "0"; var limit_buf: [20]u8 = undefined; const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{limit}) catch "100"; // Use sendCommand for -inf syntax const result = client.sendCommand(&.{ "ZRANGEBYSCORE", expirations_key, "-inf", ts_str, "LIMIT", "0", limit_str, }) catch return &[_][]const u8{}; // Parse array result const arr = result.asArray() orelse return &[_][]const u8{}; if (arr.len == 0) return &[_][]const u8{}; // Copy results var ids = try self.alloc.alloc([]const u8, arr.len); var count: usize = 0; for (arr) |item| { if (item.asString()) |id| { ids[count] = try self.alloc.dupe(u8, id); count += 1; } } return ids[0..count]; } /// Get active (non-expired) leases with pagination pub fn getActiveLeases(self: *Self, limit: usize, offset: usize) ![]Lease { var client = try self.connect(); defer client.close(); const now_us = time_util.nowMicros(); const now_ts: f64 = @as(f64, @floatFromInt(now_us)) / 1_000_000.0; var ts_buf: [32]u8 = undefined; const ts_str = std.fmt.bufPrint(&ts_buf, "{d:.6}", .{now_ts}) catch "0"; var offset_buf: [20]u8 = undefined; const offset_str = std.fmt.bufPrint(&offset_buf, "{d}", .{offset}) catch "0"; var limit_buf: [20]u8 = undefined; const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{limit}) catch "100"; // ZRANGEBYSCORE expirations now +inf LIMIT offset limit const result = client.sendCommand(&.{ "ZRANGEBYSCORE", expirations_key, ts_str, "+inf", "LIMIT", offset_str, limit_str, }) catch return &[_]Lease{}; const arr = result.asArray() orelse return &[_]Lease{}; if (arr.len == 0) return &[_]Lease{}; // Fetch each lease var results = std.ArrayListUnmanaged(Lease){}; for (arr) |item| { if (item.asString()) |lease_id| { if (try self.getById(lease_id)) |lease| { try results.append(self.alloc, lease); } } } return results.toOwnedSlice(self.alloc); } /// Atomically delete expired lease and return it pub fn deleteExpiredLease(self: *Self, lease_id: []const u8) !?Lease { const now_us = time_util.nowMicros(); // Get the lease first const maybe_lease = try self.getById(lease_id); if (maybe_lease) |lease| { if (lease.expiration_us < now_us) { // Still expired, safe to delete _ = try self.revokeLease(lease_id); return lease; } // Lease was renewed, free and return null self.freeLease(lease); } return null; } pub fn freeLease(self: *Self, lease: Lease) void { self.alloc.free(lease.id); for (lease.resource_ids) |rid| { self.alloc.free(rid); } self.alloc.free(lease.resource_ids); if (lease.holder_type) |ht| self.alloc.free(ht); if (lease.holder_id) |hi| self.alloc.free(hi); } /// Calculate occupancy seconds for a lease pub fn calculateOccupancy(lease: Lease) f64 { const now_us = time_util.nowMicros(); const end_us = @min(now_us, lease.expiration_us); const duration_us: f64 = @floatFromInt(@max(0, end_us - lease.created_at_us)); return duration_us / 1_000_000.0; } };