Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Draft Under Discussion

Discussion thread: here 

JIRA: KAFKA-15045

...

Back in KIP-441, we introduced a new task assignor which prioritized stateful task availability over stickiness, but kept the old assignment logic which prioritized stickiness called the StickyTaskAssignor. As a safeguard we added a backdoor internal config for the task assignor, the idea being that we could recommend this to users in case of critical bugs in the new assignor. However over time it has become clear that there are valid use cases for wanting the old StickyTaskAssignor over the new HighAvailabilityTaskAssignor, primarily relating to the possibility of extreme task shuffling induced by the HAAssignor in less stable environments. As the overall assignment logic is becoming more and more sophisticated, now is a good time to finally move the internal task assignor config to a full-fledged public API.

Public Interfaces

The original change here will be introducing a new public config for the task assignor class. Note that the thread-level assignment will remain an internal implementation detail (see "Rejected Alternatives" for further thoughts and reasoning)

There are also reasonable scenarios where users may want to plug in their own task assignor. For example, a user may want to configure their own static assignment to work around an issue in the available assignors. Or they may want to implement their own assignment logic that considers metrics they've collected, or task migration cost related to their specific processors.

Finally, there are good reasons for a user to want to extend or modify the behaviour of the Kafka Streams partition assignor beyond just changing the task assignment. For example, a user may want to implement their own initialization logic that initializes resources (much the same way the Streams Partition Assignor initializes internal topics).

With these motivations in mind, we are proposing to add a new configurable interface for plugging custom behaviour into the Streams Partition Assignor. This configuration will supplant the existing internal task assignor config. In this KIP, we will limit the scope of this interface to supplying a custom task assignor. However, this gives us a means to, in future KIPs, add further points which a user can plug in custom behaviour.

Public Interfaces

We will introduce a new config that supplies an instance of StreamsPartitionAssignorPlugin  (discussed below):

Code Block
languagejava
titleStreamsConfig
/** {@code partition.assignor.plugin.class} */
public static final String PARTITION_ASSIGNOR_PLUGIN_CLASS_CONFIG = "partition.assignor.plugin.class";
private static final String PARTITION_ASSIGNOR_PLUGIN_
Code Block
languagejava
titleStreamsConfig
/** {@code task.assignor.class} */
public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class";
private static final String TASK_ASSIGNOR_CLASS_DOC = "A taskpartition assignor class or class name implementing the <@link TaskAssignor>StreamsPartitionAssignorPlugin> interface"
        + " that will be usedDefaults to assignthe tasks<@link to client nodes during a rebalance. Note that tasks will be distributed equally among the"
DefaultPartitionAssignorPlugin> class.";

We will also remove the old internal config (which we can do without deprecation as this was an internal config and thus by definition not part of the public API, also discussed in more detail in "Rejected Alternatives")

Code Block
languagejava
titleStreamsConfig
public static class InternalConfig {
        // This +will "StreamThreadsbe withinremoved
 each client with maximal stickiness ie returning aspublic manystatic tasksfinal as possible to their previous owner on that"
        + "client node. Defaults to the <@link HighAvailabilityTaskAssignor> class.";

...

String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class";
}

StreamsPartitionAssignorPlugin  is an interface that allows for plugging behavior into the various phases of partition assignment done by StreamsPartitionAssignor . In this KIP, StreamsPartitionAssignorPlugin  only has one method, which supplies a task assignor. Note that the thread-level assignment will remain an un-configurable internal implementation detail of the partition assignor (see "Rejected Alternatives" for further thoughts and reasoning).

Code Block
languagejava
titleStreamsConfig
public staticinterface StreamsPartitionAssignorPlugin classextends InternalConfigConfigurable {
    /**
     * @return: An implementation // Thisof TaskAssignor that will be removed
used to assign tasks to nodes. If this
    public * static  final  String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class";
}     method returns Optional.empty(), the <@link HighAvailabilityTaskAssignor> is used.
     */
    default Optional<TaskAssignor> taskAssignor() {
        return Optional.empty();
    }
}

To enable users to actually plug something in via this configby implementing taskAssignor , we will need to move the TaskAssignor interface from o.a.k.streams.processor.internals.assignment to a public package, along with some of the supporting classes such as the assignment configs container class and ClientState which both appear in the TaskAssignor#assign method (although those will be heavily refactored, discussed below). All these new public APIs will be placed in a new non-internal package that mirrors their old internal package, specifically org.apache.kafka.streams.processor.assignment.

...

Code Block
languagejava
titleAssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public int maxWarmupReplicas();
    public int numStandbyReplicas();
    public long probingRebalanceIntervalMs();
    public List<String> rackAwareAssignmentTags();
    public int trafficCost();
    public  public List<String> rackAwareAssignmentTags();
    public int trafficCost();
    public int nonOverlapCost();
}int nonOverlapCost();
}


Finally, as part of this change, we're moving some of the behavior that can fail into the task assignor. In particular, we're moving the bits that compute lags for stateful tasks into the implementation of ApplicationMetadata.computeTaskLags . This means we need some way to communicate to the streams partition assignor that it should retain the same assignment and schedule a follow-up rebalance. To do this, we will add the exception type StreamsAssignorRetryableException . If the TaskAssignor  throws this exception, StreamsPartitionAssignor  catches it, does fallback assignment, and schedules a follow-up rebalance.

Proposed Changes

No actual changes to functionality, mainly moving an internal config to the public part of StreamsConfig and bringing along a few currently-internal classes into the public API as some new interfaces and a new public assignment package. Code-wise the largest change is really the breaking up of the ClientState into the new NodeState and NodeAssignment interfaces, but that will be handled transparently to the user for all existing built-in-assignors, which will continue to work the same as before. 

...

  1. One obvious question here is whether we want to still deprecate the old internal config anyway, out of compassion for any who may already be using it despite it not being considered public. Personally I think this would be reasonable but don't feel strongly one way or another.
  2. Another possibility that was considered and ultimately decided against was whether to encompass the thread-level assignment in this KIP, and bring that into the public API or and make it pluggable as well. We determined that this did not seem necessary to do as part of the initial KIP, especially considering the large scope we have already reached. However it's worth noting that a followup KIP that builds on the new public API(s) introduced here would become much more feasible should someone wish to customize the thread-level logic at some point in the future. If/when that question is brought up, we'll have to address a few other concerns we had that contributed to our decision to exclude this for now, such as validating the thread assignment for correctness according to the cooperative rebalancing protocol, or niche optimizations like transient standbys to avoid losing in-memory state, and some other subtle logic that currently resides in the last leg of the StreamsPartitionAssignor's algorithm that tackles the distribution of node tasks to threads

...