//! Redis stream commands //! //! Streams are append-only log data structures, ideal for: //! - Event sourcing //! - Message queues with consumer groups //! - Activity feeds //! - Time series data //! //! ## Key Concepts //! //! - **Entry ID**: Unique identifier like "1234567890123-0" (timestamp-sequence) //! - **Consumer Group**: Named group of consumers that share message processing //! - **Consumer**: Individual worker within a group //! - **Pending Entry List (PEL)**: Messages delivered but not acknowledged //! //! ## Fan-Out Pattern //! //! Multiple consumer groups on the same stream each receive ALL messages: //! ``` //! Stream: events //! └── Consumer Group: "event-persister" (gets all messages) //! └── Consumer Group: "ws-broadcaster" (also gets all messages) //! └── Consumer Group: "analytics" (also gets all messages) //! ``` //! //! ## Examples //! //! ```zig //! // Publish event //! const id = try client.streams().xadd("events", .auto, &.{ //! .{"type", "user.login"}, //! .{"user_id", "1234"}, //! }); //! //! // Create consumer group //! try client.streams().xgroupCreate("events", "workers", "0"); //! //! // Read as consumer (blocks until message or timeout) //! if (try client.streams().xreadgroup("workers", "worker-1", "events", 1000, 1, ">")) |entry| { //! // Process entry //! try client.streams().xack("events", "workers", &.{entry.id}); //! } //! //! // Claim abandoned messages //! const claimed = try client.streams().xautoclaim("events", "workers", "worker-1", 30000, "0-0", 10); //! ``` const std = @import("std"); const Client = @import("../client.zig").Client; const Value = @import("../resp.zig").Value; const CommandError = @import("../resp.zig").CommandError; const ClientError = @import("../resp.zig").ClientError; /// A field-value pair in a stream entry pub const StreamField = struct { field: []const u8, value: []const u8, }; /// A single entry from a Redis stream pub const StreamEntry = struct { /// Stream entry ID (e.g., "1234567890123-0") id: []const u8, /// Field-value pairs in this entry fields: []const StreamField, /// Get a field value by name pub fn get(self: StreamEntry, field: []const u8) ?[]const u8 { for (self.fields) |f| { if (std.mem.eql(u8, f.field, field)) return f.value; } return null; } }; /// Result from XAUTOCLAIM command pub const AutoclaimResult = struct { /// Next stream ID to use for subsequent XAUTOCLAIM calls next_id: []const u8, /// Claimed entries entries: []const StreamEntry, /// IDs that no longer exist in the stream (Redis 7+) deleted_ids: []const []const u8, }; /// Result from XPENDING summary pub const PendingSummary = struct { count: i64, min_id: ?[]const u8, max_id: ?[]const u8, consumers: []const struct { name: []const u8, count: i64 }, }; /// Stream ID for XADD pub const StreamId = union(enum) { /// Let Redis auto-generate: "*" auto, /// Specific ID explicit: []const u8, /// Minimum ID greater than all existing: ">" (for new entries only) new_entries, fn toArg(self: StreamId) []const u8 { return switch (self) { .auto => "*", .explicit => |id| id, .new_entries => ">", }; } }; /// Stream command implementations. pub const StreamCommands = struct { client: *Client, pub fn init(client: *Client) StreamCommands { return .{ .client = client }; } // ======================================================================== // Publishing // ======================================================================== /// XADD stream ID field value [field value ...] - append entry to stream /// Returns the entry ID assigned by Redis. pub fn xadd(self: *StreamCommands, key: []const u8, id: StreamId, fields: []const [2][]const u8) ClientError![]const u8 { const arg_count = 3 + fields.len * 2; var args = try self.client.allocator.alloc([]const u8, arg_count); defer self.client.allocator.free(args); args[0] = "XADD"; args[1] = key; args[2] = id.toArg(); for (fields, 0..) |field, i| { args[3 + i * 2] = field[0]; args[3 + i * 2 + 1] = field[1]; } const result = try self.client.sendCommand(args); return switch (result) { .bulk => |b| b orelse return CommandError.RedisError, .err => return CommandError.RedisError, else => return CommandError.RedisError, }; } /// XADD with MAXLEN option for capped streams pub fn xaddCapped(self: *StreamCommands, key: []const u8, maxlen: usize, approximate: bool, fields: []const [2][]const u8) ClientError![]const u8 { const arg_count = 5 + fields.len * 2 + @as(usize, if (approximate) 1 else 0); var args = try self.client.allocator.alloc([]const u8, arg_count); defer self.client.allocator.free(args); var maxlen_buf: [24]u8 = undefined; args[0] = "XADD"; args[1] = key; args[2] = "MAXLEN"; var idx: usize = 3; if (approximate) { args[idx] = "~"; idx += 1; } args[idx] = std.fmt.bufPrint(&maxlen_buf, "{d}", .{maxlen}) catch unreachable; idx += 1; args[idx] = "*"; idx += 1; for (fields, 0..) |field, i| { args[idx + i * 2] = field[0]; args[idx + i * 2 + 1] = field[1]; } const result = try self.client.sendCommand(args[0 .. idx + fields.len * 2]); return switch (result) { .bulk => |b| b orelse return CommandError.RedisError, .err => return CommandError.RedisError, else => return CommandError.RedisError, }; } // ======================================================================== // Consumer Groups // ======================================================================== /// XGROUP CREATE stream group id [MKSTREAM] - create consumer group /// start_id: "0" to read from beginning, "$" for new messages only pub fn xgroupCreate(self: *StreamCommands, key: []const u8, group: []const u8, start_id: []const u8) ClientError!void { const result = try self.client.sendCommand(&.{ "XGROUP", "CREATE", key, group, start_id, "MKSTREAM" }); if (result.isError()) { // BUSYGROUP means group already exists - that's fine if (result.asString()) |msg| { if (std.mem.indexOf(u8, msg, "BUSYGROUP") != null) return; } return CommandError.RedisError; } } /// XGROUP DESTROY stream group - delete consumer group pub fn xgroupDestroy(self: *StreamCommands, key: []const u8, group: []const u8) ClientError!bool { const result = try self.client.sendCommand(&.{ "XGROUP", "DESTROY", key, group }); return switch (result) { .integer => |i| i == 1, .err => { // NOGROUP means doesn't exist - fine for cleanup if (result.asString()) |msg| { if (std.mem.indexOf(u8, msg, "NOGROUP") != null) return false; } return CommandError.RedisError; }, else => false, }; } /// XGROUP CREATECONSUMER stream group consumer - create consumer in group pub fn xgroupCreateConsumer(self: *StreamCommands, key: []const u8, group: []const u8, consumer: []const u8) ClientError!bool { const result = try self.client.sendCommand(&.{ "XGROUP", "CREATECONSUMER", key, group, consumer }); return switch (result) { .integer => |i| i == 1, else => false, }; } /// XGROUP DELCONSUMER stream group consumer - delete consumer from group pub fn xgroupDelConsumer(self: *StreamCommands, key: []const u8, group: []const u8, consumer: []const u8) ClientError!i64 { const result = try self.client.sendCommand(&.{ "XGROUP", "DELCONSUMER", key, group, consumer }); return switch (result) { .integer => |i| i, else => 0, }; } /// XGROUP SETID stream group id - set group's last-delivered-id pub fn xgroupSetId(self: *StreamCommands, key: []const u8, group: []const u8, entry_id: []const u8) ClientError!void { const result = try self.client.sendCommand(&.{ "XGROUP", "SETID", key, group, entry_id }); if (result.isError()) return CommandError.RedisError; } // ======================================================================== // Reading // ======================================================================== /// XREAD [BLOCK ms] [COUNT count] STREAMS key [key ...] id [id ...] /// Non-consumer-group read. Use "$" for new messages only. pub fn xread(self: *StreamCommands, key: []const u8, block_ms: ?u32, count: ?u32, start_id: []const u8) ClientError!?StreamEntry { var args_buf: [12][]const u8 = undefined; var arg_count: usize = 1; args_buf[0] = "XREAD"; var block_buf: [16]u8 = undefined; if (block_ms) |ms| { args_buf[arg_count] = "BLOCK"; args_buf[arg_count + 1] = std.fmt.bufPrint(&block_buf, "{d}", .{ms}) catch unreachable; arg_count += 2; } var count_buf: [16]u8 = undefined; if (count) |c| { args_buf[arg_count] = "COUNT"; args_buf[arg_count + 1] = std.fmt.bufPrint(&count_buf, "{d}", .{c}) catch unreachable; arg_count += 2; } args_buf[arg_count] = "STREAMS"; args_buf[arg_count + 1] = key; args_buf[arg_count + 2] = start_id; arg_count += 3; const result = try self.client.sendCommand(args_buf[0..arg_count]); if (result.isNull()) return null; return self.parseStreamReadResult(result); } /// XREADGROUP GROUP group consumer [BLOCK ms] [COUNT count] STREAMS key id /// Use ">" for new messages, "0" to replay pending pub fn xreadgroup( self: *StreamCommands, group: []const u8, consumer: []const u8, key: []const u8, block_ms: u32, count: u32, start_id: []const u8, ) ClientError!?StreamEntry { var block_buf: [16]u8 = undefined; var count_buf: [16]u8 = undefined; const block_str = std.fmt.bufPrint(&block_buf, "{d}", .{block_ms}) catch unreachable; const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{count}) catch unreachable; const result = try self.client.sendCommand(&.{ "XREADGROUP", "GROUP", group, consumer, "BLOCK", block_str, "COUNT", count_str, "STREAMS", key, start_id, }); if (result.isNull()) return null; return self.parseStreamReadResult(result); } /// XRANGE stream start end [COUNT count] - get entries in ID range pub fn xrange(self: *StreamCommands, key: []const u8, start: []const u8, end: []const u8, count: ?u32) ClientError![]const StreamEntry { var count_buf: [16]u8 = undefined; const result = if (count) |c| blk: { const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{c}) catch unreachable; break :blk try self.client.sendCommand(&.{ "XRANGE", key, start, end, "COUNT", count_str }); } else try self.client.sendCommand(&.{ "XRANGE", key, start, end }); return self.parseEntryArray(result); } /// XREVRANGE stream end start [COUNT count] - get entries in reverse order pub fn xrevrange(self: *StreamCommands, key: []const u8, end: []const u8, start: []const u8, count: ?u32) ClientError![]const StreamEntry { var count_buf: [16]u8 = undefined; const result = if (count) |c| blk: { const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{c}) catch unreachable; break :blk try self.client.sendCommand(&.{ "XREVRANGE", key, end, start, "COUNT", count_str }); } else try self.client.sendCommand(&.{ "XREVRANGE", key, end, start }); return self.parseEntryArray(result); } // ======================================================================== // Acknowledgment // ======================================================================== /// XACK stream group id [id ...] - acknowledge messages /// Returns number of messages acknowledged. pub fn xack(self: *StreamCommands, key: []const u8, group: []const u8, ids: []const []const u8) ClientError!i64 { var args = try self.client.allocator.alloc([]const u8, ids.len + 3); defer self.client.allocator.free(args); args[0] = "XACK"; args[1] = key; args[2] = group; @memcpy(args[3..], ids); const result = try self.client.sendCommand(args); return switch (result) { .integer => |i| i, else => 0, }; } // ======================================================================== // Claiming / Recovery // ======================================================================== /// XAUTOCLAIM stream group consumer min-idle start [COUNT count] /// Automatically claim pending messages that have been idle too long. pub fn xautoclaim( self: *StreamCommands, key: []const u8, group: []const u8, consumer: []const u8, min_idle_ms: u32, start_id: []const u8, count: u32, ) ClientError!?AutoclaimResult { var idle_buf: [16]u8 = undefined; var count_buf: [16]u8 = undefined; const idle_str = std.fmt.bufPrint(&idle_buf, "{d}", .{min_idle_ms}) catch unreachable; const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{count}) catch unreachable; const result = try self.client.sendCommand(&.{ "XAUTOCLAIM", key, group, consumer, idle_str, start_id, "COUNT", count_str, }); if (result.isError()) return CommandError.RedisError; return self.parseAutoclaimResult(result); } /// XCLAIM stream group consumer min-idle-time id [id ...] - claim specific messages pub fn xclaim( self: *StreamCommands, key: []const u8, group: []const u8, consumer: []const u8, min_idle_ms: u32, ids: []const []const u8, ) ClientError![]const StreamEntry { var args = try self.client.allocator.alloc([]const u8, ids.len + 5); defer self.client.allocator.free(args); var idle_buf: [16]u8 = undefined; args[0] = "XCLAIM"; args[1] = key; args[2] = group; args[3] = consumer; args[4] = std.fmt.bufPrint(&idle_buf, "{d}", .{min_idle_ms}) catch unreachable; @memcpy(args[5..], ids); const result = try self.client.sendCommand(args); return self.parseEntryArray(result); } // ======================================================================== // Pending // ======================================================================== /// XPENDING stream group - get pending summary pub fn xpendingSummary(self: *StreamCommands, key: []const u8, group: []const u8) ClientError!PendingSummary { const result = try self.client.sendCommand(&.{ "XPENDING", key, group }); const arr = switch (result) { .array => |a| a, else => return PendingSummary{ .count = 0, .min_id = null, .max_id = null, .consumers = &.{} }, }; if (arr.len < 4) return PendingSummary{ .count = 0, .min_id = null, .max_id = null, .consumers = &.{} }; const count = arr[0].asInt() orelse 0; const min_id = arr[1].asString(); const max_id = arr[2].asString(); // Parse consumer list const consumer_arr = arr[3].asArray() orelse &.{}; var consumers = try self.client.allocator.alloc(struct { name: []const u8, count: i64 }, consumer_arr.len); for (consumer_arr, 0..) |c, i| { const ca = c.asArray() orelse continue; if (ca.len >= 2) { consumers[i] = .{ .name = ca[0].asString() orelse "", .count = ca[1].asInt() orelse 0, }; } } return PendingSummary{ .count = count, .min_id = min_id, .max_id = max_id, .consumers = consumers, }; } // ======================================================================== // Info // ======================================================================== /// XLEN stream - get stream length pub fn xlen(self: *StreamCommands, key: []const u8) ClientError!i64 { const result = try self.client.sendCommand(&.{ "XLEN", key }); return switch (result) { .integer => |i| i, else => 0, }; } /// XINFO STREAM stream - get stream info pub fn xinfoStream(self: *StreamCommands, key: []const u8) ClientError![]const Value { const result = try self.client.sendCommand(&.{ "XINFO", "STREAM", key }); return switch (result) { .array => |a| a, else => &.{}, }; } /// XINFO GROUPS stream - get consumer groups info pub fn xinfoGroups(self: *StreamCommands, key: []const u8) ClientError![]const Value { const result = try self.client.sendCommand(&.{ "XINFO", "GROUPS", key }); return switch (result) { .array => |a| a, else => &.{}, }; } // ======================================================================== // Trimming // ======================================================================== /// XTRIM stream MAXLEN [~] count - trim stream to max length pub fn xtrim(self: *StreamCommands, key: []const u8, maxlen: usize, approximate: bool) ClientError!i64 { var buf: [24]u8 = undefined; const len_str = std.fmt.bufPrint(&buf, "{d}", .{maxlen}) catch unreachable; const result = if (approximate) try self.client.sendCommand(&.{ "XTRIM", key, "MAXLEN", "~", len_str }) else try self.client.sendCommand(&.{ "XTRIM", key, "MAXLEN", len_str }); return switch (result) { .integer => |i| i, else => 0, }; } /// XDEL stream id [id ...] - delete entries pub fn xdel(self: *StreamCommands, key: []const u8, ids: []const []const u8) ClientError!i64 { var args = try self.client.allocator.alloc([]const u8, ids.len + 2); defer self.client.allocator.free(args); args[0] = "XDEL"; args[1] = key; @memcpy(args[2..], ids); const result = try self.client.sendCommand(args); return switch (result) { .integer => |i| i, else => 0, }; } // ======================================================================== // Parsing Helpers // ======================================================================== fn parseStreamReadResult(self: *StreamCommands, result: Value) ClientError!?StreamEntry { // Format: [[stream_name, [[entry_id, [field, value, ...]], ...]]] const outer = switch (result) { .array => |a| a, else => return null, }; if (outer.len == 0) return null; // First stream: [name, entries] const stream_data = switch (outer[0]) { .array => |a| a, else => return null, }; if (stream_data.len < 2) return null; // Entries array const entries = switch (stream_data[1]) { .array => |a| a, else => return null, }; if (entries.len == 0) return null; return self.parseEntry(entries[0]); } fn parseEntry(self: *StreamCommands, entry_val: Value) ClientError!?StreamEntry { const entry_arr = switch (entry_val) { .array => |a| a, else => return null, }; if (entry_arr.len < 2) return null; // Entry ID const entry_id = switch (entry_arr[0]) { .bulk => |b| b orelse return null, else => return null, }; // Fields array const fields_arr = switch (entry_arr[1]) { .array => |a| a, else => return null, }; // Parse field-value pairs const field_count = fields_arr.len / 2; var fields = try self.client.allocator.alloc(StreamField, field_count); var i: usize = 0; while (i < fields_arr.len - 1) : (i += 2) { const field = switch (fields_arr[i]) { .bulk => |b| b orelse continue, else => continue, }; const value = switch (fields_arr[i + 1]) { .bulk => |b| b orelse continue, else => continue, }; fields[i / 2] = .{ .field = field, .value = value }; } return StreamEntry{ .id = entry_id, .fields = fields[0..field_count], }; } fn parseEntryArray(self: *StreamCommands, result: Value) ClientError![]const StreamEntry { const arr = switch (result) { .array => |a| a, else => return &.{}, }; var entries = try self.client.allocator.alloc(StreamEntry, arr.len); var count: usize = 0; for (arr) |entry_val| { if (try self.parseEntry(entry_val)) |entry| { entries[count] = entry; count += 1; } } return entries[0..count]; } fn parseAutoclaimResult(self: *StreamCommands, result: Value) ClientError!?AutoclaimResult { // Format: [next_id, [[id, [field, value]], ...], [deleted_ids]] const outer = switch (result) { .array => |a| a, else => return null, }; if (outer.len < 2) return null; // Next ID const next_id = switch (outer[0]) { .bulk => |b| b orelse "0-0", else => "0-0", }; // Entries array const entries_arr = switch (outer[1]) { .array => |a| a, else => return AutoclaimResult{ .next_id = next_id, .entries = &.{}, .deleted_ids = &.{} }, }; var entries = try self.client.allocator.alloc(StreamEntry, entries_arr.len); var count: usize = 0; for (entries_arr) |entry_val| { if (try self.parseEntry(entry_val)) |entry| { entries[count] = entry; count += 1; } } // Deleted IDs (Redis 7+) var deleted_ids: []const []const u8 = &.{}; if (outer.len >= 3) { if (outer[2].asArray()) |del_arr| { var ids = try self.client.allocator.alloc([]const u8, del_arr.len); var del_count: usize = 0; for (del_arr) |v| { if (v.asString()) |s| { ids[del_count] = s; del_count += 1; } } deleted_ids = ids[0..del_count]; } } return AutoclaimResult{ .next_id = next_id, .entries = entries[0..count], .deleted_ids = deleted_ids, }; } }; /// Extend Client with stream commands. pub fn streams(client: *Client) StreamCommands { return StreamCommands.init(client); }