Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Info
iconfalse

isAbsolutePreferredDistribution = num.standby.replicas <= min(F(all client tags)) - 1 // - 1 is for active task

where F is a function for getting distinct count of tag values grouped by tag key. Example:


F(cluster:[K8s_Cluster1, K8s_Cluster2, K8s_Cluster3], zone:[eu-central-1a, eu-central-1b, eu-central-1c]) will return 3return [cluster: 3, zone: 3]


F(cluster:[K8s_Cluster1, K8s_Cluster2], zone:[eu-central-1a, eu-central-1b, eu-central-1c]) will return [cluster: 2, zone: 3]


F(cluster:[K8s_Cluster1, K8s_Cluster2], zone:[eu-central-1a]) will return [cluster: 2, zone: 1]

1. Formula for determining if Absolute Preferred distribution is possible

...

Kafka Streams will be eligible to perform Partially Preferred standby task distribution when at least one of the instance tag unique values is >= num.standby.replicas. So formula of determining if Partially Preferred standby task allocation is doable, will look like this: 

Info
iconfalse

isPartiallyPreferredDistribution = num.standby.replicas <= max(allInstanceTags.values().stream().map(Set::size).reduce(0, Math::max) - 1) // F(all client tags)) - 1 // - 1 is for active task
isPartiallyPreferredDistribution = num.standby.replicas <= max(F(all client tags)) - 1 // - 1 is for active task
where F
where F is a function for getting distinct count of tag values grouped by tag key. Example:


F(cluster:[K8s_Cluster1, K8s_Cluster2, K8s_Cluster3], zone:[eu-central-1a, eu-central-1b, eu-central-1c]) will return 3return [cluster: 3, zone: 3]

F(cluster:[K8s_Cluster1, K8s_Cluster2], zone:[eu-central-1a, eu-central-1b, eu-central-1c]) will return [cluster: 2, zone: 3]

F(cluster:[K8s_Cluster1, K8s_Cluster2], zone:[eu-central-1a]) will return [cluster: 2, zone: 1]

2. Formula for determining if Partially Preferred distribution is possible

...

  1. Node-5 (different cluster, different zone), LL([(Node-3, Node-6]) 
  2. Node-6 (different cluster, different zone), LL([(Node-2, Node-5])
  3. (Node-3, Node-6] different zones but in same cluster as previous standby or active task).
  4. Node-6 (different cluster, different zone), LL([(Node-2, Node-5] different zones but in same cluster as previous standby or active task). 

Where LL is a function determining the least-loaded client based on active + standby task assignment.

As previously mentioned, In both cases, Kafka Streams will prefer to distribute standby over different zones, since zone has higher precedence than cluster in the standby.replicas.awareness configuration. For instanceIn the case of scenario 1, both Node-3 and Node-6 are in different zone (eu-central-1c) compared to Node-1 (eu-central-1a) and Node-5 (eu-central-1b). As a result overall task distribution will be spanned across three availability zones - active task in Node-1 (eu-central-1a) and standby tasks in Node-5 (eu-central-1b) and Node-3 (eu-central-1c) OR Node-6 (eu-central-1c)Where LL is a function determining the least-loaded client based on active + standby task assignment.


The Least Preferred Standby Task Distribution

...

With the setup presented above, we can't distribute second standby task in different zone as requested by standby.replicas.awareness configuration, because there're only two distinct zones available (and one will be reserved for active task). In this case Kafka Streams will default to using the Least Loaded client to allocate remaining standby task.

...

  • The initial idea was to introduce two configurations in StreamsConfig, rack.id, which defines the rack of the Kafka Streams instance and standby.task.assignor - class that implements RackAwareStandbyTaskAssignor interface. 

    The signature of RackAwareStandbyTaskAssignor was the following:

    Code Block
    languagejava
    public interface RackAwareStandbyTaskAssignor {
    
        /**
         * Computes desired standby task distribution for a different {@link StreamsConfig#RACK_ID_CONFIG}s.
         * @param sourceTasks - Source {@link TaskId}s with a corresponding rack IDs that are eligible for standby task creation.
         * @param clientRackIds - Client rack IDs that were received during assignment.
         * @return - Map of the rack IDs to set of {@link TaskId}s. The return value can be used by {@link TaskAssignor}
         *           implementation to decide if the {@link TaskId} can be assigned to a client that is located in a given rack.
         */
        Map<String, Set<TaskId>> computeStandbyTaskDistribution(final Map<TaskId, String> sourceTasks,
                                                                final Set<String> clientRackIds);
    }
    
    

    By injecting custom implementation of RackAwareStandbyTaskAssignor interface, users could hint Kafka Streams where to allocate certain standby tasks when more complex processing logic was required — for example, parsing rack.id, which can be a combination of multiple identifiers (as seen in the previous examples where we have cluster and zone tags).

    The above mentioned idea was abandoned because it's easier and more user-friendly to let users control standby task allocation with just configuration options instead of forcing them to implement a custom interface. 

  • The second approach was to refactor TaskAssignor interface to be more user-friendly and expose it as a public interface. Users then could implement custom TaskAssignor logic and set it via StreamsConfig. With this, Kafka Streams users would effectively be in control of Active and Standby task allocation.
    Similarly to the point above, this approach also was rejected because it's more complex.
    Even though it's more-or-less agreed on the pluggable TaskAssignor interface's usefulness, it was decided to cut it out of this KIP's scope and prepare a separate one for that feature.