Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Draft Under Discussion

Discussion thread: here 

JIRA: KAFKA-15045

...

Back in KIP-441, we introduced a new task assignor which prioritized stateful task availability over stickiness, but kept the old assignment logic which prioritized stickiness called the StickyTaskAssignor. As a safeguard we added a backdoor internal config for the task assignor, the idea being that we could recommend this to users in case of critical bugs in the new assignor. However over time it has become clear that there are valid use cases for wanting the old StickyTaskAssignor over the new HighAvailabilityTaskAssignor, primarily relating to the possibility of extreme task shuffling induced by the HAAssignor in less stable environments. As the overall assignment logic is becoming more and more sophisticated, now is a good time to finally move the internal task assignor config to a full-fledged public API.

Public Interfaces

The original change here will be introducing a new public config for the task assignor class. Note that the thread-level assignment will remain an internal implementation detail (see "Rejected Alternatives" for further thoughts and reasoning)

.

There are also reasonable scenarios where users may want to plug in their own task assignor. For example, a user may want to configure their own static assignment to work around an issue in the available assignors. Or they may want to implement their own assignment logic that considers metrics they've collected, or task migration cost related to their specific processors.

Finally, there are good reasons for a user to want to extend or modify the behaviour of the Kafka Streams partition assignor beyond just changing the task assignment. For example, a user may want to implement their own initialization logic that initializes resources (much the same way the Streams Partition Assignor initializes internal topics).

With these motivations in mind, we are proposing to add a new group of configurable interfaces for plugging custom behaviour into the Streams Partition Assignor. This configuration will supplant the existing internal task assignor config. In this KIP, we will limit the scope of these configs to supplying a custom task assignor. However, this gives us a pattern based on which to, in future KIPs, add further configs which a user can set to plug in custom behavior.

Public Interfaces

We will introduce a new config that supplies an instance of TaskAssignor  (discussed below). In the future, additional plugins can use the same partition.assignor  prefix:

Code Block
languagejava
titleStreamsConfig
/** {@code partition.assignor.task.assignor.class} */
public static final String PARTITION_ASSIGNOR_TASK_ASSIGNOR_CLASS_CONFIG = "partition.assignor.task.assignor.class";
private static final String PARTITION_ASSIGNOR_TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <@link TaskAssignor> interface". Defaults to the <@link HighAvailabilityTaskAssignor> class.";

We will also remove the old internal config (which we can do without deprecation as this was an internal config and thus by definition not part of the public API, also discussed in more detail in "Rejected Alternatives")

Code Block
languagejava
titleStreamsConfig
public static class InternalConfig {
Code Block
languagejava
titleStreamsConfig
/** {@code task.assignor.class} */
public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class";
private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <@link TaskAssignor> interface"
        + " that will be used to assign tasks to client nodes during a rebalance. Note that tasks will be distributed equally among the"
        +// "StreamThreadsThis withinwill eachbe clientremoved
 with maximal stickiness ie returning as many taskspublic asstatic possiblefinal to their previous owner on that"
        + "client node. Defaults to the <@link HighAvailabilityTaskAssignor> class.";

We will also remove the old internal config (which we can do without deprecation as this was an internal config and thus by definition not part of the public API, also discussed in more detail in "Rejected Alternatives")

Code Block
languagejava
titleStreamsConfig
public static class InternalConfig {
        // This will be removed
        public static final String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class";
}
String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class";
}

Note that the thread-level assignment will remain an un-configurable internal implementation detail of the partition assignor (see "Rejected Alternatives" for further thoughts and reasoning).

To enable users to actually plug something in by implementing taskAssignor , we will need to move the TaskAssignor interface from o.a.k.To enable users to actually plug something in via this config, we will need to move the TaskAssignor interface from o.a.k.streams.processor.internals.assignment to a public package, along with some of the supporting classes such as the assignment configs container class and ClientState which both appear in the TaskAssignor#assign method (although those will be heavily refactored, discussed below). All these new public APIs will be placed in a new non-internal package that mirrors their old internal package, specifically org.apache.kafka.streams.processor.assignment.

...

  1. To provide a clean separation of input/output by splitting the ClientState into a readan input-only NodeState metadata class and a writean output-only NodeAssignment return value class
  2. To decouple the followup rebalance request from the probing rebalance feature and give the assignor more direct control over the followup rebalance schedule, by allowing it to indicate which node(s) should trigger a rejoin and when to request the subsequent rebalance

...

Code Block
languagejava
titleNodeAssignment
package org.apache.kafka.streams.processor.assignment;

/**
 * A simple writewrapper classaround forUUID thethat assignorabstracts toa fillProcess inID
public theclass desiredProcessID placement{
 of active and standbyprivate tasksfinal on Streams client nodes 
 */
