Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Similarly to Kafka Broker, Kafka Streams instances can also be grouped in different racks. When Kafka Stream's standby task is properly distributed in different rack compared to corresponding active task, it provides fault tolerance and faster recovery time if the rack of the active task goes down.

...

This KIP aims to provide Kafka Streams users the ability to assign rack ID to each Kafka Streams instance and use that information to assign the standby tasks in different racks (by implementing custom logic if necessary) 

Public Interfaces

This KIP introduces new configuration options in StreamsConfig, changes in SubscriptionInfoData and introducing new public interface standby task assignment over different racks.

Changes in SubscriptionInfoData protocol

SubscriptionInfoData protocol will get a version bump as well as the new field that will encode rack ID of the Kafka Streams instance. Encoded rack ID will later be used by RackStandbyTaskAssignor implementation to distribute standby tasks over different racks (which is called by TaskAssignor implementation).

Changes in StreamsConfig

In order to give Kafka Stream's user the ability to define their desired standby task distribution, we need to add two new configuration options to StreamsConfig.

...

Code Block
languagejava
    public static final String RACK_ID_CONFIG = "rack.id";
    private static final String RACK_ID_DOC = "An identifier for the rack of an instance of Kafka Streams. " +
                                              "When set, the default implementation of org.apache.kafka.streams.StandbyTaskDistributor of the Kafka Streams " +
                                              "will try to distribute standby tasks in different rack compared to corresponding active task.";

    public static final String RACK_STANDBY_TASK_ASSIGNOR_CLASS_CONFIG = "rack.standby.task.assignor";
    public static final String RACK_STANDBY_TASK_ASSIGNOR_CLASS_DOC = "Class that implements RackStandbyTaskAssignor interface, " +
                                                                      "that is used when deciding on which racks standby tasks can be created.";

New RackStandbyTaskAssignor interface

In order to give users flexibility over controlling on which racks standby task can be allocated, we will introduce new public interface that can be implemented by Kafka Stream's user when default implementation is not enough.

...

Code Block
languagejava
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import org.apache.kafka.streams.processor.TaskId;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class DefaultRackStandbyTaskAssignor implements RackStandbyTaskAssignor {

    @Override
    public Map<String, Set<TaskId>> assignStandbyTaskToRacks(final Map<TaskId, String> sourceTasks,
                                                             final Set<String> clientRackIds) {
        final Map<String, Set<TaskId>> standbyTaskDistribution = new HashMap<>();
        for (final Map.Entry<TaskId, String> sourceTaskEntry : sourceTasks.entrySet()) {
            final TaskId taskId = sourceTaskEntry.getKey();
            final String taskRackId = sourceTaskEntry.getValue();

            for (final String clientRackId : clientRackIds) {
                if (!taskRackId.equals(clientRackId)) {
                    final Set<TaskId> rackTasks = standbyTaskDistribution.getOrDefault(clientRackId, new HashSet<>());
                    rackTasks.add(taskId);
                    standbyTaskDistribution.put(clientRackId, rackTasks);
                } else {
                    standbyTaskDistribution.put(clientRackId, new HashSet<>());
                }
            }
        }

        return Collections.unmodifiableMap(standbyTaskDistribution);
    }
}

 

Proposed Changes

As described in Public Interfaces section, proposal consists of:

  1. Allowing users to specify rack ID of the Kafka Streams instance by settings rack.id streams config.
  2. Configured rack.id will be encoded in SubscriptionInfoData payload.
  3. StreamsPartitionAssignor#assign will assign received rack.id to org.apache.kafka.streams.processor.internals.assignment.ClientState and then all the client states will be passed on to TaskAssignor implementation.
  4. TaskAssignor#assign when assigning standby tasks to clients, will invoke configured RackStandbyTaskAssignor class that returns desired RackID => Set(TaskId... ) distribution..

Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

N/A