Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: electPreferredLeaders

...

The AdminClient will gain a new method:

  • electPreferredReplicaLeaderelectPreferredLeaders(Collection<TopicPartition> partitions)

A new network protocol will be added:

  • ElectPreferredReplicaLeaderRequest ElectPreferredLeadersRequest and ElectPreferredReplicaLeaderResponseElectPreferredLeadersResponse

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

...

  1. Perform the election by calling AdminClient.electPreferredReplicaLeaderelectPreferredLeaders() on an AdminClient instance bootstrapped from the via the given --bootstrap-server.

...

The --help output for the tool will be updated to note that the command is not necessary if the broker is configured with auto.leader.rebalance.enable=true.

AdminClient:

...

electPreferredLeaders()

The following methods will be added to AdminClient:

Code Block
/**
 * Elect the preferred replica of the given {@code partitions} as leader, or
 * elect the preferred replica for all partitions as leader if the argument to {@code partitions} is null.
 *
 * This operation is supported by brokers with version 1.0 or higher.
 */
ElectPreferredReplicaLeaderResultElectPreferredLeadersResult electPreferredReplicaLeaderelectPreferredLeaders(Collection<TopicPartition> partitions, ElectPreferredReplicaLeaderOptionsElectPreferredLeadersOptions options)
ElectPreferredReplicaLeaderResultElectPreferredLeadersResult electPreferredReplicaLeader(Collection<TopicPartition>electPreferredLeadersCollection<TopicPartition> partitions)

Where

Code Block
class ElectPreferredReplicaLeaderOptionsElectPreferredLeadersOptions {
    public ElectPreferredReplicaLeaderOptionsElectPreferredLeadersOptions() { ... }
    /**
     * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
     * AdminClient should be used.
     */
    public Integer timeoutMs() { ... }
    /**
     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
     * AdminClient should be used.
     */
    public ElectPreferredReplicaLeaderOptionsElectPreferredLeadersOptions timeoutMs(Integer timeoutMs) { ... }
}
class ElectPreferredReplicaLeaderResultElectPreferredLeadersResult {
    // package access constructor
    Map<TopicPartition, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
 }

A call to electPreferredReplicaLeaderelectPreferredLeaders() will send a ElectPreferredReplicaLeaderRequest ElectPreferredLeadersRequest to the controller broker.

NetworkProtocol:

...

ElectPreferredLeadersRequest and

...

ElectPreferredLeadersResponse

No Format
ElectPreferredReplicaLeaderRequestElectPreferredLeadersRequest => [topic_partitions]
  topic_partitions => topic [partition_id]
    topic => STRING
    partition_id => INT32

...

Note: It is not an error if there is a duplicate (topic, partition)-pair in the request.

Note that ElectPreferredReplicaLeaderRequest a ElectPreferredLeadersRequest must be sent to the controller of the cluster.

No Format
ElectPreferredReplicaLeaderResponseElectPreferredLeadersResponse => throttle_time_ms [replica_election_result]
  throttle_time_ms => INT32
  replica_election_result => topic [partition_result]
    topic => STRING
    partition_result => partition_id error_code error_message
      partition_id => INT32
      error_code => INT16
      error_message => NULLABLE_STRING

...

Broker-side election algorithm

The broker-side handling of ElectPreferredReplicaLeaderRequest of ElectPreferredLeadersRequest will be somewhat different than currently:

  1. On receipt of ElectPreferredReplicaLeaderRequest of ElectPreferredLeadersRequest the controller will atomically check-and-set a flag (to prevent concurrent elections) then enqueue a PreferredReplicaLeaderElection with the ControllerManager
  2. The controller will when await completion of the PreferredReplicaLeaderElection, with a timeout.
  3. When processing the PreferredReplicaLeaderElection the controller will clear the flag.
  4. Successful or timed-out completion of the PreferredReplicaLeaderElection will result in ElectPreferredReplicaLeaderResponse a ElectPreferredLeadersResponse being returned to the client

(The flag will also be checked-and-set when handling a change of the /admin/preferred_replica_election znode, via the existing --zookeeper-supporting code)

This change means that the ElectPreferredReplicaLeaderResponse the ElectPreferredLeadersResponse is sent when the election is actually complete, rather than when the /admin/preferred_replica_election znode has merely been updated. Thus if the election fails, the ElectPreferredReplicaLeaderResponse ElectPreferredLeadersResponse's error_code will provide a reason.

...