XACK key group id [id ...]

从 5.0.0 开始可用。

时间复杂度:处理的每个消息 ID O(1)。

XACK命令从流消费者组的待处理条目列表(PEL)中删除一条或多条消息 。消息处于待处理状态,因此存储在 PEL 中,当它被传递给某个消费者时,通常是调用XREADGROUP的副作用,或者当消费者获得调用XCLAIM的消息的所有权时。待处理的消息已传递给某个消费者,但服务器还不确定它是否至少被处理过一次。所以对XREADGROUP的新调用获取消费者的消息历史记录(例如使用 ID 0),将返回此类消息。同样,待定消息将由检查 PEL的XPENDING命令列出。

一旦消费者成功处理了一条消息,它应该调用XACK 以便该消息不会再次被处理,并且作为副作用,关于该消息的 PEL 条目也被清除,从 Redis 服务器释放内存。

返回值

整数回复,具体来说:

该命令返回成功确认的消息数。某些消息 ID 可能不再是 PEL 的一部分(例如因为它们已经被确认),并且 XACK 不会将它们视为成功确认。

例子

redis> XACK mystream mygroup 1526569495631-0
(integer) 1

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]

从 5.0.0 开始可用。

时间复杂度:添加新条目时为 O(1),修剪时为 O(N),其中 N 是被驱逐的条目数。

将指定的流条目附加到指定键处的流。如果键不存在,则作为运行此命令的副作用,会使用流值创建键。可以使用该NOMKSTREAM选项禁用流密钥的创建。

条目由一组字段值对组成,它基本上是一个小字典。字段值对按照用户给定的顺序存储,读取流的命令(例如 XRANGE或XREAD)保证返回字段和值的顺序与XADD添加的顺序完全相同。

XADD是唯一可以将数据添加到流中的 Redis 命令,但还有其他命令,例如XDEL和XTRIM,可以从流中删除数据。

指定流 ID 作为参数

流条目 ID 标识流中的给定条目。

如果指定的 ID 参数是字符(星号 ASCII 字符),XADD命令将为您自动生成唯一 ID 。*然而,虽然仅在极少数情况下有用,但可以指定格式良好的 ID,以便将使用指定的 ID 精确添加新条目。

ID 由两个由一个-字符分隔的数字指定:

1526919030474-55

这两个数量都是 64 位数字。自动生成 ID 时,第一部分是 Redis 实例生成 ID 的 Unix 时间(以毫秒为单位)。第二部分只是一个序列号,用于区分在同一毫秒内生成的 ID。

您还可以指定一个不完整的 ID,它只包含毫秒部分,对于序列部分被解释为零值。要仅自动生成序列部分,请指定毫秒部分,后跟-分隔符和*字符:

> XADD mystream 1526919030474-55 message "Hello,"
"1526919030474-55"
> XADD mystream 1526919030474-* message " World!"
"1526919030474-56"

D 保证始终是递增的:如果您比较刚刚插入的条目的 ID,它将大于任何其他过去的 ID,因此条目在流中完全排序。为了保证这个属性,如果流中当前的top ID的时间大于实例当前的本地时间,则使用top entry时间代替,并且ID的序列部分递增。例如,当本地时钟向后跳时,或者在故障转移后新的主节点具有不同的绝对时间时,可能会发生这种情况。

当用户为XADD指定显式 ID 时,最小有效 ID 为 0-1,并且用户必须指定一个大于流中当前任何其他 ID 的 ID,否则该命令将失败并返回错误。通常,仅当您有另一个系统生成唯一 ID(例如 SQL 表)并且您确实希望 Redis 流 ID 与另一个系统匹配时,才使用特定 ID。

封顶流

XADD包含与XTRIM命令相同的语义- 请参阅其文档页面以获取更多信息。这允许添加新条目并通过对XADD的单个调用来检查流的大小,从而有效地用任意阈值限制流。尽管精确修剪是可能的并且是默认设置,但由于流的内部表示,使用几乎精确修剪(参数)使用XADD添加条目和修剪流会更有效。~

例如,以下列形式调用XADD :

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

将添加一个新条目,但也将逐出旧条目,以便流将仅包含 1000 个条目,或者最多包含几十个。

有关流的附加信息

有关 Redis 流的更多信息,请查看我们 对 Redis Streams 文档的介绍。

返回值

批量字符串回复,具体来说:

该命令返回添加条目的 ID。*如果作为 ID 参数传递,则ID 是自动生成的,否则该命令仅返回用户在插入期间指定的相同 ID。

当与选项一起使用并且密钥不存在时,该命令返回Null 回复。NOMKSTREAM

例子

redis> XADD mystream * name Sara surname OConnor
"1647317596356-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1647317596358-0"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1647317596356-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1647317596358-0"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"
redis> 

历史

  • Redis 版本 >= 6.2.0:添加了 NOMKSTREAM 选项、MINID 修剪策略和 LIMIT 选项。
  • Redis 版本 >= 7.0.0:添加了对 的支持-* 显式 ID 形式。

XAUTOCLAIM key group consumer min-idle-time start [COUNT count] [JUSTID]

从 6.2.0 开始可用。

时间复杂度:如果 COUNT 很小,则为 O(1)。

此命令转移与指定条件匹配的挂起流条目的所有权。从概念上讲,XAUTOCLAIM 等价于调用XPENDING然后XCLAIM ,但提供了一种更直接的方法来通过类似SCAN的语义来处理消息传递失败。

与XCLAIM一样,该命令在提供的. 它将所有权转移到等待超过毫秒且 ID 等于或大于的消息的所有权

可选参数,默认为 100,是命令尝试声明的条目数的上限。在内部,该命令开始扫描消费者组的待处理条目列表 (PEL),并过滤掉空闲时间小于或等于 的条目。该命令扫描的最大挂起条目数是' 值乘以 10(硬编码)的乘积。因此,声明的条目数可能会小于指定值。

可选JUSTID参数将回复更改为仅返回成功声明的消息 ID 数组,而不返回实际消息。使用此选项意味着重试计数器不会增加。

该命令将声明的条目作为数组返回。它还返回一个流 ID,用于类似游标的用途,作为其后续调用的参数。当没有剩余的 PEL 条目时,该命令返回特殊0-0ID 以表示完成。但是,请注意,即使在使用as ID完成扫描后,您也可能希望继续调用XAUTOCLAIM,因为已经过了足够的时间,因此较旧的待处理条目现在可能有资格进行声明。0-0

请注意,只有空闲时间长于声明的消息,声明消息会重置其空闲时间。这确保了只有单个消费者可以在特定时刻成功声明给定的待处理消息,并大大降低了多次处理同一消息的概率。

