You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread: here 

JIRA: KAFKA-15045

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Back in KIP-441, we introduced a new task assignor which prioritized stateful task availability over stickiness, but kept the old assignment logic which prioritized stickiness called the StickyTaskAssignor. As a safeguard we added a backdoor internal config for the task assignor, the idea being that we could recommend this to users in case of critical bugs in the new assignor. However over time it has become clear that there are valid use cases for wanting the old StickyTaskAssignor over the new HighAvailabilityTaskAssignor, primarily relating to the possibility of extreme task shuffling induced by the HAAssignor in less stable environments. As the overall assignment logic is becoming more and more sophisticated, now is a good time to finally move the internal task assignor config to a full-fledged public API.

Public Interfaces

The primary change here will be introducing a new public config for the active task assignor class. Note that the standby task assignor will remain an internal implementation detail

StreamsConfig
/** {@code probing.rebalance.interval.ms} */
public static final String ACTIVE_TASK_ASSIGNOR_CLASS_CONFIG = "active.task.assignor.class";
private static final String ACTIVE_TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <@link TaskAssignor> interface that will be used to assign active tasks to client nodes during a rebalance. Note that tasks will be distributed equally among the StreamThreads within each client with maximal stickiness ie returning as many tasks as possible to their previous owner on that client. Defaults to the <@link HighAvailabilityTaskAssignor> class."

We will also remove the old internal config (which we can do without deprecation as this was an internal config and thus by definition not part of the public API

StreamsConfig
 public static class InternalConfig {
        // This is settable in the main Streams config, but it's a private API for now
        public static final String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class";
}

Lastly, we will need to move the TaskAssignor interface from o.a.k.streams.processor.internals.assignment to a public package, as well as the AssignmentConfigs container class and ClientState which both appear in the TaskAssignor#assign method. These will be placed in a new (public API) package that mirrors their old internal package, specifically org.apache.kafka.streams.processor.assignment.

TaskAssignor
package org.apache.kafka.streams.processor.assignment;

public interface TaskAssignor {
    /**
     * @return whether the generated assignment requires a followup probing rebalance to satisfy all conditions
     */
    boolean assign(Map<UUID, ClientState> clients,
                   Set<TaskId> allTaskIds,
                   Set<TaskId> statefulTaskIds,
                   AssignmentConfigs configs);
}

Next we have the AssignmentConfigs, which are (and would remain) just a basic container class, although we will migrate from public fields to standard getters for each of the configs passed into the assignor. Going forward, when a KIP is proposed to introduce a new config intended for the assignor, it should include the appropriate getter(s) in this class as part of the accepted proposal.

AssignmentConfigs
package org.apache.kafka.streams.processor.assignment;

public class AssignmentConfigs {
    public long acceptableRecoveryLag();
    public int maxWarmupReplicas();
    public int numStandbyReplicas();
    public long probingRebalanceIntervalMs();
    public List<String> rackAwareAssignmentTags();
}

And finally, the existing ClientState class which will be moved as-is to the new public package:

ClientState
package org.apache.kafka.streams.processor.assignment;

public class ClientState {
    // no changes, see link above for details on what is in this class
}

Proposed Changes

No actual changes to functionality, simply moving an internal config to the public part of StreamsConfig and moving an interface from an internal package to a public one.

Compatibility, Deprecation, and Migration Plan

Since this was formally an internal config and not part of the public API, we don't need to go through the usual deprecation path. See "Rejected Alternatives" for some discussion here

Test Plan

Mostly nothing to report here as there should already be tests in place for this config, however I will check the existing test coverage during implementation and fill in any gaps as needed to make sure it's possible to set either of the OOTB assignors (HA or Sticky) as well as a custom assignor.

Rejected Alternatives

  1. I think the only real open question here is whether we want to still deprecate the old internal config anyway, out of compassion for any who may already be using it despite it not being considered public. Personally I think this would be reasonable but don't feel strongly one way or another.
  2. One more potential alternative would be to introduce a 2nd public config for the standby task assignment; however this doesn't seem to have strong motivation for at this point in time, thus I would save it for a followup KIP if/when it becomes necessary




  • No labels