Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

3) Add optional argument --timeout to kafka-reassignment-partitions.sh. This is because kafka-reassignment-partitions.sh may need to re-send ChangeReplicaDirRequest to broker if replica hasn't already been created there. The timeout is set to 10 seconds by default.


AdminClient

The following methods and classes are added.

 

Code Block
languagejava
public interface AdminClient extends AutoCloseable {

    /**
     * Change the log directory for the specified replicas.
     *
     * Updates are not transactional so they may succeed for some resources while fail for others. The log directory for
     * a particular replica is updated atomically.
     *
     * This operation is supported by brokers with version 0.11.1.0 or higher.
     *
     * @param replicaAssignment  The replicas with their log directory absolute path
     * @param options            The options to use when changing replica dir
     * @return                   The ChangeReplicaDirResult
     */
    public abstract ChangeReplicaDirResult changeReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, ChangeReplicaDirOptions options);
 
    /**
     * Query the log directory information for the specified log directories on the given brokers.
     * All log directories on a broker are queried if an empty collection of log directories is specified for this broker
     *
     * This operation is supported by brokers with version 0.11.1.0 or higher.
     *
     * @param logDirsByBroker     A list of log dirs per broker
     * @param options             The options to use when querying log dir info
     * @return                    The DescribeDirsResult
     */
    public abstract DescribeDirsResult describeDirs(Map<Integer, Collection<String>> logDirsByBroker, DescribeDirsOptions options);
 
    /**
     * Query the replica directory information for the specified replicas.
     *
     * This operation is supported by brokers with version 0.11.1.0 or higher.
     *
     * @param replicas      The replicas to query
     * @param options       The options to use when querying replica dir info
     * @return              The DescribeReplicaDirResult
     */
    public abstract DescribeReplicaDirResult describeReplicaDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaDirOptions options);
}


/**
  * Options for the changeReplicaDir call.
  */
class ChangeReplicaDirOptions {
    private Integer timeoutMs = null;
    public ChangeReplicaDirOptions timeoutMs(Integer timeoutMs);
    public Integer timeoutMs();
}
 
/**
  * The result of the changeReplicaDir call.
  */
class ChangeReplicaDirResult {
    /**
     * Return a map from replica to futures, which can be used to check the status of individual replica movement.
     */
    public Map<TopicPartitionReplica, KafkaFuture<Void>> values();

    /**
     * Return a future which succeeds if all the replica movement have succeeded
     */
    public KafkaFuture<Void> all();
}
 
/**
  * Options for the describeDirs call.
  */
class DescribeDirsOptions {
    private Integer timeoutMs = null;
    public DescribeDirsOptions timeoutMs(Integer timeoutMs);
    public Integer timeoutMs();
}
 
/**
  * The result of the describeDirs call.
  */
class DescribeDirsResult {
    /**
     * Return a map from brokerId to futures which can be used to check the information of partitions on each individual broker
     */
    public Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> values();

    /**
     * Return a future which succeeds only if all the brokers have responded without error
     */
    public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all();
}
 
/**
  * Description of a log directory
  */
class LogDirInfo {
    public final Errors error;
    public final Map<TopicPartition, ReplicaInfo> replicaInfos;
}
 
/**
  * Description of a replica
  */
public class ReplicaInfo {
    public final long size;
    public final long logEndOffset;
    public final boolean isTemporary;
}
 
/**
  * Options for the describeReplicaDir call.
  */
class DescribeReplicaDirOptions {
    private Integer timeoutMs = null;
    public DescribeReplicaDirOptions timeoutMs(Integer timeoutMs);
    public Integer timeoutMs();
}
 
/**
  * The result of the describeReplicaDir call.
  */
class DescribeReplicaDirResult {
    /**
     * Return a map from replica to futures which can be used to check the log directory information of individual replicas
     */
    public Map<TopicPartitionReplica, KafkaFuture<ReplicaDirInfo>> values();

    /**
     * Return a future which succeeds if log directory information of all replicas are available
     */
    public KafkaFuture<Map<TopicPartitionReplica, ReplicaDirInfo>> all();
}

/**
  * Log directory information of a given replica and its intra-broker movement progress
  */
class ReplicaDirInfo {
    public String currentReplicaDir;
    public String temporaryReplicaDir;
    public long temporaryReplicaOffsetLag;
}

...