You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Draft

Discussion thread: here 

JIRA: KAFKA-15045

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Back in KIP-441, we introduced a new task assignor which prioritized stateful task availability over stickiness, but kept the old assignment logic which prioritized stickiness called the StickyTaskAssignor. As a safeguard we added a backdoor internal config for the task assignor, the idea being that we could recommend this to users in case of critical bugs in the new assignor. However over time it has become clear that there are valid use cases for wanting the old StickyTaskAssignor over the new HighAvailabilityTaskAssignor, primarily relating to the possibility of extreme task shuffling induced by the HAAssignor in less stable environments. As the overall assignment logic is becoming more and more sophisticated, now is a good time to finally move the internal task assignor config to a full-fledged public API.

Public Interfaces

The original change here will be introducing a new public config for the task assignor class. Note that the thread-level assignment will remain an internal implementation detail (see "Rejected Alternatives" for further thoughts and reasoning)

StreamsConfig
/** {@code task.assignor.class} */
public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class";
private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <@link TaskAssignor> interface"
        + " that will be used to assign tasks to client nodes during a rebalance. Note that tasks will be distributed equally among the"
        + "StreamThreads within each client with maximal stickiness ie returning as many tasks as possible to their previous owner on that"
        + "client node. Defaults to the <@link HighAvailabilityTaskAssignor> class.";

We will also remove the old internal config (which we can do without deprecation as this was an internal config and thus by definition not part of the public API, also discussed in more detail in "Rejected Alternatives")

StreamsConfig
public static class InternalConfig {
        // This will be removed
        public static final String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class";
}

To enable users to actually plug something in via this config, we will need to move the TaskAssignor interface from o.a.k.streams.processor.internals.assignment to a public package, along with some of the supporting classes such as the assignment configs container class and ClientState which both appear in the TaskAssignor#assign method (although those will be heavily refactored, discussed below). All these new public APIs will be placed in a new non-internal package that mirrors their old internal package, specifically org.apache.kafka.streams.processor.assignment.

Both the input parameter and return value will be encapsulated in wrapper classes for the sake of forwards compatibility. This will let us avoid the cycle of adding, deprecating, and removing new #assign overloads if/when we want to evolve the assignor in the future, for example to pass in additional metadata or enable the assignor to output new kinds of information or instructions to the StreamsPartitionAssignor. The analogous ConsumerPartitionAssignor works similarly, returning a single GroupAssignment object that wraps the collection of individual consumer assignments for the same reason.

TaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor extends Configurable {

  /**
   * @param applicationMetadata the metadata for this Streams application
   * @return the assignment of active and standby tasks to Streams client nodes
   */
  TaskAssignment assign(final ApplicationMetadata applicationMetadata);

  /**
   * Wrapper class for the final assignment of active and standbys tasks to individual Streams 
   * client nodes
   */
  class TaskAssignment {
    private final Collection<NodeAssignment> nodeAssignments;

	/**
     * @return the assignment of tasks to nodes
     */
    public Collection<NodeAssignment> assignment();

    /**
     * @return the number of Streams client nodes to which tasks were assigned
     */
    public int numNodes();

    /**
     * @return a String representation of the returned assignment, in processId order
     */
    @Override
    public String toString();
  }
}

Another reason for introducing the new TaskAssignment and ApplicationMetadata classes is to clean up the way assignment is performed today, as the current API is really not fit for public consumption. Currently, the TaskAssignor is provided a set of ClientState objects representing each client node. The ClientState is however not just the input to the assignor, but also its output – the assignment of tasks to nodes is performed by mutating the ClientStates passed in. The return value of the #assign method is a simple boolean indicating to the StreamsPartitionAssignor whether it should request a followup probing rebalance, a feature associated only with the HighAvailabilityTaskAssignor.

To solve these problems, we plan to refactor the interface with two goals in mind:

  1. To provide a clean separation of input/output by splitting the ClientState into a read-only NodeState metadata class and a write-only NodeAssignment return value class
  2. To decouple the followup rebalance request from the probing rebalance feature and give the assignor more direct control over the followup rebalance schedule, by allowing it to indicate which node(s) should trigger a rejoin and when to request the subsequent rebalance

This gives us the following two new public interfaces:

NodeAssignment
package org.apache.kafka.streams.processor.assignment;

/**
 * A simple write class for the assignor to fill in the desired placement of active and standby tasks on Streams client nodes 
 */
public interface NodeAssignment {
  UUID processId();

  void assignActive(final TaskId task);

  void assignStandby(final TaskId task);

  void removeActive(final TaskId task);

  void removeStandby(final TaskId task);

  Set<TaskId> activeAssignment();

  Set<TaskId> activeStatefulAssignment();
  
  Set<TaskId> activeStatelessAssignment();

  Set<TaskId> standbyAssignment();

  /**
   * Request a followup rebalance to be triggered by one of the consumers on this node after the
   * given interval has elapsed. The {@link StreamThread} selected for this will be chosen at random.
   * <p>
   * NOTE: A best effort will be made to enforce a rebalance according to the requested schedule,
   * but there is no guarantee that another rebalance will not occur before this time has elapsed.
   * Similarly, there is no guarantee the followup rebalance will occur, and must be re-requested
   * if, for example, the requesting consumer crashes or drops out of the group. Such an event
   * is, however, guaranteed to trigger a new rebalance itself, at which point the assignor
   * can re-evaluate whether to request an additional rebalance or not.
   *
   * @param followupRebalanceDelay how long this node should wait before initiating a new rebalance
   */
  void requestFollowupRebalance(final Duration followupRebalanceDelay);

