Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

3) Add optional argument --timeout to kafka-reassignment-partitions.sh. This is because kafka-reassignment-partitions.sh may need to re-send ChangeReplicaDirRequest to broker if replica hasn't already been created there. The timeout is set to 10 seconds by default.


AdminClient


 

Code Block
languagejava
public interface AdminClient extends AutoCloseable {

    /**
     * Change the log directory for the specified replicas.
     *
     * Updates are not transactional so they may succeed for some resources while fail for others. The log directory for
     * a particular replica is updated atomically.
     *
     * This operation is supported by brokers with version 0.11.1.0 or higher.
     *
     * @param replicaAssignment  The replicas with their log directory absolute path
     * @param options            The options to use when changing replica dir
     * @return                   The ChangeReplicaDirResult
     */
    public abstract ChangeReplicaDirResult changeReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, ChangeReplicaDirOptions options);
 
    /**
     * Query the log directory information for the specified log directories.
     * All log directories on the broker are queries of the collection is empty.
     *
     * This operation is supported by brokers with version 0.11.1.0 or higher.
     *
     * @param logDirsByBroker     A list of log dirs per broker
     * @param options             The options to use when querying log dir info
     * @return                    The DescribeDirsResult
     */
    public abstract DescribeDirsResult describeDirs(Map<Integer, Collection<String>> logDirsByBroker, DescribeDirsOptions options);
 
    /**
     * Query the replica dir information for the specified replicas.
     *
     * This operation is supported by brokers with version 0.11.1.0 or higher.
     *
     * @param replicas      The replicas to query
     * @param options       The options to use when querying replica dir info
     * @return              The DescribeReplicaDirResult
     */
    public abstract DescribeReplicaDirResult describeReplicaDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaDirOptions options);
}


/**
  * Options for the changeReplicaDir call.
  */
class ChangeReplicaDirOptions {
    private Integer timeoutMs = null;
    public ChangeReplicaDirOptions timeoutMs(Integer timeoutMs);
    public Integer timeoutMs();
}
 
/**
  * The result of the changeReplicaDir call.
  */
class ChangeReplicaDirResult {
    /**
     * Return a map from replica to futures, which can be used to check the status of individual replica movement.
     */
    public Map<TopicPartitionReplica, KafkaFuture<Void>> values();

    /**
     * Return a future which succeeds if all the replica movement have succeeded
     */
    public KafkaFuture<Void> all();
}
 
/**
  * Options for the describeDirs call.
  */
class DescribeDirsOptions {
    private Integer timeoutMs = null;
    public DescribeDirsOptions timeoutMs(Integer timeoutMs);
    public Integer timeoutMs();
}
 
/**
  * The result of the describeDirs call.
  */
class DescribeDirsResult {
    /**
     * Return a map from brokerId to futures which can be used to check the logDirInfo on each individual broker
     */
    public Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> values();

    /**
     * Return a future which succeeds only if all the brokers have responded without error
     */
    public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all();
}
 
/**
  * Description of a log directory
  */
class LogDirInfo {
    public final Errors error;
    public final Map<TopicPartition, ReplicaInfo> replicaInfos;
}
 
/**
  * Description of a replica
  */
public class ReplicaInfo {
    public final long size;
    public final long logEndOffset;
    public final boolean isTemporary;
}
 
/**
  * Options for the describeReplicaDir call.
  */
class DescribeReplicaDirOptions {
    private Integer timeoutMs = null;
    public DescribeReplicaDirOptions timeoutMs(Integer timeoutMs);
    public Integer timeoutMs();
}
 
/**
  * The result of the describeReplicaDir call.
  */
class DescribeReplicaDirResult {
    /**
     * Return a map from replica to futures which can be used to check the status of individual replicas
     */
    public Map<TopicPartitionReplica, KafkaFuture<ReplicaDirInfo>> values();

    /**
     * Return a future which succeeds if ReplicaDirInfo of all replicas are available
     */
    public KafkaFuture<Map<TopicPartitionReplica, ReplicaDirInfo>> all();
}

/**
  * Log directory of a given replica and its intra-broker movement progress
  */
class ReplicaDirInfo {
    public String currentReplicaDir;
    public String temporaryReplicaDir;
    public long temporaryReplicaOffsetLag;
}



Changes in Operational Procedures

...