在迭代 PEL 时,如果XAUTOCLAIM 偶然发现流中不再存在的消息(由XDEL修剪或删除),它不会声明它,并将其从找到它的 PEL 中删除。此功能是在 Redis 7.0 中引入的。这些消息 ID 作为XAUTOCLAIM回复的一部分返回给调用者。

最后,使用XAUTOCLAIM声明消息也会增加该消息的尝试传递计数,除非JUSTID已指定选项(它只传递消息 ID,而不传递消息本身)。由于某种原因无法处理的消息——例如,因为消费者在处理它们时系统性地崩溃——将表现出很高的尝试传递计数,可以通过监控检测到。

返回值

数组回复,具体来说:

包含三个元素的数组:

  1. 用作下次调用XAUTOCLAIM的参数的流 ID
  2. 一个数组,包含所有成功声明的消息,格式与XRANGE相同。
  3. 一个数组,包含流中不再存在的流 ID,并且从找到它们的 PEL 中删除

例子

> XAUTOCLAIM mystream mygroup Alice 3600000 0-0 COUNT 25
1) "0-0"
2) 1) 1) "1609338752495-0"
      2) 1) "field"
         2) "value"
3) (empty array)

在上面的示例中,我们尝试声明最多 25 个待处理且空闲(未确认或声明)至少一个小时的条目,从流的开头开始。“mygroup”组中的消费者“Alice”获得这些消息的所有权。请注意,示例中返回的流 ID 为0-0,表示扫描了整个流。我们还可以看到XAUTOCLAIM没有偶然发现任何已删除的消息(第三个回复元素是一个空数组)。

历史

  • Redis 版本 >= 7.0.0:在回复数组中添加了一个元素,其中包含命令从 PEL 中清除的已删除条目

XCLAIM key group consumer min-idle-time id [id ...] [IDLE ms] [TIME unix-time-milliseconds] [RETRYCOUNT count] [FORCE] [JUSTID]

从 5.0.0 开始可用。

时间复杂度: O(log N),其中 N 是消费者组的 PEL 中的消息数。

在流消费者组的上下文中,此命令更改待处理消息的所有权,以便新所有者是指定为命令参数的消费者。通常会发生这种情况:

  1. 有一个带有关联消费者组的流。
  2. 某些消费者 A 在该消费者组的上下文中通过XREADGROUP从流中读取消息。
  3. 作为副作用,在消费者组的待处理条目列表 (PEL) 中创建了待处理的消息条目:这意味着消息已传递给给定的消费者,但尚未通过XACK确认。
  4. 然后突然之间,消费者永远失败了。
  5. 其他消费者可能会使用XPENDING命令检查已过时的待处理消息列表。为了继续处理此类消息,他们使用XCLAIM获取消息的所有权并继续。消费者还可以使用XAUTOCLAIM命令自动扫描和声明过时的未决消息。

Stream 介绍文档中清楚地解释了这种动态。
请注意,仅当消息的空闲时间大于我们在调用XCLAIM时指定的最小空闲时间时,才会声明该消息。因为作为副作用,XCLAIM也会重置空闲时间(因为这是处理消息的新尝试),试图同时声明消息的两个消费者永远不会都成功:只有一个消费者会成功声明消息。这避免了我们以简单的方式多次处理给定消息(但在一般情况下多次处理是可能的并且不可避免)。

此外,作为副作用,XCLAIM将增加尝试传递消息的计数,除非JUSTID已指定选项(它只传递消息 ID,而不传递消息本身)。通过这种方式,由于某种原因无法处理的消息,例如因为消费者试图处理它们而崩溃,将开始有一个更大的计数器,并且可以在系统内部被检测到。

XCLAIM不会在以下情况下声明消息:

  1. 该消息在组 PEL 中不存在(即任何消费者从未阅读过该消息)
  2. 消息存在于组 PEL 中但不存在于流本身中(即消息已被读取但从未确认,然后通过修剪或通过XDEL从流中删除)

在这两种情况下,回复都不会包含该消息的相应条目(即回复数组的长度可能小于提供给XCLAIM的 ID 数量)。在后一种情况下,该消息也会从它所在的 PEL 中删除。此功能是在 Redis 7.0 中引入的。

命令选项

该命令有多个选项,但大多数主要用于内部使用,以便将XCLAIM或其他命令的效果传输到 AOF 文件并将相同的效果传播到副本,并且不太可能对普通用户有用:

  1. IDLE :设置消息的空闲时间(上次发送时间)。如果未指定 IDLE,则假定 IDLE 为 0,即重置时间计数,因为该消息现在有一个新的所有者尝试处理它。
  2. TIME :这与 IDLE 相同,但不是相对的毫秒数,而是将空闲时间设置为特定的 Unix 时间(以毫秒为单位)。这对于重写生成XCLAIM命令的 AOF 文件很有用。
  3. RETRYCOUNT :将重试计数器设置为指定值。每次再次传递消息时,此计数器都会增加。通常XCLAIM不会改变这个计数器,它只是在调用 XPENDING 命令时提供给客户端:这样客户端可以检测异常,例如在大量传递尝试后由于某种原因从未处理过的消息。
  4. FORCE:即使某些指定的 ID 尚未在分配给不同客户端的 PEL 中,也会在 PEL 中创建待处理的消息条目。但是消息必须存在于流中,否则不存在的消息的 ID 将被忽略。
  5. JUSTID:仅返回成功声明的消息 ID 数组,不返回实际消息。使用此选项意味着重试计数器不会增加。

返回值

数组回复,具体来说:

该命令返回所有成功声明的消息,格式与XRANGE相同。但是,如果JUSTID指定了该选项,则仅报告消息 ID,而不包括实际消息。

例子

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

在上面的示例中,我们使用 ID 声明消息1526569498055-0,仅当消息空闲至少一小时而原始消费者或其他消费者没有取得进展(确认或声明),并将所有权分配给消费者Alice。

XDEL key id [id ...]

从 5.0.0 开始可用。

时间复杂度:无论流大小如何,流中要删除的每个单个项目都是 O(1)。

从流中删除指定的条目,并返回已删除的条目数。在某些指定的 ID 在流中不存在的情况下,此数字可能小于传递给命令的 ID 的数量。

通常你可能会认为 Redis 流是一种仅追加的数据结构,但是 Redis 流是在内存中表示的,因此我们也可以删除条目。这可能很有用,例如,为了遵守某些隐私政策。

了解条目删除的低级细节

Redis 流的表示方式使其内存高效:使用基数树来索引线性打包数十个流条目的宏节点。通常,当您从流中删除一个条目时,该条目并没有真正被驱逐,它只是被标记为已删除。