  /**
   * @return the actual deadline in objective time, using ms since the epoch, after which the
   * followup rebalance will be attempted. Equivalent to {@code 'now + followupRebalanceDelay'}
   */
  long followupRebalanceDeadline();
 }

and

NodeState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface NodeState {
    
}

The NodeState will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final new interface, the ApplicationMetadata:

ApplicationMetadata
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface ApplicationMetadata {
    /**
     * @return a map from the {@code processId} to {@link NodeState} for all Streams client nodes in this app
     */
    Map<UUID, NodeState> nodeStates();

    /**
     * Makes a remote call to fetch changelog topic end offsets and, if successful, uses the results to compute
     * task lags for each {@link NodeState}.
     *
     * @return whether the end offset fetch and lag computation was successful
     */
    boolean computeTaskLags();

    /**
     * @return a simple container class with the Streams configs relevant to assignment
     */
    AssignmentConfigs assignmentConfigs();

    /**
     * @return the set of all tasks in this topology which must be assigned to a node
     */
    Set<TaskId> allTasks();

    /**
     *
     * @return the set of stateful and changelogged tasks in this topology
     */
    Set<TaskId> statefulTasks();

    /**
     * Assign standby tasks to nodes according to the default logic.
     * <p>
     * If rack-aware client tags are configured, the rack-aware standby task assignor will be used
     *
     * @param nodeAssignments the current assignment of tasks to nodes
     */
    void defaultStandbyTaskAssignment(final Map<UUID, NodeAssignment> nodeAssignments);

    /**
     * Optimize the active task assignment for rack-awareness
     *
     * @param nodeAssignments the current assignment of tasks to nodes
     * @param tasks the set of tasks to reassign if possible. Must already be assigned to a node
     */
    void optimizeRackAwareActiveTasks(final Map<UUID, NodeAssignment> nodeAssignments, final SortedSet<TaskId> tasks);

    /**
     * Optimize the standby task assignment for rack-awareness
     *
     * @param nodeAssignments the current assignment of tasks to nodes
     */
    void optimizeRackAwareStandbyTasks(final Map<UUID, NodeAssignment> nodeAssignments); 
}

The first four methods on the ApplicationMetadata are basically just the current inputs to the #assign method. The last three methods are new APIs but pre-existing functionality, essentially presenting a clean way for users to take advantage of the current optimizations and algorithms that are utilized by the built-in assignors, so that users don't have to re-implement complex features such as rack-awareness. The #defaultStandbyTaskAssignment API will just delegate to the appropriate standby task assignor (either basic default or client tag based standby rack awareness, depending on the existence of client tags in the configuration). Similarly, the #optimizeRackAware{Active/Standby}Tasks API will just delegate to the new RackAwareTaskAssignor that is being added in KIP-925.

Last, we have the AssignmentConfigs, which are (and would remain) just a basic container class, although we will migrate from public fields to standard getters for each of the configs passed into the assignor. Going forward, when a KIP is proposed to introduce a new config intended for the assignor, it should include the appropriate getter(s) in this class as part of the accepted proposal.

AssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public int maxWarmupReplicas();
    public int numStandbyReplicas();
    public long probingRebalanceIntervalMs();
    public List<String> rackAwareAssignmentTags();
    public int trafficCost();
    public int nonOverlapCost();
}

Proposed Changes

No actual changes to functionality, mainly moving an internal config to the public part of StreamsConfig and bringing along a few currently-internal classes into the public API as some new interfaces and a new public assignment package. Code-wise the largest change is really the breaking up of the ClientState into the new NodeState and NodeAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before. 

Compatibility, Deprecation, and Migration Plan

Since this was formally an internal config and not part of the public API, we don't need to go through the usual deprecation path. See "Rejected Alternatives" for some slightly more nuanced discussion here

Test Plan

Mostly nothing to report here as there should already be tests in place for this config, however I will check the existing test coverage during implementation and fill in any gaps as needed to make sure it's possible to set either of the OOTB assignors (HA or Sticky) as well as a custom assignor.

Rejected Alternatives

  1. One obvious question here is whether we want to still deprecate the old internal config anyway, out of compassion for any who may already be using it despite it not being considered public. Personally I think this would be reasonable but don't feel strongly one way or another.
  2. Another possibility that was considered and ultimately decided against was whether to encompass the thread-level assignment in this KIP, and bring that into the public API or make it pluggable as well. We determined that this did not seem necessary to do as part of the initial KIP, especially considering the large scope we have already reached. However it's worth noting that a followup KIP that builds on the new public API(s) introduced here would become much more feasible should someone wish to customize the thread-level logic at some point in the future. If/when that question is brought up, we'll have to address a few other concerns we had that contributed to our decision to exclude this for now, such as validating the thread assignment for correctness according to the cooperative rebalancing protocol, or niche optimizations like transient standbys to avoid losing in-memory state, and some other subtle logic that currently resides in the last leg of the StreamsPartitionAssignor's algorithm that tackles the distribution of node tasks to threads




  • No labels