Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion. This KIP currently has mostly architectural details. Finer details will be added once we agree on the core design.

Discussion thread 

JIRA:

Motivation

Currently if the Kafka cluster loses a broker, there is no mechanism to transfer replicas from the failed node to others within the cluster other than manually triggering ReassignPartitionCommand. This is especially a problem when operating clusters with replication factor = 2. Loss of a single broker means that there is effectively no more redundancy which is not desirable. Automatic rebalancing upon failure is important to preserve redundancy within the cluster. This KIP effectively automates commands that operators typically perform upon failure. 

Public Interfaces

Proposed Changes

This problem is broken down into 2 parts. Discovery of failed brokers and mitigation. The design for each is documented separately.

Part 1: Failure Discovery

How do we discover that a broker has failed? It is particularly important to avoid false alarms i.e. if a broker is being bounced it should not count as a failure and prematurely trigger a rebalance.

We feel that the following approach is better than a periodic scan (documented in the rejected alternatives section).

Explicit notification

The KafkaController receives an onBrokerFailure callback everytime a broker dies. This is an explicit signal that we should leverage because this is also used to effect leadership changes. The proposal is to introduce a new path in ZooKeeper that is managed by only the controller. onBrokerFailure callback can generate a znode in /admin/failed_brokers. This can have a timestamp corresponding to when this callback was first received by a controller.

...

The list of dead nodes needs to be persisted to survive controller handoffs and any newly elected controller can simply pick up the persisted list in ZK and reuse the initial failure time.

Part 2: Mitigation

Once the failure detection piece is solved we need to decide how best to solve the problem of moving replicas to hosts. If a broker has been deemed to be failed, the controller should generate a list of topic-partitions that it owned. In large brokers such as the ones we run within LinkedIn, each failed broker likely has several hundred partitions. We will saturate the network if we bootstrap all these at the same time. So some throttling is needed.

...

We recommend the second approach. 

Reassign Partitions

Reassign partitions is an already available feature used in Kafka to move partition replicas between different hosts. Unless there is a strong reason not to, we should leverage this approach rather than reinvent.

...

The controller parses each of the chunks sequentially and deletes them when the chunk is processed.

Workflow

Let's walk through a sample sequence of events.

...

  • If the failed broker comes back online.
  • After the reassign tasks get scheduled, the znode can be deleted since the tasks themselves are persisted in ZK.

Controller failures

If a controller fails, the newly elected controller should rebuild it's internal state based on the nodes in /failed_brokers and the auto scheduled reassignment tasks.

...

  1. The new controller can simply compare the assignments with the failed brokers. If the assignment has a reference to the failed broker, the controller knows which failed node created it. If it finds all tasks for a failed node, then it can simply assume the assignments are sufficient and delete the znode for that broker.
  2. It should also be fine to let the controller simply reschedule the assignment tasks. Since the tasks are executed sequentially the previously executed tasks complete before these. Before scheduling any task we check to see if assignments are already satisfied. This will be true and hence the task can be skipped.

Option 2 is simpler.

Quotas

Should we throttle replica bootstrap traffic? This is tangentially related but we can actually throttle inter broker replication traffic to say "50%" of available network. Regardless of the number of failed replicas we have, we can impose a hard limitation on the total amount of network traffic caused by auto-rebalancing. Given that we plan to move only a set of partitions at one time this shouldn't really be a problem. Rather a nice to have.

Auto-Expansion

This doesn't really solve the problem of load balancing existing assignments within a cluster. It should be possible to ensure even partition distribution in the cluster whenever new nodes are added i.e. if I expand a cluster it would be nice to not move partitions onto them manually. This KIP does not attempt to solve that problem rather it makes it possible to tackle auto-expansion in the future since we could reuse the same pattern.

Configs

The following configs can be added.

...

In addition, we can also add a quota config for replica fetchers if we choose to tackle that in this KIP.

Compatibility, Deprecation, and Migration Plan

This should not impact existing users at all. Old behavior can be preserved by simply setting the failure interval to -1.

...

  • Older versions of reassign partitions may no longer work. New jars corresponding to this release will have to be used.
  • While upgrading the cluster, we should make sure not to manually generate any reassign tasks. The new controller may not parse those tasks so they need to be rescheduled.

Rejected Alternatives

Periodic Scans

In this approach, the controller can identify dead replicas by iterating through all topics periodically and identify all the failed replicas based on the information under /topics/<topic_name>. If any replica is not online, write that replica to /admin/failed_brokers. The rest is similar to the approach described above. This does not rely on receiving an onBrokerFailure callback. However, since a significant amount of controller behavior depends on getting these callbacks right, it is perhaps more apt to use that as a direct signal. There are a few other challenges to this approach:

  • Since the failure detection thread only sees a snapshot It is possible for it to think certain replicas are dead when they are in fact not. For example: a node bounced twice can be recognized as a failed node. This is possible during normal operation for e.g. rollback a broker soon after deployment.
  • There is also a delay to this approach bounded the frequency of the scan. Frequent scans can cause a high volume of ZooKeeper activity since the scan approach scales with the number of topics and not the number of replicas. For clusters with a large number of partitions (40k+ topics at LinkedIn) the controller will read several hundred znodes.

Fast detection

After some discussion we decided against this proposal because of the added complexity. This is still worth discussing though.

...