服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Kafka源码系列教程之删除topic

Kafka源码系列教程之删除topic

2021-05-26 13:32浪尖 Java教程

这篇文章主要给大家介绍了关于Kafka源码系列教程之删除topic的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

前言

apache kafka发源于linkedin,于2011年成为apache的孵化项目,随后于2012年成为apache的主要项目之一。kafka使用scala和java进行编写。apache kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

本文依然是以kafka0.8.2.2为例讲解

一,如何删除一个topic

删除一个topic有两个关键点:

1,配置删除参数

delete.topic.enable这个broker参数配置为true。

2,执行

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

假如不配置删除参数为true的话,topic其实并没有被清除,只是被标记为删除。此时,估计一般人的做法是删除topic在zookeeper的信息和日志,其实这个操作并不会清除kafkabroker内存的topic数据。所以,此时最佳的策略是配置删除参数为true然后,重启kafka。

二,重要的类介绍

1,partitionstatemachine

该类代表分区的状态机。决定者分区的当前状态,和状态转移。四种状态

  • nonexistentpartition
  • newpartition
  • onlinepartition
  • offlinepartition

2,replicamanager

负责管理当前机器的所有副本,处理读写、删除等具体动作。

读写:写获取partition对象,再获取replica对象,再获取log对象,采用其管理的segment对象将数据写入、读出。

3,replicastatemachine

副本的状态机。决定者副本的当前状态和状态之间的转移。一个副本总共可以处于一下几种状态的一种
newreplica:crontroller在分区重分配的时候可以创建一个新的副本。只能接受变为follower的请求。前状态可以是nonexistentreplica

onlinereplica:新启动的分区,能接受变为leader或者follower请求。前状态可以是newreplica, onlinereplica or offlinereplica

offlinereplica:死亡的副本处于这种状态。前状态可以是newreplica, onlinereplica

replicadeletionstarted:分本删除开始的时候处于这种状态,前状态是offlinereplica

replicadeletionsuccessful:副本删除成功。前状态是replicadeletionstarted

replicadeletionineligible:删除失败的时候处于这种状态。前状态是replicadeletionstarted

nonexistentreplica:副本成功删除之后处于这种状态,前状态是replicadeletionsuccessful

4,topicdeletionmanager

该类管理着topic删除的状态机

1),topiccommand通过创建/admin/delete_topics/<topic>,来发布topic删除命令。

2),controller监听/admin/delete_topic子节点变动,开始分别删除topic

3),controller有个后台线程负责删除topic

三,源码彻底解析topic的删除过程

此处会分四个部分:

a),客户端执行删除命令作用

b),不配置delete.topic.enable整个流水的源码

c),配置了delete.topic.enable整个流水的源码

d),手动删除zk上topic信息和磁盘数据

1,客户端执行删除命令

?
1
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

进入kafka-topics.sh我们会看到

?
1
exec $(dirname $0)/kafka-run-class.sh kafka.admin.topiccommand $@

进入topiccommand里面,main方法里面

?
1
2
else if(opts.options.has(opts.deleteopt))
 deletetopic(zkclient, opts)

实际内容是

?
1
2
3
4
5
6
7
val topics = gettopics(zkclient, opts)
if (topics.length == 0) {
 println("topic %s does not exist".format(opts.options.valueof(opts.topicopt)))
}
topics.foreach { topic =>
 try {
 zkutils.createpersistentpath(zkclient, zkutils.getdeletetopicpath(topic))

在"/admin/delete_topics"目录下创建了一个topicname的节点。

2,假如不配置delete.topic.enable整个流水是

总共有两处listener会响应:

a),topicchangelistener

b),deletetopicslistener

使用topic的删除命令删除一个topic的话,指挥触发deletetopiclistener。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var topicstobedeleted = {
 import javaconversions._
 (children: buffer[string]).toset
}
val nonexistenttopics = topicstobedeleted.filter(t => !controllercontext.alltopics.contains(t))
topicstobedeleted --= nonexistenttopics
if(topicstobedeleted.size > 0) {
 info("starting topic deletion for topics " + topicstobedeleted.mkstring(","))
 // mark topic ineligible for deletion if other state changes are in progress
 topicstobedeleted.foreach { topic =>
 val preferredreplicaelectioninprogress =
  controllercontext.partitionsundergoingpreferredreplicaelection.map(_.topic).contains(topic)
 val partitionreassignmentinprogress =
  controllercontext.partitionsbeingreassigned.keyset.map(_.topic).contains(topic)
 if(preferredreplicaelectioninprogress || partitionreassignmentinprogress)
  controller.deletetopicmanager.marktopicineligiblefordeletion(set(topic))
 }
 // add topic to deletion list
 controller.deletetopicmanager.enqueuetopicsfordeletion(topicstobedeleted)
}

