Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

Discussion thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10686
 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6718

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Similarly to Kafka BrokerSimilar to many distributed systems, Kafka Streams instances can also be grouped in different racks. When Kafka Stream's standby task is properly distributed in different rack compared to the corresponding active task, it provides fault tolerance and faster recovery time if the rack of the active task rack goes down.

As of now, there's no possibility to ensure that standby tasks are distributed in different Kafka Streams racks.

Moreover, It would be desirable to allow custom logic for the standby task distribution in cases when there's more complex identification of racks (e.g rack ID is concatenation of different clusters + availability zone (in AWS's terms)) and some additional parsing and processing is needed before deciding the desired standby task distribution over different racks.

This KIP aims to provide Kafka Streams users the ability to assign rack ID to each Kafka Streams instance and use that information to assign the standby tasks in different racks (by implementing custom logic if necessary) 

Public Interfaces

This KIP introduces new configuration options in StreamsConfig, changes in SubscriptionInfoData and introducing new public interface standby task assignment over different racks.

Changes in SubscriptionInfoData protocol

SubscriptionInfoData protocol will get a version bump as well as the new field that will encode rack ID of the Kafka Streams instance. Encoded rack ID will later be used by RackAwareStandbyTaskAssignor implementation to distribute standby tasks over different racks (which is called by TaskAssignor implementation).

Code Block
{
  "name": "SubscriptionInfoData",
  "validVersions": "1-10", // NEW: version bump to 10
  "fields": [
    ...
    {
      "name": "rackId", // NEW
      "versions": "10+",
      "type": "bytes"
    }
  ],
  ...
}

Changes in StreamsConfig

In order to give Kafka Stream's user the ability to define their desired standby task distribution, we need to add two new configuration options to StreamsConfig.

rack.id - configuration that assigns rack identification to Kafka Streams

rack.standby.task.assignor - Class that implements RackAwareStandbyTaskAssignor interface (see below)

Different distributed systems implement rack awareness. Below we will explore how other systems implement rack awareness and what kind of guarantees they aim to provide. 

Elasticsearch

Rack awareness in Elasticsearch works by defining a list of tags/attributes, called awareness attributes to each node in the cluster. When Elasticsearch knows the nodes' rack specification, it distributes the primary shard and its replica shards to minimize the risk of losing all shard copies in the event of a failure. Besides defining an arbitrary list of tags/attributes for each node, Elasticsearch provides a means of setting which tags/attributes it must consider when balancing the shards across the racks.

Example:

Code Block
languageyml
node.attr.rack_id: rack_one
node.attr.cluster_id: cluster_one
cluster.routing.allocation.awareness.attributes: rack_id,cluster_id

Besides, Elasticsearch provides "Forced awareness" configuration, a safeguard to prevent racks from being overloaded in case of a failure. By default, if one location fails, Elasticsearch assigns all of the missing replica shards to the remaining locations. In the case of limited resources, a single rack might be unable to host all of the shards. cluster.routing.allocation.awareness.attributes configuration can be used to prevent Elasticsearch from allocating replicas until nodes are available in another location.

Example:

Code Block
languageyml
cluster.routing.allocation.awareness.attributes: rack_id
cluster.routing.allocation.awareness.force.zone.values: zone1,zone2

In the example above, if we start two nodes with node.attr.zone set to zone1 and create an index with five shards and one replica, Elasticsearch creates the index and allocates the five primary shards but no replicas. Replicas are only allocated once nodes with node.attr.zone set to zone2 is available. 

Hadoop

In the case of Hadoop, rack is a physical collection of nodes in the cluster, and it's the mean of fault tolerance, as well as optimization. The idea in Hadoop is that read/write operation in the same rack is cheaper compared to when the process spans across multiple racks. With the rack information, Namenode chooses the closest Datanode while performing the read/write operation, which reduces network traffic.

A rack can have multiple data nodes storing the file blocks and replicas. Hadoop cluster with a replication factor of 3 will automatically write a particular file block in 2 different Datanodes in the same rack, plus in a different rack for redundancy. 

