服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - Redis - Redis+Hbase+RocketMQ 实际使用问题案例讲解

Redis+Hbase+RocketMQ 实际使用问题案例讲解

2023-01-28 14:23王德发! Redis

这篇文章主要介绍了Redis+Hbase+RocketMQ 实际使用问题案例分享,本文结合示例代码给大家讲解的非常详细,需要的朋友可以参考下

需求

  • 将Hbase数据,解析后推送到RocketMQ。
  • redis使用list数据类型,存储了需要推送的数据的RowKey及表名。

简单画个流程图就是:

Redis+Hbase+RocketMQ 实际使用问题案例讲解

分析及确定方案

Redis

  • 明确list中元素结构{"rowkey":rowkey,"table":table}解析出rowkey;
  • 一次取多个元素加快效率;取了之后放入重试队列,并删除原来的元素;
  • 处理数据永远是重试队列里的,成功之后删除,失败就加上重试次数并重新放回;
  • 明确从list中取值所使用的redis命令;
  • 范围获取LRANGE
  • 范围删除(留下指定范围的数据)LTRIM
  • 判断list长度LLEN
  • 加入listRPUSH;删除LREM等等;
  • 从Hbase获取数据失败和发送到mq失败都令重试次数加一;
  • 每次碰到重试次数不为0的数据都休眠1s;
  • 设置最大重试次数,达到限制后丢弃;
  • 考虑客户redis部署方式,单机、主从、集群、哨兵等;
  • 选择合适的客户端,Jedis、Redisson、Lettuce等;
  • 编写不同的操作代码,也可以利用配置文件、环境变量、工厂模式等适配各种部署模式;

Hbase

  • 基本理论知识学习(原来没接触过),rowkey是没条数据的主键,限定符是字段名,列族是多个限定名的集合等;
  • 当时看这个觉得不错,因为是不停读取数据、链接、Table不用close,可以缓存起来,没必要每次都创建;
  • 确定批量获取数据方式为批量Get,没用scan
  • 了解解析方式,一些网上的解析试了之后会乱码,这边用的是它自带的CellUtil.clone相关方法;
  • 考虑所有都没数据时休眠10s;

RocketMQ

  • 有现成的发送代码,公司封装好的;
  • 调整发送的速度、太快了服务端会吃不消(获取Hbase数据速度太快了,最开始没限制一会儿就入了百万数据),设置超时时间(默认3s);
  • 调整服务端的内存、线程数等参数;

实现

配置

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#server configuration
server.port=8896
#log config
logging.file.path=./logs
#redis-standalone
redis.standalone.host=
redis.standalone.port=6379
redis.standalone.password=
redis.standalone.enable=true
#redis-cluster
redis.cluster.nodes=
redis.cluster.password=
redis.cluster.timeout=30000
redis.cluster.enable=false
# Zookeeper 集群地址,逗号分隔
hbase.zookeeper.quorum=
# Zookeeper 端口
hbase.zookeeper.property.clientPort=2181
# 消息目的rocketmq地址
rocketmq.server.host=
# 发送消息间隔时间,防止发送过快mq受不了
rocketmq.send.interval.millisec=10
# 每次从redis读取数据量限制。
data.access.redisDataSize=100
# 失败数据重试次数,超过的直接丢弃
data.access.retryNum=10
# 需要接入的表,需要发送到rocketmq的topic和在redis中的key的映射。xxx.xxx.xxx[topic]=redisKey
data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back
data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back

部分代码

获取配置,其余的直接@Value("${}")

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "data.access")
public class AccessRedisMqConfig {
 
    /**
     * key:topic; value:redis的key
     */
    private Map<String, String> topicKeyMap = new HashMap<>();
 
    /**
     * 一次从redis中读取数据量限制
     */
    private long redisDataSize = 50;
 
    /**
     * 失败数据重试次数
     */
    private int retryNum = 10;
 
}

开启接入:

?
1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class AdapterRunner implements ApplicationRunner {
 
    @Resource
    private DataAccessService dataAccessService;
 
    @Override
    public void run(ApplicationArguments args) {
        System.out.println("项目已启动,开始接入数据到RocketMQ……");
        dataAccessService.accessData2Mq();
    }
}

其他代码其实也在分析里了。

踩坑

mq发送问题

?
1
2
3
4
5
6
7
8
9
10
11
12
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout
    at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572)
    at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
    at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Wo

上面分析也说了,注意发送速度,有多少资源就接入多快。还有注意相关三个端口是否开放。

总结

程序很简单,主要涉及方案的是,获取redis的list数据时,是考虑效率,及加入重试策略,保证数据不丢失等。

原文链接:https://www.cnblogs.com/letscrazy/p/case.html

延伸 · 阅读

精彩推荐
  • Redis使用Redis有序集合实现IP归属地查询详解

    使用Redis有序集合实现IP归属地查询详解

    这篇文章主要介绍了使用Redis有序集合实现IP归属地查询,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友...

    yongxinz5602019-11-23
  • Redis从源码解读redis持久化

    从源码解读redis持久化

    redis的持久化也就是数据落地,对于任何一个数据系统都要考虑是不是需要数据落地。在系统崩溃或是机房掉电等的情况下,将有用的数据记录在非易失性...

    爱编程厨师3902019-11-16
  • RedisRedis cluster集群的介绍

    Redis cluster集群的介绍

    今天小编就为大家分享一篇关于Redis cluster集群的介绍,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    qq_431937974382019-11-22
  • Redis如何高效使用Redis作为LRU缓存

    如何高效使用Redis作为LRU缓存

    这篇文章主要介绍了如何高效使用Redis作为LRU缓存,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考...

    程序零世界3702020-09-12
  • RedisRedis:我是如何与客户端进行通信的

    Redis:我是如何与客户端进行通信的

    我是一个Redis服务,最引以为傲的就是我的速度,我的 QPS 能达到10万级别。在我的手下有数不清的小弟,他们会时不时到我这来存放或者取走一些数据,我...

    码农参上11342021-06-22
  • Redisredis实现排行榜的简单方法

    redis实现排行榜的简单方法

    这篇文章主要给大家介绍了关于redis实现排行榜的简单方法,文中通过示例代码介绍的非常详细,对大家学习或者使用redis具有一定的参考学习价值,需要的...

    布尔bl2942019-11-27
  • RedisRedis Sentinel实现高可用配置的详细步骤

    Redis Sentinel实现高可用配置的详细步骤

    这篇文章主要介绍了Redis Sentinel实现高可用配置的详细步骤,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧 ...

    阿平5152019-11-18
  • RedisRedis集群的搭建图文教程

    Redis集群的搭建图文教程

    下面小编就为大家分享一篇Redis集群的搭建图文教程,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧 ...

    cnblogs4312019-11-10