由于都会判断delete.topic.enable是否为true,假如不为true就不会执行,为true就进入执行

?
1
2
controller.deletetopicmanager.marktopicineligiblefordeletion(set(topic))
controller.deletetopicmanager.enqueuetopicsfordeletion(topicstobedeleted)

3,delete.topic.enable配置为true

此处与步骤2的区别,就是那两个处理函数。

?
1
2
controller.deletetopicmanager.marktopicineligiblefordeletion(set(topic))
controller.deletetopicmanager.enqueuetopicsfordeletion(topicstobedeleted)

marktopicineligiblefordeletion函数的处理为

?
1
2
3
4
5
6
if(isdeletetopicenabled) {
 val newtopicstohaltdeletion = topicstobedeleted & topics
 topicsineligiblefordeletion ++= newtopicstohaltdeletion
 if(newtopicstohaltdeletion.size > 0)
 info("halted deletion of topics %s".format(newtopicstohaltdeletion.mkstring(",")))
}

主要是停止删除topic,假如存储以下三种情况

* halt delete topic if -
* 1. replicas being down
* 2. partition reassignment in progress for some partitions of the topic
* 3. preferred replica election in progress for some partitions of the topic

enqueuetopicsfordeletion主要作用是更新删除topic的集合,并激活topicdeletethread

?
1
2
3
4
5
6
7
def enqueuetopicsfordeletion(topics: set[string]) {
 if(isdeletetopicenabled) {
 topicstobedeleted ++= topics
 partitionstobedeleted ++= topics.flatmap(controllercontext.partitionsfortopic)
 resumetopicdeletionthread()
 }
}

在删除线程deletetopicsthread的dowork方法中

?
1
2
3
4
5
6
7
topicsqueuedfordeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
 if(controller.replicastatemachine.areallreplicasfortopicdeleted(topic)) {
 // clear up all state for this topic from controller cache and zookeeper
 completedeletetopic(topic)
 info("deletion of topic %s successfully completed".format(topic))
 }

