this repo has no description
1//! Redis client connection and command execution
2//!
3//! This module provides the core `Client` type which manages:
4//! - TCP connection to Redis
5//! - Buffer management for reads/writes
6//! - RESP protocol serialization/deserialization
7//! - Authentication and database selection
8//!
9//! ## Memory Model
10//!
11//! The client owns a read buffer. Slices returned from commands reference
12//! this buffer and are valid until the next command. This avoids allocation
13//! for most use cases.
14//!
15//! For data that must outlive the next command, copy it:
16//! ```zig
17//! const ephemeral = try client.get("key");
18//! const owned = try allocator.dupe(u8, ephemeral orelse return);
19//! defer allocator.free(owned);
20//! ```
21//!
22//! ## Thread Safety
23//!
24//! A single `Client` instance is NOT thread-safe. For concurrent access:
25//! - Create one client per thread
26//! - Use external synchronization (mutex)
27//! - Use a connection pool (not provided here)
28
29const std = @import("std");
30const net = std.net;
31const Allocator = std.mem.Allocator;
32
33const resp = @import("resp.zig");
34pub const Value = resp.Value;
35pub const ConnectionError = resp.ConnectionError;
36pub const ProtocolError = resp.ProtocolError;
37pub const CommandError = resp.CommandError;
38pub const ClientError = resp.ClientError;
39
40/// Redis client with connection and buffer management.
41///
42/// The client is designed for simplicity and efficiency:
43/// - Single TCP connection (no pipelining yet)
44/// - Owned read buffer (grows as needed)
45/// - Stack-allocated command buffer for small commands
46pub const Client = struct {
47 const Self = @This();
48
49 stream: net.Stream,
50 allocator: Allocator,
51 read_buf: []u8,
52 buf_len: usize = 0,
53 buf_pos: usize = 0,
54
55 /// Stack buffer for small commands (16KB covers most cases)
56 cmd_buf: [16384]u8 = undefined,
57
58 /// Heap buffer for large commands (allocated on demand)
59 heap_cmd_buf: ?[]u8 = null,
60
61 const DEFAULT_READ_BUF_SIZE = 65536;
62
63 // ========================================================================
64 // Configuration
65 // ========================================================================
66
67 /// Connection configuration
68 pub const Config = struct {
69 host: []const u8 = "localhost",
70 port: u16 = 6379,
71 username: []const u8 = "default",
72 password: []const u8 = "",
73 db: u8 = 0,
74 /// Read buffer size (default 64KB)
75 read_buffer_size: usize = DEFAULT_READ_BUF_SIZE,
76 };
77
78 // ========================================================================
79 // Connection
80 // ========================================================================
81
82 /// Connect to Redis with minimal configuration.
83 /// Use `connectWithConfig` for full control.
84 pub fn connect(allocator: Allocator, host: []const u8, port: u16) ClientError!Self {
85 return connectWithConfig(allocator, .{ .host = host, .port = port });
86 }
87
88 /// Connect to Redis with full configuration.
89 pub fn connectWithConfig(allocator: Allocator, config: Config) ClientError!Self {
90 // Resolve address: try numeric IP first, then DNS
91 const address = net.Address.parseIp(config.host, config.port) catch blk: {
92 const list = net.getAddressList(allocator, config.host, config.port) catch {
93 return ConnectionError.AddressResolutionFailed;
94 };
95 defer list.deinit();
96 if (list.addrs.len == 0) return ConnectionError.AddressResolutionFailed;
97 break :blk list.addrs[0];
98 };
99
100 const tcp_stream = net.tcpConnectToAddress(address) catch {
101 return ConnectionError.ConnectionRefused;
102 };
103 errdefer tcp_stream.close();
104
105 const read_buf = try allocator.alloc(u8, config.read_buffer_size);
106 errdefer allocator.free(read_buf);
107
108 var self = Self{
109 .stream = tcp_stream,
110 .allocator = allocator,
111 .read_buf = read_buf,
112 };
113
114 // Authenticate if password provided
115 if (config.password.len > 0) {
116 const result = try self.sendCommand(&.{ "AUTH", config.username, config.password });
117 if (result.isError()) return ConnectionError.AuthenticationFailed;
118 }
119
120 // Select database if non-default
121 if (config.db != 0) {
122 var db_buf: [4]u8 = undefined;
123 const db_str = std.fmt.bufPrint(&db_buf, "{d}", .{config.db}) catch unreachable;
124 const result = try self.sendCommand(&.{ "SELECT", db_str });
125 if (result.isError()) return ConnectionError.InvalidDatabase;
126 }
127
128 return self;
129 }
130
131 /// Close the connection and free resources.
132 pub fn close(self: *Self) void {
133 if (self.heap_cmd_buf) |buf| {
134 self.allocator.free(buf);
135 }
136 self.allocator.free(self.read_buf);
137 self.stream.close();
138 }
139
140 // ========================================================================
141 // Command Execution (Core)
142 // ========================================================================
143
144 /// Send a command and receive the response.
145 /// This is the foundation for all other commands.
146 ///
147 /// Arguments must be string slices. For integer arguments,
148 /// format them to a buffer first:
149 /// ```zig
150 /// var buf: [16]u8 = undefined;
151 /// const n_str = std.fmt.bufPrint(&buf, "{d}", .{n}) catch unreachable;
152 /// try client.sendCommand(&.{"EXPIRE", key, n_str});
153 /// ```
154 pub fn sendCommand(self: *Self, args: []const []const u8) ClientError!Value {
155 // Calculate required buffer size
156 var total_size: usize = 16; // header overhead
157 for (args) |arg| {
158 total_size += 16 + arg.len; // $<len>\r\n<data>\r\n
159 }
160
161 // Get command buffer (stack or heap)
162 const cmd_buf = try self.getCommandBuffer(total_size);
163
164 // Build RESP array
165 var pos: usize = 0;
166
167 // Array header: *<count>\r\n
168 pos += (std.fmt.bufPrint(cmd_buf[pos..], "*{d}\r\n", .{args.len}) catch {
169 return ProtocolError.BufferOverflow;
170 }).len;
171
172 // Each argument as bulk string: $<len>\r\n<data>\r\n
173 for (args) |arg| {
174 pos += (std.fmt.bufPrint(cmd_buf[pos..], "${d}\r\n", .{arg.len}) catch {
175 return ProtocolError.BufferOverflow;
176 }).len;
177
178 @memcpy(cmd_buf[pos..][0..arg.len], arg);
179 pos += arg.len;
180 cmd_buf[pos] = '\r';
181 cmd_buf[pos + 1] = '\n';
182 pos += 2;
183 }
184
185 // Send command
186 _ = try self.stream.write(cmd_buf[0..pos]);
187
188 // Reset buffer state for new response
189 self.buf_len = 0;
190 self.buf_pos = 0;
191
192 // Read and parse response
193 return self.readResponse();
194 }
195
196 /// Send a command with a pre-built argument array.
197 /// Useful when the argument count is dynamic.
198 pub fn sendCommandDynamic(self: *Self, comptime prefix: []const []const u8, suffix: []const []const u8) ClientError!Value {
199 const total_len = prefix.len + suffix.len;
200 var args = try self.allocator.alloc([]const u8, total_len);
201 defer self.allocator.free(args);
202
203 inline for (prefix, 0..) |p, i| {
204 args[i] = p;
205 }
206 @memcpy(args[prefix.len..], suffix);
207
208 return self.sendCommand(args);
209 }
210
211 // ========================================================================
212 // Buffer Management
213 // ========================================================================
214
215 fn getCommandBuffer(self: *Self, required_size: usize) ClientError![]u8 {
216 if (required_size <= self.cmd_buf.len) {
217 return &self.cmd_buf;
218 }
219
220 // Need heap allocation for large command
221 if (self.heap_cmd_buf) |buf| {
222 if (buf.len >= required_size) return buf;
223 self.allocator.free(buf);
224 }
225
226 self.heap_cmd_buf = try self.allocator.alloc(u8, required_size);
227 return self.heap_cmd_buf.?;
228 }
229
230 fn readResponse(self: *Self) ClientError!Value {
231 // Ensure we have at least one byte
232 if (self.buf_len == 0) {
233 const n = try self.stream.read(self.read_buf);
234 if (n == 0) return ProtocolError.ConnectionClosed;
235 self.buf_len = n;
236 }
237
238 return self.parseValue();
239 }
240
241 fn parseValue(self: *Self) ClientError!Value {
242 // Ensure we have data
243 while (self.buf_pos >= self.buf_len) {
244 try self.readMore();
245 }
246
247 const type_char = self.read_buf[self.buf_pos];
248 self.buf_pos += 1;
249
250 return switch (type_char) {
251 '+' => .{ .string = try self.readLine() },
252 '-' => .{ .err = try self.readLine() },
253 ':' => .{ .integer = try self.readInt() },
254 '$' => try self.readBulk(),
255 '*' => try self.readArray(),
256 else => ProtocolError.InvalidResponse,
257 };
258 }
259
260 fn readLine(self: *Self) ClientError![]const u8 {
261 const start = self.buf_pos;
262 while (true) {
263 while (self.buf_pos + 1 < self.buf_len) {
264 if (self.read_buf[self.buf_pos] == '\r' and self.read_buf[self.buf_pos + 1] == '\n') {
265 const line = self.read_buf[start..self.buf_pos];
266 self.buf_pos += 2;
267 return line;
268 }
269 self.buf_pos += 1;
270 }
271 try self.readMore();
272 }
273 }
274
275 fn readInt(self: *Self) ClientError!i64 {
276 const line = try self.readLine();
277 return std.fmt.parseInt(i64, line, 10) catch ProtocolError.InvalidResponse;
278 }
279
280 fn readBulk(self: *Self) ClientError!Value {
281 const len = try self.readInt();
282 if (len < 0) return .nil;
283
284 const ulen: usize = @intCast(len);
285
286 // Ensure we have all the data
287 while (self.buf_pos + ulen + 2 > self.buf_len) {
288 try self.readMore();
289 }
290
291 const data = self.read_buf[self.buf_pos..][0..ulen];
292 self.buf_pos += ulen + 2;
293 return .{ .bulk = data };
294 }
295
296 fn readArray(self: *Self) ClientError!Value {
297 const len = try self.readInt();
298 if (len < 0) return .nil;
299 if (len == 0) return .{ .array = &.{} };
300
301 const ulen: usize = @intCast(len);
302 const values = try self.allocator.alloc(Value, ulen);
303 errdefer self.allocator.free(values);
304
305 for (0..ulen) |i| {
306 values[i] = try self.parseValue();
307 }
308
309 return .{ .array = values };
310 }
311
312 fn readMore(self: *Self) ClientError!void {
313 // Grow buffer if needed
314 if (self.buf_len >= self.read_buf.len) {
315 const new_buf = try self.allocator.realloc(self.read_buf, self.read_buf.len * 2);
316 self.read_buf = new_buf;
317 }
318
319 const n = try self.stream.read(self.read_buf[self.buf_len..]);
320 if (n == 0) return ProtocolError.ConnectionClosed;
321 self.buf_len += n;
322 }
323
324 // ========================================================================
325 // Connection Commands
326 // ========================================================================
327
328 /// Send PING, returns true if server responds with PONG.
329 pub fn ping(self: *Self) ClientError!bool {
330 const result = try self.sendCommand(&.{"PING"});
331 return switch (result) {
332 .string => |s| std.mem.eql(u8, s, "PONG"),
333 else => false,
334 };
335 }
336
337 /// Send ECHO, returns the message back.
338 pub fn echo(self: *Self, message: []const u8) ClientError!?[]const u8 {
339 const result = try self.sendCommand(&.{ "ECHO", message });
340 return result.asString();
341 }
342
343 /// Get server info.
344 pub fn info(self: *Self, section: ?[]const u8) ClientError!?[]const u8 {
345 const result = if (section) |s|
346 try self.sendCommand(&.{ "INFO", s })
347 else
348 try self.sendCommand(&.{"INFO"});
349 return result.asString();
350 }
351
352 /// Get number of keys in current database.
353 pub fn dbsize(self: *Self) ClientError!i64 {
354 const result = try self.sendCommand(&.{"DBSIZE"});
355 return result.asInt() orelse 0;
356 }
357
358 /// Delete all keys in current database (use with caution!).
359 pub fn flushDb(self: *Self) ClientError!void {
360 const result = try self.sendCommand(&.{"FLUSHDB"});
361 if (result.isError()) return CommandError.RedisError;
362 }
363
364 // ========================================================================
365 // Command Module Accessors
366 // ========================================================================
367
368 const strings_mod = @import("commands/strings.zig");
369 const keys_mod = @import("commands/keys.zig");
370 const hashes_mod = @import("commands/hashes.zig");
371 const lists_mod = @import("commands/lists.zig");
372 const sets_mod = @import("commands/sets.zig");
373 const sorted_sets_mod = @import("commands/sorted_sets.zig");
374 const streams_mod = @import("commands/streams.zig");
375
376 /// Get string commands interface
377 pub fn strings(self: *Self) strings_mod.StringCommands {
378 return strings_mod.StringCommands.init(self);
379 }
380
381 /// Get key commands interface
382 pub fn keys(self: *Self) keys_mod.KeyCommands {
383 return keys_mod.KeyCommands.init(self);
384 }
385
386 /// Get hash commands interface
387 pub fn hashes(self: *Self) hashes_mod.HashCommands {
388 return hashes_mod.HashCommands.init(self);
389 }
390
391 /// Get list commands interface
392 pub fn lists(self: *Self) lists_mod.ListCommands {
393 return lists_mod.ListCommands.init(self);
394 }
395
396 /// Get set commands interface
397 pub fn sets(self: *Self) sets_mod.SetCommands {
398 return sets_mod.SetCommands.init(self);
399 }
400
401 /// Get sorted set commands interface
402 pub fn sortedSets(self: *Self) sorted_sets_mod.SortedSetCommands {
403 return sorted_sets_mod.SortedSetCommands.init(self);
404 }
405
406 /// Get stream commands interface
407 pub fn streams(self: *Self) streams_mod.StreamCommands {
408 return streams_mod.StreamCommands.init(self);
409 }
410
411 // ========================================================================
412 // Value Management
413 // ========================================================================
414
415 /// Free a Value that contains allocated memory (arrays).
416 /// Call this when done with array values to prevent leaks.
417 pub fn freeValue(self: *Self, value: Value) void {
418 switch (value) {
419 .array => |arr| {
420 for (arr) |v| {
421 self.freeValue(v);
422 }
423 self.allocator.free(arr);
424 },
425 else => {},
426 }
427 }
428};
429
430// ============================================================================
431// Tests
432// ============================================================================
433
434test "Client struct size is reasonable" {
435 // Ensure the client doesn't grow unexpectedly
436 try std.testing.expect(@sizeOf(Client) < 20000);
437}