最终,如果宏节点中的所有条目都被标记为已删除,则整个节点被销毁并回收内存。这意味着,如果您从流中删除大量条目,例如超过 50% 的附加到流的条目,则每个条目的内存使用量可能会增加,因为流将变得碎片化。但是流性能将保持不变。

在未来的 Redis 版本中,如果给定的宏节点达到给定数量的已删除条目,我们可能会触发节点垃圾回收。目前,随着我们对该数据结构的预期使用,增加这种复杂性并不是一个好主意。

返回值

整数回复:实际删除的条目数。

例子

> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"

XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]

从 5.0.0 开始可用。

时间复杂度: O(1)

此命令为存储在的流创建一个由 唯一标识的新使用者组

每个组在给定的流中都有一个唯一的名称。当同名的消费者组已经存在时,该命令返回-BUSYGROUP错误。

该命令的参数从新组的角度指定流中最后一个传递的条目。特殊 ID$表示流中最后一个条目的 ID,但您可以提供任何有效 ID。例如,如果您希望组的消费者从头开始获取整个流,请使用零作为消费者组的起始 ID:

XGROUP CREATE mystream mygroup 0

默认情况下,XGROUP CREATE命令坚持目标流存在并在不存在时返回错误。但是,如果流不存在,您可以使用可选MKSTREAM子命令作为 之后的最后一个参数来自动创建流(长度为 0):

XGROUP CREATE mystream mygroup $ MKSTREAM

可以指定可选的entries_read命名参数以启用任意 ID 的消费者组滞后跟踪。任意 ID 是不是流的第一个条目的 ID、它的最后一个条目或零(“0-0”)ID 的任何 ID。这有助于您准确了解任意 ID(不包括它)和流的最后一个条目之间有多少条目。在这种情况下,entries_read可以将流设置为entries_added减去条目数。

返回值

简单的字符串回复:OK成功。

历史

  • Redis 版本 >= 7.0.0:添加了 entries_read 命名参数。

XGROUP CREATECONSUMER key groupname consumername

从 6.2.0 开始可用。

时间复杂度: O(1)

创建一个在存储在的流的消费者组中命名的消费者。

每当操作(例如XREADGROUP)引用不存在的使用者时,也会自动创建使用者。

返回值

  • 整数回复:创建的消费者数量(0或1)

XGROUP DELCONSUMER key groupname consumername

从 5.0.0 开始可用。

时间复杂度: O(1)

XGROUP DELCONSUMER命令从消费者组中删除消费者。

有时删除旧消费者可能很有用,因为它们不再使用。

但是请注意,消费者拥有的任何待处理消息在被删除后都将变得不可声明。因此,强烈建议在从组中删除使用者之前声明或确认任何待处理的消息。

返回值

  • 整数回复:消费者在被删除之前拥有的待处理消息的数量

XGROUP DESTROY key groupname

从 5.0.0 开始可用。

时间复杂度: O(N),其中 N 是组的待处理条目列表 (PEL) 中的条目数。

XGROUP DESTROY命令完全销毁消费者组。

即使有活动的消费者和待处理的消息,消费者组也会被销毁,因此请确保仅在真正需要时调用此命令。

返回值

整数回复:被销毁的消费者组数(0或1)

XGROUP SETID key groupname id|$ [ENTRIESREAD entries_read]

从 5.0.0 开始可用。

时间复杂度: O(1)

为消费者组设置最后交付的 ID。

通常,使用XGROUP CREATE创建组时,会设置消费者组的最后交付 ID 。XGROUP SETID命令允许修改组的最后交付 ID,而无需删除和重新创建组。例如,如果您希望消费者组中的消费者重新处理流中的所有消息,您可能希望将其下一个 ID 设置为 0:

XGROUP SETID mystream mygroup 0

可以指定可选entries_read参数以启用任意 ID 的消费者组滞后跟踪。任意 ID 是不是流的第一个条目的 ID、它的最后一个条目或零(“0-0”)ID 的任何 ID。这有助于您准确了解任意 ID(不包括它)和流的最后一个条目之间有多少条目。在这种情况下,entries_read可以将流设置为entries_added减去条目数。

返回值

简单的字符串回复:OK成功。

历史

  • Redis 版本 >= 7.0.0:添加了可选的 entries_read 参数。

XINFO CONSUMERS key groupname

从 5.0.0 开始可用。

时间复杂度: O(1)

此命令返回属于存储在的流的使用者组的使用者列表

为组中的每个消费者提供以下信息:

  • name : 消费者的名字
  • pending : 客户端的未决消息数,即已传递但尚未确认的消息
  • idle : 自消费者上次与服务器交互以来经过的毫秒数

数组回复:消费者列表。

例子

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

XINFO GROUPS key

从 5.0.0 开始可用。

时间复杂度: O(1)

此命令返回存储在的流的所有消费者组的列表

默认情况下,仅为每个组提供以下信息:

  • name:消费者组的名称
  • 消费者:组中消费者的数量
  • pending:组的未决条目列表 (PEL) 的长度,即已传递但尚未确认的消息
  • last-delivered-id : 最后一个条目的 ID 交付组的消费者
  • entry_read:传递给组消费者的最后一个条目的逻辑“读取计数器”
  • lag:流中仍在等待交付给组的消费者的条目数,如果无法确定该数字,则为 NULL。

消费群体滞后

给定消费者组的滞后是组entries_read和流之间范围内的条目数entries_added。换句话说,它是尚未交付给组消费者的条目数。

该指标的值和趋势有助于做出有关消费者群体的扩展决策。您可以通过向组中添加更多消费者来解决高滞后值,而低值可能表明您可以从组中删除消费者以缩小其规模。

Redis 通过保留两个计数器来报告消费者组的滞后:添加到流中的所有条目的数量和消费者组进行的逻辑读取的数量。滞后是这两者之间的差异。

流的计数器(XINFO STREAM命令的entries_added字段)在每次XADD时加一,并计算在其生命周期内添加到流中的所有条目。

消费者组的计数器entries_read是该组已读取的条目的逻辑计数器。重要的是要注意这个计数器只是一个启发式的而不是一个准确的计数器,因此使用术语“逻辑”。计数器试图反映组应该读取的条目数以达到其当前last-delivered-id 。entries_read计数器仅在完美世界中是准确的,其中消费者组从流的第一个条目开始并处理其所有条目(即,没有在处理之前删除的条目)。

