//! Redis client connection and command execution //! //! This module provides the core `Client` type which manages: //! - TCP connection to Redis //! - Buffer management for reads/writes //! - RESP protocol serialization/deserialization //! - Authentication and database selection //! //! ## Memory Model //! //! The client owns a read buffer. Slices returned from commands reference //! this buffer and are valid until the next command. This avoids allocation //! for most use cases. //! //! For data that must outlive the next command, copy it: //! ```zig //! const ephemeral = try client.get("key"); //! const owned = try allocator.dupe(u8, ephemeral orelse return); //! defer allocator.free(owned); //! ``` //! //! ## Thread Safety //! //! A single `Client` instance is NOT thread-safe. For concurrent access: //! - Create one client per thread //! - Use external synchronization (mutex) //! - Use a connection pool (not provided here) const std = @import("std"); const net = std.net; const Allocator = std.mem.Allocator; const resp = @import("resp.zig"); pub const Value = resp.Value; pub const ConnectionError = resp.ConnectionError; pub const ProtocolError = resp.ProtocolError; pub const CommandError = resp.CommandError; pub const ClientError = resp.ClientError; /// Redis client with connection and buffer management. /// /// The client is designed for simplicity and efficiency: /// - Single TCP connection (no pipelining yet) /// - Owned read buffer (grows as needed) /// - Stack-allocated command buffer for small commands pub const Client = struct { const Self = @This(); stream: net.Stream, allocator: Allocator, read_buf: []u8, buf_len: usize = 0, buf_pos: usize = 0, /// Stack buffer for small commands (16KB covers most cases) cmd_buf: [16384]u8 = undefined, /// Heap buffer for large commands (allocated on demand) heap_cmd_buf: ?[]u8 = null, const DEFAULT_READ_BUF_SIZE = 65536; // ======================================================================== // Configuration // ======================================================================== /// Connection configuration pub const Config = struct { host: []const u8 = "localhost", port: u16 = 6379, username: []const u8 = "default", password: []const u8 = "", db: u8 = 0, /// Read buffer size (default 64KB) read_buffer_size: usize = DEFAULT_READ_BUF_SIZE, }; // ======================================================================== // Connection // ======================================================================== /// Connect to Redis with minimal configuration. /// Use `connectWithConfig` for full control. pub fn connect(allocator: Allocator, host: []const u8, port: u16) ClientError!Self { return connectWithConfig(allocator, .{ .host = host, .port = port }); } /// Connect to Redis with full configuration. pub fn connectWithConfig(allocator: Allocator, config: Config) ClientError!Self { // Resolve address: try numeric IP first, then DNS const address = net.Address.parseIp(config.host, config.port) catch blk: { const list = net.getAddressList(allocator, config.host, config.port) catch { return ConnectionError.AddressResolutionFailed; }; defer list.deinit(); if (list.addrs.len == 0) return ConnectionError.AddressResolutionFailed; break :blk list.addrs[0]; }; const tcp_stream = net.tcpConnectToAddress(address) catch { return ConnectionError.ConnectionRefused; }; errdefer tcp_stream.close(); const read_buf = try allocator.alloc(u8, config.read_buffer_size); errdefer allocator.free(read_buf); var self = Self{ .stream = tcp_stream, .allocator = allocator, .read_buf = read_buf, }; // Authenticate if password provided if (config.password.len > 0) { const result = try self.sendCommand(&.{ "AUTH", config.username, config.password }); if (result.isError()) return ConnectionError.AuthenticationFailed; } // Select database if non-default if (config.db != 0) { var db_buf: [4]u8 = undefined; const db_str = std.fmt.bufPrint(&db_buf, "{d}", .{config.db}) catch unreachable; const result = try self.sendCommand(&.{ "SELECT", db_str }); if (result.isError()) return ConnectionError.InvalidDatabase; } return self; } /// Close the connection and free resources. pub fn close(self: *Self) void { if (self.heap_cmd_buf) |buf| { self.allocator.free(buf); } self.allocator.free(self.read_buf); self.stream.close(); } // ======================================================================== // Command Execution (Core) // ======================================================================== /// Send a command and receive the response. /// This is the foundation for all other commands. /// /// Arguments must be string slices. For integer arguments, /// format them to a buffer first: /// ```zig /// var buf: [16]u8 = undefined; /// const n_str = std.fmt.bufPrint(&buf, "{d}", .{n}) catch unreachable; /// try client.sendCommand(&.{"EXPIRE", key, n_str}); /// ``` pub fn sendCommand(self: *Self, args: []const []const u8) ClientError!Value { // Calculate required buffer size var total_size: usize = 16; // header overhead for (args) |arg| { total_size += 16 + arg.len; // $\r\n\r\n } // Get command buffer (stack or heap) const cmd_buf = try self.getCommandBuffer(total_size); // Build RESP array var pos: usize = 0; // Array header: *\r\n pos += (std.fmt.bufPrint(cmd_buf[pos..], "*{d}\r\n", .{args.len}) catch { return ProtocolError.BufferOverflow; }).len; // Each argument as bulk string: $\r\n\r\n for (args) |arg| { pos += (std.fmt.bufPrint(cmd_buf[pos..], "${d}\r\n", .{arg.len}) catch { return ProtocolError.BufferOverflow; }).len; @memcpy(cmd_buf[pos..][0..arg.len], arg); pos += arg.len; cmd_buf[pos] = '\r'; cmd_buf[pos + 1] = '\n'; pos += 2; } // Send command _ = try self.stream.write(cmd_buf[0..pos]); // Reset buffer state for new response self.buf_len = 0; self.buf_pos = 0; // Read and parse response return self.readResponse(); } /// Send a command with a pre-built argument array. /// Useful when the argument count is dynamic. pub fn sendCommandDynamic(self: *Self, comptime prefix: []const []const u8, suffix: []const []const u8) ClientError!Value { const total_len = prefix.len + suffix.len; var args = try self.allocator.alloc([]const u8, total_len); defer self.allocator.free(args); inline for (prefix, 0..) |p, i| { args[i] = p; } @memcpy(args[prefix.len..], suffix); return self.sendCommand(args); } // ======================================================================== // Buffer Management // ======================================================================== fn getCommandBuffer(self: *Self, required_size: usize) ClientError![]u8 { if (required_size <= self.cmd_buf.len) { return &self.cmd_buf; } // Need heap allocation for large command if (self.heap_cmd_buf) |buf| { if (buf.len >= required_size) return buf; self.allocator.free(buf); } self.heap_cmd_buf = try self.allocator.alloc(u8, required_size); return self.heap_cmd_buf.?; } fn readResponse(self: *Self) ClientError!Value { // Ensure we have at least one byte if (self.buf_len == 0) { const n = try self.stream.read(self.read_buf); if (n == 0) return ProtocolError.ConnectionClosed; self.buf_len = n; } return self.parseValue(); } fn parseValue(self: *Self) ClientError!Value { // Ensure we have data while (self.buf_pos >= self.buf_len) { try self.readMore(); } const type_char = self.read_buf[self.buf_pos]; self.buf_pos += 1; return switch (type_char) { '+' => .{ .string = try self.readLine() }, '-' => .{ .err = try self.readLine() }, ':' => .{ .integer = try self.readInt() }, '$' => try self.readBulk(), '*' => try self.readArray(), else => ProtocolError.InvalidResponse, }; } fn readLine(self: *Self) ClientError![]const u8 { const start = self.buf_pos; while (true) { while (self.buf_pos + 1 < self.buf_len) { if (self.read_buf[self.buf_pos] == '\r' and self.read_buf[self.buf_pos + 1] == '\n') { const line = self.read_buf[start..self.buf_pos]; self.buf_pos += 2; return line; } self.buf_pos += 1; } try self.readMore(); } } fn readInt(self: *Self) ClientError!i64 { const line = try self.readLine(); return std.fmt.parseInt(i64, line, 10) catch ProtocolError.InvalidResponse; } fn readBulk(self: *Self) ClientError!Value { const len = try self.readInt(); if (len < 0) return .nil; const ulen: usize = @intCast(len); // Ensure we have all the data while (self.buf_pos + ulen + 2 > self.buf_len) { try self.readMore(); } const data = self.read_buf[self.buf_pos..][0..ulen]; self.buf_pos += ulen + 2; return .{ .bulk = data }; } fn readArray(self: *Self) ClientError!Value { const len = try self.readInt(); if (len < 0) return .nil; if (len == 0) return .{ .array = &.{} }; const ulen: usize = @intCast(len); const values = try self.allocator.alloc(Value, ulen); errdefer self.allocator.free(values); for (0..ulen) |i| { values[i] = try self.parseValue(); } return .{ .array = values }; } fn readMore(self: *Self) ClientError!void { // Grow buffer if needed if (self.buf_len >= self.read_buf.len) { const new_buf = try self.allocator.realloc(self.read_buf, self.read_buf.len * 2); self.read_buf = new_buf; } const n = try self.stream.read(self.read_buf[self.buf_len..]); if (n == 0) return ProtocolError.ConnectionClosed; self.buf_len += n; } // ======================================================================== // Connection Commands // ======================================================================== /// Send PING, returns true if server responds with PONG. pub fn ping(self: *Self) ClientError!bool { const result = try self.sendCommand(&.{"PING"}); return switch (result) { .string => |s| std.mem.eql(u8, s, "PONG"), else => false, }; } /// Send ECHO, returns the message back. pub fn echo(self: *Self, message: []const u8) ClientError!?[]const u8 { const result = try self.sendCommand(&.{ "ECHO", message }); return result.asString(); } /// Get server info. pub fn info(self: *Self, section: ?[]const u8) ClientError!?[]const u8 { const result = if (section) |s| try self.sendCommand(&.{ "INFO", s }) else try self.sendCommand(&.{"INFO"}); return result.asString(); } /// Get number of keys in current database. pub fn dbsize(self: *Self) ClientError!i64 { const result = try self.sendCommand(&.{"DBSIZE"}); return result.asInt() orelse 0; } /// Delete all keys in current database (use with caution!). pub fn flushDb(self: *Self) ClientError!void { const result = try self.sendCommand(&.{"FLUSHDB"}); if (result.isError()) return CommandError.RedisError; } // ======================================================================== // Command Module Accessors // ======================================================================== const strings_mod = @import("commands/strings.zig"); const keys_mod = @import("commands/keys.zig"); const hashes_mod = @import("commands/hashes.zig"); const lists_mod = @import("commands/lists.zig"); const sets_mod = @import("commands/sets.zig"); const sorted_sets_mod = @import("commands/sorted_sets.zig"); const streams_mod = @import("commands/streams.zig"); /// Get string commands interface pub fn strings(self: *Self) strings_mod.StringCommands { return strings_mod.StringCommands.init(self); } /// Get key commands interface pub fn keys(self: *Self) keys_mod.KeyCommands { return keys_mod.KeyCommands.init(self); } /// Get hash commands interface pub fn hashes(self: *Self) hashes_mod.HashCommands { return hashes_mod.HashCommands.init(self); } /// Get list commands interface pub fn lists(self: *Self) lists_mod.ListCommands { return lists_mod.ListCommands.init(self); } /// Get set commands interface pub fn sets(self: *Self) sets_mod.SetCommands { return sets_mod.SetCommands.init(self); } /// Get sorted set commands interface pub fn sortedSets(self: *Self) sorted_sets_mod.SortedSetCommands { return sorted_sets_mod.SortedSetCommands.init(self); } /// Get stream commands interface pub fn streams(self: *Self) streams_mod.StreamCommands { return streams_mod.StreamCommands.init(self); } // ======================================================================== // Value Management // ======================================================================== /// Free a Value that contains allocated memory (arrays). /// Call this when done with array values to prevent leaks. pub fn freeValue(self: *Self, value: Value) void { switch (value) { .array => |arr| { for (arr) |v| { self.freeValue(v); } self.allocator.free(arr); }, else => {}, } } }; // ============================================================================ // Tests // ============================================================================ test "Client struct size is reasonable" { // Ensure the client doesn't grow unexpectedly try std.testing.expect(@sizeOf(Client) < 20000); }