标签:
KafkaControler(leader)通过在zk的不同目录建立各种listener来达到对topic的管理和维护,其在zk的目录结构和对应的listener如下:
/**
* This is the zookeeper listener that triggers all the state transitions for a replica
*/
class BrokerChangeListener() extends IZkChildListener with Logging {
this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
ControllerStats.leaderElectionTimer.time {
try {
val curBrokerIds = currentBrokerList.map(_.toInt).toSet
val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _))
//筛选出newBroker
val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
//筛选出deadBrokerIds
val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
.format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
//添加和newBroker的通信通道
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
//删除和newBroker的通信通道
deadBrokerIds.foreach(controlleContext.controllerChannelManager.removeBroker(_))
if(newBrokerIds.size > 0)
//尝试将该broker上的replica切换为online状态,并且恢复删除topic的流程
controller.onBrokerStartup(newBrokerIds.toSeq)
if(deadBrokerIds.size > 0)
//尝试将该broker上的replica切换为offline状态,并且标记该replica删除失败
controller.onBrokerFailure(deadBrokerIds.toSeq)
} catch {
case e: Throwable => error("Error while handling broker changes", e)
}
}
}
}
}
}class TopicChangeListener extends IZkChildListener with Logging {
this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
try {
val currentChildren = {
import JavaConversions._
debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
(children: Buffer[String]).toSet
}
//筛选出newTopics
val newTopics = currentChildren -- controllerContext.allTopics
//筛选出deletedTopics
val deletedTopics = controllerContext.allTopics -- currentChildren
controllerContext.allTopics = currentChildren
//获取topic的assignment分配情况
val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
//剔除deletedTopics的replicaassignment
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
//增加newTopics的replicaassignment
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
deletedTopics, addedPartitionReplicaAssignment))
if(newTopics.size > 0)//创建topic
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
} catch {
case e: Throwable => error("Error while handling new topic", e )
}
}
}
}class DeleteTopicsListener() extends IZkChildListener with Logging {
this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
val zkClient = controllerContext.zkClient
/**
* Invoked when a topic is being deleted
* @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
inLock(controllerContext.controllerLock) {
var topicsToBeDeleted = {
import JavaConversions._
(children: Buffer[String]).toSet
}
debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
//过滤出不存在的topic
val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
if(nonExistentTopics.size > 0) {
warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
nonExistentTopics.foreach(topic => ZkUtils.deletePathRecursive(zkClient, ZkUtils.getDeleteTopicPath(topic)))
}
//剔除不存在的topic
topicsToBeDeleted --= nonExistentTopics
if(topicsToBeDeleted.size > 0) {
info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
// mark topic ineligible for deletion if other state changes are in progress
topicsToBeDeleted.foreach { topic =>
val preferredReplicaElectionInProgress =
controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
//如果topic的partition的replica正在重分配或者重新选举的话,则标识该topic不能被删除
if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
}
//把topic交由deleteTopicManager处理
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
}
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging {
this.logIdent = "[PreferredReplicaElectionListener on " + controller.config.brokerId + "]: "
val zkClient = controller.controllerContext.zkClient
val controllerContext = controller.controllerContext
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
.format(dataPath, data.toString))
inLock(controllerContext.controllerLock) {
val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
if(controllerContext.partitionsUndergoingPreferredReplicaElection.size > 0)
info("These partitions are already undergoing preferred replica election: %s"
.format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
//剔除正在PreferredReplicaElection的topic的partition
val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
//筛选出topic处于删除状态的topic and partition
val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
if(partitionsForTopicsToBeDeleted.size > 0) {
error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
.format(partitionsForTopicsToBeDeleted))
}
//剩余的topic and partition才是真正需要PreferredReplicaElection
controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
}
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
val zkClient = controller.controllerContext.zkClient
val controllerContext = controller.controllerContext
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
.format(dataPath, data))
val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
//剔除正在重分配的partition
val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
}
//剔除partition的topic处于删除状态的
partitionsToBeReassigned.foreach { partitionToBeReassigned =>
inLock(controllerContext.controllerLock) {
if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
.format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
} else {//开始进行真正的partition的reassigned动作
val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
}
}
}
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}Partition的reassign比较复杂,因此详细叙述下,继续往下看:
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
val newReplicas = reassignedPartitionContext.newReplicas
val topic = topicAndPartition.topic
val partition = topicAndPartition.partition
//过滤出有效的topic and partition的replicas
val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
try {
val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
assignedReplicasOpt match {
case Some(assignedReplicas) =>
if(assignedReplicas == newReplicas) {//和之前的对比,如果一致,则不需要reassign
throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
} else {
if(aliveNewReplicas == newReplicas) {// 如果reassign的replicas全部是在线状态的话,则执行reassign动作
info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
//针对该partition的isr路径注册watch,检测它的变化,注意该listener为ReassignedPartitionsIsrChangeListener
watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
//标记该topic and partition处于reassigned状态
controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
//标记topic为非法,防止中途被删除
deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
//真正地执行reassigned动作
onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
} else {//有一些reassign的replica是离线状态,因此reassign失败
// some replica in RAR is not alive. Fail partition reassignment
throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
" %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
"Failing partition reassignment")
}
}
//找不到该topic and partition
case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
.format(topicAndPartition))
}
} catch {//只要发生异常,则从reassignedpartitions中删除
case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
// remove the partition from the admin path to unblock the admin client
removePartitionFromReassignedPartitions(topicAndPartition)
}
}这其中最主要的流程是onPartitionReassignment内部的逻辑,如下:
/*
*1.首先解释下名词:
* RAR = Reassigned replicas(replicas的重分配情况)
* OAR = Original list of replicas for partition(replicas的初始状态)
* AR = current assigned replicas
*/
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match {
case false =>//发现new replicas不在之前该partition的isr中,表明没有同步上最新数据,则首先应该让这些new replicas同步上该partition的数据
info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
"reassigned not yet caught up with the leader")
val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
//因此先把该partition的replicas置为 newAndOldReplicas
updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
//向这些replicas所在的broker发送 LeaderAndIsrRequest请求
updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
newAndOldReplicas.toSeq)
//置newReplicasNotInOldReplicaList的状态为NewReplica,那么程序在这里之后是如何运行的呢?
//注意在这之前,KafkaControler在/brokers/topics/[topic]/partitions/[partitionId]/state注册了ReassignedPartitionsIsrChangeListener
//函数,当新增的replicas同步上这个partition的leader之后,KafkaController更新对应的isr时会进一步触发//ReassignedPartitionsIsrChangeListener,且看ReassignedPartitionsIsrChangeListener的实现
startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
"reassigned to catch up with the leader")
case true =>//此时new replicas已经全部同步上了
//过滤出旧的replicas
val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
//将resignedReplicas的状态置为OnlineReplica
reassignedReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
replica)), OnlineReplica)
}
//按需确定新的leader,如果leader在newreplicas中,则保持不变,如果不在,则重新选举
moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
//删除旧的replicas
stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
//在kafkaController cache和zk上更新topicAndPartition的replicas
updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
//更新zk上的/admin/reassign_partitions内容,删除该topicAndPartition
removePartitionFromReassignedPartitions(topicAndPartition)
info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
//发送UpdateMetadataRequest给broker
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
//恢复删除topic的流程,可能该topic的partition在重分配之后需要被删除
deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
}
}当新的replics同步上对应partition的leader之后,会在/brokers/topics/[topic]/partitions/[partitionId]/state路径更新对应partition的状态,此时触发ReassignedPartitionsIsrChangeListener的回调函数
class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int,
reassignedReplicas: Set[Int])
extends IZkDataListener with Logging {
this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: "
val zkClient = controller.controllerContext.zkClient
val controllerContext = controller.controllerContext
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
inLock(controllerContext.controllerLock) {
debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
val topicAndPartition = TopicAndPartition(topic, partition)
try {
controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
case Some(reassignedPartitionContext) =>
val newLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
newLeaderAndIsrOpt match {
case Some(leaderAndIsr) =>
val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
if(caughtUpReplicas == reassignedReplicas) {//如果reassigned的replicas全部处于isr之中的话,说明新增的replicas已经追上了其partition的leader
info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
.format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
"Resuming partition reassignment")
//则再一次进入onPartitionReassignment处理流程,
//此时areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)为true
controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
}
else {
info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
.format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
"Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
}
case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
.format(topicAndPartition, reassignedReplicas.mkString(",")))
}
case None =>
}
} catch {
case e: Throwable => error("Error while handling partition reassignment", e)
}
}
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}class AddPartitionsListener(topic: String) extends IZkDataListener with Logging {
this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: "
@throws(classOf[Exception])
def handleDataChange(dataPath : String, data: Object) {
inLock(controllerContext.controllerLock) {
try {
info("Add Partition triggered " + data.toString + " for path " + dataPath)
val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
//过滤出新增的partition
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
//如果新增的partition的topic正在删除中的话,则忽略,否则开始创建新的partition
if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
error("Skipping adding partitions %s for topic %s since it is currently being deleted"
.format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
else {
if (partitionsToBeAdded.size > 0) {
info("New partitions to be added %s".format(partitionsToBeAdded))
controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
}
}
} catch {
case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
}
}
}
@throws(classOf[Exception])
def handleDataDeleted(parentPath : String) {
// this is not implemented for partition change
}
}那什么是rebalance呢?rebalance就是当topic and partition的leader发生变化时,造成在集群内部分布不均,需要重新调整topic and partition的leader为原始状态,使负载均衡,即如下的过程:
|
Topic And Partition |
Leader |
ISR |
|
[topic] partition 0 |
1 |
1,2, |
|
[topic] partition 1 |
2 |
2,3 |
|
[topic] partition 2 |
3 |
3,4 |
|
[topic] partition 3 |
4 |
4,1 |
每个Broker都存在一个leader,则当broker 4离线了一段时间后再上线时,其topic and partition的变化如下:
|
Topic And Partition |
Leader |
ISR |
|
[topic] partition 0 |
1 |
1,2, |
|
[topic] partition 1 |
2 |
2,3 |
|
[topic] partition 2 |
3 |
3,4 |
|
[topic] partition 3 |
1 |
4,1 |
在Broker 1上出现了2个leader,即partition 0和partition 3的leader位于broker 1了。则接着broker 2离线了一段时间后再上线时,其topic and partition的变化如下:
|
Topic And Partition |
Leader |
ISR |
|
[topic] partition 0 |
1 |
1,2, |
|
[topic] partition 1 |
3 |
2,3 |
|
[topic] partition 2 |
3 |
3,4 |
|
[topic] partition 3 |
1 |
4,1 |
此时leader都集中在了broker 1和broker 3上,其它节点没有leader了,那么这个时候生成者都会把数据发生给broker 1和broker 3,造成该两个节点负载比较大,如果此时配置了auto.leader.rebalance.enable=true的话,即开了负载均衡的功能的话,topic and partition的leader会发生迁移,会尽量恢复成系统初始的状态,即如下:
|
Topic And Partition |
Leader |
ISR |
|
[topic] partition 0 |
1 |
1,2, |
|
[topic] partition 1 |
2 |
2,3 |
|
[topic] partition 2 |
3 |
3,4 |
|
[topic] partition 3 |
4 |
4,1 |
即定时任务checkAndTriggerPartitionRebalance
private def checkAndTriggerPartitionRebalance(): Unit = {
if (isActive()) {
trace("checking need to trigger partition rebalance")
// 获取所有在线的broker的replicas
var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
inLock(controllerContext.controllerLock) {
preferredReplicasForTopicsByBrokers =
controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
case(topicAndPartition, assignedReplicas) => assignedReplicas.head
}
}
debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
// for each broker, check if a preferred replica election needs to be triggered
preferredReplicasForTopicsByBrokers.foreach {
case(leaderBroker, topicAndPartitionsForBroker) => {
var imbalanceRatio: Double = 0
var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
inLock(controllerContext.controllerLock) {
//过滤出leader不在PreferredReplica的head的topics
topicsNotInPreferredReplica =
topicAndPartitionsForBroker.filter {
case(topicPartition, replicas) => {
controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
//leaderAndIsr.leader != leaderBroker(目前的leader和原本的assignedReplicas的第一个broker不一样)
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
}
}
debug("topics not in preferred replica " + topicsNotInPreferredReplica)
val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
//计算不平衡度
imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
}
//如果不平衡读到达某个程度,则触发均衡
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
topicsNotInPreferredReplica.foreach {
case(topicPartition, replicas) => {
inLock(controllerContext.controllerLock) {
if (controllerContext.liveBrokerIds.contains(leaderBroker) &&// leaderBroker必须是在线状态
controllerContext.partitionsBeingReassigned.size == 0 &&//没有partition在进行重分配,避免加重系统负担
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&//没有partition在被重新选举leader
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&//该topic不需要删除
controllerContext.allTopics.contains(topicPartition.topic)) {//该topic有效
onPreferredReplicaElection(Set(topicPartition), true)//则触发对这个topic and partition的PreferredReplicaElection过程
}
}
}
}
}
}
}
}
}本质是就是开启DeleteTopicsThread线程,然后等待KafakController触发删除
class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {
val zkClient = controllerContext.zkClient
override def doWork() {
awaitTopicDeletionNotification()//等待KafakController触发删除
if (!isRunning.get)
return
inLock(controllerContext.controllerLock) {
val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
if(!topicsQueuedForDeletion.isEmpty)
info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
topicsQueuedForDeletion.foreach { topic =>
//由于是异步流程,则当topic的每个partition的replicas成功删除之后
if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
//此时清除KafakController内部关于该topic的任何信息
completeDeleteTopic(topic)
info("Deletion of topic %s successfully completed".format(topic))
} else {//忽略topic正在删除的状态
if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
// ignore since topic deletion is in progress
val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
val replicaIds = replicasInDeletionStartedState.map(_.replica)
val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))
info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
partitions.mkString(","), topic))
} else {
//删除replica出现意外,应该重试
if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
markTopicForDeletionRetry(topic)
}
}
}
//如果topic可以删除的话,则开始删除该topic,最重要的动作就是向该topic所在的所有broker发送StopReplicaRequest,
//通知各个broker停止同步并且删除对应的replica
if(isTopicEligibleForDeletion(topic)) {
info("Deletion of topic %s (re)started".format(topic))
// topic deletion will be kicked off
onTopicDeletion(Set(topic))
} else if(isTopicIneligibleForDeletion(topic)) {
info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
}
}
}
}
}ControllerChannelManager保存了和各个broker通信的通道:
class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging { private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] }且看ControllerBrokerStateInfo类:case class ControllerBrokerStateInfo(channel: BlockingChannel, broker: Broker, messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], requestSendThread: RequestSendThread)其messageQueue存放了发往特定broker的消息,其每个消息对应一个cb回调函数,channel为和broker通信的链路,RequestSendThread为其发送线程,查看requestSendThread发送线程:class RequestSendThread(val controllerId: Int, val controllerContext: ControllerContext, val toBroker: Broker, val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], val channel: BlockingChannel) extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) { private val lock = new Object() private val stateChangeLogger = KafkaController.stateChangeLogger connectToBroker(toBroker, channel) override def doWork(): Unit = { val queueItem = queue.take()//获取请求 val request = queueItem._1 val callback = queueItem._2 var receive: Receive = null try { lock synchronized { var isSendSuccessful = false while(isRunning.get() && !isSendSuccessful) { // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying. try { channel.send(request)//发送请求 receive = channel.receive()//获取响应 isSendSuccessful = true } catch { case e: Throwable => // if the send was not successful, reconnect to broker and resend the message warn(("Controller %d epoch %d fails to send request %s to broker %s. " + "Reconnecting to broker.").format(controllerId, controllerContext.epoch, request.toString, toBroker.toString()), e) channel.disconnect() connectToBroker(toBroker, channel) isSendSuccessful = false // backoff before retrying the connection and send Utils.swallow(Thread.sleep(300)) } } var response: RequestOrResponse = null request.requestId.get match {// 转化不同的response case RequestKeys.LeaderAndIsrKey => response = LeaderAndIsrResponse.readFrom(receive.buffer) case RequestKeys.StopReplicaKey => response = StopReplicaResponse.readFrom(receive.buffer) case RequestKeys.UpdateMetadataKey => response = UpdateMetadataResponse.readFrom(receive.buffer) } stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString)) //如果设置了回调函数,则触发回调 if(callback != null) { callback(response) } } } catch { case e: Throwable => error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e) // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. channel.disconnect() } }
kafka源码解析之十二KafkaController(下篇)
标签:
原文地址:http://blog.csdn.net/wl044090432/article/details/51119264