Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languageyml
/**
 * Prefix used to add arbitrary tags to a Kafka Stream's instance as key-value pairs.
 * Example:
 * instanceclient.tag.zone=zone1
 * instanceclient.tag.cluster=cluster1
 */
@SuppressWarnings("WeakerAccess")
public static final String INSTANCECLIENT_TAG_PREFIX = "instanceclient.tag.";

We will also add a new configuration option in StreamsConfig, which will be the means of setting which tags Kafka Streams must take into account when balancing the standby tasks across the racks.

Code Block
languageyml
public static final String STANDBY_TASK_ASSIGNMENTREPLICA_AWARENESS_CONFIG = "standby.task.assignmentreplicas.awareness";
public static final String STANDBY_TASK_ASSIGNMENTREPLICA_AWARENESS_DOC = "List of instance tag keys used to distribute standby replicas across Kafka Streams instances." +
                                                                   " Tag keys must be set in an order of precedence." +                                                                   
                                                                   " When configures, Kafka Streams will make a best effort to distribute" +
                                                                   " the standby tasks over each instance tag dimension.";

...

With the new configuration options presented in this KIP, we will have the following:

Info
iconfalse

Node-1:
instanceclient.tag.cluster: K8s_Cluster1
instanceclient.tag.zone: eu-central-1a
standby.task.assignmentreplicas.awareness: zone,cluster
num.standby.replicas2

Node-2:
instanceclient.tag.clusterK8s_Cluster1
instanceclient.tag.zoneeu-central-1b
standby.task.assignmentreplicas.awarenesszone,cluster
num.standby.replicas2

Node-3:
instanceclient.tag.clusterK8s_Cluster1
instanceclient.tag.zoneeu-central-1c
standby.taskreplicas.assignment.awarenesszone,cluster
num.standby.replicas2

Node-4:
instanceclient.tag.clusterK8s_Cluster2
instanceclient.tag.zoneeu-central-1a
standby.taskreplicas.assignment.awarenesszone,cluster
num.standby.replicas2

Node-5:
instanceclient.tag.clusterK8s_Cluster2
instanceclient.tag.zoneeu-central-1b
standby.taskreplicas.assignment.awarenesszone,cluster
num.standby.replicas2

Node-6:
instanceclient.tag.clusterK8s_Cluster2
instanceclient.tag.zoneeu-central-1c
standby.task.assignmentreplicas.awarenesszone,cluster
num.standby.replicas: 2

Node-7:
instanceclient.tag.clusterK8s_Cluster3
instanceclient.tag.zoneeu-central-1a
standby.taskreplicas.assignment.awarenesszone,cluster
num.standby.replicas2

Node-8:
instanceclient.tag.clusterK8s_Cluster3
instanceclient.tag.zoneeu-central-1b
standby.task.assignmentreplicas.awarenesszone,cluster
num.standby.replicas2

Node-9:
instanceclient.tag.clusterK8s_Cluster3
instanceclient.tag.zoneeu-central-1c
standby.task.assignmentreplicas.awarenesszone,cluster
num.standby.replicas: 2

...

With the infrastructure topology and configuration presented above, we can easily achieve Absolute Preferred standby task distribution. Absolute Preferred standby task distribution is achievable because we have to allocate three tasks for any given stateful task (1 active task + 2 standby task), and it corresponds to unique values for each tag. So the formula for determining if Absolute Preferred standby task allocation is achievable can be something like this :

Info
iconfalse

isAbsolutePreferredDistribution = num.standby.replicas <= min(allInstanceTags.values().stream().map(Set::size).reduce(0, Math::min) - 1) F(all client tags)) - 1 // - 1 is for active task

1. Formula for determining if Absolute Preferred distribution is possible

Partially Preferred Standby Task Distribution

...


where F is a function for getting distinct count of tag values grouped by tag key. Example:


F(cluster:[K8s_Cluster1, K8s_Cluster2, K8s_Cluster3], zone:[

eu-central-1a, eu-central-1b, eu-central-1c

...

]) will return 3

F(cluster:[K8s_Cluster1, K8s_Cluster2], zone:[eu-central-1a, eu-central-1b, eu-central-1c]) will return 2

F(cluster:[K8s_Cluster1, K8s_Cluster2], zone:[eu-central-1a]) will return 1

1. Formula for determining if Absolute Preferred distribution is possible

Assuming active stateful task 0_0 is in Node-1, Absolute Preferred standby task distribution might look like this:

  1. Node-5 (different cluster, different zone), Node-9 (different cluster, different zone)
  2. Node-6 (different cluster, different zone), Node-8 (different cluster, different zone)

Partially Preferred Standby Task Distribution

Suppose we have the following infrastructure setup: Two Kubernetes Clusters, let us call them K8s_Cluster1, K8s_Cluster2, and each Kubernetes cluster spanned across three availability zones: eu-central-1a, eu-central-1b, eu-central-1c. 

Our use-case is similar to the previous section - to have a distribution of the standby tasks across different Kubernetes clusters and AZs so we can be Kubernetes cluster and AZ failure tolerant.

With the new configuration options presented in this KIP, we will have the following:

Info
iconfalse

Node-1:
client.tag.cluster: K8s_Cluster1
client.tag.zone: eu-central-1a
standby.replicas.awareness:

Our use-case is similar to the previous section - to have a distribution of the standby tasks across different Kubernetes clusters and AZs so we can be Kubernetes cluster and AZ failure tolerant.

With the new configuration options presented in this KIP, we will have the following:

Info
iconfalse

Node-1:
instance.tag.cluster: K8s_Cluster1
instance.tag.zone: eu-central-1a
standby.task.assignment.awareness: zone,cluster
num.standby.replicas2

