Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

With these motivations in mind, we are proposing to add a new group of configurable interface interfaces for plugging custom behaviour into the Streams Partition Assignor. This configuration will supplant the existing internal task assignor config. In this KIP, we will limit the scope of this interface these configs to supplying a custom task assignor. However, this gives us a means pattern based on which to, in future KIPs, add further points configs which a user can set to plug in custom behaviourbehavior.

Public Interfaces

We will introduce a new config that supplies an instance of StreamsPartitionAssignorPlugin TaskAssignor  (discussed below). In the future, additional plugins can use the same partition.assignor  prefix:

Code Block
languagejava
titleStreamsConfig
/** {@code partition.assignor.plugintask.assignor.class} */
public static final String PARTITION_ASSIGNOR_PLUGINTASK_ASSIGNOR_CLASS_CONFIG = "partition.assignor.task.pluginassignor.class";
private static final String PARTITION_ASSIGNOR_PLUGINTASK_ASSIGNOR_CLASS_DOC = "A partitiontask assignor class or class name implementing the <@link StreamsPartitionAssignorPlugin>TaskAssignor> interface"
        + ". Defaults to the <@link DefaultPartitionAssignorPlugin>HighAvailabilityTaskAssignor> class.";

We will also remove the old internal config (which we can do without deprecation as this was an internal config and thus by definition not part of the public API, also discussed in more detail in "Rejected Alternatives")

Code Block
languagejava
titleStreamsConfig
public static class InternalConfig {
        // This will be removed
        public static final String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class";
}

StreamsPartitionAssignorPlugin  is an interface that allows for plugging behavior into the various phases of partition assignment done by StreamsPartitionAssignor . In this KIP, StreamsPartitionAssignorPlugin  only has one method, which supplies a task assignor. Note that the Note that the thread-level assignment will remain an un-configurable internal implementation detail of the partition assignor (see "Rejected Alternatives" for further thoughts and reasoning).

Code Block
languagejava
titleStreamsConfig
public interface StreamsPartitionAssignorPlugin extends Configurable {
    /**
     * @return: An implementation of TaskAssignor that will be used to assign tasks to nodes. If this
     *          method returns Optional.empty(), the <@link HighAvailabilityTaskAssignor> is used.
     */
    default Optional<TaskAssignor> taskAssignor() {
        return Optional.empty();
    }
}

To enable users to actually plug something in by implementing taskAssignor , we will need to move the TaskAssignor interface from o.a.k.streams.processor.internals.assignment to a public package, along with some of the supporting classes such as the assignment configs container class and ClientState which both appear in the TaskAssignor#assign method (although those will be heavily refactored, discussed below). All these new public APIs will be placed in a new non-internal package that mirrors their old internal package, specifically org.apache.kafka.streams.processor.assignment.

To enable users to actually plug something in by implementing taskAssignor , we will need to move the TaskAssignor interface from o.a.k.streams.processor.internals.assignment to a public package, along with some of the supporting classes such as the assignment configs container class and ClientState which both appear in the TaskAssignor#assign method (although those will be heavily refactored, discussed below). All these new public APIs will be placed in a new non-internal package that mirrors their old internal package, specifically org.apache.kafka.streams.processor.assignment.

Both the input parameter and return value will be encapsulated in wrapper classes for the sake of forwards compatibility. This will let us avoid the cycle of adding, deprecating, Both the input parameter and return value will be encapsulated in wrapper classes for the sake of forwards compatibility. This will let us avoid the cycle of adding, deprecating, and removing new #assign overloads if/when we want to evolve the assignor in the future, for example to pass in additional metadata or enable the assignor to output new kinds of information or instructions to the StreamsPartitionAssignor. The analogous ConsumerPartitionAssignor works similarly, returning a single GroupAssignment object that wraps the collection of individual consumer assignments for the same reason.

...

  1. To provide a clean separation of input/output by splitting the ClientState into a readan input-only NodeState metadata class and a writean output-only NodeAssignment return value class
  2. To decouple the followup rebalance request from the probing rebalance feature and give the assignor more direct control over the followup rebalance schedule, by allowing it to indicate which node(s) should trigger a rejoin and when to request the subsequent rebalance

This gives us the following two new public interfaces:

  1. schedule, by allowing it to indicate which node(s) should trigger a rejoin and when to request the subsequent rebalance