Rack awareness in the Hadoop cluster has to comply with the following policies:

  • There should not be more than 1 replica on the same Datanode.
  • More than 2 replica's of a single block is not allowed on the same rack.
  • The number of racks used inside a Hadoop cluster must be smaller than the number of replicas.

Redis

Rack "awareness" in Redis is called "Rack-zone awareness" and it's very similar to Kafka Broker's rack awareness. Rack-zone awareness only works in a clustered Redis deployment, and it's an enterprise feature.

Rack-zone awareness works by assigning a rack-zone ID to each node. This ID is used to map the node to a physical rack or logical zone (AWS availability zone, for instance). When appropriate IDs are set, cluster ensures that leader shards, corresponding replica shards, and associated endpoints are placed on nodes in different racks/zones.

In the event of a rack failure, the remaining racks' replicas and endpoints will be promoted. This approach ensures high availability when a rack or zone fails.

Proposed Changes

This KIP proposes to implement similar semantics in Kafka Streams as in Elasticsearch. Rack awareness semantics in Elasticsearch seems the most flexible and can cover more complex use-cases, such as multi-dimensional rack awareness. To achieve this, KIP proposes to introduce a new config prefix in StreamsConfig that will be used to retrieve user-defined instance tags of the Kafka Streams

Code Block
languageyml
/**
 * Prefix used to add arbitrary tags to a Kafka Stream's instance as key-value pairs.
 * Example:
 * instance.tag.zone=zone1
 * instance.tag.cluster=cluster1
 */
@SuppressWarnings("WeakerAccess")
public static final String INSTANCE_TAG_PREFIX = "instance.tag.";

We will also add a new configuration option in StreamsConfig, which will be the means of setting which tags Kafka Streams must take into account when balancing the standby tasks across the racks.

Code Block
languageyml
public static final String STANDBY_TASK_ASSIGNMENT_AWARENESS_CONFIG = "standby.task.assignment.awareness";
public static final String STANDBY_TASK_ASSIGNMENT_AWARENESS_DOC = "List of instance tag keys used to distribute standby replicas across Kafka Streams instances.
Code Block
languagejava
    public static final String RACK_ID_CONFIG = "rack.id";
    private static final String RACK_ID_DOC = "An identifier for the rack of an instance of Kafka Streams. " +
                                              "When set, the default implementation of org.apache.kafka.streams.RackAwareStandbyTaskAssignor of the Kafka Streams " +
          " Tag keys must be set in an order of precedence." +                         "will try to distribute standby tasks in different rack compared to corresponding active task.";

    public static final String RACK_STANDBY_TASK_ASSIGNOR_CLASS_CONFIG = "rack.standby.task.assignor";
    public static final String RACK_STANDBY_TASK_ASSIGNOR_CLASS_DOC = "Class that implements RackAwareStandbyTaskAssignor interface, " +
   
                                                                   "that isWhen usedconfigures, whenKafka decidingStreams onwill whichmake racksa standbybest taskseffort canto be created.";

New RackAwareStandbyTaskAssignor interface

In order to give users flexibility over controlling on which racks standby task can be allocated, we will introduce new public interface that can be implemented by Kafka Stream's user when default implementation is not enough.

Code Block
languagejava
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;

import java.util.Map;
import java.util.Set;

public interface RackAwareStandbyTaskAssignor {

    /**
     * Computes desired standby task distribution for a different {@link StreamsConfig#RACK_ID_CONFIG}s.
     * @param sourceTasks - Source {@link TaskId}s with a corresponding rack IDs that are eligible for standby task creation.
     * @param clientRackIds - Client rack IDs that were received during assignment.
     * @return - Map of the rack IDs to set of {@link TaskId}s. The return value can be used by {@link TaskAssignor}
     *           implementation to decide if the {@link TaskId} can be assigned to a client that is located in a given rack.
     */
    Map<String, Set<TaskId>> assignStandbyTaskToRacks(final Map<TaskId, String> sourceTasks,
                                                      final Set<String> clientRackIds);
}

The default implementation of the interface will make sure that standby tasks for sourceTasks are assigned in different racks. Example:

Code Block
languagejava
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import org.apache.kafka.streams.processor.TaskId;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class DefaultRackAwareStandbyTaskAssignor implements RackAwareStandbyTaskAssignor {

    @Override
    public Map<String, Set<TaskId>> assignStandbyTaskToRacks(final Map<TaskId, String> sourceTasks,
                                                             final Set<String> clientRackIds) {
        final Map<String, Set<TaskId>> standbyTaskDistribution = new HashMap<>();
        for (final Map.Entry<TaskId, String> sourceTaskEntry : sourceTasks.entrySet()) {
            final TaskId taskId = sourceTaskEntry.getKey();
            final String taskRackId = sourceTaskEntry.getValue();

            for (final String clientRackId : clientRackIds) {
                if (!taskRackId.equals(clientRackId)) {
                    final Set<TaskId> rackTasks = standbyTaskDistribution.getOrDefault(clientRackId, new HashSet<>());
                    rackTasks.add(taskId);
                    standbyTaskDistribution.put(clientRackId, rackTasks);
                } else {
                    standbyTaskDistribution.put(clientRackId, new HashSet<>());
                }
            }
        }

        return Collections.unmodifiableMap(standbyTaskDistribution);
    }
}

 

Proposed Changes

As described in Public Interfaces section, proposal consists of:

...

distribute" +
                                                                   " the standby tasks over each instance tag dimension.";

Example of standby task allocation

Absolute Preferred Standby Task Distribution

Suppose we have the following infrastructure setup: Three Kubernetes Clusters, lets call them K8s_Cluster1, K8s_Cluster2 and K8s_Cluster3  and each Kubernetes cluster is spanned across three availability zones:  eu-central-1a, eu-central-1b, eu-central-1c. 

Our use-case is to have distribution of the standby tasks across different Kubernetes clusters, as well as AZs so we can be Kubernetes cluster and AZ failure tolerant.

With the new configuration options presented in this KIP, we will have the following:

Info
iconfalse

Node-1:
instance.tag.cluster: K8s_Cluster1
instance.tag.zone: eu-central-1a
standby.task.assignment.awareness: zone,cluster
num.standby.replicas2

Node-2:
instance.tag.clusterK8s_Cluster1
instance.tag.zoneeu-central-1b
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-3:
instance.tag.clusterK8s_Cluster1
instance.tag.zoneeu-central-1c
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-4:
instance.tag.clusterK8s_Cluster2
instance.tag.zoneeu-central-1a
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-5:
instance.tag.clusterK8s_Cluster2
instance.tag.zoneeu-central-1b
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-6:
instance.tag.clusterK8s_Cluster2
instance.tag.zoneeu-central-1c
standby.task.assignment.awarenesszone,cluster
num.standby.replicas: 2

Node-7:
instance.tag.clusterK8s_Cluster3
instance.tag.zoneeu-central-1a
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-8:
instance.tag.clusterK8s_Cluster3
instance.tag.zoneeu-central-1b
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-9:
instance.tag.clusterK8s_Cluster3
instance.tag.zoneeu-central-1c
standby.task.assignment.awarenesszone,cluster
num.standby.replicas: 2


With the infrastructure topology and configuration presented above, we can easily achieve absolute preferred standby task distribution, because total tasks that we have to allocate for any given stateful task is three (1 active task + 2 standby task) and unique values that we have for each tag is also three. So formula for determining if absolute preferred standby task allocation is achievable can be something like this:

Info
iconfalse

num.standby.replicas <= (allInstanceTags.values().stream().map(Set::size).reduce(0, Math::min) - 1) // -1 is for active task

Where allInstanceTags is a map of all client instance tags and has a signature of Map<String, Set<String>>

Partially Preferred Standby Task Distribution

Suppose we have the following infrastructure setup: Two Kubernetes Clusters, lets call them K8s_Cluster1, K8s_Cluster2  and each Kubernetes cluster is spanned across three availability zones:  eu-central-1a, eu-central-1b, eu-central-1c. 

Our use-case is to have distribution of the standby tasks across different Kubernetes clusters, as well as AZs so we can be Kubernetes cluster and AZ failure tolerant.