有两种特殊情况,此机制无法报告滞后:

  1. 使用任意最后交付的 ID(分别为XGROUP CREATE和XGROUP SETID命令)创建或设置消费者组。任意 ID 是不是流的第一个条目的 ID、它的最后一个条目或零(“0-0”)ID 的任何 ID。
  2. last-delivered-id组和流之间的一个或多个条目last-generated-id被删除(使用XDEL或修剪操作)。

在这两种情况下,组的读取计数器都被认为是无效的,并且返回的值设置为 NULL 以表示滞后当前不可用。

但是,延迟只是暂时不可用。当消费者继续处理消息时,它会在常规操作期间自动恢复。一旦消费者组将流中的最后一条消息传递给其成员,它将设置正确的逻辑读取计数器,并且可以恢复跟踪其滞后。

数组回复:消费者组列表。

例子

> XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 2
    5) "pending"
    6) (integer) 2
    7) "last-delivered-id"
    8) "1638126030001-0"
    9) "entries_read"
   10) (integer) 2
   11) "lag"
   12) (integer) 0
2)  1) "name"
    2) "some-other-group"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1638126028070-0"
    9) "entries_read"
   10) (integer) 1
   11) "lag"
   12) (integer) 1

历史

  • Redis 版本 >= 7.0.0:添加了 entries-readlag 字段

XINFO STREAM key [FULL [COUNT count]]

从 5.0.0 开始可用。

时间复杂度: O(1)

此命令返回有关存储在的流的信息

此命令提供的信息详细信息是:

  • length:流中的条目数(请参阅XLEN)
  • radix-tree-keys:底层基数数据结构中的键数
  • radix-tree-nodes:底层基数数据结构中的节点数
  • groups:为流定义的消费者组的数量
  • last-generated-id:最近添加到流中的条目的 ID
  • max-deleted-entry-id:从流中删除的最大条目 ID
  • entries-added:在其生命周期内添加到流中的所有条目的计数
  • first-entry:流中第一个条目的 ID 和字段值元组
  • last-entry:流中最后一个条目的 ID 和字段值元组

可选FULL修饰符提供更详细的回复。提供时,FULL回复包括一个条目数组,该数组由升序的流条目(ID 和字段值元组)组成。此外,groups也是一个数组,对于每个消费者组,它都由信孚集团和信孚消费者报告的信息组成。

该COUNT选项可用于限制返回的流和 PEL 条目的数量(返回第一个条目)。默认COUNT值为 10,a COUNTof 0 表示将返回所有条目(如果流有很多条目,则执行时间可能会很长)。

返回值

数组回复:信息位列表

例子

默认回复:

> XINFO STREAM mystream
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1638125141232-0"
 9) "max-deleted-entry-id"
10) "0-0"
11) "entries-added"
12) (integer) 2
13) "groups"
14) (integer) 1
15) "first-entry"
16) 1) "1638125133432-0"
    2) 1) "message"
       2) "apple"
17) "last-entry"
18) 1) "1638125141232-0"
    2) 1) "message"
       2) "banana"

完整回复:

> XADD mystream * foo bar
"1638125133432-0"
> XADD mystream * foo bar2
"1638125141232-0"
> XGROUP CREATE mystream mygroup 0-0
OK
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1638125133432-0"
         2) 1) "foo"
            2) "bar"
> XINFO STREAM mystream FULL
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1638125141232-0"
 9) "max-deleted-entry-id"
10) "0-0"
11) "entries-added"
12) (integer) 2
13) "entries"
14) 1) 1) "1638125133432-0"
       2) 1) "foo"
          2) "bar"
    2) 1) "1638125141232-0"
       2) 1) "foo"
          2) "bar2"
15) "groups"
16) 1)  1) "name"
        2) "mygroup"
        3) "last-delivered-id"
        4) "1638125133432-0"
        5) "entries-read"
        6) (integer) 1
        7) "lag"
        8) (integer) 1
        9) "pel-count"
       10) (integer) 1
       11) "pending"
       12) 1) 1) "1638125133432-0"
              2) "Alice"
              3) (integer) 1638125153423
              4) (integer) 1
       13) "consumers"
       14) 1) 1) "name"
              2) "Alice"
              3) "seen-time"
              4) (integer) 1638125153423
              5) "pel-count"
              6) (integer) 1
              7) "pending"
              8) 1) 1) "1638125133432-0"
                    2) (integer) 1638125153423
                    3) (integer) 1
>

历史

  • Redis 版本 >= 7.0.0:添加了 max-deleted-entry-identries-addedrecorded-first-entry-identries-readlag 字段

XLEN key

从 5.0.0 开始可用。

时间复杂度: O(1)

返回流中的条目数。如果指定的键不存在,则命令返回零,就好像流为空一样。但是请注意,与其他 Redis 类型不同,零长度流是可能的,因此您应该调用TYPE或EXISTS以检查键是否存在。

一旦内部没有条目(例如在XDEL调用之后),流就不会自动删除,因为流可能有与之关联的消费者组。

返回值

整数回复: 处的流的条目数key。

例子

redis> XADD mystream * item 1
"1647336770490-0"
redis> XADD mystream * item 2
"1647336770490-1"
redis> XADD mystream * item 3
"1647336770490-2"
redis> XLEN mystream
(integer) 3
redis> 

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

从 5.0.0 开始可用。

时间复杂度: O(N),N 是返回的元素数量,因此每次调用要求少量固定数量的条目是 O(1)。O(M),其中 M 是与 IDLE 过滤器一起使用时扫描的条目总数。当命令只返回摘要并且消费者列表很小时,它会在 O(1) 时间内运行;否则,需要额外的 O(N) 时间来迭代每个消费者。

通过消费者组从流中获取数据,而不确认此类数据,具有创建待处理条目的效果。这在XREADGROUP命令中得到了很好的解释,在我们对 Redis Streams 的介绍中甚至更好 。XACK命令将立即从待处理条目列表 (PEL)中删除待处理条目,因为一旦成功处理了消息,消费者组就不再需要跟踪它并记住消息的当前所有者。

XPENDING命令是检查待处理消息列表的接口,因此是一个非常重要的命令,用于观察和了解流消费者组发生的情况:哪些客户端处于活动状态,哪些消息待处理,或查看是否有空闲消息。此外,该命令与XCLAIM一起 用于实现对长时间失败的消费者的恢复,从而导致某些消息未被处理:不同的消费者可以认领该消息并继续。这在流介绍和 XCLAIM中有更好的解释命令页面,此处不再赘述。

XPENDING汇总表

当仅使用键名和消费者组名称调用XPENDING时,它只输出有关给定消费者组中未决消息的摘要。在下面的示例中,我们创建了一个消费者组,并通过使用 XREADGROUP从组中读取来立即创建一条待处理消息。