public interface NodeAssignmentUUID id;

    public ProcessID(final UUID id) {
   UUID processId();

    void assignActive(final TaskId task);

  void assignStandby(final TaskId task); this.id = id;
    }

  void removeActive(final TaskIdpublic taskid();

 {
      void removeStandby(final TaskIdreturn task)id;

   Set<TaskId> activeAssignment();}

  Set<TaskId>  int activeStatefulAssignmenthashCode(); {
  
   Set<TaskId> activeStatelessAssignment();

  Set<TaskId>return standbyAssignmentid.hashCode();

    /**}

   * Requestboolean aequals(final followupProcessID rebalanceother) to{
 be  triggered  by  one if of(other the== consumersnull on this node after the|| getClass() != other.getClass()) {
   * given interval has elapsed. The {@link StreamThread} selected forreturn thisfalse;
 will be chosen at random.
   * <p>}
   * NOTE: A best effort will be made to enforce a rebalance according to the requested schedule,
   * but there is no guarantee that another rebalance will not occur before this time has elapsed.
   * Similarly, there is no guarantee the followup rebalance will occur, and must be re-requestedreturn id.equals(other.id);
    }
}

/**
 * A simple interface for the assignor to return the desired placement of active and standby tasks on Streams client nodes 
 */
public interface NodeAssignment {
  ProcessID processId();

  Set<TaskId> activeAssignment();

  Set<TaskId> activeStatefulAssignment();
  
  Set<TaskId> activeStatelessAssignment();

  Set<TaskId> standbyAssignment();

  /**
   * if,@return forthe example,actual thedeadline requestingin consumerobjective crashestime, orusing dropsms outsince of the group.epoch, Suchafter anwhich eventthe
   * is,followup however,rebalance guaranteedwill tobe triggerattempted. aEquivalent newto rebalance itself, at which point the assignor{@code 'now + followupRebalanceDelay'}
   */
   * can re-evaluate whether to request an additional rebalance or not.
   *
   * @param followupRebalanceDelay how long this node should wait before initiating a new rebalance
   */
  void requestFollowupRebalance(final Duration followupRebalanceDelay);

long followupRebalanceDeadline();
 }

and

Code Block
languagejava
titleNodeState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface NodeState {
  /**
   * @return the actualprocessId deadlineof inthe objectiveapplication time,instance usingrunning mson sincethis thenode
 epoch, after which the*/
   * followup rebalance will be attempted. Equivalent to {@code 'now + followupRebalanceDelay'}ProcessID processId();

  /**
   * Returns the number of StreamThreads on this node, which is equal to the number of main consumers
   */
  long followupRebalanceDeadline();
 }

and

Code Block
languagejava
titleNodeState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface NodeState {       
  
  /**
   * @return a new {@link NodeAssignment} that can be used to assign tasks for this node
   */
  NodeAssignment newAssignmentForNode and represents its overall capacity.
   * <p>
   * NOTE: this is actually the "minimum capacity" of a node, or the minimum number of assigned
   * active tasks below which the node will have been over-provisioned and unable to give every
   * available StreamThread an active task assignment
   *
   * @return the number of consumers on this node
   */
  int numStreamThreads();

  /**
   * @return the processIdset of the application instance running on thisconsumer client ids for all StreamThreads on the given node
   */
  UUIDSortedSet<String> processIdconsumers();

  /**
   * Returns@return the numberset of StreamThreads all active tasks owned by consumers on this node, whichsince isthe equalprevious torebalance
 the number of main consumers*/
  SortedSet<TaskId> previousActiveTasks();

   * and represents its overall capacity./**
   * <p>
@return the set *of NOTE:all thisstandby istasks actuallyowned theby "minimumconsumers capacity" of aon this node, orsince the minimum number of assignedprevious rebalance
   */
 active tasks below which the node will have been over-provisioned and unable to give every
   * available StreamThread an active task assignment
   *SortedSet<TaskId> previousStandbyTasks();

  /**
   * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
   * @returndid thenot numberhave ofany consumersstate onfor this node
task on  */
  int numStreamThreads();

  /*disk.
   *
   * @return theend setoffset ofsum consumer- clientoffset idssum
 for all StreamThreads* on the given node
   */
  SortedSet<String> consumers();

  /**
   * @return the consumer Task.LATEST_OFFSET if this was previously an active running task on this node that previously owned this partition in the previous rebalanceclient
   */
  Stringlong previousOwnerForPartitionlagFor(final TopicPartitionTaskId topicPartitiontask);

  /**
   * @return the previous settasks ofassigned allto activethis tasksconsumer ownedordered by consumers on this node since the previous rebalance lag, filtered for any tasks that don't exist in this assignment
   */
  SortedSet<TaskId> previousActiveTasks()prevTasksByLag(final String consumer);

  /**
   * @returnReturns thea setcollection ofcontaining all standby(and tasksonly) ownedstateful bytasks consumersin onthe thistopology nodeby since the previous rebalance{@link TaskId},
   */
 mapped to SortedSet<TaskId> previousStandbyTasks();

  /**
   * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
   * did not have any state for this task on diskits "offset lag sum". This is computed as the difference between the changelog end offset
   * and the current offset, summed across all logged state stores in the task.
   *
   * @return end offset sum - offseta map from all stateful tasks to their lag sum
   */
  Map<TaskId, Long> statefulTasksToLagSums();

  /**
   * The {@link HostInfo} of  Task.LATEST_OFFSETthis node, if thisset wasvia previouslythe
 an active running* task on this client{@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
   */
  long lagFor(final TaskId task * @return the host info for this node if configured, else {@code Optional.empty()}
   */
  Optional<HostInfo> hostInfo();

  /**
   * @returnThe theclient previoustags tasksfor assigned to this consumer ordered by lagclient node, filteredif forset any taskshave thatbeen don'tvia existconfigs inusing this assignmentthe
   */
  SortedSet<TaskId> prevTasksByLag(final String consumer);

  /**{@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
   * <p>
   * ReturnsCan abe collectionused containinghowever allyou (and only) stateful taskswant, or passed in the topology by {@link TaskId},to enable the rack-aware standby task assignor.
   *
 mapped to its* "offset@return lagall sum".the Thisclient istags computedfound asin thethis difference between the changelog end offsetnode's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<String, * and the current offset, summed across all logged state stores in the task.
   *
   * @return a map from all stateful tasks to their lag sum
   */
  Map<TaskId, Long> statefulTasksToLagSums();

  String> clientTags();
 }

The NodeState will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final new interface, the ApplicationMetadata. The methods on the ApplicationMetadata are basically just the current inputs to the #assign method:

Code Block
languagejava
titleApplicationMetadata
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface ApplicationMetadata {
    /**
     * The @return a map from the {@link@code HostInfoprocessId} ofto this node, if set via the
   * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
{@link NodeState} for all Streams client nodes in this app
     */
   * @return the host info for this node if configured, else {@code Optional.empty()}
   */
  Optional<HostInfo> hostInfo();

  Map<ProcessID, NodeState> nodeStates();

    /**
   * The client tags for this client node  * Makes a remote call to fetch changelog topic end offsets and, if setsuccessful, anyuses havethe beenresults viato configscompute
 using the
   * task lags for each {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
NodeState}.
     *
  <p>
   * Can@return be used however you want, or passed in to enable the rack-aware standby task assignor.
   *
   * @return all the client tags found in this node's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<String, String> clientTags();
 }

The NodeState will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final new interface, the ApplicationMetadata:

Code Block
languagejava
titleApplicationMetadata
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface ApplicationMetadata {
    /**whether the end offset fetch and lag computation was successful
     */
    boolean computeTaskLags();

    /**
     * @return a simple container class with the Streams configs relevant to assignment
     */
    AssignmentConfigs assignmentConfigs();

    /**
     * @return the set of all tasks in this topology which must be assigned to a node
     */
    Set<TaskId> allTasks();

    /**
     *
     * @return a map from the {@codeset processId}of tostateful {@linkand NodeState} for all Streams client nodes changelogged tasks in this apptopology
     */
    Map<UUID, NodeState> nodeStatesSet<TaskId> statefulTasks(); 
}

We'll also move some of the existing assignment functionality into a utils class that can be called by implementors of TaskAssignor:

Code Block
languagejava
titleApplicationMetadata
package org.apache.kafka.streams.processor.assignment;


    /**
 * A set of *utilities Makesto ahelp remoteimplement calltask to fetch changelog topic end offsets and, if successful, uses the results to compute
     * task lags for each {@link NodeState}assignment
 */
public final class TaskAssignmentUtils {
    /**
     * Assign standby tasks to nodes according to the default logic.
     * <p>
     * @returnIf whetherrack-aware theclient endtags offsetare fetchconfigured, andthe lagrack-aware computationstandby wastask successful
assignor will be   */
  used
  boolean computeTaskLags();

    /**
     * @return@param anodeAssignments simplethe containercurrent classassignment withof the Streams configs relevant tasks to assignmentnodes
     */
    AssignmentConfigs assignmentConfigs();

    /**
     * @return the set of all tasks in this topology which must be assigned to a nodepublic static void defaultStandbyTaskAssignment(final ApplicationMetadata applicationMetadata, final Map<ProcessID, NodeAssignment> nodeAssignments) {...}

    /**
     * Optimize the active task assignment for rack-awareness
     */
    Set<TaskId> allTasks();

    /**
     * * @param nodeAssignments the current assignment of tasks to nodes
     * @return@param tasks the set of stateful and changelogged tasks in this topology tasks to reassign if possible. Must already be assigned to a node
     */
    public Set<TaskId>static void statefulTasks();optimizeRackAwareActiveTasks(final ApplicationMetadata applicationMetadata, final Map<ProcessID, NodeAssignment> nodeAssignments, final SortedSet<TaskId> tasks) {...}

    /**
     * Assign standby tasks to nodes according to Optimize the defaultstandby logic.
task     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be usedassignment for rack-awareness
     *
     * @param nodeAssignments the current assignment of tasks to nodes
     */
    public static void defaultStandbyTaskAssignmentoptimizeRackAwareStandbyTasks(final Map<UUIDApplicationMetadata applicationMetadata, final Map<ProcessID, NodeAssignment> nodeAssignments);

    /**
     * Optimize the active task assignment for rack-awareness
     *
     * @param nodeAssignments the current assignment of tasks to nodes
     * @param tasks the set of tasks to reassign if possible. Must already be assigned to a node
     */
    void optimizeRackAwareActiveTasks(final Map<UUID, NodeAssignment> nodeAssignments, final SortedSet<TaskId> tasks);

    /**
     * Optimize the standby task assignment for rack-awareness
     *
     * @param nodeAssignments the current assignment of tasks to nodes
     */
    void optimizeRackAwareStandbyTasks(final Map<UUID, NodeAssignment> nodeAssignments); 
}

The first four methods on the ApplicationMetadata are basically just the current inputs to the #assign method. The last three methods are new APIs but pre-existing functionality, essentially presenting a clean way for users to take advantage of the current optimizations and algorithms that are utilized by the built-in assignors, so that users don't have to re-implement complex features such as rack-awareness. The #defaultStandbyTaskAssignment API will just delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.

Last, we have the AssignmentConfigs, which are (and would remain) just a basic container class, although we will migrate from public fields to standard getters for each of the configs passed into the assignor. Going forward, when a KIP is proposed to introduce a new config intended for the assignor, it should include the appropriate getter(s) in this class as part of the accepted proposal.

 {...}
}

TaskAssignmentUtils  provides new APIs but pre-existing functionality, essentially presenting a clean way for users to take advantage of the current optimizations and algorithms that are utilized by the built-in assignors, so that users don't have to re-implement complex features such as rack-awareness. The #defaultStandbyTaskAssignment API will just delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.

Last, we have the AssignmentConfigs, which are (and would remain) just a basic container class, although we will migrate from public fields to standard getters for each of the configs passed into the assignor. Going forward, when a KIP is proposed to introduce a new config intended for the assignor, it should include the appropriate getter(s) in this class as part of the accepted proposal.

Code Block
languagejava
titleAssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public int maxWarmupReplicas();
    public int numStandbyReplicas
Code Block
languagejava
titleAssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public intlong maxWarmupReplicasprobingRebalanceIntervalMs();
    public intList<String> numStandbyReplicasrackAwareAssignmentTags();
    public longint probingRebalanceIntervalMstrafficCost();
    public List<String> rackAwareAssignmentTags();
    public int trafficCost();
    public int nonOverlapCost();
} int nonOverlapCost();
}


Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationMetadata.computeTaskLags . This means we need some way to communicate to the streams partition assignor that it should retain the same assignment and schedule a follow-up rebalance. To do this, we will add the exception type StreamsAssignorRetryableException . If the TaskAssignor  throws this exception, StreamsPartitionAssignor  catches it, does fallback assignment, and schedules a follow-up rebalance.

Proposed Changes

No actual changes to functionality, mainly moving an internal config to the public part of StreamsConfig and bringing along a few currently-internal classes into the public API as some new interfaces and a new public assignment package. Code-wise the largest change is really the breaking up of the ClientState into the new NodeState and NodeAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before. 

...

  1. One obvious question here is whether we want to still deprecate the old internal config anyway, out of compassion for any who may already be using it despite it not being considered public. Personally I think this would be reasonable but don't feel strongly one way or another.
  2. Another possibility that was considered and ultimately decided against was whether to encompass the thread-level assignment in this KIP, and bring that into the public API or and make it pluggable as well. We determined that this did not seem necessary to do as part of the initial KIP, especially considering the large scope we have already reached. However it's worth noting that a followup KIP that builds on the new public API(s) introduced here would become much more feasible should someone wish to customize the thread-level logic at some point in the future. If/when that question is brought up, we'll have to address a few other concerns we had that contributed to our decision to exclude this for now, such as validating the thread assignment for correctness according to the cooperative rebalancing protocol, or niche optimizations like transient standbys to avoid losing in-memory state, and some other subtle logic that currently resides in the last leg of the StreamsPartitionAssignor's algorithm that tackles the distribution of node tasks to threads

...