服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - Redis - 详解Redis Stream做消息队列

详解Redis Stream做消息队列

2022-11-23 14:58在下uptown Redis

这篇文章主要介绍了详解Redis Stream做消息队列,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下

List

众所周知redis数据结构中的list的lpush与rpop可以用于常规消息队列,从集合的最左端写入,最右端弹出消费。并且支持多个生产者与多个消费者并发拿数据,数据只能由一个消费者拿到。

但这个方案并不能保证消费者消费消息后是否成功处理的问题(服务挂掉或处理异常等),机制属于点对点模式不能做广播模式(发布/订阅模式)

Pub/sub

于是redis提供了相应的发布订阅功能,为了解除点对点的强绑定模式引入了Channel管道

当生产者向管道中发布消息,订阅了该管道的消费者能够同时接收到该消息,而且为了简化订阅多个管道需要显式关注多个名称提供了pattern能力。

详解Redis Stream做消息队列

通过名称匹配如果接收消息的频道wmyskxz.chat,consumer3也会收到消息。

但这个方案也有很大的诟病就是不会持久化,如果服务挂掉重启数据就全丢弃了,也没有提供ack机制,不保证数据可靠性,不管有没有消费成功发后既忘。

Stream

stream的话结构很像kafka的设计思想,提供了consumer group和offset机制,结构上感觉跟kafka的topic差不多,只是没有对应partation副本机制,而是一个追加消息的链表结构。客户端调用XADD时候自动创建stream。每个消息都会持久化并存在唯一的id标识

详解Redis Stream做消息队列

Consumer Group

消费者组的概念跟kafka的消费者概念如出一辙,消费者既可以用XREAD命令进行独立消费,也可以多个消费者同时加入一个消费者组。一条消息只能由一个消费者组中的一个消费者消费。这样可以在分布式系统中保证消息的唯一性。

其实这个特性我后来仔细琢磨了一下当时自认为无懈可击的流式图表为了保证分布式系统消息唯一做了redis分布式锁。有点鸡肋,明明消费者组已经保证了数据的唯一性。只能说加锁可以压缩资源成本

last_delivered_id

用于标识消费者组消费在stream上消费位置的游标,每个消费者组都有一个stream内唯一的名称,消费者组不会自动创建,需要用XGROUP CREATE显式创建。

pending_ids

每个消费者内部都有一个状态变量。用来表示已经被客户端消费但没有ack的消费。目的是为了保证客户端至少消费了消息一次(atleastonce)。如果消费者收到了消息处理完了但是没有回复ack,就会导致列表不断增长,如果有很多消费组的话,那么这个列表占用的内存就会放大

curd

  • xadd 追加消息
  • xdel 删除消息,这里的删除仅仅是设置了标志位,不影响消息总长度
  • xrange 获取消息列表,会自动过滤已经删除的消息
  • xlen 消息长度
  • del 删除Stream

pending_ids如何避免消息丢失

在客户端消费者读取Stream消息时,Redis服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。

但是pending_ids里已经保存了发出去的消息ID。待客户端重新连上之后,可以再次收到pending_ids中的消息ID列表。

不过此时xreadgroup的起始消息必须是任意有效的消息ID,一般将参数设为0-0,表示读取所有的pending_ids消息以及自last_delivered_id之后的新消息。

嵌入SpringBoot

redis stream虽然还是有一些弊端,但是相比较而言用kafka之类的消息组件太重,redis用作消息队列已经很合适了。

这里简单提一下思路,本质上是提供一个管理消息的一个小功能,定义一个注解用于创建stream管道

详解Redis Stream做消息队列

创建一个注解类,标注该注解的类必须继承StreamListener<String, ObjectRecord<String, Object>>类且重写onMessage方法。方法上也加这个注解

创建一个config类实现BeanPostProcessor接口,重写bean声明周期postProcessAfterInitializationpostProcessBeforeInitialization方法。该方法会在spring启动流程里的refresh方法加载bean的声明周期中扫描到所有加了注解的bean。

通过线程池挨个创建stream的group组与stream的consumer监听连接,config类记得继承DisposableBean类在destroy方法里把连接关掉免得oom。