> XGROUP CREATE mystream group55 0-0
OK

> XREADGROUP GROUP group55 consumer-123 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"

我们希望消费者组的待处理条目列表group55现在有一条消息:名为的消费者consumer-123在未确认其处理的情况下获取了消息。简单的XPENDING 表格将为我们提供以下信息:

> XPENDING mystream group55
1) (integer) 1
2) 1526984818136-0
3) 1526984818136-0
4) 1) 1) "consumer-123"
      2) "1"

在这种形式中,该命令输出该消费者组的待处理消息总数,即1,后跟待处理消息中最小和最大的ID,然后列出该消费者组中至少有一条待处理消息的每个消费者,以及它拥有的待处理消息的数量。

XPENDING 的扩展形式

摘要提供了一个很好的概述,但有时我们对细节感兴趣。为了查看具有更多相关信息的所有待处理消息,我们还需要传递一系列 ID,以类似的方式使用 XRANGE和非可选计数参数来限制每次调用返回的消息数量:

> XPENDING mystream group55
1) (integer) 1
2) 1526984818136-0
3) 1526984818136-0
4) 1) 1) "consumer-123"
      2) "1"

在扩展表单中,我们不再看到摘要信息,而是在待处理条目列表中显示每条消息的详细信息。对于每条消息,返回四个属性:

  1. 消息的 ID。
  2. 获取消息但仍需确认的消费者的名称。我们称它为消息的当前所有者。
  3. 自上次将此消息传递给此使用者以来经过的毫秒数。
  4. 此消息的传递次数。

传递计数器,即数组中的第四个元素,当其他消费者使用XCLAIM声明消息时,或者当消息通过XREADGROUP再次传递时,在访问消费者组中消费者的历史记录时递增(参见XREADGROUP页面了解更多信息)。

可以将附加参数传递给命令,以查看具有特定所有者的消息:

> XPENDING mystream group55 - + 10 consumer-123

但在上述情况下,输出将是相同的,因为我们只有单个消费者的待处理消息。但是要记住的重要一点是,即使有来自许多消费者的许多待处理消息,这个由特定消费者过滤的操作也不是低效的:我们有一个全局和每个消费者的待处理条目列表数据结构,所以我们可以非常有效地只显示单个消费者的待处理消息。

空闲时间过滤器

也可以通过空闲时间过滤待处理的流条目,以毫秒为单位(对于XCLAIM一段时间未处理的条目很有用):

> XPENDING mystream group55 IDLE 9000 - + 10
> XPENDING mystream group55 IDLE 9000 - + 10 consumer-123

第一种情况将返回整个组中空闲超过 9 秒的前 10 个(或更少)PEL 条目,而在第二种情况下仅返回 consumer-123.

独占范围和迭代 PEL

XPENDING命令允许遍历待处理的条目,就像 XRANGE和XREVRANGE允许流的条目一样。您可以通过在最后读取的挂起条目的 ID 前面加上(表示开放(独占)范围的字符来做到这一点,并将其证明给随后的命令调用。

返回值

数组回复,具体来说:

该命令根据调用方式以不同格式返回数据,如本页前面所述。但是,回复始终是一组项目。

历史

  • Redis 版本 >= 6.2.0:添加了 IDLE 选项和独占范围间隔。

XRANGE key start end [COUNT count]

从 5.0.0 开始可用。

时间复杂度: O(N),其中 N 是返回的元素数。如果 N 是常数(例如,总是用 COUNT 请求前 10 个元素),你可以认为它是 O(1)。

该命令返回匹配给定 ID 范围的流条目。范围由最小和最大 ID 指定。返回具有两个指定的 ID 或恰好是指定的两个 ID 之一(闭区间)的所有条目。

XRANGE命令有许多应用:

  • 返回特定时间范围内的项目。这是可能的,因为流 ID与时间有关。
  • 增量迭代流,每次迭代只返回几个项目。然而,它在语义上比SCAN系列函数更健壮。
  • 从流中获取单个条目,提供要获取两次的条目 ID:作为查询间隔的开始和结束。

该命令还有一个以相反顺序返回项目的互惠命令,称为XREVRANGE,其他方面是相同的。

-和+特殊身份证

-和特殊 ID 分别表示流中可能的+最小 ID 和可能的最大 ID,因此以下命令将仅返回流中的每个条目:

> XRANGE somestream - +
1) 1) 1526985054069-0
   2) 1) "duration"
      2) "72"
      3) "event-id"
      4) "9"
      5) "user-id"
      6) "839248"
2) 1) 1526985069902-0
   2) 1) "duration"
      2) "415"
      3) "event-id"
      4) "2"
      5) "user-id"
      6) "772213"
... other entries here ...

ID 实际上与-指定完全相同0-0,而 +等效于18446744073709551615-18446744073709551615,但是它们更易于键入。

ID 不完整

流 ID 由两部分组成,一个 Unix 毫秒时间戳和一个在同一毫秒内插入的条目的序列号。可以使用XRANGE仅指定 ID 的第一部分,即毫秒时间,如下例所示:

> XRANGE somestream 1526985054069 1526985055069

在这种情况下,XRANGE将自动完成开始间隔-0 和结束间隔-18446744073709551615,以返回在给定毫秒和另一个指定毫秒结束之间生成的所有条目。这也意味着重复相同的毫秒两次,我们会在该毫秒内获得所有条目,因为序列号范围将从零到最大值。

以这种方式使用XRANGE作为范围查询命令来获取指定时间内的条目。这对于访问流中过去事件的历史非常方便。

独家系列

默认情况下,范围很接近(包括),这意味着回复可以包含 ID 与查询的开始和结束间隔匹配的条目。可以通过在 ID 前加上字符 来指定一个开区间(不包括)(。这对于迭代流很有用,如下所述。

返回最大条目数

使用COUNT选项可以减少报告的条目数。这是一个非常重要的功能,即使它看起来很微不足道,因为它允许例如对操作进行建模,例如给我大于或等于以下内容的条目:

> XRANGE somestream 1526985054069-0 + COUNT 1
1) 1) 1526985054069-0
   2) 1) "duration"
      2) "72"
      3) "event-id"
      4) "9"
      5) "user-id"
      6) "839248"

在上述情况下,条目1526985054069-0存在,否则服务器会向我们发送下一个条目。UsingCOUNT也是使用XRANGE作为迭代器的基础。

迭代流

为了迭代流,我们可以进行如下操作。假设我们每次迭代需要两个元素。我们开始获取前两个元素,这很简单:

> XRANGE writers - + COUNT 2
1) 1) 1526985676425-0
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) 1526985685298-0
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"

