this repo has no description
at main 652 lines 24 kB view raw
1//! Redis stream commands 2//! 3//! Streams are append-only log data structures, ideal for: 4//! - Event sourcing 5//! - Message queues with consumer groups 6//! - Activity feeds 7//! - Time series data 8//! 9//! ## Key Concepts 10//! 11//! - **Entry ID**: Unique identifier like "1234567890123-0" (timestamp-sequence) 12//! - **Consumer Group**: Named group of consumers that share message processing 13//! - **Consumer**: Individual worker within a group 14//! - **Pending Entry List (PEL)**: Messages delivered but not acknowledged 15//! 16//! ## Fan-Out Pattern 17//! 18//! Multiple consumer groups on the same stream each receive ALL messages: 19//! ``` 20//! Stream: events 21//! └── Consumer Group: "event-persister" (gets all messages) 22//! └── Consumer Group: "ws-broadcaster" (also gets all messages) 23//! └── Consumer Group: "analytics" (also gets all messages) 24//! ``` 25//! 26//! ## Examples 27//! 28//! ```zig 29//! // Publish event 30//! const id = try client.streams().xadd("events", .auto, &.{ 31//! .{"type", "user.login"}, 32//! .{"user_id", "1234"}, 33//! }); 34//! 35//! // Create consumer group 36//! try client.streams().xgroupCreate("events", "workers", "0"); 37//! 38//! // Read as consumer (blocks until message or timeout) 39//! if (try client.streams().xreadgroup("workers", "worker-1", "events", 1000, 1, ">")) |entry| { 40//! // Process entry 41//! try client.streams().xack("events", "workers", &.{entry.id}); 42//! } 43//! 44//! // Claim abandoned messages 45//! const claimed = try client.streams().xautoclaim("events", "workers", "worker-1", 30000, "0-0", 10); 46//! ``` 47 48const std = @import("std"); 49const Client = @import("../client.zig").Client; 50const Value = @import("../resp.zig").Value; 51const CommandError = @import("../resp.zig").CommandError; 52const ClientError = @import("../resp.zig").ClientError; 53 54/// A field-value pair in a stream entry 55pub const StreamField = struct { 56 field: []const u8, 57 value: []const u8, 58}; 59 60/// A single entry from a Redis stream 61pub const StreamEntry = struct { 62 /// Stream entry ID (e.g., "1234567890123-0") 63 id: []const u8, 64 /// Field-value pairs in this entry 65 fields: []const StreamField, 66 67 /// Get a field value by name 68 pub fn get(self: StreamEntry, field: []const u8) ?[]const u8 { 69 for (self.fields) |f| { 70 if (std.mem.eql(u8, f.field, field)) return f.value; 71 } 72 return null; 73 } 74}; 75 76/// Result from XAUTOCLAIM command 77pub const AutoclaimResult = struct { 78 /// Next stream ID to use for subsequent XAUTOCLAIM calls 79 next_id: []const u8, 80 /// Claimed entries 81 entries: []const StreamEntry, 82 /// IDs that no longer exist in the stream (Redis 7+) 83 deleted_ids: []const []const u8, 84}; 85 86/// Result from XPENDING summary 87pub const PendingSummary = struct { 88 count: i64, 89 min_id: ?[]const u8, 90 max_id: ?[]const u8, 91 consumers: []const struct { name: []const u8, count: i64 }, 92}; 93 94/// Stream ID for XADD 95pub const StreamId = union(enum) { 96 /// Let Redis auto-generate: "*" 97 auto, 98 /// Specific ID 99 explicit: []const u8, 100 /// Minimum ID greater than all existing: ">" (for new entries only) 101 new_entries, 102 103 fn toArg(self: StreamId) []const u8 { 104 return switch (self) { 105 .auto => "*", 106 .explicit => |id| id, 107 .new_entries => ">", 108 }; 109 } 110}; 111 112/// Stream command implementations. 113pub const StreamCommands = struct { 114 client: *Client, 115 116 pub fn init(client: *Client) StreamCommands { 117 return .{ .client = client }; 118 } 119 120 // ======================================================================== 121 // Publishing 122 // ======================================================================== 123 124 /// XADD stream ID field value [field value ...] - append entry to stream 125 /// Returns the entry ID assigned by Redis. 126 pub fn xadd(self: *StreamCommands, key: []const u8, id: StreamId, fields: []const [2][]const u8) ClientError![]const u8 { 127 const arg_count = 3 + fields.len * 2; 128 var args = try self.client.allocator.alloc([]const u8, arg_count); 129 defer self.client.allocator.free(args); 130 131 args[0] = "XADD"; 132 args[1] = key; 133 args[2] = id.toArg(); 134 135 for (fields, 0..) |field, i| { 136 args[3 + i * 2] = field[0]; 137 args[3 + i * 2 + 1] = field[1]; 138 } 139 140 const result = try self.client.sendCommand(args); 141 return switch (result) { 142 .bulk => |b| b orelse return CommandError.RedisError, 143 .err => return CommandError.RedisError, 144 else => return CommandError.RedisError, 145 }; 146 } 147 148 /// XADD with MAXLEN option for capped streams 149 pub fn xaddCapped(self: *StreamCommands, key: []const u8, maxlen: usize, approximate: bool, fields: []const [2][]const u8) ClientError![]const u8 { 150 const arg_count = 5 + fields.len * 2 + @as(usize, if (approximate) 1 else 0); 151 var args = try self.client.allocator.alloc([]const u8, arg_count); 152 defer self.client.allocator.free(args); 153 154 var maxlen_buf: [24]u8 = undefined; 155 156 args[0] = "XADD"; 157 args[1] = key; 158 args[2] = "MAXLEN"; 159 160 var idx: usize = 3; 161 if (approximate) { 162 args[idx] = "~"; 163 idx += 1; 164 } 165 args[idx] = std.fmt.bufPrint(&maxlen_buf, "{d}", .{maxlen}) catch unreachable; 166 idx += 1; 167 args[idx] = "*"; 168 idx += 1; 169 170 for (fields, 0..) |field, i| { 171 args[idx + i * 2] = field[0]; 172 args[idx + i * 2 + 1] = field[1]; 173 } 174 175 const result = try self.client.sendCommand(args[0 .. idx + fields.len * 2]); 176 return switch (result) { 177 .bulk => |b| b orelse return CommandError.RedisError, 178 .err => return CommandError.RedisError, 179 else => return CommandError.RedisError, 180 }; 181 } 182 183 // ======================================================================== 184 // Consumer Groups 185 // ======================================================================== 186 187 /// XGROUP CREATE stream group id [MKSTREAM] - create consumer group 188 /// start_id: "0" to read from beginning, "$" for new messages only 189 pub fn xgroupCreate(self: *StreamCommands, key: []const u8, group: []const u8, start_id: []const u8) ClientError!void { 190 const result = try self.client.sendCommand(&.{ "XGROUP", "CREATE", key, group, start_id, "MKSTREAM" }); 191 if (result.isError()) { 192 // BUSYGROUP means group already exists - that's fine 193 if (result.asString()) |msg| { 194 if (std.mem.indexOf(u8, msg, "BUSYGROUP") != null) return; 195 } 196 return CommandError.RedisError; 197 } 198 } 199 200 /// XGROUP DESTROY stream group - delete consumer group 201 pub fn xgroupDestroy(self: *StreamCommands, key: []const u8, group: []const u8) ClientError!bool { 202 const result = try self.client.sendCommand(&.{ "XGROUP", "DESTROY", key, group }); 203 return switch (result) { 204 .integer => |i| i == 1, 205 .err => { 206 // NOGROUP means doesn't exist - fine for cleanup 207 if (result.asString()) |msg| { 208 if (std.mem.indexOf(u8, msg, "NOGROUP") != null) return false; 209 } 210 return CommandError.RedisError; 211 }, 212 else => false, 213 }; 214 } 215 216 /// XGROUP CREATECONSUMER stream group consumer - create consumer in group 217 pub fn xgroupCreateConsumer(self: *StreamCommands, key: []const u8, group: []const u8, consumer: []const u8) ClientError!bool { 218 const result = try self.client.sendCommand(&.{ "XGROUP", "CREATECONSUMER", key, group, consumer }); 219 return switch (result) { 220 .integer => |i| i == 1, 221 else => false, 222 }; 223 } 224 225 /// XGROUP DELCONSUMER stream group consumer - delete consumer from group 226 pub fn xgroupDelConsumer(self: *StreamCommands, key: []const u8, group: []const u8, consumer: []const u8) ClientError!i64 { 227 const result = try self.client.sendCommand(&.{ "XGROUP", "DELCONSUMER", key, group, consumer }); 228 return switch (result) { 229 .integer => |i| i, 230 else => 0, 231 }; 232 } 233 234 /// XGROUP SETID stream group id - set group's last-delivered-id 235 pub fn xgroupSetId(self: *StreamCommands, key: []const u8, group: []const u8, entry_id: []const u8) ClientError!void { 236 const result = try self.client.sendCommand(&.{ "XGROUP", "SETID", key, group, entry_id }); 237 if (result.isError()) return CommandError.RedisError; 238 } 239 240 // ======================================================================== 241 // Reading 242 // ======================================================================== 243 244 /// XREAD [BLOCK ms] [COUNT count] STREAMS key [key ...] id [id ...] 245 /// Non-consumer-group read. Use "$" for new messages only. 246 pub fn xread(self: *StreamCommands, key: []const u8, block_ms: ?u32, count: ?u32, start_id: []const u8) ClientError!?StreamEntry { 247 var args_buf: [12][]const u8 = undefined; 248 var arg_count: usize = 1; 249 args_buf[0] = "XREAD"; 250 251 var block_buf: [16]u8 = undefined; 252 if (block_ms) |ms| { 253 args_buf[arg_count] = "BLOCK"; 254 args_buf[arg_count + 1] = std.fmt.bufPrint(&block_buf, "{d}", .{ms}) catch unreachable; 255 arg_count += 2; 256 } 257 258 var count_buf: [16]u8 = undefined; 259 if (count) |c| { 260 args_buf[arg_count] = "COUNT"; 261 args_buf[arg_count + 1] = std.fmt.bufPrint(&count_buf, "{d}", .{c}) catch unreachable; 262 arg_count += 2; 263 } 264 265 args_buf[arg_count] = "STREAMS"; 266 args_buf[arg_count + 1] = key; 267 args_buf[arg_count + 2] = start_id; 268 arg_count += 3; 269 270 const result = try self.client.sendCommand(args_buf[0..arg_count]); 271 if (result.isNull()) return null; 272 273 return self.parseStreamReadResult(result); 274 } 275 276 /// XREADGROUP GROUP group consumer [BLOCK ms] [COUNT count] STREAMS key id 277 /// Use ">" for new messages, "0" to replay pending 278 pub fn xreadgroup( 279 self: *StreamCommands, 280 group: []const u8, 281 consumer: []const u8, 282 key: []const u8, 283 block_ms: u32, 284 count: u32, 285 start_id: []const u8, 286 ) ClientError!?StreamEntry { 287 var block_buf: [16]u8 = undefined; 288 var count_buf: [16]u8 = undefined; 289 const block_str = std.fmt.bufPrint(&block_buf, "{d}", .{block_ms}) catch unreachable; 290 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{count}) catch unreachable; 291 292 const result = try self.client.sendCommand(&.{ 293 "XREADGROUP", "GROUP", group, consumer, 294 "BLOCK", block_str, "COUNT", count_str, 295 "STREAMS", key, start_id, 296 }); 297 298 if (result.isNull()) return null; 299 return self.parseStreamReadResult(result); 300 } 301 302 /// XRANGE stream start end [COUNT count] - get entries in ID range 303 pub fn xrange(self: *StreamCommands, key: []const u8, start: []const u8, end: []const u8, count: ?u32) ClientError![]const StreamEntry { 304 var count_buf: [16]u8 = undefined; 305 const result = if (count) |c| blk: { 306 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{c}) catch unreachable; 307 break :blk try self.client.sendCommand(&.{ "XRANGE", key, start, end, "COUNT", count_str }); 308 } else try self.client.sendCommand(&.{ "XRANGE", key, start, end }); 309 310 return self.parseEntryArray(result); 311 } 312 313 /// XREVRANGE stream end start [COUNT count] - get entries in reverse order 314 pub fn xrevrange(self: *StreamCommands, key: []const u8, end: []const u8, start: []const u8, count: ?u32) ClientError![]const StreamEntry { 315 var count_buf: [16]u8 = undefined; 316 const result = if (count) |c| blk: { 317 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{c}) catch unreachable; 318 break :blk try self.client.sendCommand(&.{ "XREVRANGE", key, end, start, "COUNT", count_str }); 319 } else try self.client.sendCommand(&.{ "XREVRANGE", key, end, start }); 320 321 return self.parseEntryArray(result); 322 } 323 324 // ======================================================================== 325 // Acknowledgment 326 // ======================================================================== 327 328 /// XACK stream group id [id ...] - acknowledge messages 329 /// Returns number of messages acknowledged. 330 pub fn xack(self: *StreamCommands, key: []const u8, group: []const u8, ids: []const []const u8) ClientError!i64 { 331 var args = try self.client.allocator.alloc([]const u8, ids.len + 3); 332 defer self.client.allocator.free(args); 333 args[0] = "XACK"; 334 args[1] = key; 335 args[2] = group; 336 @memcpy(args[3..], ids); 337 338 const result = try self.client.sendCommand(args); 339 return switch (result) { 340 .integer => |i| i, 341 else => 0, 342 }; 343 } 344 345 // ======================================================================== 346 // Claiming / Recovery 347 // ======================================================================== 348 349 /// XAUTOCLAIM stream group consumer min-idle start [COUNT count] 350 /// Automatically claim pending messages that have been idle too long. 351 pub fn xautoclaim( 352 self: *StreamCommands, 353 key: []const u8, 354 group: []const u8, 355 consumer: []const u8, 356 min_idle_ms: u32, 357 start_id: []const u8, 358 count: u32, 359 ) ClientError!?AutoclaimResult { 360 var idle_buf: [16]u8 = undefined; 361 var count_buf: [16]u8 = undefined; 362 const idle_str = std.fmt.bufPrint(&idle_buf, "{d}", .{min_idle_ms}) catch unreachable; 363 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{count}) catch unreachable; 364 365 const result = try self.client.sendCommand(&.{ 366 "XAUTOCLAIM", key, group, consumer, 367 idle_str, start_id, "COUNT", count_str, 368 }); 369 370 if (result.isError()) return CommandError.RedisError; 371 return self.parseAutoclaimResult(result); 372 } 373 374 /// XCLAIM stream group consumer min-idle-time id [id ...] - claim specific messages 375 pub fn xclaim( 376 self: *StreamCommands, 377 key: []const u8, 378 group: []const u8, 379 consumer: []const u8, 380 min_idle_ms: u32, 381 ids: []const []const u8, 382 ) ClientError![]const StreamEntry { 383 var args = try self.client.allocator.alloc([]const u8, ids.len + 5); 384 defer self.client.allocator.free(args); 385 386 var idle_buf: [16]u8 = undefined; 387 args[0] = "XCLAIM"; 388 args[1] = key; 389 args[2] = group; 390 args[3] = consumer; 391 args[4] = std.fmt.bufPrint(&idle_buf, "{d}", .{min_idle_ms}) catch unreachable; 392 @memcpy(args[5..], ids); 393 394 const result = try self.client.sendCommand(args); 395 return self.parseEntryArray(result); 396 } 397 398 // ======================================================================== 399 // Pending 400 // ======================================================================== 401 402 /// XPENDING stream group - get pending summary 403 pub fn xpendingSummary(self: *StreamCommands, key: []const u8, group: []const u8) ClientError!PendingSummary { 404 const result = try self.client.sendCommand(&.{ "XPENDING", key, group }); 405 406 const arr = switch (result) { 407 .array => |a| a, 408 else => return PendingSummary{ .count = 0, .min_id = null, .max_id = null, .consumers = &.{} }, 409 }; 410 411 if (arr.len < 4) return PendingSummary{ .count = 0, .min_id = null, .max_id = null, .consumers = &.{} }; 412 413 const count = arr[0].asInt() orelse 0; 414 const min_id = arr[1].asString(); 415 const max_id = arr[2].asString(); 416 417 // Parse consumer list 418 const consumer_arr = arr[3].asArray() orelse &.{}; 419 var consumers = try self.client.allocator.alloc(struct { name: []const u8, count: i64 }, consumer_arr.len); 420 421 for (consumer_arr, 0..) |c, i| { 422 const ca = c.asArray() orelse continue; 423 if (ca.len >= 2) { 424 consumers[i] = .{ 425 .name = ca[0].asString() orelse "", 426 .count = ca[1].asInt() orelse 0, 427 }; 428 } 429 } 430 431 return PendingSummary{ 432 .count = count, 433 .min_id = min_id, 434 .max_id = max_id, 435 .consumers = consumers, 436 }; 437 } 438 439 // ======================================================================== 440 // Info 441 // ======================================================================== 442 443 /// XLEN stream - get stream length 444 pub fn xlen(self: *StreamCommands, key: []const u8) ClientError!i64 { 445 const result = try self.client.sendCommand(&.{ "XLEN", key }); 446 return switch (result) { 447 .integer => |i| i, 448 else => 0, 449 }; 450 } 451 452 /// XINFO STREAM stream - get stream info 453 pub fn xinfoStream(self: *StreamCommands, key: []const u8) ClientError![]const Value { 454 const result = try self.client.sendCommand(&.{ "XINFO", "STREAM", key }); 455 return switch (result) { 456 .array => |a| a, 457 else => &.{}, 458 }; 459 } 460 461 /// XINFO GROUPS stream - get consumer groups info 462 pub fn xinfoGroups(self: *StreamCommands, key: []const u8) ClientError![]const Value { 463 const result = try self.client.sendCommand(&.{ "XINFO", "GROUPS", key }); 464 return switch (result) { 465 .array => |a| a, 466 else => &.{}, 467 }; 468 } 469 470 // ======================================================================== 471 // Trimming 472 // ======================================================================== 473 474 /// XTRIM stream MAXLEN [~] count - trim stream to max length 475 pub fn xtrim(self: *StreamCommands, key: []const u8, maxlen: usize, approximate: bool) ClientError!i64 { 476 var buf: [24]u8 = undefined; 477 const len_str = std.fmt.bufPrint(&buf, "{d}", .{maxlen}) catch unreachable; 478 479 const result = if (approximate) 480 try self.client.sendCommand(&.{ "XTRIM", key, "MAXLEN", "~", len_str }) 481 else 482 try self.client.sendCommand(&.{ "XTRIM", key, "MAXLEN", len_str }); 483 484 return switch (result) { 485 .integer => |i| i, 486 else => 0, 487 }; 488 } 489 490 /// XDEL stream id [id ...] - delete entries 491 pub fn xdel(self: *StreamCommands, key: []const u8, ids: []const []const u8) ClientError!i64 { 492 var args = try self.client.allocator.alloc([]const u8, ids.len + 2); 493 defer self.client.allocator.free(args); 494 args[0] = "XDEL"; 495 args[1] = key; 496 @memcpy(args[2..], ids); 497 498 const result = try self.client.sendCommand(args); 499 return switch (result) { 500 .integer => |i| i, 501 else => 0, 502 }; 503 } 504 505 // ======================================================================== 506 // Parsing Helpers 507 // ======================================================================== 508 509 fn parseStreamReadResult(self: *StreamCommands, result: Value) ClientError!?StreamEntry { 510 // Format: [[stream_name, [[entry_id, [field, value, ...]], ...]]] 511 const outer = switch (result) { 512 .array => |a| a, 513 else => return null, 514 }; 515 if (outer.len == 0) return null; 516 517 // First stream: [name, entries] 518 const stream_data = switch (outer[0]) { 519 .array => |a| a, 520 else => return null, 521 }; 522 if (stream_data.len < 2) return null; 523 524 // Entries array 525 const entries = switch (stream_data[1]) { 526 .array => |a| a, 527 else => return null, 528 }; 529 if (entries.len == 0) return null; 530 531 return self.parseEntry(entries[0]); 532 } 533 534 fn parseEntry(self: *StreamCommands, entry_val: Value) ClientError!?StreamEntry { 535 const entry_arr = switch (entry_val) { 536 .array => |a| a, 537 else => return null, 538 }; 539 if (entry_arr.len < 2) return null; 540 541 // Entry ID 542 const entry_id = switch (entry_arr[0]) { 543 .bulk => |b| b orelse return null, 544 else => return null, 545 }; 546 547 // Fields array 548 const fields_arr = switch (entry_arr[1]) { 549 .array => |a| a, 550 else => return null, 551 }; 552 553 // Parse field-value pairs 554 const field_count = fields_arr.len / 2; 555 var fields = try self.client.allocator.alloc(StreamField, field_count); 556 557 var i: usize = 0; 558 while (i < fields_arr.len - 1) : (i += 2) { 559 const field = switch (fields_arr[i]) { 560 .bulk => |b| b orelse continue, 561 else => continue, 562 }; 563 const value = switch (fields_arr[i + 1]) { 564 .bulk => |b| b orelse continue, 565 else => continue, 566 }; 567 fields[i / 2] = .{ .field = field, .value = value }; 568 } 569 570 return StreamEntry{ 571 .id = entry_id, 572 .fields = fields[0..field_count], 573 }; 574 } 575 576 fn parseEntryArray(self: *StreamCommands, result: Value) ClientError![]const StreamEntry { 577 const arr = switch (result) { 578 .array => |a| a, 579 else => return &.{}, 580 }; 581 582 var entries = try self.client.allocator.alloc(StreamEntry, arr.len); 583 var count: usize = 0; 584 585 for (arr) |entry_val| { 586 if (try self.parseEntry(entry_val)) |entry| { 587 entries[count] = entry; 588 count += 1; 589 } 590 } 591 592 return entries[0..count]; 593 } 594 595 fn parseAutoclaimResult(self: *StreamCommands, result: Value) ClientError!?AutoclaimResult { 596 // Format: [next_id, [[id, [field, value]], ...], [deleted_ids]] 597 const outer = switch (result) { 598 .array => |a| a, 599 else => return null, 600 }; 601 if (outer.len < 2) return null; 602 603 // Next ID 604 const next_id = switch (outer[0]) { 605 .bulk => |b| b orelse "0-0", 606 else => "0-0", 607 }; 608 609 // Entries array 610 const entries_arr = switch (outer[1]) { 611 .array => |a| a, 612 else => return AutoclaimResult{ .next_id = next_id, .entries = &.{}, .deleted_ids = &.{} }, 613 }; 614 615 var entries = try self.client.allocator.alloc(StreamEntry, entries_arr.len); 616 var count: usize = 0; 617 618 for (entries_arr) |entry_val| { 619 if (try self.parseEntry(entry_val)) |entry| { 620 entries[count] = entry; 621 count += 1; 622 } 623 } 624 625 // Deleted IDs (Redis 7+) 626 var deleted_ids: []const []const u8 = &.{}; 627 if (outer.len >= 3) { 628 if (outer[2].asArray()) |del_arr| { 629 var ids = try self.client.allocator.alloc([]const u8, del_arr.len); 630 var del_count: usize = 0; 631 for (del_arr) |v| { 632 if (v.asString()) |s| { 633 ids[del_count] = s; 634 del_count += 1; 635 } 636 } 637 deleted_ids = ids[0..del_count]; 638 } 639 } 640 641 return AutoclaimResult{ 642 .next_id = next_id, 643 .entries = entries[0..count], 644 .deleted_ids = deleted_ids, 645 }; 646 } 647}; 648 649/// Extend Client with stream commands. 650pub fn streams(client: *Client) StreamCommands { 651 return StreamCommands.init(client); 652}