Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When client.tag.* dimensions are configured, Kafka Streams will read this information from the configuration and encode it into SubscriptionInfoData into SubscriptionInfoData as key-value pairs. SubscriptionInfoData will be bumped to version 10


Code Block
{SubscriptionInfoData 
=> Version "name": "SubscriptionInfoData",
  // version bump
  "validVersions": "1-10",
  "fields": [
    ...
    {
LatestSupportedVersion ProcessId PrevTasks StandbyTasks UserEndPoint TaskOffsetSums UniqueField ErrorCode ClientTags
 
   Version              "name": "clientTags",
  => Int32
   "versions": "10+",
  LatestSupportedVersion  => Int32
   ProcessId "type": "[]ClientTag"
    }
  ],
  "commonStructs": [
    {=> UUID
   PrevTasks   "name": "ClientTag",
      "versions": "1+",
    => List<TaskId>
 "fields": [
 StandbyTasks       {
     => List<TaskId>
   UserEndPoint "name": "key",
          "versions": "1+",=> Bytes
   TaskOffsetSums       "type": "bytes"
     => List<TaskOffsetSum>
   UniqueField      },
       => {Int8
   ErrorCode        "name": "value",
      => Int32
   "versions": "1+",
ClientTags           "type": "bytes"
  => List<ClientTag> // new change

Where is the struct with the following signature

Code Block
ClientTag => }Key Value
 
    Key ]
   => },Bytes
   Value ...
 => ]
}Bytes


Kafka Streams's Task Assignor will make a decision on how to distribute standby tasks over the available clients based on encoded clientTags within the subscription info and configured task.assignment.rack.awareness

Info
Standby task distribution algorithm is not specified in this KIP, but is left as an implementation detail. However, every distribution algorithm must handle gracefully when ideal standby task distribution is not possible; In that case, Kafka Streams must not fail the assignment but try to find the subsequent most optimal distribution. The ideal distribution means there is no repeated client dimension amongst clients assigned to the active task and all standby tasks.

Benefits of tags vs single rack.id configuration

Defining multiple client.tag with combination of task.assignment.rack.awareness gives more flexibility, which otherwise could have been only possible with pluggable custom logic Kafka Streams's user must provide (it is briefly described in "Rejected Alternatives" section).

For instance, if we append multiple tags to form a single rack, it may not give desired distribution to the user if the infrastructure topology is more complex. Let us consider the following example with appending multiple tags to form the single rack.

Code Block
Node-1:
rack.id: K8s_Cluster1-eu-central-1a
num.standby.replicas: 1

Node-2:
rack.id: K8s_Cluster1-eu-central-1b
num.standby.replicas: 1

Node-3:
rack.id: K8s_Cluster1-eu-central-1c
num.standby.replicas: 1

Node-4:
rack.id: K8s_Cluster2-eu-central-1a
num.standby.replicas: 1

Node-5:
rack.id: K8s_Cluster2-eu-central-1b
num.standby.replicas: 1

Node-6:
rack.id: K8s_Cluster2-eu-central-1c
num.standby.replicas: 1

In the example mentioned above, we have three AZs and two Kubernetes clusters. Our use-case is to distribute standby task in the different Kubernetes cluster and different availability zone. For instance, if the active task is in Node-1 (K8s_Cluster1-eu-central-1a), the corresponding standby task should be in either on Node-5 (K8s_Cluster2-eu-central-1b) or on Node-6 (K8s_Cluster2-eu-central-1c).

Unfortunately, without custom logic provided by the user, this would be very hard to achieve with a single rack.id configuration. Because without any input from the user, Kafka Streams might as well allocate standby task for the active task either:

  • In the same Kubernetes cluster and different AZ (Node-2, Node-3)
  • In the different Kubernetes cluster but the same AZ (Node-4)

On the other hand, with the combination of the new "client.tag.*" and "task.assignment.rack.awareness" configurations, standby task distribution algorithm will be able to figure out what will be the most optimal distribution by balancing the standby tasks over each client.tag dimension individually. And it can be achieved by simply providing necessary configurations to Kafka Streams.

Changes in HighAvailabilityTaskAssignor

Implementation of this KIP must not affect HighAvailabilityTaskAssignor in a breaking way, meaning that all the existing behavior should stay unchanged (e.g., when new configurations are not specified). Once required configurations are set, the main change should happen within the code that deals with standby task allocation, specifically:

HighAvailabilityTaskAssignor#assignStandbyReplicaTasks and HighAvailabilityTaskAssignor#assignStandbyTaskMovements

Compatibility, Deprecation, and Migration Plan

The changes proposed by this KIP shouldn't affect previously setup applications. Since we introduce new configuration options, existing ones shouldn't be affected by this change.

Rejected Alternatives

The initial idea was to introduce two configurations in StreamsConfig, rack.id, which defines the rack of the Kafka Streams instance and standby.task.assignor - class that implements RackAwareStandbyTaskAssignor interface. 

The signature of RackAwareStandbyTaskAssignor was the following:

Code Block
languagejava
public interface RackAwareStandbyTaskAssignor {

