prefect server in zig
at main 422 lines 16 kB view raw
1//! Redis-backed lease storage for HA deployments 2//! 3//! Stores leases in Redis for shared access across multiple server instances: 4//! - Lease data in `prefect:concurrency:lease:{id}` (JSON) 5//! - Expiration times in sorted set `prefect:concurrency:expirations` 6//! - Atomic operations via Redis commands 7 8const std = @import("std"); 9const Allocator = std.mem.Allocator; 10const json = std.json; 11 12const redis = @import("redis"); 13const log = @import("../logging.zig"); 14const uuid_util = @import("../utilities/uuid.zig"); 15const time_util = @import("../utilities/time.zig"); 16 17pub const RedisLeaseStorage = struct { 18 const Self = @This(); 19 20 alloc: Allocator, 21 host: []const u8, 22 port: u16, 23 password: []const u8, 24 db: u8, 25 26 const base_prefix = "prefect:concurrency:"; 27 const lease_prefix = base_prefix ++ "lease:"; 28 const expirations_key = base_prefix ++ "expirations"; 29 30 pub const Lease = struct { 31 id: []const u8, 32 resource_ids: []const []const u8, 33 slots: i64, 34 expiration_us: i64, 35 created_at_us: i64, 36 holder_type: ?[]const u8, 37 holder_id: ?[]const u8, 38 }; 39 40 pub fn init(alloc: Allocator, host: []const u8, port: u16, password: []const u8, db: u8) Self { 41 return .{ 42 .alloc = alloc, 43 .host = host, 44 .port = port, 45 .password = password, 46 .db = db, 47 }; 48 } 49 50 fn connect(self: *Self) !redis.Client { 51 return redis.Client.connectWithConfig(self.alloc, .{ 52 .host = self.host, 53 .port = self.port, 54 .password = self.password, 55 .db = self.db, 56 }); 57 } 58 59 fn leaseKey(buf: *[128]u8, lease_id: []const u8) []const u8 { 60 const result = std.fmt.bufPrint(buf, "{s}{s}", .{ lease_prefix, lease_id }) catch return lease_prefix; 61 return result; 62 } 63 64 /// Create a new lease and store in Redis 65 pub fn createLease( 66 self: *Self, 67 resource_ids: []const []const u8, 68 ttl_seconds: f64, 69 slots: i64, 70 holder_type: ?[]const u8, 71 holder_id: ?[]const u8, 72 ) !Lease { 73 var id_buf: [36]u8 = undefined; 74 const lease_id = uuid_util.generate(&id_buf); 75 76 const now_us = time_util.nowMicros(); 77 const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000); 78 const expiration_us = now_us + ttl_us; 79 const expiration_ts: f64 = @as(f64, @floatFromInt(expiration_us)) / 1_000_000.0; 80 81 // Build JSON for lease data 82 var json_buf = std.ArrayListUnmanaged(u8){}; 83 defer json_buf.deinit(self.alloc); 84 85 try json_buf.appendSlice(self.alloc, "{\"id\":\""); 86 try json_buf.appendSlice(self.alloc, lease_id); 87 try json_buf.appendSlice(self.alloc, "\",\"resource_ids\":["); 88 for (resource_ids, 0..) |rid, i| { 89 if (i > 0) try json_buf.append(self.alloc, ','); 90 try json_buf.append(self.alloc, '"'); 91 try json_buf.appendSlice(self.alloc, rid); 92 try json_buf.append(self.alloc, '"'); 93 } 94 try json_buf.appendSlice(self.alloc, "],\"slots\":"); 95 var slots_buf: [20]u8 = undefined; 96 const slots_str = std.fmt.bufPrint(&slots_buf, "{d}", .{slots}) catch "0"; 97 try json_buf.appendSlice(self.alloc, slots_str); 98 try json_buf.appendSlice(self.alloc, ",\"expiration_us\":"); 99 var exp_buf: [20]u8 = undefined; 100 const exp_str = std.fmt.bufPrint(&exp_buf, "{d}", .{expiration_us}) catch "0"; 101 try json_buf.appendSlice(self.alloc, exp_str); 102 try json_buf.appendSlice(self.alloc, ",\"created_at_us\":"); 103 var created_buf: [20]u8 = undefined; 104 const created_str = std.fmt.bufPrint(&created_buf, "{d}", .{now_us}) catch "0"; 105 try json_buf.appendSlice(self.alloc, created_str); 106 if (holder_type) |ht| { 107 try json_buf.appendSlice(self.alloc, ",\"holder_type\":\""); 108 try json_buf.appendSlice(self.alloc, ht); 109 try json_buf.append(self.alloc, '"'); 110 } 111 if (holder_id) |hi| { 112 try json_buf.appendSlice(self.alloc, ",\"holder_id\":\""); 113 try json_buf.appendSlice(self.alloc, hi); 114 try json_buf.append(self.alloc, '"'); 115 } 116 try json_buf.append(self.alloc, '}'); 117 118 // Store in Redis 119 var client = try self.connect(); 120 defer client.close(); 121 122 var key_buf: [128]u8 = undefined; 123 const key = leaseKey(&key_buf, lease_id); 124 125 // SET lease data 126 var strings = client.strings(); 127 try strings.set(key, json_buf.items); 128 129 // ZADD to expirations sorted set 130 var zsets = client.sortedSets(); 131 _ = try zsets.zadd(expirations_key, expiration_ts, lease_id); 132 133 // Return lease (caller must dupe strings if needed) 134 const id_copy = try self.alloc.dupe(u8, lease_id); 135 var rids = try self.alloc.alloc([]const u8, resource_ids.len); 136 for (resource_ids, 0..) |rid, i| { 137 rids[i] = try self.alloc.dupe(u8, rid); 138 } 139 140 return Lease{ 141 .id = id_copy, 142 .resource_ids = rids, 143 .slots = slots, 144 .expiration_us = expiration_us, 145 .created_at_us = now_us, 146 .holder_type = if (holder_type) |ht| try self.alloc.dupe(u8, ht) else null, 147 .holder_id = if (holder_id) |hi| try self.alloc.dupe(u8, hi) else null, 148 }; 149 } 150 151 /// Read a lease by ID 152 pub fn getById(self: *Self, lease_id: []const u8) !?Lease { 153 var client = try self.connect(); 154 defer client.close(); 155 156 var key_buf: [128]u8 = undefined; 157 const key = leaseKey(&key_buf, lease_id); 158 159 var strings = client.strings(); 160 const data = strings.get(key) catch return null; 161 if (data == null) return null; 162 163 return try self.parseLease(data.?); 164 } 165 166 fn parseLease(self: *Self, data: []const u8) !Lease { 167 const parsed = try json.parseFromSlice(json.Value, self.alloc, data, .{}); 168 defer parsed.deinit(); 169 170 const obj = parsed.value.object; 171 172 const id = try self.alloc.dupe(u8, obj.get("id").?.string); 173 const slots = obj.get("slots").?.integer; 174 const expiration_us = obj.get("expiration_us").?.integer; 175 const created_at_us = obj.get("created_at_us").?.integer; 176 177 // Parse resource_ids array 178 const rids_arr = obj.get("resource_ids").?.array; 179 var rids = try self.alloc.alloc([]const u8, rids_arr.items.len); 180 for (rids_arr.items, 0..) |item, i| { 181 rids[i] = try self.alloc.dupe(u8, item.string); 182 } 183 184 const holder_type = if (obj.get("holder_type")) |v| blk: { 185 if (v == .string) break :blk try self.alloc.dupe(u8, v.string); 186 break :blk null; 187 } else null; 188 189 const holder_id = if (obj.get("holder_id")) |v| blk: { 190 if (v == .string) break :blk try self.alloc.dupe(u8, v.string); 191 break :blk null; 192 } else null; 193 194 return Lease{ 195 .id = id, 196 .resource_ids = rids, 197 .slots = slots, 198 .expiration_us = expiration_us, 199 .created_at_us = created_at_us, 200 .holder_type = holder_type, 201 .holder_id = holder_id, 202 }; 203 } 204 205 /// Renew a lease by updating its expiration 206 pub fn renewLease(self: *Self, lease_id: []const u8, ttl_seconds: f64) !bool { 207 var client = try self.connect(); 208 defer client.close(); 209 210 var key_buf: [128]u8 = undefined; 211 const key = leaseKey(&key_buf, lease_id); 212 213 // Get existing lease 214 var strings = client.strings(); 215 const data = strings.get(key) catch return false; 216 if (data == null) return false; 217 218 // Parse and update expiration 219 var parsed = try json.parseFromSlice(json.Value, self.alloc, data.?, .{}); 220 defer parsed.deinit(); 221 222 const now_us = time_util.nowMicros(); 223 const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000); 224 const new_expiration_us = now_us + ttl_us; 225 const new_expiration_ts: f64 = @as(f64, @floatFromInt(new_expiration_us)) / 1_000_000.0; 226 227 // Rebuild JSON with new expiration 228 var json_buf = std.ArrayListUnmanaged(u8){}; 229 defer json_buf.deinit(self.alloc); 230 231 // Simple approach: rebuild the JSON 232 const obj = parsed.value.object; 233 try json_buf.appendSlice(self.alloc, "{\"id\":\""); 234 try json_buf.appendSlice(self.alloc, obj.get("id").?.string); 235 try json_buf.appendSlice(self.alloc, "\",\"resource_ids\":"); 236 237 // Serialize resource_ids 238 const rids = obj.get("resource_ids").?.array; 239 try json_buf.append(self.alloc, '['); 240 for (rids.items, 0..) |item, i| { 241 if (i > 0) try json_buf.append(self.alloc, ','); 242 try json_buf.append(self.alloc, '"'); 243 try json_buf.appendSlice(self.alloc, item.string); 244 try json_buf.append(self.alloc, '"'); 245 } 246 try json_buf.appendSlice(self.alloc, "],\"slots\":"); 247 var slots_buf: [20]u8 = undefined; 248 const slots_str = std.fmt.bufPrint(&slots_buf, "{d}", .{obj.get("slots").?.integer}) catch "0"; 249 try json_buf.appendSlice(self.alloc, slots_str); 250 try json_buf.appendSlice(self.alloc, ",\"expiration_us\":"); 251 var exp_buf: [20]u8 = undefined; 252 const exp_str = std.fmt.bufPrint(&exp_buf, "{d}", .{new_expiration_us}) catch "0"; 253 try json_buf.appendSlice(self.alloc, exp_str); 254 try json_buf.appendSlice(self.alloc, ",\"created_at_us\":"); 255 var created_buf: [20]u8 = undefined; 256 const created_str = std.fmt.bufPrint(&created_buf, "{d}", .{obj.get("created_at_us").?.integer}) catch "0"; 257 try json_buf.appendSlice(self.alloc, created_str); 258 259 if (obj.get("holder_type")) |v| { 260 if (v == .string) { 261 try json_buf.appendSlice(self.alloc, ",\"holder_type\":\""); 262 try json_buf.appendSlice(self.alloc, v.string); 263 try json_buf.append(self.alloc, '"'); 264 } 265 } 266 if (obj.get("holder_id")) |v| { 267 if (v == .string) { 268 try json_buf.appendSlice(self.alloc, ",\"holder_id\":\""); 269 try json_buf.appendSlice(self.alloc, v.string); 270 try json_buf.append(self.alloc, '"'); 271 } 272 } 273 try json_buf.append(self.alloc, '}'); 274 275 // Update in Redis 276 try strings.set(key, json_buf.items); 277 var zsets = client.sortedSets(); 278 _ = try zsets.zadd(expirations_key, new_expiration_ts, lease_id); 279 280 return true; 281 } 282 283 /// Revoke (delete) a lease 284 pub fn revokeLease(self: *Self, lease_id: []const u8) !bool { 285 var client = try self.connect(); 286 defer client.close(); 287 288 var key_buf: [128]u8 = undefined; 289 const key = leaseKey(&key_buf, lease_id); 290 291 // Delete lease data and remove from expirations 292 var keys_cmd = client.keys(); 293 const deleted = try keys_cmd.del(&.{key}); 294 var zsets = client.sortedSets(); 295 _ = try zsets.zrem(expirations_key, &.{lease_id}); 296 297 return deleted > 0; 298 } 299 300 /// Get expired lease IDs using ZRANGEBYSCORE with -inf 301 pub fn getExpiredLeaseIds(self: *Self, limit: usize) ![][]const u8 { 302 var client = try self.connect(); 303 defer client.close(); 304 305 const now_us = time_util.nowMicros(); 306 const now_ts: f64 = @as(f64, @floatFromInt(now_us)) / 1_000_000.0; 307 308 // ZRANGEBYSCORE expirations -inf now LIMIT 0 limit 309 var ts_buf: [32]u8 = undefined; 310 const ts_str = std.fmt.bufPrint(&ts_buf, "{d:.6}", .{now_ts}) catch "0"; 311 312 var limit_buf: [20]u8 = undefined; 313 const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{limit}) catch "100"; 314 315 // Use sendCommand for -inf syntax 316 const result = client.sendCommand(&.{ 317 "ZRANGEBYSCORE", 318 expirations_key, 319 "-inf", 320 ts_str, 321 "LIMIT", 322 "0", 323 limit_str, 324 }) catch return &[_][]const u8{}; 325 326 // Parse array result 327 const arr = result.asArray() orelse return &[_][]const u8{}; 328 if (arr.len == 0) return &[_][]const u8{}; 329 330 // Copy results 331 var ids = try self.alloc.alloc([]const u8, arr.len); 332 var count: usize = 0; 333 for (arr) |item| { 334 if (item.asString()) |id| { 335 ids[count] = try self.alloc.dupe(u8, id); 336 count += 1; 337 } 338 } 339 340 return ids[0..count]; 341 } 342 343 /// Get active (non-expired) leases with pagination 344 pub fn getActiveLeases(self: *Self, limit: usize, offset: usize) ![]Lease { 345 var client = try self.connect(); 346 defer client.close(); 347 348 const now_us = time_util.nowMicros(); 349 const now_ts: f64 = @as(f64, @floatFromInt(now_us)) / 1_000_000.0; 350 351 var ts_buf: [32]u8 = undefined; 352 const ts_str = std.fmt.bufPrint(&ts_buf, "{d:.6}", .{now_ts}) catch "0"; 353 354 var offset_buf: [20]u8 = undefined; 355 const offset_str = std.fmt.bufPrint(&offset_buf, "{d}", .{offset}) catch "0"; 356 357 var limit_buf: [20]u8 = undefined; 358 const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{limit}) catch "100"; 359 360 // ZRANGEBYSCORE expirations now +inf LIMIT offset limit 361 const result = client.sendCommand(&.{ 362 "ZRANGEBYSCORE", 363 expirations_key, 364 ts_str, 365 "+inf", 366 "LIMIT", 367 offset_str, 368 limit_str, 369 }) catch return &[_]Lease{}; 370 371 const arr = result.asArray() orelse return &[_]Lease{}; 372 if (arr.len == 0) return &[_]Lease{}; 373 374 // Fetch each lease 375 var results = std.ArrayListUnmanaged(Lease){}; 376 for (arr) |item| { 377 if (item.asString()) |lease_id| { 378 if (try self.getById(lease_id)) |lease| { 379 try results.append(self.alloc, lease); 380 } 381 } 382 } 383 384 return results.toOwnedSlice(self.alloc); 385 } 386 387 /// Atomically delete expired lease and return it 388 pub fn deleteExpiredLease(self: *Self, lease_id: []const u8) !?Lease { 389 const now_us = time_util.nowMicros(); 390 391 // Get the lease first 392 const maybe_lease = try self.getById(lease_id); 393 if (maybe_lease) |lease| { 394 if (lease.expiration_us < now_us) { 395 // Still expired, safe to delete 396 _ = try self.revokeLease(lease_id); 397 return lease; 398 } 399 // Lease was renewed, free and return null 400 self.freeLease(lease); 401 } 402 return null; 403 } 404 405 pub fn freeLease(self: *Self, lease: Lease) void { 406 self.alloc.free(lease.id); 407 for (lease.resource_ids) |rid| { 408 self.alloc.free(rid); 409 } 410 self.alloc.free(lease.resource_ids); 411 if (lease.holder_type) |ht| self.alloc.free(ht); 412 if (lease.holder_id) |hi| self.alloc.free(hi); 413 } 414 415 /// Calculate occupancy seconds for a lease 416 pub fn calculateOccupancy(lease: Lease) f64 { 417 const now_us = time_util.nowMicros(); 418 const end_us = @min(now_us, lease.expiration_us); 419 const duration_us: f64 = @floatFromInt(@max(0, end_us - lease.created_at_us)); 420 return duration_us / 1_000_000.0; 421 } 422};