You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current stateUnder Discussion

Discussion thread: TBD

JIRA: Unable to render Jira issues macro, execution error.   Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Similarly to Kafka Broker, Kafka Streams instances can also be grouped in different racks. When Kafka Stream's standby task is properly distributed in different rack compared to corresponding active task, it provides fault tolerance and faster recovery time if the rack of the active task goes down.

As of now, there's no possibility to ensure that standby tasks are distributed in different Kafka Streams racks.

Moreover, It would be desirable to allow custom logic for the standby task distribution in cases when there's more complex identification of racks (e.g rack ID is concatenation of different clusters + availability zone (in AWS's terms)) and some additional parsing and processing is needed before deciding the desired standby task distribution over different racks.

This KIP aims to provide Kafka Streams users the ability to assign rack ID to each Kafka Streams instance and use that information to assign the standby tasks in different racks (by implementing custom logic if necessary) 

Public Interfaces

This KIP introduces new configuration options in StreamsConfig, changes in SubscriptionInfoData and introducing new public interface standby task assignment over different racks.

Changes in SubscriptionInfoData protocol

SubscriptionInfoData protocol will get a version bump as well as the new field that will encode rack ID of the Kafka Streams instance. Encoded rack ID will later be used by RackStandbyTaskAssignor implementation to distribute standby tasks over different racks (which is called by TaskAssignor implementation).

{
  "name": "SubscriptionInfoData",
  "validVersions": "1-10", // NEW: version bump to 10
  "fields": [
    ...
    {
      "name": "rackId", // NEW
      "versions": "10+",
      "type": "bytes"
    }
  ],
  ...
}


Changes in StreamsConfig

In order to give Kafka Stream's user the ability to define their desired standby task distribution, we need to add two new configuration options to StreamsConfig.

rack.id - configuration that assigns rack identification to Kafka Streams

rack.standby.task.assignor - Class that implements RackStandbyTaskAssignor interface (see below)

    public static final String RACK_ID_CONFIG = "rack.id";
    private static final String RACK_ID_DOC = "An identifier for the rack of an instance of Kafka Streams. " +
                                              "When set, the default implementation of org.apache.kafka.streams.StandbyTaskDistributor of the Kafka Streams " +
                                              "will try to distribute standby tasks in different rack compared to corresponding active task.";

    public static final String RACK_STANDBY_TASK_ASSIGNOR_CLASS_CONFIG = "rack.standby.task.assignor";
    public static final String RACK_STANDBY_TASK_ASSIGNOR_CLASS_DOC = "Class that implements RackStandbyTaskAssignor interface, " +
                                                                      "that is used when deciding on which racks standby tasks can be created.";

New RackStandbyTaskAssignor interface

In order to give users flexibility over controlling on which racks standby task can be allocated, we will introduce new public interface that can be implemented by Kafka Stream's user when default implementation is not enough.

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;

import java.util.Map;
import java.util.Set;

public interface RackStandbyTaskAss
ignor {

    /**
     * Computes desired standby task distribution for a different {@link StreamsConfig#RACK_ID_CONFIG}s.
     * @param sourceTasks - Source {@link TaskId}s with a corresponding rack IDs that are eligible for standby task creation.
     * @param clientRackIds - Client rack IDs that were received during assignment.
     * @return - Map of the rack IDs to set of {@link TaskId}s. The return value can be used by {@link TaskAssignor}
     *           implementation to decide if the {@link TaskId} can be assigned to a client that is located in a given rack.
     */
    Map<String, Set<TaskId>> assignStandbyTaskToRacks(final Map<TaskId, String> sourceTasks,
                                                      final Set<String> clientRackIds);
}


The default implementation of the interface will make sure that standby tasks for sourceTasks are assigned in different racks. Example:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import org.apache.kafka.streams.processor.TaskId;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class DefaultRackStandbyTaskAssignor implements RackStandbyTaskAssignor {

    @Override
    public Map<String, Set<TaskId>> assignStandbyTaskToRacks(final Map<TaskId, String> sourceTasks,
                                                             final Set<String> clientRackIds) {
        final Map<String, Set<TaskId>> standbyTaskDistribution = new HashMap<>();
        for (final Map.Entry<TaskId, String> sourceTaskEntry : sourceTasks.entrySet()) {
            final TaskId taskId = sourceTaskEntry.getKey();
            final String taskRackId = sourceTaskEntry.getValue();

            for (final String clientRackId : clientRackIds) {
                if (!taskRackId.equals(clientRackId)) {
                    final Set<TaskId> rackTasks = standbyTaskDistribution.getOrDefault(clientRackId, new HashSet<>());
                    rackTasks.add(taskId);
                    standbyTaskDistribution.put(clientRackId, rackTasks);
                } else {
                    standbyTaskDistribution.put(clientRackId, new HashSet<>());
                }
            }
        }

        return Collections.unmodifiableMap(standbyTaskDistribution);
    }
}

 

Proposed Changes

As described in Public Interfaces section, proposal consists of:

  1. Allowing users to specify rack ID of the Kafka Streams instance by settings rack.id streams config.
  2. Configured rack.id will be encoded in SubscriptionInfoData payload.
  3. StreamsPartitionAssignor#assign will assign received rack.id to org.apache.kafka.streams.processor.internals.assignment.ClientState and then all the client states will be passed on to TaskAssignor implementation.
  4. TaskAssignor#assign when assigning standby tasks to clients, will invoke configured RackStandbyTaskAssignor class that returns desired RackID => Set(TaskId... ) distribution..

Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

N/A

  • No labels