Node-2:
instance.tag.clusterK8s_Cluster1
instance.tag.zoneeu-central-1b
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-3:
instance.tag.clusterK8s_Cluster1
instance.tag.zoneeu-central-1c
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-4:
instance.tag.clusterK8s_Cluster2
instance.tag.zoneeu-central-1a
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-5:
instance.tag.clusterK8s_Cluster2
instance.tag.zoneeu-central-1b
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-6:
instance.tag.clusterK8s_Cluster2
instance.tag.zoneeu-central-1c
standby.task.assignment.awarenesszone,cluster
num.standby.replicas: 2

With the infrastructure topology presented above, we can't achieve Absolute Preferred standby task distribution because we only have two unique cluster tags in the topology. The Absolute Preferred distribution could have been achieved with a third Kubernetes cluster (K8s_Cluster3) spanned across three AZs (as seen in the previous section).

Even though we can't achieve Absolute Preferred standby task distribution with the configuration presented above, we can still achieve Partially Preferred distribution.

Partially Preferred distribution can be achieved by distributing standby tasks over different zone tags. Zone has higher precedence than cluster in the standby.task.assignment.awareness configuration. Therefore, Kafka Streams would prefer to distribute standby tasks over the different zone, rather than different cluster when Absolute Preferred distribution check formula [1] returns false.

Kafka Streams will be eligible to perform Partially Preferred standby task distribution when at least one of the instance tag unique values is >= num.standby.replicas. So formula of determining if Partially Preferred standby task allocation is doable, will look like this: 

 2

Node-2:
client.tag.clusterK8s_Cluster1
client.tag.zoneeu-central-1b
standby.replicas.awarenesszone,cluster
num.standby.replicas2

Node-3:
client.tag.clusterK8s_Cluster1
client.tag.zoneeu-central-1c
standby.replicas.awarenesszone,cluster
num.standby.replicas2

Node-4:
client.tag.clusterK8s_Cluster2
client.tag.zoneeu-central-1a
standby.replicas.awarenesszone,cluster
num.standby.replicas2

Node-5:
client.tag.clusterK8s_Cluster2
client.tag.zoneeu-central-1b
standby.replicas.awarenesszone,cluster
num.standby.replicas2

Node-6:
client.tag.clusterK8s_Cluster2
client.tag.zoneeu-central-1c
standby.replicas.awarenesszone,cluster
num.standby.replicas: 2


With the infrastructure topology presented above, we can't achieve Absolute Preferred standby task distribution because we only have two unique cluster tags in the topology. The Absolute Preferred distribution could have been achieved with a third Kubernetes cluster (K8s_Cluster3) spanned across three AZs (as seen in the previous section).

Even though we can't achieve Absolute Preferred standby task distribution with the configuration presented above, we can still achieve Partially Preferred distribution.

Partially Preferred distribution can be achieved by distributing standby tasks over different zone tags. Zone has higher precedence than cluster in the standby.replicas.awareness configuration. Therefore, Kafka Streams would prefer to distribute standby tasks over the different zone, rather than different cluster when Absolute Preferred distribution check formula [1] returns false.


Kafka Streams will be eligible to perform Partially Preferred standby task distribution when at least one of the instance tag unique values is >= num.standby.replicas. So formula of determining if Partially Preferred standby task allocation is doable, will look like this: 

Info
iconfalse

num.standby.replicas <= (allInstanceTags.values().stream().map(Set::size).reduce(0, Math::max) - 1) // -1 is for active task


isPartiallyPreferredDistribution = num.standby.replicas <= max(F(all client tags)) - 1 // - 1 is for active task

where F is a function for getting distinct count of tag values grouped by tag key. Example:


F(cluster:[K8s_Cluster1, K8s_Cluster2, K8s_Cluster3], zone:[eu-central-1a, eu-central-1b, eu-central-1c]) will return 3

F(cluster:[K8s_Cluster1, K8s_Cluster2], zone:[eu-central-1a, eu-central-1b, eu-central-1c]) will return 2

F(cluster:[K8s_Cluster1, K8s_Cluster2], zone:[eu-central-1a]) will return 1

Info
iconfalse

num.standby.replicas <= (allInstanceTags.values().stream().map(Set::size).reduce(0, Math::max) - 1) // -1 is for active task

2. Formula for determining if Partially Preferred distribution is possible

...

  1. Node-5 (different cluster, different zone), LL([(Node-3, Node-6]) 
  2. Node-6 (different cluster, different zone), LL([(Node-2, Node-5])

...

With the new configuration options presented in this KIP, we will have the following:

Info
iconfalse

Node-1:
instanceclient.tag.cluster: K8s_Cluster1
instanceclient.tag.zone: eu-central-1a
standby.task.assignmentreplicas.awareness: zone,cluster
num.standby.replicas2

Node-2:
instanceclient.tag.clusterK8s_Cluster1
instanceclient.tag.zoneeu-central-1b
standby.task.assignmentreplicas.awarenesszone,cluster
num.standby.replicas2

Node-3:
instanceclient.tag.clusterK8s_Cluster2
instanceclient.tag.zoneeu-central-1a
standby.task.assignmentreplicas.awarenesszone,cluster
num.standby.replicas2

Node-4:
instanceclient.tag.clusterK8s_Cluster2
instanceclient.tag.zoneeu-central-1b
standby.task.assignmentreplicas.awarenesszone,cluster
num.standby.replicas2

With the setup presented above, we can't distribute second standby task in different zone as requested by standby.task.assignmentreplicas.awareness configuration, because there're only two distinct zones available (and one will be reserved for active task). In this case Kafka Streams will default to using the Least Loaded client to allocate remaining standby task.

...