然后,不再从 开始迭代,而是使用上一次XRANGE调用返回的最后一个-条目的条目 ID 作为范围的开始作为排他区间。

最后一个条目的 ID 是1526985685298-0,所以我们只需在它前面加上一个 '(',然后继续我们的迭代:

> XRANGE writers (1526985685298-0 + COUNT 2
1) 1) 1526985691746-0
   2) 1) "name"
      2) "Toni"
      3) "surname"
      4) "Morrison"
2) 1) 1526985712947-0
   2) 1) "name"
      2) "Agatha"
      3) "surname"
      4) "Christie"

等等。最终,这将允许访问流中的所有条目。显然,我们可以通过提供给定的不完整开始 ID,从任何 ID,甚至从特定时间开始迭代。此外,我们可以通过提供结束 ID 或不完整 ID 而不是+.

命令XREAD也能够迭代流。命令XREVRANGE可以逆向迭代流,从较高的 ID(或时间)到较低的 ID(或时间)。

使用早期版本的 Redis 进行迭代

虽然独占范围间隔仅在 Redis 6.2 中可用,但仍然可以在早期版本中使用类似的流迭代模式。您开始以与上述相同的方式从流中获取第一个条目。

对于后续调用,您需要以编程方式推进返回的最后一个条目的 ID。大多数 Redis 客户端应该抽象出这个细节,但如果需要,实现也可以在应用程序中。在上面的示例中,这意味着将序列1526985685298-0从 0 增加到 1。因此,第二次调用将是:

> XRANGE writers 1526985685298-1 + COUNT 2
1) 1) 1526985691746-0
   2) 1) "name"
      2) "Toni"
...

另外,请注意,一旦最后一个 ID 的序列部分等于 18446744073709551615,您需要增加时间戳并将序列部分重置为 0。例如,增加 ID 1526985685298-18446744073709551615应该会导致1526985685299-0.

对称模式适用于使用XREVRANGE迭代流。唯一的区别是客户端需要为后续调用递减 ID。递减序列部分为0的ID时,时间戳需要减1,序列设置为18446744073709551615。

获取单个项目

如果您查找XGET命令,您会感到失望,因为XRANGE 是从流中获取单个条目的有效方法。您所要做的就是在 XRANGE 的参数中指定两次 ID:

> XRANGE mystream 1526984818136-0 1526984818136-0
1) 1) 1526984818136-0
   2) 1) "duration"
      2) "1532"
      3) "event-id"
      4) "5"
      5) "user-id"
      6) "7782813"

有关流的附加信息

有关 Redis 流的更多信息,请查看我们 对 Redis Streams 文档的介绍。

返回值

数组回复,具体来说:

该命令返回 ID 与指定范围匹配的条目。返回的条目是完整的,这意味着返回了 ID 和它们组成的所有字段。此外,条目的返回顺序与它们的字段和值的顺序与XADD添加它们的顺序完全相同。

例子

redis> XADD writers * name Virginia surname Woolf
"1647337116946-0"
redis> XADD writers * name Jane surname Austen
"1647337116947-0"
redis> XADD writers * name Toni surname Morrison
"1647337116948-0"
redis> XADD writers * name Agatha surname Christie
"1647337116949-0"
redis> XADD writers * name Ngozi surname Adichie
"1647337116949-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1647337116946-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) "1647337116947-0"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis> 

历史

  • Redis 版本 >= 6.2.0:添加了独占范围。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

从 5.0.0 开始可用。

时间复杂度:对于提到的每个流:O(N),N 是返回的元素数,这意味着具有固定 COUNT 的 XREAD-ing 是 O(1)。请注意,当使用 BLOCK 选项时,XADD 将花费 O(M) 时间来服务在获取新数据的流上阻塞的 M 个客户端。\

从一个或多个流中读取数据,仅返回 ID 大于调用者报告的最后收到的 ID 的条目。如果项目不可用,此命令具有阻止选项,类似于BRPOP或BZPOPMIN等。

请注意,在阅读本页之前,如果您不熟悉流,我们建议您阅读我们的 Redis Streams 介绍。

非阻塞使用

如果不使用BLOCK选项,则该命令是同步的,并且可以被认为与XRANGE有点相关:它将返回流中的一系列项目,但是 即使我们只考虑同步用法,它与XRANGE相比也有两个根本区别:

  • 如果我们想同时从多个键中读取,则可以使用多个流调用此命令。这是XREAD的一个关键特性,因为特别是在使用BLOCK阻塞时,能够通过单个连接到多个键进行监听是一个重要特性。
  • 虽然XRANGE返回一系列 ID 中的项目,但 XREAD更适合使用从第一个条目开始的流,这比我们目前看到的任何其他条目都大。因此,对于每个流,我们传递给XREAD的是我们从该流中接收到的最后一个元素的 ID。

例如,如果我有两个流mystream和writers,并且我想从它们包含的第一个元素开始从两个流中读取数据,我可以像下面的示例一样调用XREAD 。

注意:我们在示例中使用了COUNT选项,因此对于每个流,调用将在每个流中最多返回两个元素。

> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

STREAMS选项是强制性的,并且必须是最终选项,因为此类选项以下列格式获取可变长度的参数:

STREAMS key_1 key_2 key_3 ... key_N ID_1 ID_2 ID_3 ... ID_N

因此,我们从一个键列表开始,然后继续使用所有关联的 ID,代表我们为该流收到的最后一个 ID,这样调用将只为我们提供来自同一流的更大 ID。

例如,在上面的示例中,我们为流接收到的最后一个项目mystream具有 ID 1526999352406-0,而对于流writers具有 ID 1526985685298-0。

要继续迭代两个流,我将调用:

> XREAD COUNT 2 STREAMS mystream writers 1526999352406-0 1526985685298-0
1) 1) "mystream"
   2) 1) 1) 1526999626221-0
         2) 1) "duration"
            2) "911"
            3) "event-id"
            4) "7"
            5) "user-id"
            6) "9488232"
2) 1) "writers"
   2) 1) 1) 1526985691746-0
         2) 1) "name"
            2) "Toni"
            3) "surname"
            4) "Morrison"
      2) 1) 1526985712947-0
         2) 1) "name"
            2) "Agatha"
            3) "surname"
            4) "Christie"

等等。最终,调用将不会返回任何项目,而只是一个空数组,然后我们知道没有什么可以从我们的流中获取(我们将不得不重试该操作,因此该命令也支持阻塞模式)。

ID 不完整

使用不完整的 ID 是有效的,就像它对XRANGE一样有效。然而,这里的 ID 序列部分,如果缺失,总是被解释为零,所以命令:

