WHCSRL 技术网

Redis数据类型之stream类型

介绍

主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

Stream 类型结构

在这里插入图片描述
一个消费组中每个消费者不会消费同一条信息。

常用命令

xadd
描述:在某个stream中追加消息

xadd key ID field value [filed value ...]

需要提供key,消息ID方案,消息内容,其中消息内容为key-value型数据。
ID,最常使用*,表示由Redis生成消息ID,这也是强烈建议的方案。
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述
①是自己定义的key ;②是定义的ID,* 代表由redis自动生成,格式为:时间戳-index。index一般情况下为0,但当并发量大,同一时间新增数据量大则+1。

xlen
描述:返回结果为stream数据类型的长度

xlen key
  • 1

在这里插入图片描述
xrange
描述:获取消息列表,会自动过滤已经删除的消息

xrange key start end [COUNT count]
  • 1

在这里插入图片描述
-表示最小值, +表示最大值
在这里插入图片描述
xread
我们可以在不定义消费组的情况下进行Stream消息的独立消费,当Stream没有新消息时,甚至可以阻塞等待。Redis设计了一个单独的消费指令xread,可以将Stream当成普通的消息队列(list)来使用。使用xread时,我们可以完全忽略消费组(Consumer Group)的存在,就好比Stream就是一个普通的列表(list)。
在这里插入图片描述
xgroup create
Stream通过xgroup create指令创建消费组(Consumer Group),需要传递起始消息ID参数用来初始化last_delivered_id变量。

xgroup [CREATE key groipname id-or-$]
  • 1

在这里插入图片描述
xinfo
描述:获取Stream信息

xinfo [CONSUMERS key groupname] [GROUPS key]
  • 1

在这里插入图片描述
xreadgroup group
Stream提供了xreadgroup指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息ID。它同xread一样,也可以阻塞等待新消息。读到新消息后,对应的消息ID就会进入消费者的PEL(正在处理的消息)结构里,客户端处理完毕后使用xack指令通知服务器,本条消息已经处理完毕,该消息ID就会从PEL中移除。

 consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ..] ID [ID ...]
  • 1

开始消费:
在这里插入图片描述
>号表示从当前消费组的last_delivered_id后面开始读
每当消费者读取一条消息,last_delivered_id变量就会前进

对比消费前后的消费组信息上图nginx消费者消费了4条
在这里插入图片描述
xrange key - + 获取消息列表
在这里插入图片描述
xpending
为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。

xpending key groupname [start end count] [consumer]
  • 1

在这里插入图片描述
在这里插入图片描述
上面的结果我们可以看到,我们之前读取的消息,都被记录在Pending列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?
使用命令 XACK 完成告知消息处理完成,演示如下:
xack

xack key group ID [ID ....]
  • 1

在这里插入图片描述
处理完毕后再次查看。待处理信息变成了2。
在这里插入图片描述

推荐阅读