You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussion

Discussion thread:

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When Kafka is used to build data pipeline in mission critical business scenarios, availability and throughput are the most important operational goals that need to be maintained in presence of transience local failure. In one typical situation, some partitions have long write latency caused by extremely high disk utilization; since all partitions share the same buffer under the current producer thread model, the buffer will be filled up quickly and eventually the good partitions are impacted as well. The cluster level success rate and timeout ratio will degrade until the local infrastructure issue is resolved.

One way to mitigate this issue is to add client side mechanism to short circuit problematic partitions during transient failure. Similar approach is applied in other distributed systems and RPC frameworks.

Public Interfaces

New producer config option is added:

  • key.circuit.breaker: class name of the break class

Add a PartitionCircuitBreaker class

public interface PartitionCircuitBreaker {
/**
 * Configure this class.
 *
 * @param configs configs in key/value pairs
 */
void configure(Map<String, ?> configs);

/**
 * Callback of failed operation
 * @param topicPartition
 * @param cluster
 */
void onError(TopicPartition topicPartition, Cluster cluster);

/**
 * Callback of successful operation
 * @param topicPartition
 * @param cluster
 */
void onSuccess(TopicPartition topicPartition, Cluster cluster);

/** 
 * Return muted partitions
 * @return
 */
public Map<TopicPartition> partitions();

Implementation of the default rate based circuit breaker:

/** 
 * Implementation of failure rate based circuit breaker
 */
public class FailureRatePartitionsCircuitBreaker implements  PartitionsCircuitBreaker {
/**
 * Minimal message count to enable the breaker
 */
private final int triggerMessageCount;

/**
 * Failure ratio percentage that triggers the circuit breaker
 */
private final int triggerFailurePerc;

/**
 * Retry time after a partition is muted
 */
private final int muteRetryInterval;

/**
 * Total mute period
 */
private final long mutePeriod;

/**
 * Maximum percentage of muted partition
 */
private final int muteMaxPartitionPerc;
}

A new class PartitionsCircuitBreaker will be added and included as a member of org.apache.kafka.common.Cluster. When a breaker is in effect, it will filter out muted paritions when calculating availablePartitionsForTopic.

public final class Cluster {
	/**
 	* Circuit breaker used in the cluster
 	*/
	private final PartitionsCircuitBreaker partitionsCircuitBreaker;
}

Proposed Changes

We propose to add a configuration driven circuit breaking mechanism that allows Kafka client to mute’ partitions when certain condition is met. In addition to the interface, we provide a default implementation that uses failure rate as the condition. The client can choose proper implementation that fits a special failure scenario.

Several parameters together defines the behavior of the default circuit breaker

  • The breaker is enabled only after certain number (failureRateBaseCount) of messages are sent

  • The breaker is triggered when failure rate partition exceeds a threshold failureRatio

  • The partition will be muted from the client side for a total period of mutePeriod

  • Muted partition will be monitored and reset upon successful writes after muteRetryInterval

  • The percentage of partition in a topic that can be muted is capped at muteMaxPartitionPerc

Muting partitions have impact when the topic contains keyed message as messages will be written to more than one partitions during period of recovery. We believe this can be an explicit trade-off the application makes between availability and message ordering.

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives


  • No labels