进入completedeletetopic方法中

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// deregister partition change listener on the deleted topic. this is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
partitionstatemachine.deregisterpartitionchangelistener(topic)
val replicasfordeletedtopic = controller.replicastatemachine.replicasinstate(topic, replicadeletionsuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicastatemachine.handlestatechanges(replicasfordeletedtopic, nonexistentreplica)
val partitionsfordeletedtopic = controllercontext.partitionsfortopic(topic)
// move respective partition to offlinepartition and nonexistentpartition state
partitionstatemachine.handlestatechanges(partitionsfordeletedtopic, offlinepartition)
partitionstatemachine.handlestatechanges(partitionsfordeletedtopic, nonexistentpartition)
topicstobedeleted -= topic
partitionstobedeleted.retain(_.topic != topic)
controllercontext.zkclient.deleterecursive(zkutils.gettopicpath(topic))
controllercontext.zkclient.deleterecursive(zkutils.gettopicconfigpath(topic))
controllercontext.zkclient.delete(zkutils.getdeletetopicpath(topic))
controllercontext.removetopic(topic)

主要作用是解除掉监控分区变动的listener,删除zookeeper具体节点信息,删除磁盘数据,更新内存数据结构,比如从副本状态机里面移除分区的具体信息。

其实,最终要的是我们的副本磁盘数据是如何删除的。我们重点介绍这个部分。

首次清除的话,在删除线程deletetopicsthread的dowork方法中

?
1
2
3
4
5
6
7
8
{
 // if you come here, then no replica is in topicdeletionstarted and all replicas are not in
 // topicdeletionsuccessful. that means, that either given topic haven't initiated deletion
 // or there is at least one failed replica (which means topic deletion should be retried).
 if(controller.replicastatemachine.isanyreplicainstate(topic, replicadeletionineligible)) {
 // mark topic for deletion retry
 marktopicfordeletionretry(topic)
 }

进入marktopicfordeletionretry

?
1
2
3
4
val failedreplicas = controller.replicastatemachine.replicasinstate(topic, replicadeletionineligible)
info("retrying delete topic for topic %s since replicas %s were not successfully deleted"
 .format(topic, failedreplicas.mkstring(",")))
controller.replicastatemachine.handlestatechanges(failedreplicas, offlinereplica)

在replicastatemachine的handlestatechanges方法中,调用了handlestatechange,处理offlinereplica

?
1
2
// send stop replica command to the replica so that it stops fetching from the leader
brokerrequestbatch.addstopreplicarequestforbrokers(list(replicaid), topic, partition, deletepartition = false)

接着在handlestatechanges中

?
1
brokerrequestbatch.sendrequeststobrokers(controller.epoch, controllercontext.correlationid.getandincrement)

给副本数据存储节点发送stopreplicakey副本指令,并开始删除数据

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
stopreplicarequestmap foreach { case(broker, replicainfolist) =>
 val stopreplicawithdelete = replicainfolist.filter(p => p.deletepartition == true).map(i => i.replica).toset
 val stopreplicawithoutdelete = replicainfolist.filter(p => p.deletepartition == false).map(i => i.replica).toset
 debug("the stop replica request (delete = true) sent to broker %d is %s"
 .format(broker, stopreplicawithdelete.mkstring(",")))
 debug("the stop replica request (delete = false) sent to broker %d is %s"
 .format(broker, stopreplicawithoutdelete.mkstring(",")))
 replicainfolist.foreach { r =>
 val stopreplicarequest = new stopreplicarequest(r.deletepartition,
  set(topicandpartition(r.replica.topic, r.replica.partition)), controllerid, controllerepoch, correlationid)
 controller.sendrequest(broker, stopreplicarequest, r.callback)
 }
}
stopreplicarequestmap.clear()

broker的kafkaapis的handle方法在接受到指令后

?
1
case requestkeys.stopreplicakey => handlestopreplicarequest(request)
?
1
val (response, error) = replicamanager.stopreplicas(stopreplicarequest)

接着是在stopreplicas方法中

?
1
2
3
4
5
6
7
8
9
10
{
 controllerepoch = stopreplicarequest.controllerepoch
 // first stop fetchers for all partitions, then stop the corresponding replicas
 replicafetchermanager.removefetcherforpartitions(stopreplicarequest.partitions.map(r => topicandpartition(r.topic, r.partition)))
 for(topicandpartition <- stopreplicarequest.partitions){
 val errorcode = stopreplica(topicandpartition.topic, topicandpartition.partition, stopreplicarequest.deletepartitions)
 responsemap.put(topicandpartition, errorcode)
 }
 (responsemap, errormapping.noerror)
}

进一步进入stopreplica方法,正式进入日志删除

?
1
2
3
4
5
6
7
getpartition(topic, partitionid) match {
 case some(partition) =>
 if(deletepartition) {
  val removedpartition = allpartitions.remove((topic, partitionid))
  if (removedpartition != null)
  removedpartition.delete() // this will delete the local log
 }

以上就是kafka的整个日志删除流水。

4,手动删除zk上topic信息和磁盘数据

topicchangelistener会监听处理,但是处理很简单,只是更新了

?
1
2
3
4
5
val deletedtopics = controllercontext.alltopics -- currentchildren
controllercontext.alltopics = currentchildren
 
val addedpartitionreplicaassignment = zkutils.getreplicaassignmentfortopics(zkclient, newtopics.toseq)
controllercontext.partitionreplicaassignment = controllercontext.partitionreplicaassignment.filter(p =>

四,总结

kafka的topic的删除过程,实际上就是基于zookeeper做了一个订阅发布系统。zookeeper的客户端创建一个节点/admin/delete_topics/<topic>,由kafka controller监听到事件之后正式触发topic的删除:解除partition变更监听的listener,清除内存数据结构,删除副本数据,删除topic的相关zookeeper节点。

delete.topic.enable配置该参数为false的情况下执行了topic的删除命令,实际上未做任何动作。我们此时要彻底删除topic建议修改该参数为true,重启kafka,这样topic信息会被彻底删除,已经测试。

一般流行的做法是手动删除zookeeper的topic相关信息及磁盘数据但是这样的话会造成部分内存数据未清除。至于是否会有隐患,未测试。

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对服务器之家的支持。

原文链接:https://mp.weixin.qq.com/s/69NVkikYNdoyLyz12Z59_g

延伸 · 阅读

精彩推荐