Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

Vote thread: here

JIRAKAFKA-7235

Pull RequestPR-5821

Relates to:

...

  1. Broker should always process the control requests sent by the active controller.
  2. Controller should always process the ControlledShutdownRequest sent by the broker.
  3. The first LeaderAndIsrRequest received by a broker after it starts up always contains all partitions hosted by the broker.
  4. By the time a broker finishes bouncing, controller should have processed the BrokerChange event.

...

  • If a broker bounces during controlled shutdown, the bounced broker may accidentally process its earlier generation’s StopReplicaRequest sent from the active controller for one of its follower replicas, leaving the replica offline while its remaining replicas may stay online
  • If controller receives old ControlledShutdownRequest (due to retries) after the broker has been bounced, controller will proactively send out control requests to the broker, which may leave the broker in a bad state.
  • If the first LeaderAndIsrRequest that a broker processes is sent by the active controller before its startup, the broker will overwrite the high watermark checkpoint file and may cause incorrect truncation (KAFKA-7235)
  • If a broker bounces very quickly, the controller may start processing the BrokerChange event after the broker already re-registers itself in zk. In this case, controller will miss the broker restart and will not send any requests to the broker for initialization. The broker will not be able to accept traffics.

...

This KIP will include the broker generation (broker_epoch) in LeaderAndIsrRequest, UpdateMetadataRequest and , StopReplicaRequest, ControlledShutdownRequest and bump up their protocol versions. Since we will evolve the schema of these control requests in this KIP, I would also normalize the schema of these requests by avoiding data redundancy for the topic strings to reduce the memory footprint in the controller side and reduce the amount of data we send across the network. Here are the new version of these requests

Changes in Control Requests:

Code Block
titleLeaderAndIsrRequest V2
LeaderAndIsr Request => controller_id controller_epoch broker_epoch [topic_states] [live_leaders] 
  controller_id => INT32
  controller_epoch => INT32
  broker_epoch => INT64  						<-- NEW
  topic_states => topic [partition_states]  	<-- NEW
    topic => STRING
    partitions_states => partition controller_epoch leader leader_epoch [isr] zk_version [replicas] is_new
    	partition => INT32
    	controller_epoch => INT32
    	leader => INT32
    	leader_epoch => INT32
    	isr => INT32
    	zk_version => INT32
    	replicas => INT32
    	is_new => BOOLEAN
  live_leaders => id host port 
    id => INT32
    host => STRING
    port => INT32

...

Code Block
title StopReplicaRequest V1
StopReplica Request => controller_id controller_epoch broker_epoch delete_partitions [topic_partitions] 
  controller_id => INT32
  controller_epoch => INT32
  broker_epoch => INT64  					<-- NEW
  delete_partitions => BOOLEAN
  topic_partitions => topic [partition]  	<-- NEW
    partition => INT32


Code Block
title ControlledShutdownRequest V2
ControlledShutdown Request => broker_id broker_epoch
  broker_id => INT32
  broker_epoch => INT64                    <-- NEW


Note: Normalizing the schema is a good-to-have optimization because the memory footprint for the control requests hinders the controller from scaling up if we have many topics with large partition counts. We already did the same thing in other types of request (e.g. Produce, Fetch, ...).

New Error Code:

New error code STALE_BROKER_EPOCH (77) and a new type of exception StaleBrokerEpochException will be added. This error is used in the following scenario:

  • When a broker sees a LeaderAndIsrRequest/UpdateMetadataRequest/StopReplicaRequest with outdated broker epoch, it will respond back with STALE_BROKER_EPOCH error. The controller will not resend the request. 
  • When the controller sees a ControlledShutdownRequest with outdated broker epoch, it will respond back with STALE_BROKER_EPOCH error. If the broker gets quickly restarted, it will no see the error response since the channel has already been closed during broker shutdown. If the broker just gets disconnected from zookeeper and re-connect during controlled shutdown, it will retry to send the ControlledShutdownRequest with newer broker epoch to controller.

Proposed Changes

Broker Registration in Zookeeper

...

A broker will extract the broker generation (czxid) in LeaderAndIsrRequest/UpdateMetadataRequest/StopReplicaRequest and will reject the requests with smaller broker generation than its current generation.

Controller Rejects ControlledShutdownRequest Sent from Former Generations

During controlled shutdown, the broker will include its current broker generation (czxid) in the ControlledShutdownRequest. Upon receiving ControlledShutdownRequest, controller will check the broker generation (czxid) in ControlledShutdownRequest and will reject the request if its broker generation is smaller broker generation than the broker generation cached in the controller side. This guarantees controller will not attempt to send out control requests to move partitions and stop replicas in reaction to a broker cleaned shutdown if the broker has already been restarted. 

Controller Detects Bounced Broker

In order to avoid missing broker state change when fast broker bounce happens, the logic in controller processing BrokerChange event should be:

  1. Reads all child nodes in /brokers/ids/ to get current brokers with broker generation
  2. Detect new brokers, dead brokers and bounced brokers:
    • new brokers:  brokers exist in current brokers list and do not exist in controller context
    • dead brokers: brokers exist in controller context but do not exist in current brokers list
    • bounce brokers: brokers exist in both current brokers list and controller context but have higher generation than the cached value in controller context
  3. Update the broker info (broker id + broker generation) live broker ids in controller context
  4. Handle broker state change:
    • new brokers: update broker generation in controller context for new brokers, then invoke onBrokerStartUp(...new brokers)
    • dead brokers: invoke onBrokerFailure(...dead brokers)
    • bounced brokers: invoke onBrokerFailure(...bounced brokers) first, then update broker generation in controller context for bounced brokers, finally invoke onBrokerStartUp(...bounced brokers)


Compatibility, Deprecation, and Migration Plan

...