> XREAD COUNT 2 STREAMS mystream writers 0 0

完全等同于

> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0

数据阻塞

在其同步形式中,只要有更多可用项,该命令就可以获取新数据。但是,在某些时候,我们将不得不等待数据生产者使用XADD将新条目推送到我们正在消费的流中。为了避免以固定或自适应间隔进行轮询,该命令能够根据指定的流和 ID 在无法返回任何数据时阻塞,并在请求的键之一接受数据后自动解除阻塞。

重要的是要了解,此命令会扇出所有等待相同 ID 范围的客户端,因此每个消费者都将获得数据的副本,这与使用阻塞列表弹出操作时发生的情况不同。

为了阻塞,使用了BLOCK选项,以及我们想要在超时之前阻塞的毫秒数。通常 Redis 阻塞命令以秒为单位超时,但是此命令需要毫秒超时,即使通常服务器将具有接近 0.1 秒的超时分辨率。这次在某些用例中可以阻塞更短的时间,如果服务器内部会随着时间的推移而改进,那么超时的解决可能会得到改进。

当传递了BLOCK命令,但至少在传递的流之一中有数据要返回时,该命令将同步执行 ,就像缺少 BLOCK 选项一样。

这是一个阻塞调用的示例,该命令稍后会返回一个空回复,因为超时已经过去而没有新数据到达:

> XREAD BLOCK 1000 STREAMS mystream 1526999626221-0
(nil)

特殊$标识。

当阻塞时,有时我们只想接收从我们阻塞的那一刻开始通过XADD添加到流中的条目。在这种情况下,我们对已添加条目的历史不感兴趣。对于这个用例,我们必须检查流顶部元素 ID,并在XREAD命令行中使用该 ID。这不干净,需要调用其他命令,因此可以使用特殊$ ID 来指示流我们只需要新事物。

了解您应该 仅在第一次调用XREAD时使用 ID非常重要。稍后,ID 应该是流中最后报告的项目之一,否则您可能会错过中间添加的所有条目。$

这是消费者愿意只消费新条目的第一次迭代中典型的XREAD调用的样子:

> XREAD BLOCK 5000 COUNT 100 STREAMS mystream $

一旦我们得到一些回复,下一个电话将是这样的:

> XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3

等等。

如何为单个流上被阻止的多个客户端提供服务

列表或排序集上的阻塞列表操作具有弹出行为。基本上,元素从列表或排序集中删除,以便返回给客户端。在这种情况下,您希望以公平的方式消费项目,具体取决于在给定密钥上被阻止的客户端到达的时刻。通常 Redis 在此用例中使用 FIFO 语义。

但是请注意,对于流,这不是问题:在为客户端提供服务时,不会从流中删除流条目,因此只要XADD命令向流提供数据,就会为每个等待的客户端提供服务。

返回值

数组回复,具体来说:

该命令返回一个结果数组:返回数组的每个元素都是一个由两个元素组成的数组,其中包含键名和为该键报告的条目。报告的条目是完整的流条目,具有 ID 以及所有字段和值的列表。字段和值保证按照XADD添加的顺序报告。

使用BLOCK时,超时时返回空回复。

强烈建议阅读Redis Streams 介绍,以便更多地了解流的整体行为和语义。

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]

从 5.0.0 开始可用。

时间复杂度:对于提到的每个流:O(M),M 是返回的元素数。如果 M 是常数(例如,总是用 COUNT 请求前 10 个元素),你可以认为它是 O(1)。另一方面,当 XREADGROUP 阻塞时,XADD 将支付 O(N) 时间,以便为在获取新数据的流上阻塞的 N 个客户端提供服务。

XREADGROUP命令是 XREAD 命令的特殊版本,支持使用者组。在阅读此页面之前,您可能必须了解 XREAD命令才有意义。

此外,如果您不熟悉流,我们建议您阅读我们 对 Redis Streams 的介绍。确保在介绍中了解消费者组的概念,以便更简单地了解此命令的工作原理。

30秒消费群体

这个命令和 vanilla XREAD的区别在于这个命令支持消费者组。

如果没有消费者组,只需使用XREAD,所有客户端都会收到所有到达流的条目。而不是使用带有XREADGROUP的消费者组,可以创建客户端组,这些客户端组使用到达给定流中的消息的不同部分。例如,如果流获取新条目 A、B 和 C,并且有两个消费者通过消费者组读取,则一个客户端将获得例如消息 A 和 C,另一个客户端将获得消息 B,并且等等。

在消费者组中,给定的消费者(即,只是一个从流中消费消息的客户端)必须使用唯一的消费者名称来标识。这只是一个字符串。

消费者组的保证之一是给定消费者只能看到传递给它的消息的历史记录,因此消息只有一个所有者。但是,有一个称为消息声明的特殊功能,它允许其他消费者声明消息,以防某些消费者出现不可恢复的故障。为了实现这种语义,消费者组需要通过XACK命令明确确认消费者成功处理的消息。这是必需的,因为流将为每个消费者组跟踪谁在处理什么消息。

这是如何理解是否要使用消费者组:

  1. 如果您有一个流和多个客户端,并且您希望所有客户端获取所有消息,则不需要消费者组。
  2. 如果您有一个流和多个客户端,并且您希望流在您的客户端之间进行分区或分片,以便每个客户端将获得到达流中的消息的子集,您需要一个消费者组。

XREAD 和 XREADGROUP 之间的区别

从语法的角度来看,这些命令几乎相同,但是XREADGROUP 需要一个特殊的强制选项:

GROUP <group-name> <consumer-name>

组名只是与流关联的消费者组的名称。该组是使用XGROUP命令创建的。消费者名称是客户端用来在组内标识自己的字符串。消费者在第一次看到时在消费者组内自动创建。不同的客户端应该选择不同的消费者名称。

当您使用XREADGROUP阅读时,服务器会记住给定消息已传递给您:该消息将存储在消费者组中称为待处理条目列表 (PEL) 中,即已传递但未传递的消息 ID 列表还承认。

客户端必须使用XACK确认消息处理 ,以便从 PEL 中删除挂起的条目。可以使用XPENDING命令检查 PEL。

该NOACK子命令可用于避免在不要求可靠性并且可以接受偶尔的消息丢失的情况下将消息添加到 PEL。这相当于在阅读消息时确认消息。

