Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Event Types and Handling Process

Event Types

There will be eight types of events in the controller, which are defined as below:

Code Block
languagescala
object ControllerEventType extends Enumeration {
  type ControllerEventType = Value
  val AddPartition, TopicChange, DeleteTopic, BrokerChange, PreferredReplicaElection, PartitionReassigned,
      ReassignedPartitionIsrChange, ControlledShutdown = Value
}

KafkaControllerEvent

A generic controller event class will be defined:

Code Block
languagescala
abstract class KafkaControllerEvent(eventType: ControllerEventType) {
  // A set that tracks the responses from brokers
  val unAckedNode = new mutable.HashSet[Int]
  val eventDone = new CountDownLatch(1)
  
  def makeStatesChange(currentState: PartitionStateMachine): Map[Int, ClientRequest]

  def controllerRequestCallback(response: ClientResponse) {
    handleBrokerResponse(response)
    unAckedNode.remove(response.request().request().destination())
    if (unAckedNode.isEmpty)
      eventDone.countDown()
  }

  def handleBrokerResponse(response: ClientResponse)
}

AddPartitionEvent

Code Block
languagescala
class AddPartitionEvent extends KafkaControllerEvent(ControllerEventType.AddPartition) {
	override def makeStatesChange(): Map[Int, ClientRequest] = {
		// make state change and generate requests to each broker
	}
  	
	override def handleBrokerResponse(response: ClientResponse) {
		// If necessary, do something when response is received
  	}
}

TopicChangeEvent

Code Block
languagescala
class TopicChangeEvent extends KafkaControllerEvent(ControllerEventType.TopicChange) {
	override def makeStatesChange(): Map[Int, ClientRequest] = {
		// make state change and generate requests to each broker
	}
  	
	override def handleBrokerResponse(response: ClientResponse) {
		// If necessary, do something when response is received
  	}
}

DeleteTopicEvent

Code Block
languagescala
class DeleteTopicEvent extends KafkaControllerEvent(ControllerEventType.DeleteTopic) {
	override def makeStatesChange(): Map[Int, ClientRequest] = {
		// make state change and generate requests to each broker
	}
  	
	override def handleBrokerResponse(response: ClientResponse) {
		// If necessary, do something when response is received
  	}
}

BrokerChangeEvent

Code Block
languagescala
class BrokerChangeEvent extends KafkaControllerEvent(ControllerEventType.BrokerChange) {
	override def makeStatesChange(): Map[Int, ClientRequest] = {
		// make state change and generate requests to each broker
	}
  	
	override def handleBrokerResponse(response: ClientResponse) {
		// If necessary, do something when response is received
  	}
}

PreferredReplicaElectionEvent

Code Block
languagescala
class PreferredReplicaElectionEvent extends KafkaControllerEvent(ControllerEventType.PreferredReplicaElection) {
	override def makeStatesChange(): Map[Int, ClientRequest] = {
		// make state change and generate requests to each broker
	}
  	
	override def handleBrokerResponse(response: ClientResponse) {
		// If necessary, do something when response is received
  	}
}

PartitionReassignedEvent

Code Block
languagescala
class PartitionReassignedEvent extends KafkaControllerEvent(ControllerEventType.PartitionReassigned) {
	override def makeStatesChange(): Map[Int, ClientRequest] = {
		// make state change and generate requests to each broker
	}
  	
	override def handleBrokerResponse(response: ClientResponse) {
		// If necessary, do something when response is received
  	}
}

ReassignedPartitionIsrChangeEvent

Code Block
languagescala
class ReassignedPartitionIsrChangeEvent extends KafkaControllerEvent(ControllerEventType.ReassignedPartitionIsrChange) {
	override def makeStatesChange(): Map[Int, ClientRequest] = {
		// make state change and generate requests to each broker
	}
  	
	override def handleBrokerResponse(response: ClientResponse) {
		// If necessary, do something when response is received
  	}
}

ControlledShutdown

Code Block
languagescala
class ControlledShutdownEvent extends KafkaControllerEvent(ControllerEventType.ControlledShutdown) {
	override def makeStatesChange(): Map[Int, ClientRequest] = {
		// make state change and generate requests to each broker
	}
  	
	override def handleBrokerResponse(response: ClientResponse) {
		// If necessary, do something when response is received
  	}
}

Event Handling Process

The general event handling process would be something like this:

Code Block
languagescala
while(!shutdown){
	Event event = eventQueue.pollFirst()
	// Make state change
	event.eventType match {
		case EventType1 =>
			//Make state change and send requests
		case EventType2try {
		val brokerRequests = event.makeStateChange(partitionStateMachine)
   		brokerRequests.map { case (broker, request) =>
    		networkClient.send(request)
        	event.unAckedNode.add(broker)
    	
		while (!event.unAckedNode.isEmpty) {
			try {
				networkClient.poll(timeout)
			} catch {
				case KafkaApiException =>
					// Make state change and send requests
	}
	while (!allCallbacksReceivedFromReachableBrokers) {
Do something
				case Exception =>
					// Error handling
			}
			checkNodeLivenessAndIgnoreDeadNode()
		}
	} catch {
		case StateChangeException =>
			// Timedhandle waitillegal for event callbacks.state change
	}
}

 

Discussion Required

  1. As stated in KAFKA-2029, current state change propagation has an issue that in an unbalanced cluster. State change on a heavily loaded broker will be much slower than a lightly loaded broker. This is because of the following two reasons:
    • Controller traffic is not prioritized on broker side. So controller messages needs to wait until some clients requests are handled which takes much longer on a heavily loaded broker.
    • Heavily loaded broker needs to take state change for more partitions while same amount state changes are distributed among several brokers as followers.
    Batching state change into a single request and prioritize the controller traffic on broker will solve the problem. But I wonder is there any specific reason we did not batch the state change in current controller?
  2. Ideally, we should try to let the brokers in consistent state if possible. That indicates that we should put synchronization barriers between two events. Which means we do not execute the next event until:
    1. Callback has been fired on each broker, or
    2. A broker is down and we will just skip waiting for its callback and move on.
    Does it make sense to do so? Implementation wise, it means that if a broker is down, we will stop sending message to all brokers until we know it's down. (And this leads to question 3)
  3. We actually have two ways to determine if a broker is alive or not: from Zookeeper or from NetworkClient. Currently we take zookeeper as source of truth, but when we send messages, NetworkClient connectivity is the one actually matters. Because Zookeeper timeout could take a while, so what should we do if Zookeeper says a broker is alive but NetworkClient shows it is disconnected. Should we block the current event processing? If there is a long GC, is it possible that NetworkClient says broker is connected but Zookeeper says it's dead? Back to question 2, it is about when we consider "a broker is down".

...