this repo has no description
at main 437 lines 15 kB view raw
1//! Redis client connection and command execution 2//! 3//! This module provides the core `Client` type which manages: 4//! - TCP connection to Redis 5//! - Buffer management for reads/writes 6//! - RESP protocol serialization/deserialization 7//! - Authentication and database selection 8//! 9//! ## Memory Model 10//! 11//! The client owns a read buffer. Slices returned from commands reference 12//! this buffer and are valid until the next command. This avoids allocation 13//! for most use cases. 14//! 15//! For data that must outlive the next command, copy it: 16//! ```zig 17//! const ephemeral = try client.get("key"); 18//! const owned = try allocator.dupe(u8, ephemeral orelse return); 19//! defer allocator.free(owned); 20//! ``` 21//! 22//! ## Thread Safety 23//! 24//! A single `Client` instance is NOT thread-safe. For concurrent access: 25//! - Create one client per thread 26//! - Use external synchronization (mutex) 27//! - Use a connection pool (not provided here) 28 29const std = @import("std"); 30const net = std.net; 31const Allocator = std.mem.Allocator; 32 33const resp = @import("resp.zig"); 34pub const Value = resp.Value; 35pub const ConnectionError = resp.ConnectionError; 36pub const ProtocolError = resp.ProtocolError; 37pub const CommandError = resp.CommandError; 38pub const ClientError = resp.ClientError; 39 40/// Redis client with connection and buffer management. 41/// 42/// The client is designed for simplicity and efficiency: 43/// - Single TCP connection (no pipelining yet) 44/// - Owned read buffer (grows as needed) 45/// - Stack-allocated command buffer for small commands 46pub const Client = struct { 47 const Self = @This(); 48 49 stream: net.Stream, 50 allocator: Allocator, 51 read_buf: []u8, 52 buf_len: usize = 0, 53 buf_pos: usize = 0, 54 55 /// Stack buffer for small commands (16KB covers most cases) 56 cmd_buf: [16384]u8 = undefined, 57 58 /// Heap buffer for large commands (allocated on demand) 59 heap_cmd_buf: ?[]u8 = null, 60 61 const DEFAULT_READ_BUF_SIZE = 65536; 62 63 // ======================================================================== 64 // Configuration 65 // ======================================================================== 66 67 /// Connection configuration 68 pub const Config = struct { 69 host: []const u8 = "localhost", 70 port: u16 = 6379, 71 username: []const u8 = "default", 72 password: []const u8 = "", 73 db: u8 = 0, 74 /// Read buffer size (default 64KB) 75 read_buffer_size: usize = DEFAULT_READ_BUF_SIZE, 76 }; 77 78 // ======================================================================== 79 // Connection 80 // ======================================================================== 81 82 /// Connect to Redis with minimal configuration. 83 /// Use `connectWithConfig` for full control. 84 pub fn connect(allocator: Allocator, host: []const u8, port: u16) ClientError!Self { 85 return connectWithConfig(allocator, .{ .host = host, .port = port }); 86 } 87 88 /// Connect to Redis with full configuration. 89 pub fn connectWithConfig(allocator: Allocator, config: Config) ClientError!Self { 90 // Resolve address: try numeric IP first, then DNS 91 const address = net.Address.parseIp(config.host, config.port) catch blk: { 92 const list = net.getAddressList(allocator, config.host, config.port) catch { 93 return ConnectionError.AddressResolutionFailed; 94 }; 95 defer list.deinit(); 96 if (list.addrs.len == 0) return ConnectionError.AddressResolutionFailed; 97 break :blk list.addrs[0]; 98 }; 99 100 const tcp_stream = net.tcpConnectToAddress(address) catch { 101 return ConnectionError.ConnectionRefused; 102 }; 103 errdefer tcp_stream.close(); 104 105 const read_buf = try allocator.alloc(u8, config.read_buffer_size); 106 errdefer allocator.free(read_buf); 107 108 var self = Self{ 109 .stream = tcp_stream, 110 .allocator = allocator, 111 .read_buf = read_buf, 112 }; 113 114 // Authenticate if password provided 115 if (config.password.len > 0) { 116 const result = try self.sendCommand(&.{ "AUTH", config.username, config.password }); 117 if (result.isError()) return ConnectionError.AuthenticationFailed; 118 } 119 120 // Select database if non-default 121 if (config.db != 0) { 122 var db_buf: [4]u8 = undefined; 123 const db_str = std.fmt.bufPrint(&db_buf, "{d}", .{config.db}) catch unreachable; 124 const result = try self.sendCommand(&.{ "SELECT", db_str }); 125 if (result.isError()) return ConnectionError.InvalidDatabase; 126 } 127 128 return self; 129 } 130 131 /// Close the connection and free resources. 132 pub fn close(self: *Self) void { 133 if (self.heap_cmd_buf) |buf| { 134 self.allocator.free(buf); 135 } 136 self.allocator.free(self.read_buf); 137 self.stream.close(); 138 } 139 140 // ======================================================================== 141 // Command Execution (Core) 142 // ======================================================================== 143 144 /// Send a command and receive the response. 145 /// This is the foundation for all other commands. 146 /// 147 /// Arguments must be string slices. For integer arguments, 148 /// format them to a buffer first: 149 /// ```zig 150 /// var buf: [16]u8 = undefined; 151 /// const n_str = std.fmt.bufPrint(&buf, "{d}", .{n}) catch unreachable; 152 /// try client.sendCommand(&.{"EXPIRE", key, n_str}); 153 /// ``` 154 pub fn sendCommand(self: *Self, args: []const []const u8) ClientError!Value { 155 // Calculate required buffer size 156 var total_size: usize = 16; // header overhead 157 for (args) |arg| { 158 total_size += 16 + arg.len; // $<len>\r\n<data>\r\n 159 } 160 161 // Get command buffer (stack or heap) 162 const cmd_buf = try self.getCommandBuffer(total_size); 163 164 // Build RESP array 165 var pos: usize = 0; 166 167 // Array header: *<count>\r\n 168 pos += (std.fmt.bufPrint(cmd_buf[pos..], "*{d}\r\n", .{args.len}) catch { 169 return ProtocolError.BufferOverflow; 170 }).len; 171 172 // Each argument as bulk string: $<len>\r\n<data>\r\n 173 for (args) |arg| { 174 pos += (std.fmt.bufPrint(cmd_buf[pos..], "${d}\r\n", .{arg.len}) catch { 175 return ProtocolError.BufferOverflow; 176 }).len; 177 178 @memcpy(cmd_buf[pos..][0..arg.len], arg); 179 pos += arg.len; 180 cmd_buf[pos] = '\r'; 181 cmd_buf[pos + 1] = '\n'; 182 pos += 2; 183 } 184 185 // Send command 186 _ = try self.stream.write(cmd_buf[0..pos]); 187 188 // Reset buffer state for new response 189 self.buf_len = 0; 190 self.buf_pos = 0; 191 192 // Read and parse response 193 return self.readResponse(); 194 } 195 196 /// Send a command with a pre-built argument array. 197 /// Useful when the argument count is dynamic. 198 pub fn sendCommandDynamic(self: *Self, comptime prefix: []const []const u8, suffix: []const []const u8) ClientError!Value { 199 const total_len = prefix.len + suffix.len; 200 var args = try self.allocator.alloc([]const u8, total_len); 201 defer self.allocator.free(args); 202 203 inline for (prefix, 0..) |p, i| { 204 args[i] = p; 205 } 206 @memcpy(args[prefix.len..], suffix); 207 208 return self.sendCommand(args); 209 } 210 211 // ======================================================================== 212 // Buffer Management 213 // ======================================================================== 214 215 fn getCommandBuffer(self: *Self, required_size: usize) ClientError![]u8 { 216 if (required_size <= self.cmd_buf.len) { 217 return &self.cmd_buf; 218 } 219 220 // Need heap allocation for large command 221 if (self.heap_cmd_buf) |buf| { 222 if (buf.len >= required_size) return buf; 223 self.allocator.free(buf); 224 } 225 226 self.heap_cmd_buf = try self.allocator.alloc(u8, required_size); 227 return self.heap_cmd_buf.?; 228 } 229 230 fn readResponse(self: *Self) ClientError!Value { 231 // Ensure we have at least one byte 232 if (self.buf_len == 0) { 233 const n = try self.stream.read(self.read_buf); 234 if (n == 0) return ProtocolError.ConnectionClosed; 235 self.buf_len = n; 236 } 237 238 return self.parseValue(); 239 } 240 241 fn parseValue(self: *Self) ClientError!Value { 242 // Ensure we have data 243 while (self.buf_pos >= self.buf_len) { 244 try self.readMore(); 245 } 246 247 const type_char = self.read_buf[self.buf_pos]; 248 self.buf_pos += 1; 249 250 return switch (type_char) { 251 '+' => .{ .string = try self.readLine() }, 252 '-' => .{ .err = try self.readLine() }, 253 ':' => .{ .integer = try self.readInt() }, 254 '$' => try self.readBulk(), 255 '*' => try self.readArray(), 256 else => ProtocolError.InvalidResponse, 257 }; 258 } 259 260 fn readLine(self: *Self) ClientError![]const u8 { 261 const start = self.buf_pos; 262 while (true) { 263 while (self.buf_pos + 1 < self.buf_len) { 264 if (self.read_buf[self.buf_pos] == '\r' and self.read_buf[self.buf_pos + 1] == '\n') { 265 const line = self.read_buf[start..self.buf_pos]; 266 self.buf_pos += 2; 267 return line; 268 } 269 self.buf_pos += 1; 270 } 271 try self.readMore(); 272 } 273 } 274 275 fn readInt(self: *Self) ClientError!i64 { 276 const line = try self.readLine(); 277 return std.fmt.parseInt(i64, line, 10) catch ProtocolError.InvalidResponse; 278 } 279 280 fn readBulk(self: *Self) ClientError!Value { 281 const len = try self.readInt(); 282 if (len < 0) return .nil; 283 284 const ulen: usize = @intCast(len); 285 286 // Ensure we have all the data 287 while (self.buf_pos + ulen + 2 > self.buf_len) { 288 try self.readMore(); 289 } 290 291 const data = self.read_buf[self.buf_pos..][0..ulen]; 292 self.buf_pos += ulen + 2; 293 return .{ .bulk = data }; 294 } 295 296 fn readArray(self: *Self) ClientError!Value { 297 const len = try self.readInt(); 298 if (len < 0) return .nil; 299 if (len == 0) return .{ .array = &.{} }; 300 301 const ulen: usize = @intCast(len); 302 const values = try self.allocator.alloc(Value, ulen); 303 errdefer self.allocator.free(values); 304 305 for (0..ulen) |i| { 306 values[i] = try self.parseValue(); 307 } 308 309 return .{ .array = values }; 310 } 311 312 fn readMore(self: *Self) ClientError!void { 313 // Grow buffer if needed 314 if (self.buf_len >= self.read_buf.len) { 315 const new_buf = try self.allocator.realloc(self.read_buf, self.read_buf.len * 2); 316 self.read_buf = new_buf; 317 } 318 319 const n = try self.stream.read(self.read_buf[self.buf_len..]); 320 if (n == 0) return ProtocolError.ConnectionClosed; 321 self.buf_len += n; 322 } 323 324 // ======================================================================== 325 // Connection Commands 326 // ======================================================================== 327 328 /// Send PING, returns true if server responds with PONG. 329 pub fn ping(self: *Self) ClientError!bool { 330 const result = try self.sendCommand(&.{"PING"}); 331 return switch (result) { 332 .string => |s| std.mem.eql(u8, s, "PONG"), 333 else => false, 334 }; 335 } 336 337 /// Send ECHO, returns the message back. 338 pub fn echo(self: *Self, message: []const u8) ClientError!?[]const u8 { 339 const result = try self.sendCommand(&.{ "ECHO", message }); 340 return result.asString(); 341 } 342 343 /// Get server info. 344 pub fn info(self: *Self, section: ?[]const u8) ClientError!?[]const u8 { 345 const result = if (section) |s| 346 try self.sendCommand(&.{ "INFO", s }) 347 else 348 try self.sendCommand(&.{"INFO"}); 349 return result.asString(); 350 } 351 352 /// Get number of keys in current database. 353 pub fn dbsize(self: *Self) ClientError!i64 { 354 const result = try self.sendCommand(&.{"DBSIZE"}); 355 return result.asInt() orelse 0; 356 } 357 358 /// Delete all keys in current database (use with caution!). 359 pub fn flushDb(self: *Self) ClientError!void { 360 const result = try self.sendCommand(&.{"FLUSHDB"}); 361 if (result.isError()) return CommandError.RedisError; 362 } 363 364 // ======================================================================== 365 // Command Module Accessors 366 // ======================================================================== 367 368 const strings_mod = @import("commands/strings.zig"); 369 const keys_mod = @import("commands/keys.zig"); 370 const hashes_mod = @import("commands/hashes.zig"); 371 const lists_mod = @import("commands/lists.zig"); 372 const sets_mod = @import("commands/sets.zig"); 373 const sorted_sets_mod = @import("commands/sorted_sets.zig"); 374 const streams_mod = @import("commands/streams.zig"); 375 376 /// Get string commands interface 377 pub fn strings(self: *Self) strings_mod.StringCommands { 378 return strings_mod.StringCommands.init(self); 379 } 380 381 /// Get key commands interface 382 pub fn keys(self: *Self) keys_mod.KeyCommands { 383 return keys_mod.KeyCommands.init(self); 384 } 385 386 /// Get hash commands interface 387 pub fn hashes(self: *Self) hashes_mod.HashCommands { 388 return hashes_mod.HashCommands.init(self); 389 } 390 391 /// Get list commands interface 392 pub fn lists(self: *Self) lists_mod.ListCommands { 393 return lists_mod.ListCommands.init(self); 394 } 395 396 /// Get set commands interface 397 pub fn sets(self: *Self) sets_mod.SetCommands { 398 return sets_mod.SetCommands.init(self); 399 } 400 401 /// Get sorted set commands interface 402 pub fn sortedSets(self: *Self) sorted_sets_mod.SortedSetCommands { 403 return sorted_sets_mod.SortedSetCommands.init(self); 404 } 405 406 /// Get stream commands interface 407 pub fn streams(self: *Self) streams_mod.StreamCommands { 408 return streams_mod.StreamCommands.init(self); 409 } 410 411 // ======================================================================== 412 // Value Management 413 // ======================================================================== 414 415 /// Free a Value that contains allocated memory (arrays). 416 /// Call this when done with array values to prevent leaks. 417 pub fn freeValue(self: *Self, value: Value) void { 418 switch (value) { 419 .array => |arr| { 420 for (arr) |v| { 421 self.freeValue(v); 422 } 423 self.allocator.free(arr); 424 }, 425 else => {}, 426 } 427 } 428}; 429 430// ============================================================================ 431// Tests 432// ============================================================================ 433 434test "Client struct size is reasonable" { 435 // Ensure the client doesn't grow unexpectedly 436 try std.testing.expect(@sizeOf(Client) < 20000); 437}