注册redis stream api提供的consumer容器

这里一定注意pollTimeout参数,看名字就知道默认拉取数据时间间隔,这个参数如果写的值很小或者写0,你就看你cpu高不高就完了。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean("listenerContainer")
@DependsOn(value = "redisConnectionFactory")
public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> init() {
   StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>>
   options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
         .batchSize(10)
         .serializer(new StringRedisSerializer())
         .executor(new ForkJoinPool())
         .pollTimeout(Duration.ofSeconds(3))
         .targetType(Object.class)
         .build();
   return StreamMessageListenerContainer.create(redisConnectionFactory, options);
}

创建消费者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private Subscription createSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) {
   StreamOperations<String, String, Object> streamOperations = this.stringRedisTemplate.opsForStream();
 
   if (stringRedisTemplate.hasKey(streamKey)) {
      StreamInfo.XInfoGroups groups = streamOperations.groups(streamKey);
 
      AtomicReference<Boolean> groupHasKey = new AtomicReference<>(false);
 
      groups.forEach(groupInfo -> {
         if (Objects.equals(group, groupInfo.getRaw().get("name"))) {
            groupHasKey.set(true);
         }
      });
 
      if (groups.isEmpty() || !groupHasKey.get()) {
         creatGroup(streamKey, group);
      } else {
         groups.stream().forEach(g -> {
            log.info("XInfoGroups:{}", g);
            StreamInfo.XInfoConsumers consumers = streamOperations.consumers(streamKey, g.groupName());
            log.info("XInfoConsumers:{}", consumers);
         });
      }
   } else {
      creatGroup(streamKey, group);
   }
   StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
   Consumer consumer = Consumer.from(group, consumerName);
 
   Subscription subscription = listenerContainer.receive(consumer, streamOffset, streamListener);
   listenerContainer.start();
   this.containerList.add(listenerContainer);
   return subscription;
}

到此这篇关于详解Redis Stream做消息队列的文章就介绍到这了,更多相关Redis Stream内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://juejin.cn/post/7146141052582232071

延伸 · 阅读

精彩推荐
  • Redis详解Redis 分布式锁遇到的序列化问题

    详解Redis 分布式锁遇到的序列化问题

    这篇文章主要介绍了Redis 分布式锁遇到的序列化问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下...

    WriteOnRead9232021-07-29
  • RedisRedis2.8配置文件中文详解

    Redis2.8配置文件中文详解

    这篇文章主要介绍了Redis2.8配置文件中文详解,本文提供的是是Redis2.8.9的配置文件各项的中文解释,需要的朋友可以参考下 ...

    Redis教程网2992019-10-24
  • Rediswindows下创建redis出现问题小结

    windows下创建redis出现问题小结

    这篇文章主要介绍了window下创建redis出现问题总结,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...

    郑子胜4122020-12-20
  • Redis在Centos 8.0中安装Redis服务器的教程详解

    在Centos 8.0中安装Redis服务器的教程详解

    由于考虑到linux服务器的性能,所以经常需要把一些中间件安装在linux服务上,今天通过本文给大家介绍下在Centos 8.0中安装Redis服务器的详细过程,感兴趣的...

    极客研究者8692022-10-11
  • RedisRedis实现主从复制方式(Master&Slave)

    Redis实现主从复制方式(Master&Slave)

    这篇文章主要介绍了Redis实现主从复制方式(Master&Slave),具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...

    遛狗大师6322022-10-24
  • Redisredis-shake同步redis数据的实现方法

    redis-shake同步redis数据的实现方法

    本文主要介绍了redis-shake同步redis数据的实现方法,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    小码农叔叔5462022-10-14
  • Redis使用redis分布式锁解决并发线程资源共享问题

    使用redis分布式锁解决并发线程资源共享问题

    这篇文章主要介绍了使用redis分布式锁解决并发线程资源共享问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要...

    保军Baojun3412019-11-26
  • RedisRedis之sql缓存的具体使用

    Redis之sql缓存的具体使用

    本文主要介绍了Redis之sql缓存的具体使用,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    yzm43999992022-01-24