服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Android - 使用Kotlin+RocketMQ实现延时消息的示例代码

使用Kotlin+RocketMQ实现延时消息的示例代码

2022-10-24 13:57Tony沈哲 Android

这篇文章主要介绍了使用Kotlin+RocketMQ实现延时消息的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

一. 延时消息

延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

使用延时消息的典型场景,例如:

  • 在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消。
  • 在电商系统中,用户七天内没有评价商品,则默认好评。

这些场景对应的解决方案,包括:

  • 轮询遍历数据库记录
  • JDK 的 DelayQueue
  • ScheduledExecutorService
  • 基于 Quartz 的定时任务
  • 基于 Redis 的 zset 实现延时队列。

除此之外,还可以使用消息队列来实现延时消息,例如 RocketMQ。

二. RocketMQ

RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ 是2012年阿里巴巴开源的第三代分布式消息中间件。

使用Kotlin+RocketMQ实现延时消息的示例代码

三. RocketMQ 实现延时消息

3.1 业务背景

我们的系统完成某项操作之后,会推送事件消息到业务方的接口。当我们调用业务方的通知接口返回值为成功时,表示本次推送消息成功;当返回值为失败时,则会多次推送消息,直到返回成功为止(保证至少成功一次)。
当我们推送失败后,虽然会进行多次推送消息,但并不是立即进行。会有一定的延迟,并按照一定的规则进行推送消息。
例如:1小时后尝试推送、3小时后尝试推送、1天后尝试推送、3天后尝试推送等等。因此,考虑使用延时消息实现该功能。

3.2 生产者(Producer)

生产者负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。

首先,定义一个支持延时发送的 AbstractProducer。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
abstract class AbstractProducer :ProducerBean() {
  var producerId: String? = null
  var topic: String? = null
  var tag: String?=null
  var timeoutMillis: Int? = null
  var delaySendTimeMills: Long? = null
 
  val log = LogFactory.getLog(this.javaClass)
 
  open fun sendMessage(messageBody: Any, tag: String) {
    val msgBody = JSON.toJSONString(messageBody)
    val message = Message(topic, tag, msgBody.toByteArray())
 
    if (delaySendTimeMills != null) {
      val startDeliverTime = System.currentTimeMillis() + delaySendTimeMills!!
      message.startDeliverTime = startDeliverTime
      log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}")
    }
    val logMessageId = buildLogMessageId(message)
    try {
      val sendResult = send(message)
      log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
    } catch (e: Exception) {
      log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
    }
 
  }
 
  fun buildLogMessageId(message: Message): String {
    return "topic: " + message.topic + "\n" +
        "producer: " + producerId + "\n" +
        "tag: " + message.tag + "\n" +
        "key: " + message.key + "\n"
  }
}

根据业务需要,增加一个支持重试机制的 Producer

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
@ConfigurationProperties("mqs.ons.producers.xxx-producer")
@Configuration
@Data
class CleanReportPushEventProducer :AbstractProducer() {
 
  lateinit var delaySecondList:List<Long>
 
  fun sendMessage(messageBody: CleanReportPushEventMessage){
    //重试超过次数之后不再发事件
    if (delaySecondList!=null) {
 
      if(messageBody.times>=delaySecondList.size){
        return
      }
      val msgBody = JSON.toJSONString(messageBody)
      val message = Message(topic, tag, msgBody.toByteArray())
      val delayTimeMills = delaySecondList[messageBody.times]*1000L
      message.startDeliverTime = System.currentTimeMillis() + delayTimeMills
      log.info( "messageBody: " + msgBody+ "startDeliverTime: "+message.startDeliverTime )
      val logMessageId = buildLogMessageId(message)
      try {
        val sendResult = send(message)
        log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
      } catch (e: Exception) {
        log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
      }
    }
  }
}

在 CleanReportPushEventProducer 中,超过了重试的次数就不会再发送消息了。