使用XREADGROUP时在STREAMS选项中指定的 ID可以是以下两个之一:

  • 特殊>ID,这意味着消费者只想接收从未传递给任何其他消费者的消息。它只是意味着,给我新的信息。
  • 任何其他 ID,即 0 或任何其他有效 ID 或不完整 ID(仅毫秒时间部分),将具有返回等待发送命令的消费者的条目的效果,其 ID 大于提供的 ID。所以基本上如果 ID 不是>,那么该命令只会让客户端访问其挂起的条目:传递给它但尚未确认的消息。请注意,在这种情况下,两者BLOCK和NOACK都将被忽略。

与XREAD一样,XREADGROUP命令可以以阻塞方式使用。在这方面没有区别。

当消息传递给消费者时会发生什么?

两件事情:

  • 如果消息从未传递给任何人,也就是说,如果我们谈论的是新消息,则创建 PEL(待处理条目列表)。
  • 相反,如果消息已经传递给这个消费者,并且它只是再次重新获取相同的消息,那么最后一个传递计数器将更新为当前时间,传递的数量增加一。您可以使用XPENDING命令访问这些消息属性。

使用示例

通常你使用这样的命令来获取新消息并处理它们。在伪代码中:

WHILE true
    entries = XREADGROUP GROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
    if entries == nil
        puts "Timeout... try again"
        CONTINUE
    end

    FOREACH entries AS stream_entries
        FOREACH stream_entries as message
            process_message(message.id,message.fields)

            # ACK the message as processed
            XACK mystream $GroupName message.id
        END
    END
END

这样,示例消费者代码将只获取新消息,处理它们,并通过XACK确认它们。然而,上面的示例代码并不完整,因为它不处理崩溃后的恢复。如果我们在处理消息的过程中崩溃会发生什么,我们的消息将保留在待处理条目列表中,因此我们可以通过将 XREADGROUP初始 ID 设置为 0 并执行相同的循环来访问我们的历史记录。一旦提供 ID 为 0 的回复是一组空消息,我们知道我们处理并确认了所有待处理的消息:我们可以开始使用>作为 ID,以便获取新消息并重新加入正在处理新事物的消费者。

要查看命令的实际响应方式,请查看XREAD命令页面。

返回值

数组回复,具体来说:

该命令返回一个结果数组:返回数组的每个元素都是一个由两个元素组成的数组,其中包含键名和为该键报告的条目。报告的条目是完整的流条目,具有 ID 以及所有字段和值的列表。字段和值保证按照XADD添加的顺序报告。

使用BLOCK时,超时时返回空回复。

强烈建议阅读Redis Streams 介绍,以便更多地了解流的整体行为和语义。

XREVRANGE key end start [COUNT count]

从 5.0.0 开始可用。

时间复杂度: O(N),N 是返回的元素数。如果 N 是常数(例如,总是用 COUNT 请求前 10 个元素),你可以认为它是 O(1)。

此命令与XRANGE完全一样,但显着不同之处在于以相反的顺序返回条目,并且还以相反的顺序获取开始-结束范围:在XREVRANGE中,您需要声明结束ID,然后是开始ID,以及命令将产生两个 ID 之间(或完全类似)的所有元素,从末端开始。

因此,例如,要获取从较高 ID 到较低 ID 的所有元素,可以使用:

XREVRANGE somestream + -

类似地,只需将最后一个元素添加到流中,发送就足够了:

XREVRANGE somestream + - COUNT 1

返回值

数组回复,具体来说:

该命令返回 ID 与指定范围匹配的条目,从较高 ID 到较低 ID 匹配。返回的条目是完整的,这意味着返回了 ID 和它们组成的所有字段。此外,条目及其字段和值的返回顺序与XADD添加它们的顺序完全相同。

例子

redis> XADD writers * name Virginia surname Woolf
"1647338000264-0"
redis> XADD writers * name Jane surname Austen
"1647338000265-0"
redis> XADD writers * name Toni surname Morrison
"1647338000265-1"
redis> XADD writers * name Agatha surname Christie
"1647338000265-2"
redis> XADD writers * name Ngozi surname Adichie
"1647338000266-0"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) "1647338000266-0"
   2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis> 

历史

  • Redis 版本 >= 6.2.0:添加了独占范围。

XSETID key last-id [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id]

从 5.0.0 开始可用。

时间复杂度: O(1)

XSETID命令是一个内部命令。Redis 主服务器使用它来复制最后交付的流 ID。

历史

  • Redis 版本 >= 7.0.0:添加了 entries_addedmax_deleted_entry_id 参数。

XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]

从 5.0.0 开始可用。

时间复杂度: O(N),其中 N 是被驱逐条目的数量。然而,常数时间非常小,因为条目被组织在包含多个条目的宏节点中,这些条目可以通过一次释放来释放。

如果需要, XTRIM通过逐出较旧的条目(具有较低 ID 的条目)来修剪流。

可以使用以下策略之一来修剪流:

  • MAXLEN:只要流的长度超过指定的 ,就会驱逐条目threshold,其中threshold是一个正整数。
  • MINID: 驱逐 ID 低于 的条目threshold,其中threshold是流 ID。

例如,这会将流修剪为最新的 1000 个项目:

XTRIM mystream MAXLEN 1000

而在本例中,所有 ID 低于 649085820-0 的条目都将被逐出:

XTRIM mystream MINID 649085820

默认情况下,或者当提供可选=参数时,该命令执行精确修剪。

根据策略,精确修剪意味着:

  • MAXLEN:修剪后的流的长度将恰好是其原始长度和指定的 之间的最小值threshold。
  • MINID: 流中最旧的 ID 将恰好是其原始最旧 ID 和指定的threshold.

近乎精确的修剪

因为精确修剪可能需要 Redis 服务器的额外工作,所以~可以提供可选参数以提高效率。

例如:

XTRIM mystream MAXLEN ~ 1000

策略和方法之间的~争论意味着用户请求修剪流,使其长度至少为,但可能略长一些。在这种情况下,Redis 会在能够获得性能时提前停止修剪(例如,当无法删除数据结构中的整个宏节点时)。这使得修剪更加高效,并且通常是您想要的,尽管在修剪之后,流可能在.MAXLENthresholdthresholdthreshold

使用 , 时控制命令完成的工作量的另一种方法~是LIMIT子句。使用时,它指定count将被驱逐的最大条目。当LIMIT和count未指定时,默认值 100 * 宏节点中的条目数将隐式用作count. 将值 0 指定为count完全禁用限制机制。

返回值

整数回复:从流中删除的条目数。

例子

redis> XADD mystream * field1 A field2 B field3 C field4 D
"1647338143892-0"
redis> XTRIM mystream MAXLEN 2
(integer) 0
redis> XRANGE mystream - +
1) 1) "1647338143892-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"
redis> 

历史

  • Redis 版本 >= 6.2.0:添加了 MINID 修剪策略和 LIMIT 选项。