This gives us the following two new public interfaces:

Code Block
languagejava
titleNodeAssignment
package org.apache.kafka.streams.processor.assignment;

/** A simple wrapper around UUID that abstracts a Process ID
public class ProcessID {
    private final UUID id;

    public ProcessID(final UUID id) {
        this.id = id;
    }

    public id() {
        return id;
    }

    int hashCode() {
        return id.hashCode();
    }

    boolean equals(final ProcessID other) {
        if (other == null || getClass() != other.getClass()) {
            return false;
        }
         return id.equals(other.id);
    }
}

/**
 * A simple interface for the assignor to return the desired placement of active and standby tasks on Streams client nodes 
 */
public interface NodeAssignment {
  ProcessID processId();

  Set<TaskId> activeAssignment();

  Set<TaskId> activeStatefulAssignment();
  
  Set<TaskId> activeStatelessAssignment();

  Set<TaskId> standbyAssignment(
Code Block
languagejava
titleNodeAssignment
package org.apache.kafka.streams.processor.assignment;

/**
 * A simple write class for the assignor to fill in the desired placement of active and standby tasks on Streams client nodes 
 */
public interface NodeAssignment {
  UUID processId();

  void assignActive(final TaskId task);

  void assignStandby(final TaskId task);

  void removeActive(final TaskId task);

  void removeStandby(final TaskId task);

  Set<TaskId> activeAssignment();

  Set<TaskId> activeStatefulAssignment();
  
  Set<TaskId> activeStatelessAssignment();

  Set<TaskId> standbyAssignment();

  /**
   * Request a followup rebalance to be triggered by one of the consumers on this node after the
   * given interval has elapsed. The {@link StreamThread} selected for this will be chosen at random.
   * <p>
   * NOTE: A best effort will be made to enforce a rebalance according to the requested schedule,
   * but there is no guarantee that another rebalance will not occur before this time has elapsed.
   * Similarly, there is no guarantee the followup rebalance will occur, and must be re-requested
   * if, for example, the requesting consumer crashes or drops out of the group. Such an event
   * is, however, guaranteed to trigger a new rebalance itself, at which point the assignor
   * can re-evaluate whether to request an additional rebalance or not.
   *
   * @param followupRebalanceDelay how long this node should wait before initiating a new rebalance
   */
  void requestFollowupRebalance(final Duration followupRebalanceDelay);

  /**
   * @return the actual deadline in objective time, using ms since the epoch, after which the
   * followup rebalance will be attempted. Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  long followupRebalanceDeadline();
 }

...

Code Block
languagejava
titleNodeState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface NodeState {       
  
  /**
   * @return a new {@link NodeAssignment} that can be used to assign tasks for this node
   */
  NodeAssignment newAssignmentForNode();

 node with at least one StreamThread participating in this rebalance
 */
public interface NodeState {
  /**
   * @return the processId of the application instance running on this node
   */
  UUIDProcessID processId();

  /**
   * Returns the number of StreamThreads on this node, which is equal to the number of main consumers
   * and represents its overall capacity.
   * <p>
   * NOTE: this is actually the "minimum capacity" of a node, or the minimum number of assigned
   * active tasks below which the node will have been over-provisioned and unable to give every
   * available StreamThread an active task assignment
   *
   * @return the number of consumers on this node
   */
  int numStreamThreads();

  /*assignment
   *
   * @return the setnumber of consumer client ids for all StreamThreads consumers on thethis given node
   */
  SortedSet<String>int consumersnumStreamThreads();

  /**
   * @return the consumerset onof thisconsumer nodeclient thatids previouslyfor ownedall thisStreamThreads partition inon the previousgiven rebalancenode
   */
  StringSortedSet<String> previousOwnerForPartitionconsumers(final TopicPartition topicPartition);

  /**
   * @return the set of all active tasks owned by consumers on this node since the previous rebalance
   */
  SortedSet<TaskId> previousActiveTasks();

  /**
   * @return the set of all standby tasks owned by consumers on this node since the previous rebalance
   */
  SortedSet<TaskId> previousStandbyTasks();

  /**
   * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
   * did not have any state for this task on disk.
   *
   * @return end offset sum - offset sum
   *          Task.LATEST_OFFSET if this was previously an active running task on this client
   */
  long lagFor(final TaskId task);

  /**
   * @return the previous tasks assigned to this consumer ordered by lag, filtered for any tasks that don't exist in this assignment
   */
  SortedSet<TaskId> prevTasksByLag(final String consumer);

  /**
   * Returns a collection containing all (and only) stateful tasks in the topology by {@link TaskId},
   * mapped to its "offset lag sum". This is computed as the difference between the changelog end offset
   * and the current offset, summed across all logged state stores in the task.
   *
   * @return a map from all stateful tasks to their lag sum
   */
  Map<TaskId, Long> statefulTasksToLagSums();

  /**
   * The {@link HostInfo} of this node, if set via the
   * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
   *
   * @return the host info for this node if configured, else {@code Optional.empty()}
   */
  Optional<HostInfo> hostInfo();

  /**
   * The client tags for this client node, if set any have been via configs using the
   * {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
   * <p>
   * Can be used however you want, or passed in to enable the rack-aware standby task assignor.
   *
   * @return all the client tags found in this node's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<String, String> clientTags();
 }

The NodeState will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final new interface, the ApplicationMetadatathe final new interface, the ApplicationMetadata. The methods on the ApplicationMetadata are basically just the current inputs to the #assign method:

Code Block
languagejava
titleApplicationMetadata
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface ApplicationMetadata {
    /**
     * @return a map from the {@code processId} to {@link NodeState} for all Streams client nodes in this app
     */
    Map<UUIDMap<ProcessID, NodeState> nodeStates();

    /**
     * Makes a remote call to fetch changelog topic end offsets and, if successful, uses the results to compute
     * task lags for each {@link NodeState}.}.
     *
     * @return whether the end offset fetch and lag computation was successful
     */
    boolean computeTaskLags();

     /**
     * @return whethera simple thecontainer endclass offsetwith fetchthe andStreams lagconfigs computationrelevant wasto successfulassignment
     */
    booleanAssignmentConfigs computeTaskLagsassignmentConfigs();

    /**
     * @return a simple container class with the Streams configs relevant to assignmentthe set of all tasks in this topology which must be assigned to a node
     */
    AssignmentConfigsSet<TaskId> assignmentConfigsallTasks();

    /**
     /**
     * @return the set of stateful and allchangelogged tasks in this topology
 which must be assigned to*/
 a node
  Set<TaskId>   */
    Set<TaskId> allTasks();

    statefulTasks(); 
}

We'll also move some of the existing assignment functionality into a utils class that can be called by implementors of TaskAssignor:

Code Block
languagejava
titleApplicationMetadata
package org.apache.kafka.streams.processor.assignment;

/**
     *
     * @return the  A set of statefulutilities andto changeloggedhelp tasksimplement intask this topologyassignment
     */
public final class  Set<TaskId> statefulTasks();

TaskAssignmentUtils {
    /**
     * Assign standby tasks to nodes according to the default logic.
     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
     *
     * @param nodeAssignments the current assignment of tasks to nodes
     */
    public static void defaultStandbyTaskAssignment(final Map<UUIDApplicationMetadata applicationMetadata, final Map<ProcessID, NodeAssignment> nodeAssignments); {...}

    /**
     * Optimize the active task assignment for rack-awareness
     *
     * @param nodeAssignments the current assignment of tasks to nodes
     * @param tasks the set of tasks to reassign if possible. Must already be assigned to a node
     */
    public static void optimizeRackAwareActiveTasks(final Map<UUIDApplicationMetadata applicationMetadata, final Map<ProcessID, NodeAssignment> nodeAssignments, final SortedSet<TaskId> tasks); {...}

    /**
     * Optimize the standby task assignment for rack-awareness
     *
     * @param nodeAssignments the current assignment of tasks to nodes
     */
    public static void optimizeRackAwareStandbyTasks(final Map<UUIDApplicationMetadata applicationMetadata, final Map<ProcessID, NodeAssignment> nodeAssignments); {...}
}

The first four methods on the ApplicationMetadata are basically just the current inputs to the #assign method. The last three methods are TaskAssignmentUtils  provides new APIs but pre-existing functionality, essentially presenting a clean way for users to take advantage of the current optimizations and algorithms that are utilized by the built-in assignors, so that users don't have to re-implement complex features such as rack-awareness. The #defaultStandbyTaskAssignment API will just delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.

...