...
The AdminClient will gain a new method:
electPreferredReplicaLeaderelectPreferredLeaders(Collection<TopicPartition> partitions)
A new network protocol will be added:
ElectPreferredReplicaLeaderRequest ElectPreferredLeadersRequest and ElectPreferredReplicaLeaderResponseElectPreferredLeadersResponse
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
...
- Perform the election by calling AdminClient.electPreferredReplicaLeaderelectPreferredLeaders() on an AdminClient instance bootstrapped from the via the given
--bootstrap-server
.
...
The --help
output for the tool will be updated to note that the command is not necessary if the broker is configured with auto.leader.rebalance.enable=true
.
AdminClient:
...
electPreferredLeaders()
The following methods will be added to AdminClient:
Code Block |
---|
/** * Elect the preferred replica of the given {@code partitions} as leader, or * elect the preferred replica for all partitions as leader if the argument to {@code partitions} is null. * * This operation is supported by brokers with version 1.0 or higher. */ ElectPreferredReplicaLeaderResultElectPreferredLeadersResult electPreferredReplicaLeaderelectPreferredLeaders(Collection<TopicPartition> partitions, ElectPreferredReplicaLeaderOptionsElectPreferredLeadersOptions options) ElectPreferredReplicaLeaderResultElectPreferredLeadersResult electPreferredReplicaLeader(Collection<TopicPartition>electPreferredLeadersCollection<TopicPartition> partitions) |
Where
Code Block |
---|
class ElectPreferredReplicaLeaderOptionsElectPreferredLeadersOptions { public ElectPreferredReplicaLeaderOptionsElectPreferredLeadersOptions() { ... } /** * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the * AdminClient should be used. */ public Integer timeoutMs() { ... } /** * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the * AdminClient should be used. */ public ElectPreferredReplicaLeaderOptionsElectPreferredLeadersOptions timeoutMs(Integer timeoutMs) { ... } } class ElectPreferredReplicaLeaderResultElectPreferredLeadersResult { // package access constructor Map<TopicPartition, KafkaFuture<Void>> values() { ... } KafkaFuture<Void> all() { ... } } |
A call to electPreferredReplicaLeaderelectPreferredLeaders() will send a ElectPreferredReplicaLeaderRequest ElectPreferredLeadersRequest to the controller broker.
NetworkProtocol:
...
ElectPreferredLeadersRequest and
...
ElectPreferredLeadersResponse
No Format |
---|
ElectPreferredReplicaLeaderRequestElectPreferredLeadersRequest => [topic_partitions] topic_partitions => topic [partition_id] topic => STRING partition_id => INT32 |
...
Note: It is not an error if there is a duplicate (topic, partition)-pair in the request.
Note that a ElectPreferredReplicaLeaderRequest
a ElectPreferredLeadersRequest
must be sent to the controller of the cluster.
No Format |
---|
ElectPreferredReplicaLeaderResponseElectPreferredLeadersResponse => throttle_time_ms [replica_election_result] throttle_time_ms => INT32 replica_election_result => topic [partition_result] topic => STRING partition_result => partition_id error_code error_message partition_id => INT32 error_code => INT16 error_message => NULLABLE_STRING |
...
Broker-side election algorithm
The broker-side handling of ElectPreferredReplicaLeaderRequest
of ElectPreferredLeadersRequest
will be somewhat different than currently:
- On receipt of
ElectPreferredReplicaLeaderRequest
ofElectPreferredLeadersRequest
the controller will atomically check-and-set a flag (to prevent concurrent elections) then enqueue aPreferredReplicaLeaderElection
with theControllerManager
- The controller will when await completion of the
PreferredReplicaLeaderElection
, with a timeout. - When processing the
PreferredReplicaLeaderElection
the controller will clear the flag. - Successful or timed-out completion of the
PreferredReplicaLeaderElection
will result in aElectPreferredReplicaLeaderResponse
aElectPreferredLeadersResponse
being returned to the client
(The flag will also be checked-and-set when handling a change of the /admin/preferred_replica_election
znode, via the existing --zookeeper
-supporting code)
This change means that the ElectPreferredReplicaLeaderResponse
the ElectPreferredLeadersResponse
is sent when the election is actually complete, rather than when the /admin/preferred_replica_election
znode has merely been updated. Thus if the election fails, the ElectPreferredReplicaLeaderResponse
ElectPreferredLeadersResponse
's error_code
will provide a reason.
...