THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
3) Add optional argument --timeout to kafka-reassignment-partitions.sh. This is because kafka-reassignment-partitions.sh may need to re-send ChangeReplicaDirRequest
to broker if replica hasn't already been created there. The timeout is set to 10 seconds by default.
AdminClient
Code Block | ||
---|---|---|
| ||
public interface AdminClient extends AutoCloseable {
/**
* Change the log directory for the specified replicas.
*
* Updates are not transactional so they may succeed for some resources while fail for others. The log directory for
* a particular replica is updated atomically.
*
* This operation is supported by brokers with version 0.11.1.0 or higher.
*
* @param replicaAssignment The replicas with their log directory absolute path
* @param options The options to use when changing replica dir
* @return The ChangeReplicaDirResult
*/
public abstract ChangeReplicaDirResult changeReplicaDir(Map<TopicPartitionReplica, String> replicaAssignment, ChangeReplicaDirOptions options);
/**
* Query the log directory information for the specified log directories.
* All log directories on the broker are queries of the collection is empty.
*
* This operation is supported by brokers with version 0.11.1.0 or higher.
*
* @param logDirsByBroker A list of log dirs per broker
* @param options The options to use when querying log dir info
* @return The DescribeDirsResult
*/
public abstract DescribeDirsResult describeDirs(Map<Integer, Collection<String>> logDirsByBroker, DescribeDirsOptions options);
/**
* Query the replica dir information for the specified replicas.
*
* This operation is supported by brokers with version 0.11.1.0 or higher.
*
* @param replicas The replicas to query
* @param options The options to use when querying replica dir info
* @return The DescribeReplicaDirResult
*/
public abstract DescribeReplicaDirResult describeReplicaDir(Collection<TopicPartitionReplica> replicas, DescribeReplicaDirOptions options);
}
/**
* Options for the changeReplicaDir call.
*/
class ChangeReplicaDirOptions {
private Integer timeoutMs = null;
public ChangeReplicaDirOptions timeoutMs(Integer timeoutMs);
public Integer timeoutMs();
}
/**
* The result of the changeReplicaDir call.
*/
class ChangeReplicaDirResult {
/**
* Return a map from replica to futures, which can be used to check the status of individual replica movement.
*/
public Map<TopicPartitionReplica, KafkaFuture<Void>> values();
/**
* Return a future which succeeds if all the replica movement have succeeded
*/
public KafkaFuture<Void> all();
}
/**
* Options for the describeDirs call.
*/
class DescribeDirsOptions {
private Integer timeoutMs = null;
public DescribeDirsOptions timeoutMs(Integer timeoutMs);
public Integer timeoutMs();
}
/**
* The result of the describeDirs call.
*/
class DescribeDirsResult {
/**
* Return a map from brokerId to futures which can be used to check the logDirInfo on each individual broker
*/
public Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> values();
/**
* Return a future which succeeds only if all the brokers have responded without error
*/
public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all();
}
/**
* Description of a log directory
*/
class LogDirInfo {
public final Errors error;
public final Map<TopicPartition, ReplicaInfo> replicaInfos;
}
/**
* Description of a replica
*/
public class ReplicaInfo {
public final long size;
public final long logEndOffset;
public final boolean isTemporary;
}
/**
* Options for the describeReplicaDir call.
*/
class DescribeReplicaDirOptions {
private Integer timeoutMs = null;
public DescribeReplicaDirOptions timeoutMs(Integer timeoutMs);
public Integer timeoutMs();
}
/**
* The result of the describeReplicaDir call.
*/
class DescribeReplicaDirResult {
/**
* Return a map from replica to futures which can be used to check the status of individual replicas
*/
public Map<TopicPartitionReplica, KafkaFuture<ReplicaDirInfo>> values();
/**
* Return a future which succeeds if ReplicaDirInfo of all replicas are available
*/
public KafkaFuture<Map<TopicPartitionReplica, ReplicaDirInfo>> all();
}
/**
* Log directory of a given replica and its intra-broker movement progress
*/
class ReplicaDirInfo {
public String currentReplicaDir;
public String temporaryReplicaDir;
public long temporaryReplicaOffsetLag;
}
|
Changes in Operational Procedures
...