    /**
     * Computes desired standby task distribution for a different {@link StreamsConfig#RACK_ID_CONFIG}s.
     * @param sourceTasks - Source {@link TaskId}s with a corresponding rack IDs that are eligible for standby task creation.
     * @param clientRackIds - Client rack IDs that were received during assignment.
     * @return - Map of the rack IDs to set of {@link TaskId}s. The return value can be used by {@link TaskAssignor}
     *           implementation to decide if the {@link TaskId} can be assigned to a client that is located in a given rack.
     */
    Map<String, Set<TaskId>> computeStandbyTaskDistribution(final Map<TaskId, String> sourceTasks,
                                                            final Set<String> clientRackIds);
}

By injecting custom implementation of RackAwareStandbyTaskAssignor interface, users could hint Kafka Streams where to allocate certain standby tasks when more complex processing logic was required — for example, parsing rack.id, which can be a combination of multiple identifiers (as seen in the previous examples where we have cluster and zone tags).

Changes in HighAvailabilityTaskAssignor

Implementation of this KIP must not affect HighAvailabilityTaskAssignor in a breaking way, meaning that all the existing behavior should stay unchanged (e.g., when new configurations are not specified). Once required configurations are set, the main change should happen within the code that deals with standby task allocation.

Compatibility, Deprecation, and Migration Plan

The changes proposed by this KIP shouldn't affect previously setup applications. Since we introduce new configuration options, existing ones shouldn't be affected by this change.

Rejected Alternatives

  • The initial idea was to introduce two configurations in StreamsConfig, rack.id, which defines the rack of the Kafka Streams instance and standby.task.assignor - class that implements RackAwareStandbyTaskAssignor interface. 

    The signature of RackAwareStandbyTaskAssignor was the following:

    Code Block
    languagejava
    public interface RackAwareStandbyTaskAssignor {
    
        /**
         * Computes desired standby task distribution for a different {@link StreamsConfig#RACK_ID_CONFIG}s.
         * @param sourceTasks - Source {@link TaskId}s with a corresponding rack IDs that are eligible for standby task creation.
         * @param clientRackIds - Client rack IDs that were received during assignment.
         * @return - Map of the rack IDs to set of {@link TaskId}s. The return value can be used by {@link TaskAssignor}
         *           implementation to decide if the {@link TaskId} can be assigned to a client that is located in a given rack.
         */
        Map<String, Set<TaskId>> computeStandbyTaskDistribution(final Map<TaskId, String> sourceTasks,
                                                                final Set<String> clientRackIds);
    }
    

    By injecting custom implementation of RackAwareStandbyTaskAssignor interface, users could hint Kafka Streams where to allocate certain standby tasks when more complex processing logic was required — for example, parsing rack.id, which can be a combination of multiple identifiers (as seen in the previous examples where we have cluster and zone tags).

    The above mentioned idea was abandoned because it's easier and more user-friendly to let users control standby task allocation with just configuration options instead of forcing them to implement a custom interface. 

  • The second approach was to refactor TaskAssignor interface to be more user-friendly and expose it as a public interface. Users then could implement custom TaskAssignor logic and set it via StreamsConfig. With this, Kafka Streams users would effectively be in control of Active and Standby task allocation.
    Similarly to the point above, this approach also was rejected because it's more complex.
    Even though it's more-or-less agreed on the pluggable TaskAssignor interface's usefulness, it was decided to cut it out of this KIP's scope and prepare a separate one for that feature.

Client tags vs single rack.id configuration

Defining multiple client.tag with combination of task.assignment.rack.awareness gives more flexibility, which otherwise could have been only possible with pluggable custom logic Kafka Streams's user must provide (it is briefly described in "Rejected Alternatives" section).

For instance, if we append multiple tags to form a single rack, it may not give desired distribution to the user if the infrastructure topology is more complex. Let us consider the following example with appending multiple tags to form the single rack.


Code Block
Node-1:
rack.id: K8s_Cluster1-eu-central-1a
num.standby.replicas: 1

Node-2:
rack.id: K8s_Cluster1-eu-central-1b
num.standby.replicas: 1

Node-3:
rack.id: K8s_Cluster1-eu-central-1c
num.standby.replicas: 1

Node-4:
rack.id: K8s_Cluster2-eu-central-1a
num.standby.replicas: 1

Node-5:
rack.id: K8s_Cluster2-eu-central-1b
num.standby.replicas: 1

Node-6:
rack.id: K8s_Cluster2-eu-central-1c
num.standby.replicas: 1


In the example mentioned above, we have three AZs and two Kubernetes clusters. Our use-case is to distribute standby task in the different Kubernetes cluster and different availability zone. For instance, if the active task is in Node-1 (K8s_Cluster1-eu-central-1a), the corresponding standby task should be in either on Node-5 (K8s_Cluster2-eu-central-1b) or on Node-6 (K8s_Cluster2-eu-central-1c).

Unfortunately, without custom logic provided by the user, this would be very hard to achieve with a single rack.id configuration. Because without any input from the user, Kafka Streams might as well allocate standby task for the active task either:

  • In the same Kubernetes cluster and different AZ (Node-2, Node-3)
  • In the different Kubernetes cluster but the same AZ (Node-4)

On the other hand, with the combination of the new "client.tag.*" and "task.assignment.rack.awareness" configurations, standby task distribution algorithm will be able to figure out what will be the most optimal distribution by balancing the standby tasks over each client.tag dimension individually. And it can be achieved by simply providing necessary configurations to Kafka Streams

...

.