Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/**
 * Change the reassignments for one or more partitions.
 * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>cancel</bold> the reassignment for the associated partition.
 *
 * @param reassignments   The reassignments to add, modify, or remove.
 * @param options         The options to use.
 * @return                The result.
 */
public AlterPartitionReassignmentsResult alterPartitionReassignments(
         Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
         AlterPartitionReassignmentsOptions options);

/**
 * A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map)}.
 */
public class NewPartitionReassignment {
    private final List<Integer> targetBrokers;
    ...
}

class AlterPartitionAssignmentsResult {
  Map<TopicPartition, Future<Void>> futures; // maps partitions to the results for each partition (success / failureKafkaFuture<Void>> values()
  Future<Void> all(); // Throws an exception if any reassignment was rejected
}

class AlterPartitionAssignmentsOptions extends AbstractOptions<> {
  // contains timeoutMs
}

/**
 * List some of the current partition reassignments.
 * 
 * @param options        The options to use.
 * @param partitions     The partitions to show reassignments for. Must be non-null.
 */
ListPartitionReassignmentsResult listPartitionReassignments(
      ListPartitionReassignmentsOptions options,
      Set<TopicPartition> partitions);


/**
 * List all of the current partition reassignments for the given topics.
 * 
 * @param options        The options to use.
 * @param topics         The topics to show reassignments for. Must be non-null.
 */
ListPartitionReassignmentsResult listPartitionReassignments(
      ListPartitionReassignmentsOptions options,
      Set<String> topics);


/**
 * List all of the current partition reassignments.
 * 
 * @param options        The options to use.
 */
ListPartitionReassignmentsResult listPartitionReassignments(
      ListPartitionReassignmentsOptions options);

class ListPartitionReassignmentsOptions extends AbstractOptions<> {
  // contains timeoutMs
}

class ListPartitionReassignmentsResult {
  private final KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments;
}

/**
 * A partition reassignment, which has been listed via {@link AdminClient#listPartitionReassignments()}.
 */
public class PartitionReassignment {
    /**
     * The brokers which this partition currently resides on.
     */
    private final List<Integer> replicas;

    /**
     * The brokers that we are adding this partition to as part of a reassignment.
     */
    private final List<Integer> addingReplicas;

    /**
     * The brokers that we are removing this partition from as part of a reassignment.
     */
    private final List<Integer> removingReplicas;
}

...

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to was not the controller.
  • INVALID_REPLICA_ASSIGNMENT: if the specified replica assignment was not valid-- for example, if it included negative numbers, repeated numbers, or specified a broker ID that the controller was not aware of.
  • UNKNOWN_TOPIC_OR_PARTITION
  • NO_REASSIGNMENT_IN_PROGRESS: If a cancellation was called on a topic/partition which was not in the middle of a reassignment
  • CLUSTER_AUTHORIZATION_FAILED: If we didn't have sufficient permission to perform the alteration.

...

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to is not the controller.
  • UNKNOWN_TOPIC_OR_PARTITION
  • CLUSTER_AUTHORIZATION_FAILED: if we didn't have sufficient permissions.

...