每一次延时消息的时间也会不同,因此需要根据重试的次数来获取这个delayTimeMills 。

通过 System.currentTimeMillis() + delayTimeMills 可以设置 message 的 startDeliverTime。然后调用 send(message) 即可发送延时消息。

我们使用商用版的 RocketMQ,因此支持精度为秒级别的延迟消息。在开源版本中,RocketMQ 只支持18个特定级别的延迟消息。:(

3.3 消费者(Consumer)

消费者负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。

定义 Push 类型的 AbstractConsumer:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Data
abstract class AbstractConsumer ():MessageListener{
 
  var consumerId: String? = null
 
  lateinit var subscribeOptions: List<SubscribeOptions>
 
  var threadNums: Int? = null
 
  val log = LogFactory.getLog(this.javaClass)
 
  override fun consume(message: Message, context: ConsumeContext): Action {
    val logMessageId = buildLogMessageId(message)
    val body = String(message.body)
    try {
      log.info(logMessageId + " body: " + body)
      val result = consumeInternal(message, context, JSON.parseObject(body, getMessageBodyType(message.tag)))
      log.info(logMessageId + " result: " + result.name)
      return result
    } catch (e: Exception) {
      if (message.reconsumeTimes >= 3) {
        log.error(logMessageId + " error: " + e.message, e)
      }
      return Action.ReconsumeLater
    }
 
  }
 
  abstract fun getMessageBodyType(tag: String): Type?
 
  abstract fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action
 
  protected fun buildLogMessageId(message: Message): String {
    return "topic: " + message.topic + "\n" +
        "consumer: " + consumerId + "\n" +
        "tag: " + message.tag + "\n" +
        "key: " + message.key + "\n" +
        "MsgId:" + message.msgID + "\n" +
        "BornTimestamp" + message.bornTimestamp + "\n" +
        "StartDeliverTime:" + message.startDeliverTime + "\n" +
        "ReconsumeTimes:" + message.reconsumeTimes + "\n"
  }
}

再定义具体的消费者,并且在消费失败之后能够再发送一次消息。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
@ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")
@Data
class CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() {
 
  val logger: Logger = LoggerFactory.getLogger(this.javaClass)
 
  override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action {
    if(obj is CleanReportPushEventMessage){
      //清除事件
      logger.info("consumer clean-report event report_id:${obj.id} ")
 
      //消费失败之后再发送一次消息
      if(!cleanReportService.sendCleanReportEvent(obj.id)){
        val times = obj.times+1
        eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))
      }
    }
    return Action.CommitMessage
  }
 
  override fun getMessageBodyType(tag: String): Type? {
    return CleanReportPushEventMessage::class.java
  }
}

其中,cleanReportService 的 sendCleanReportEvent() 会通过 http 的方式调用业务方提供的接口,进行事件消息的推送。如果推送失败了,则会进行下一次的推送。(这里使用了 eventProducer 的 sendMessage() 方法再次投递消息,是因为要根据调用的http接口返回的内容来判断消息是否发送成功。)

最后,定义 ConsumerFactory

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Component
class ConsumerFactory(val consumers: List<AbstractConsumer>,val aliyunOnsOptions: AliyunOnsOptions) {
 
  val logger: Logger = LoggerFactory.getLogger(this.javaClass)
 
 
  @PostConstruct
  fun start() {
    CompletableFuture.runAsync{
      consumers.stream().forEach {
        val properties = buildProperties(it.consumerId!!, it.threadNums)
        val consumer = ONSFactory.createConsumer(properties)
        if (it.subscribeOptions != null && !it.subscribeOptions!!.isEmpty()) {
          for (options in it.subscribeOptions!!) {
            consumer.subscribe(options.topic, options.tag, it)
          }
          consumer.start()
          val message = "\n".plus(
              it.subscribeOptions!!.stream().map{ a -> String.format("topic: %s, tag: %s has been started", a.topic, a.tag)}
                  .collect(Collectors.toList<Any>()))
          logger.info(String.format("consumer: %s\n", message))
        }
      }
    }
  }
 