With the new configuration options presented in this KIP, we will have the following:

Info
iconfalse

Node-1:
instance.tag.cluster: K8s_Cluster1
instance.tag.zone: eu-central-1a
standby.task.assignment.awareness: zone,cluster
num.standby.replicas2

Node-2:
instance.tag.clusterK8s_Cluster1
instance.tag.zoneeu-central-1b
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-3:
instance.tag.clusterK8s_Cluster1
instance.tag.zoneeu-central-1c
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-4:
instance.tag.clusterK8s_Cluster2
instance.tag.zoneeu-central-1a
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-5:
instance.tag.clusterK8s_Cluster2
instance.tag.zoneeu-central-1b
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-6:
instance.tag.clusterK8s_Cluster2
instance.tag.zoneeu-central-1c
standby.task.assignment.awarenesszone,cluster
num.standby.replicas: 2


With the infrastructure topology presented above, we can't achieve absolute preferred standby task distribution, because we only have two unique cluster tags in the topology. Absolute preferred distribution could have been achieved if we had third Kubernetes cluster (K8s_Cluster3) spanned across three AZs (as seen in the pervious section).

Even though we can't achieve absolute preferred standby task distribution, with the configuration presented above, we can still achieve partially preferred distribution.

Partially Preferred distribution can be achieved by distributing standby tasks over different zone tags. Zone has higher precedence than cluster, in the standby.task.assignment.awareness configuration, therefore, Kafka Streams would prefer to distribute standby tasks over the different zone, rather than different cluster num.standby.replicas <= allInstanceTags.values().stream().map(Set::size).reduce(0, Math::min) returns false. Kafka Streams will be eligible to perform Partially Preferred standby task distribution when at least one of the instance tag unique values is >= num.standby.replicas. So formula of determining if Partially Preferred standby task allocation is doable, will look like this: 

Info
iconfalse

num.standby.replicas <= (allInstanceTags.values().stream().map(Set::size).reduce(0, Math::max) - 1) // -1 is for active task

Where allInstanceTags is a map of all client instance tags and has a signature of Map<String, Set<String>>

Assuming active stateful task 0_0 is in Node-1, Partially Preferred standby task distribution will look like this:

  1. Node-5 (different cluster, different zone), LL([(Node-3, Node-6])
  2. Node-6 (different cluster, different zone), LL([(Node-2, Node-5])

Where LL is a function determining the least-loaded client based on active + standby task assignment.

The Least Preferred Standby Task Distribution

The Last Preferred standby task distribution is eligible when none of the Absolute Preferred and Partially Preferred standby task distribution can be satisfied.

Suppose we have the following infrastructure setup: Two Kubernetes Clusters, lets call them K8s_Cluster1, K8s_Cluster2  and each Kubernetes cluster is spanned across three availability zones:  eu-central-1a, eu-central-1b, eu-central-1c. 

With the new configuration options presented in this KIP, we will have the following:

Info
iconfalse

Node-1:
instance.tag.cluster: K8s_Cluster1
instance.tag.zone: eu-central-1a
standby.task.assignment.awareness: zone,cluster
num.standby.replicas2

Node-2:
instance.tag.clusterK8s_Cluster1
instance.tag.zoneeu-central-1b
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-3:
instance.tag.clusterK8s_Cluster2
instance.tag.zoneeu-central-1a
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

Node-4:
instance.tag.clusterK8s_Cluster2
instance.tag.zoneeu-central-1b
standby.task.assignment.awarenesszone,cluster
num.standby.replicas2

With the setup presented above, we can't distribute second standby task in different zone as requested by standby.task.assignment.awareness configuration, because there're only two distinct zones available (and one will be reserved for active task). In this case Kafka Streams will default to using Lease Loaded client to allocate remaining standby task.

Assuming active stateful task 0_0 is in Node-1, The Least Preferred standby task distribution will look like this:

  1. Node-4 (different cluster, different zone), LL([(Node-2, Node-3])

Where LL is a function determining the least-loaded client based on active + standby task assignment

...

.


Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

N/A