this repo has no description
1//! Redis stream commands
2//!
3//! Streams are append-only log data structures, ideal for:
4//! - Event sourcing
5//! - Message queues with consumer groups
6//! - Activity feeds
7//! - Time series data
8//!
9//! ## Key Concepts
10//!
11//! - **Entry ID**: Unique identifier like "1234567890123-0" (timestamp-sequence)
12//! - **Consumer Group**: Named group of consumers that share message processing
13//! - **Consumer**: Individual worker within a group
14//! - **Pending Entry List (PEL)**: Messages delivered but not acknowledged
15//!
16//! ## Fan-Out Pattern
17//!
18//! Multiple consumer groups on the same stream each receive ALL messages:
19//! ```
20//! Stream: events
21//! └── Consumer Group: "event-persister" (gets all messages)
22//! └── Consumer Group: "ws-broadcaster" (also gets all messages)
23//! └── Consumer Group: "analytics" (also gets all messages)
24//! ```
25//!
26//! ## Examples
27//!
28//! ```zig
29//! // Publish event
30//! const id = try client.streams().xadd("events", .auto, &.{
31//! .{"type", "user.login"},
32//! .{"user_id", "1234"},
33//! });
34//!
35//! // Create consumer group
36//! try client.streams().xgroupCreate("events", "workers", "0");
37//!
38//! // Read as consumer (blocks until message or timeout)
39//! if (try client.streams().xreadgroup("workers", "worker-1", "events", 1000, 1, ">")) |entry| {
40//! // Process entry
41//! try client.streams().xack("events", "workers", &.{entry.id});
42//! }
43//!
44//! // Claim abandoned messages
45//! const claimed = try client.streams().xautoclaim("events", "workers", "worker-1", 30000, "0-0", 10);
46//! ```
47
48const std = @import("std");
49const Client = @import("../client.zig").Client;
50const Value = @import("../resp.zig").Value;
51const CommandError = @import("../resp.zig").CommandError;
52const ClientError = @import("../resp.zig").ClientError;
53
54/// A field-value pair in a stream entry
55pub const StreamField = struct {
56 field: []const u8,
57 value: []const u8,
58};
59
60/// A single entry from a Redis stream
61pub const StreamEntry = struct {
62 /// Stream entry ID (e.g., "1234567890123-0")
63 id: []const u8,
64 /// Field-value pairs in this entry
65 fields: []const StreamField,
66
67 /// Get a field value by name
68 pub fn get(self: StreamEntry, field: []const u8) ?[]const u8 {
69 for (self.fields) |f| {
70 if (std.mem.eql(u8, f.field, field)) return f.value;
71 }
72 return null;
73 }
74};
75
76/// Result from XAUTOCLAIM command
77pub const AutoclaimResult = struct {
78 /// Next stream ID to use for subsequent XAUTOCLAIM calls
79 next_id: []const u8,
80 /// Claimed entries
81 entries: []const StreamEntry,
82 /// IDs that no longer exist in the stream (Redis 7+)
83 deleted_ids: []const []const u8,
84};
85
86/// Result from XPENDING summary
87pub const PendingSummary = struct {
88 count: i64,
89 min_id: ?[]const u8,
90 max_id: ?[]const u8,
91 consumers: []const struct { name: []const u8, count: i64 },
92};
93
94/// Stream ID for XADD
95pub const StreamId = union(enum) {
96 /// Let Redis auto-generate: "*"
97 auto,
98 /// Specific ID
99 explicit: []const u8,
100 /// Minimum ID greater than all existing: ">" (for new entries only)
101 new_entries,
102
103 fn toArg(self: StreamId) []const u8 {
104 return switch (self) {
105 .auto => "*",
106 .explicit => |id| id,
107 .new_entries => ">",
108 };
109 }
110};
111
112/// Stream command implementations.
113pub const StreamCommands = struct {
114 client: *Client,
115
116 pub fn init(client: *Client) StreamCommands {
117 return .{ .client = client };
118 }
119
120 // ========================================================================
121 // Publishing
122 // ========================================================================
123
124 /// XADD stream ID field value [field value ...] - append entry to stream
125 /// Returns the entry ID assigned by Redis.
126 pub fn xadd(self: *StreamCommands, key: []const u8, id: StreamId, fields: []const [2][]const u8) ClientError![]const u8 {
127 const arg_count = 3 + fields.len * 2;
128 var args = try self.client.allocator.alloc([]const u8, arg_count);
129 defer self.client.allocator.free(args);
130
131 args[0] = "XADD";
132 args[1] = key;
133 args[2] = id.toArg();
134
135 for (fields, 0..) |field, i| {
136 args[3 + i * 2] = field[0];
137 args[3 + i * 2 + 1] = field[1];
138 }
139
140 const result = try self.client.sendCommand(args);
141 return switch (result) {
142 .bulk => |b| b orelse return CommandError.RedisError,
143 .err => return CommandError.RedisError,
144 else => return CommandError.RedisError,
145 };
146 }
147
148 /// XADD with MAXLEN option for capped streams
149 pub fn xaddCapped(self: *StreamCommands, key: []const u8, maxlen: usize, approximate: bool, fields: []const [2][]const u8) ClientError![]const u8 {
150 const arg_count = 5 + fields.len * 2 + @as(usize, if (approximate) 1 else 0);
151 var args = try self.client.allocator.alloc([]const u8, arg_count);
152 defer self.client.allocator.free(args);
153
154 var maxlen_buf: [24]u8 = undefined;
155
156 args[0] = "XADD";
157 args[1] = key;
158 args[2] = "MAXLEN";
159
160 var idx: usize = 3;
161 if (approximate) {
162 args[idx] = "~";
163 idx += 1;
164 }
165 args[idx] = std.fmt.bufPrint(&maxlen_buf, "{d}", .{maxlen}) catch unreachable;
166 idx += 1;
167 args[idx] = "*";
168 idx += 1;
169
170 for (fields, 0..) |field, i| {
171 args[idx + i * 2] = field[0];
172 args[idx + i * 2 + 1] = field[1];
173 }
174
175 const result = try self.client.sendCommand(args[0 .. idx + fields.len * 2]);
176 return switch (result) {
177 .bulk => |b| b orelse return CommandError.RedisError,
178 .err => return CommandError.RedisError,
179 else => return CommandError.RedisError,
180 };
181 }
182
183 // ========================================================================
184 // Consumer Groups
185 // ========================================================================
186
187 /// XGROUP CREATE stream group id [MKSTREAM] - create consumer group
188 /// start_id: "0" to read from beginning, "$" for new messages only
189 pub fn xgroupCreate(self: *StreamCommands, key: []const u8, group: []const u8, start_id: []const u8) ClientError!void {
190 const result = try self.client.sendCommand(&.{ "XGROUP", "CREATE", key, group, start_id, "MKSTREAM" });
191 if (result.isError()) {
192 // BUSYGROUP means group already exists - that's fine
193 if (result.asString()) |msg| {
194 if (std.mem.indexOf(u8, msg, "BUSYGROUP") != null) return;
195 }
196 return CommandError.RedisError;
197 }
198 }
199
200 /// XGROUP DESTROY stream group - delete consumer group
201 pub fn xgroupDestroy(self: *StreamCommands, key: []const u8, group: []const u8) ClientError!bool {
202 const result = try self.client.sendCommand(&.{ "XGROUP", "DESTROY", key, group });
203 return switch (result) {
204 .integer => |i| i == 1,
205 .err => {
206 // NOGROUP means doesn't exist - fine for cleanup
207 if (result.asString()) |msg| {
208 if (std.mem.indexOf(u8, msg, "NOGROUP") != null) return false;
209 }
210 return CommandError.RedisError;
211 },
212 else => false,
213 };
214 }
215
216 /// XGROUP CREATECONSUMER stream group consumer - create consumer in group
217 pub fn xgroupCreateConsumer(self: *StreamCommands, key: []const u8, group: []const u8, consumer: []const u8) ClientError!bool {
218 const result = try self.client.sendCommand(&.{ "XGROUP", "CREATECONSUMER", key, group, consumer });
219 return switch (result) {
220 .integer => |i| i == 1,
221 else => false,
222 };
223 }
224
225 /// XGROUP DELCONSUMER stream group consumer - delete consumer from group
226 pub fn xgroupDelConsumer(self: *StreamCommands, key: []const u8, group: []const u8, consumer: []const u8) ClientError!i64 {
227 const result = try self.client.sendCommand(&.{ "XGROUP", "DELCONSUMER", key, group, consumer });
228 return switch (result) {
229 .integer => |i| i,
230 else => 0,
231 };
232 }
233
234 /// XGROUP SETID stream group id - set group's last-delivered-id
235 pub fn xgroupSetId(self: *StreamCommands, key: []const u8, group: []const u8, entry_id: []const u8) ClientError!void {
236 const result = try self.client.sendCommand(&.{ "XGROUP", "SETID", key, group, entry_id });
237 if (result.isError()) return CommandError.RedisError;
238 }
239
240 // ========================================================================
241 // Reading
242 // ========================================================================
243
244 /// XREAD [BLOCK ms] [COUNT count] STREAMS key [key ...] id [id ...]
245 /// Non-consumer-group read. Use "$" for new messages only.
246 pub fn xread(self: *StreamCommands, key: []const u8, block_ms: ?u32, count: ?u32, start_id: []const u8) ClientError!?StreamEntry {
247 var args_buf: [12][]const u8 = undefined;
248 var arg_count: usize = 1;
249 args_buf[0] = "XREAD";
250
251 var block_buf: [16]u8 = undefined;
252 if (block_ms) |ms| {
253 args_buf[arg_count] = "BLOCK";
254 args_buf[arg_count + 1] = std.fmt.bufPrint(&block_buf, "{d}", .{ms}) catch unreachable;
255 arg_count += 2;
256 }
257
258 var count_buf: [16]u8 = undefined;
259 if (count) |c| {
260 args_buf[arg_count] = "COUNT";
261 args_buf[arg_count + 1] = std.fmt.bufPrint(&count_buf, "{d}", .{c}) catch unreachable;
262 arg_count += 2;
263 }
264
265 args_buf[arg_count] = "STREAMS";
266 args_buf[arg_count + 1] = key;
267 args_buf[arg_count + 2] = start_id;
268 arg_count += 3;
269
270 const result = try self.client.sendCommand(args_buf[0..arg_count]);
271 if (result.isNull()) return null;
272
273 return self.parseStreamReadResult(result);
274 }
275
276 /// XREADGROUP GROUP group consumer [BLOCK ms] [COUNT count] STREAMS key id
277 /// Use ">" for new messages, "0" to replay pending
278 pub fn xreadgroup(
279 self: *StreamCommands,
280 group: []const u8,
281 consumer: []const u8,
282 key: []const u8,
283 block_ms: u32,
284 count: u32,
285 start_id: []const u8,
286 ) ClientError!?StreamEntry {
287 var block_buf: [16]u8 = undefined;
288 var count_buf: [16]u8 = undefined;
289 const block_str = std.fmt.bufPrint(&block_buf, "{d}", .{block_ms}) catch unreachable;
290 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{count}) catch unreachable;
291
292 const result = try self.client.sendCommand(&.{
293 "XREADGROUP", "GROUP", group, consumer,
294 "BLOCK", block_str, "COUNT", count_str,
295 "STREAMS", key, start_id,
296 });
297
298 if (result.isNull()) return null;
299 return self.parseStreamReadResult(result);
300 }
301
302 /// XRANGE stream start end [COUNT count] - get entries in ID range
303 pub fn xrange(self: *StreamCommands, key: []const u8, start: []const u8, end: []const u8, count: ?u32) ClientError![]const StreamEntry {
304 var count_buf: [16]u8 = undefined;
305 const result = if (count) |c| blk: {
306 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{c}) catch unreachable;
307 break :blk try self.client.sendCommand(&.{ "XRANGE", key, start, end, "COUNT", count_str });
308 } else try self.client.sendCommand(&.{ "XRANGE", key, start, end });
309
310 return self.parseEntryArray(result);
311 }
312
313 /// XREVRANGE stream end start [COUNT count] - get entries in reverse order
314 pub fn xrevrange(self: *StreamCommands, key: []const u8, end: []const u8, start: []const u8, count: ?u32) ClientError![]const StreamEntry {
315 var count_buf: [16]u8 = undefined;
316 const result = if (count) |c| blk: {
317 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{c}) catch unreachable;
318 break :blk try self.client.sendCommand(&.{ "XREVRANGE", key, end, start, "COUNT", count_str });
319 } else try self.client.sendCommand(&.{ "XREVRANGE", key, end, start });
320
321 return self.parseEntryArray(result);
322 }
323
324 // ========================================================================
325 // Acknowledgment
326 // ========================================================================
327
328 /// XACK stream group id [id ...] - acknowledge messages
329 /// Returns number of messages acknowledged.
330 pub fn xack(self: *StreamCommands, key: []const u8, group: []const u8, ids: []const []const u8) ClientError!i64 {
331 var args = try self.client.allocator.alloc([]const u8, ids.len + 3);
332 defer self.client.allocator.free(args);
333 args[0] = "XACK";
334 args[1] = key;
335 args[2] = group;
336 @memcpy(args[3..], ids);
337
338 const result = try self.client.sendCommand(args);
339 return switch (result) {
340 .integer => |i| i,
341 else => 0,
342 };
343 }
344
345 // ========================================================================
346 // Claiming / Recovery
347 // ========================================================================
348
349 /// XAUTOCLAIM stream group consumer min-idle start [COUNT count]
350 /// Automatically claim pending messages that have been idle too long.
351 pub fn xautoclaim(
352 self: *StreamCommands,
353 key: []const u8,
354 group: []const u8,
355 consumer: []const u8,
356 min_idle_ms: u32,
357 start_id: []const u8,
358 count: u32,
359 ) ClientError!?AutoclaimResult {
360 var idle_buf: [16]u8 = undefined;
361 var count_buf: [16]u8 = undefined;
362 const idle_str = std.fmt.bufPrint(&idle_buf, "{d}", .{min_idle_ms}) catch unreachable;
363 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{count}) catch unreachable;
364
365 const result = try self.client.sendCommand(&.{
366 "XAUTOCLAIM", key, group, consumer,
367 idle_str, start_id, "COUNT", count_str,
368 });
369
370 if (result.isError()) return CommandError.RedisError;
371 return self.parseAutoclaimResult(result);
372 }
373
374 /// XCLAIM stream group consumer min-idle-time id [id ...] - claim specific messages
375 pub fn xclaim(
376 self: *StreamCommands,
377 key: []const u8,
378 group: []const u8,
379 consumer: []const u8,
380 min_idle_ms: u32,
381 ids: []const []const u8,
382 ) ClientError![]const StreamEntry {
383 var args = try self.client.allocator.alloc([]const u8, ids.len + 5);
384 defer self.client.allocator.free(args);
385
386 var idle_buf: [16]u8 = undefined;
387 args[0] = "XCLAIM";
388 args[1] = key;
389 args[2] = group;
390 args[3] = consumer;
391 args[4] = std.fmt.bufPrint(&idle_buf, "{d}", .{min_idle_ms}) catch unreachable;
392 @memcpy(args[5..], ids);
393
394 const result = try self.client.sendCommand(args);
395 return self.parseEntryArray(result);
396 }
397
398 // ========================================================================
399 // Pending
400 // ========================================================================
401
402 /// XPENDING stream group - get pending summary
403 pub fn xpendingSummary(self: *StreamCommands, key: []const u8, group: []const u8) ClientError!PendingSummary {
404 const result = try self.client.sendCommand(&.{ "XPENDING", key, group });
405
406 const arr = switch (result) {
407 .array => |a| a,
408 else => return PendingSummary{ .count = 0, .min_id = null, .max_id = null, .consumers = &.{} },
409 };
410
411 if (arr.len < 4) return PendingSummary{ .count = 0, .min_id = null, .max_id = null, .consumers = &.{} };
412
413 const count = arr[0].asInt() orelse 0;
414 const min_id = arr[1].asString();
415 const max_id = arr[2].asString();
416
417 // Parse consumer list
418 const consumer_arr = arr[3].asArray() orelse &.{};
419 var consumers = try self.client.allocator.alloc(struct { name: []const u8, count: i64 }, consumer_arr.len);
420
421 for (consumer_arr, 0..) |c, i| {
422 const ca = c.asArray() orelse continue;
423 if (ca.len >= 2) {
424 consumers[i] = .{
425 .name = ca[0].asString() orelse "",
426 .count = ca[1].asInt() orelse 0,
427 };
428 }
429 }
430
431 return PendingSummary{
432 .count = count,
433 .min_id = min_id,
434 .max_id = max_id,
435 .consumers = consumers,
436 };
437 }
438
439 // ========================================================================
440 // Info
441 // ========================================================================
442
443 /// XLEN stream - get stream length
444 pub fn xlen(self: *StreamCommands, key: []const u8) ClientError!i64 {
445 const result = try self.client.sendCommand(&.{ "XLEN", key });
446 return switch (result) {
447 .integer => |i| i,
448 else => 0,
449 };
450 }
451
452 /// XINFO STREAM stream - get stream info
453 pub fn xinfoStream(self: *StreamCommands, key: []const u8) ClientError![]const Value {
454 const result = try self.client.sendCommand(&.{ "XINFO", "STREAM", key });
455 return switch (result) {
456 .array => |a| a,
457 else => &.{},
458 };
459 }
460
461 /// XINFO GROUPS stream - get consumer groups info
462 pub fn xinfoGroups(self: *StreamCommands, key: []const u8) ClientError![]const Value {
463 const result = try self.client.sendCommand(&.{ "XINFO", "GROUPS", key });
464 return switch (result) {
465 .array => |a| a,
466 else => &.{},
467 };
468 }
469
470 // ========================================================================
471 // Trimming
472 // ========================================================================
473
474 /// XTRIM stream MAXLEN [~] count - trim stream to max length
475 pub fn xtrim(self: *StreamCommands, key: []const u8, maxlen: usize, approximate: bool) ClientError!i64 {
476 var buf: [24]u8 = undefined;
477 const len_str = std.fmt.bufPrint(&buf, "{d}", .{maxlen}) catch unreachable;
478
479 const result = if (approximate)
480 try self.client.sendCommand(&.{ "XTRIM", key, "MAXLEN", "~", len_str })
481 else
482 try self.client.sendCommand(&.{ "XTRIM", key, "MAXLEN", len_str });
483
484 return switch (result) {
485 .integer => |i| i,
486 else => 0,
487 };
488 }
489
490 /// XDEL stream id [id ...] - delete entries
491 pub fn xdel(self: *StreamCommands, key: []const u8, ids: []const []const u8) ClientError!i64 {
492 var args = try self.client.allocator.alloc([]const u8, ids.len + 2);
493 defer self.client.allocator.free(args);
494 args[0] = "XDEL";
495 args[1] = key;
496 @memcpy(args[2..], ids);
497
498 const result = try self.client.sendCommand(args);
499 return switch (result) {
500 .integer => |i| i,
501 else => 0,
502 };
503 }
504
505 // ========================================================================
506 // Parsing Helpers
507 // ========================================================================
508
509 fn parseStreamReadResult(self: *StreamCommands, result: Value) ClientError!?StreamEntry {
510 // Format: [[stream_name, [[entry_id, [field, value, ...]], ...]]]
511 const outer = switch (result) {
512 .array => |a| a,
513 else => return null,
514 };
515 if (outer.len == 0) return null;
516
517 // First stream: [name, entries]
518 const stream_data = switch (outer[0]) {
519 .array => |a| a,
520 else => return null,
521 };
522 if (stream_data.len < 2) return null;
523
524 // Entries array
525 const entries = switch (stream_data[1]) {
526 .array => |a| a,
527 else => return null,
528 };
529 if (entries.len == 0) return null;
530
531 return self.parseEntry(entries[0]);
532 }
533
534 fn parseEntry(self: *StreamCommands, entry_val: Value) ClientError!?StreamEntry {
535 const entry_arr = switch (entry_val) {
536 .array => |a| a,
537 else => return null,
538 };
539 if (entry_arr.len < 2) return null;
540
541 // Entry ID
542 const entry_id = switch (entry_arr[0]) {
543 .bulk => |b| b orelse return null,
544 else => return null,
545 };
546
547 // Fields array
548 const fields_arr = switch (entry_arr[1]) {
549 .array => |a| a,
550 else => return null,
551 };
552
553 // Parse field-value pairs
554 const field_count = fields_arr.len / 2;
555 var fields = try self.client.allocator.alloc(StreamField, field_count);
556
557 var i: usize = 0;
558 while (i < fields_arr.len - 1) : (i += 2) {
559 const field = switch (fields_arr[i]) {
560 .bulk => |b| b orelse continue,
561 else => continue,
562 };
563 const value = switch (fields_arr[i + 1]) {
564 .bulk => |b| b orelse continue,
565 else => continue,
566 };
567 fields[i / 2] = .{ .field = field, .value = value };
568 }
569
570 return StreamEntry{
571 .id = entry_id,
572 .fields = fields[0..field_count],
573 };
574 }
575
576 fn parseEntryArray(self: *StreamCommands, result: Value) ClientError![]const StreamEntry {
577 const arr = switch (result) {
578 .array => |a| a,
579 else => return &.{},
580 };
581
582 var entries = try self.client.allocator.alloc(StreamEntry, arr.len);
583 var count: usize = 0;
584
585 for (arr) |entry_val| {
586 if (try self.parseEntry(entry_val)) |entry| {
587 entries[count] = entry;
588 count += 1;
589 }
590 }
591
592 return entries[0..count];
593 }
594
595 fn parseAutoclaimResult(self: *StreamCommands, result: Value) ClientError!?AutoclaimResult {
596 // Format: [next_id, [[id, [field, value]], ...], [deleted_ids]]
597 const outer = switch (result) {
598 .array => |a| a,
599 else => return null,
600 };
601 if (outer.len < 2) return null;
602
603 // Next ID
604 const next_id = switch (outer[0]) {
605 .bulk => |b| b orelse "0-0",
606 else => "0-0",
607 };
608
609 // Entries array
610 const entries_arr = switch (outer[1]) {
611 .array => |a| a,
612 else => return AutoclaimResult{ .next_id = next_id, .entries = &.{}, .deleted_ids = &.{} },
613 };
614
615 var entries = try self.client.allocator.alloc(StreamEntry, entries_arr.len);
616 var count: usize = 0;
617
618 for (entries_arr) |entry_val| {
619 if (try self.parseEntry(entry_val)) |entry| {
620 entries[count] = entry;
621 count += 1;
622 }
623 }
624
625 // Deleted IDs (Redis 7+)
626 var deleted_ids: []const []const u8 = &.{};
627 if (outer.len >= 3) {
628 if (outer[2].asArray()) |del_arr| {
629 var ids = try self.client.allocator.alloc([]const u8, del_arr.len);
630 var del_count: usize = 0;
631 for (del_arr) |v| {
632 if (v.asString()) |s| {
633 ids[del_count] = s;
634 del_count += 1;
635 }
636 }
637 deleted_ids = ids[0..del_count];
638 }
639 }
640
641 return AutoclaimResult{
642 .next_id = next_id,
643 .entries = entries[0..count],
644 .deleted_ids = deleted_ids,
645 };
646 }
647};
648
649/// Extend Client with stream commands.
650pub fn streams(client: *Client) StreamCommands {
651 return StreamCommands.init(client);
652}