  private fun buildProperties(consumerId: String,threadNums: Int?): Properties {
    val properties = Properties()
    properties.put(PropertyKeyConst.ConsumerId, consumerId)
    properties.put(PropertyKeyConst.AccessKey, aliyunOnsOptions.accessKey)
    properties.put(PropertyKeyConst.SecretKey, aliyunOnsOptions.secretKey)
    if (StringUtils.isNotEmpty(aliyunOnsOptions.onsAddr)) {
      properties.put(PropertyKeyConst.ONSAddr, aliyunOnsOptions.onsAddr)
    } else {
      // 测试环境接入RocketMQ
      properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunOnsOptions.nameServerAddress)
    }
    properties.put(PropertyKeyConst.ConsumeThreadNums, threadNums!!)
    return properties
  }
}

四. 总结

正如本文开头曾介绍过,可以使用多种方式来实现延时消息。然而,我们的系统本身就大量使用了 RocketMQ,借助成熟的 RocketMQ 实现延时消息不失为一种可靠而又方便的方式。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://juejin.im/post/5d1d50c9518825373225253e

延伸 · 阅读

精彩推荐
  • AndroidAndroid自定义ViewPager实例

    Android自定义ViewPager实例

    这篇文章主要介绍了Android自定义ViewPager的方法,结合完整实例形式分析了Android基于ViewGroup类自定义ViewPager的具体实现技巧,需要的朋友可以参考下...

    xurong6072021-06-03
  • AndroidAndroid PopupWindow使用实例

    Android PopupWindow使用实例

    这篇文章主要介绍了Android PopupWindow使用实例,本文直接给出代码实例,需要的朋友可以参考下...

    安卓开发网10482021-03-25
  • Androidandroid通过蓝牙接收文件打开时无法自动选择合适的应用程序

    android通过蓝牙接收文件打开时无法自动选择合适的应用程序

    android 通过蓝牙接收文件,从历史传输记录打开,无法自动选择合适的应用程序,比如video player打开.3gp、.mp4文件等等...

    Android开发网6582021-01-30
  • AndroidAndroid 使用Intent传递数据的实现思路与代码

    Android 使用Intent传递数据的实现思路与代码

    Intent是Android中一个非常重要的概念,跟这个词的本意(意图,目的)一样,这个类在Android中的作用就是要调用某个组建去做某一件事,接下来详细介绍,感...

    Android开发网7532020-12-31
  • AndroidAndroid实现带附件的邮件发送功能

    Android实现带附件的邮件发送功能

    这篇文章主要介绍了Android实现带附件的邮件发送功能的相关资料,android发送邮件有两种方式,本文重点介绍基于JMail实现邮件发送功能,感兴趣的小伙伴们...

    Android开发网10312021-05-10
  • AndroidAndroid轻松画出触摸轨迹

    Android轻松画出触摸轨迹

    这篇文章主要为大家详细介绍了Android轻松画出触摸轨迹的实现方法,为大家分享了一个触摸轨迹类,感兴趣的小伙伴们可以参考一下...

    antkingwei7272021-06-22
  • AndroidAndroid UI效果之绘图篇(四)

    Android UI效果之绘图篇(四)

    这篇文章主要介绍了Android UI效果之绘图篇,针对Android开发中的UI效果设计模块中Shader进行讲解,感兴趣的小伙伴们可以参考一下...

    _Hi_xiaoyu4542021-06-17
  • Android基于GridView和ActivityGroup实现的TAB分页(附源码)

    基于GridView和ActivityGroup实现的TAB分页(附源码)

    今天为大家介绍下使用GridView和ActivityGroup实现的分页,这里需要将Activity转换成Window,然后再换成成View添加到容器中,具体实现代码如下,感兴趣的朋友可...

    Android开发网11092021-01-25