Status
Current state: Under Discussion
Discussion thread:
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
When Kafka is used to build data pipeline in mission critical business scenarios, availability and throughput are the most important operational goals that need to be maintained in presence of transience local failure. In one typical situation, some partitions have long write latency caused by extremely high disk utilization; since all partitions share the same buffer under the current producer thread model, the buffer will be filled up quickly and eventually the good partitions are impacted as well. The cluster level success rate and timeout ratio will degrade until the local infrastructure issue is resolved.
One way to mitigate this issue is to add client side mechanism to short circuit problematic partitions during transient failure. Similar approach is applied in other distributed systems and RPC frameworks.
Public Interfaces
New producer config option is added:
producer.circuit.breaker.class: class name of the break class
producer.circuit.breaker.xxx: customized parameters used by the implementation class (optional)
Add a PartitionCircuitBreaker class
public interface PartitionCircuitBreaker extends Configurable, Closeable { /** * Configure this class. * * @param configs configs in key/value pairs */ void configure(Map<String, ?> configs); /** * @param circuitBreakManager */ void setCircuitBreakManager(CircuitBreakManager circuitBreakManager); /** * @param context: network congestion status of the partition and node * @return */ boolean onSend(ProducerStatusContext context); /** * * @param context: network congestion status of the partition and node * @param topicPartition: information of the partition * @param exception * @return */ boolean onComplete(ProducerStatusContext context, TopicPartition topicPartition, Exception exception); }
Add a CircuitBreakManager class that manages the partition metadata related to this mechanism
public final class CircuitBreakManager extends Closeable { /** * mute partition by nodeId */ void mute(int nodeId); /** * unmute partition by nodeId */ void unmute(int nodeId); /** * mute a partition */ void mute(TopicPartition topicPartition); /** * unmute a partition */ void unmute(TopicPartition topicPartition); /** * return available partitions */ List<PartitionInfo> availablePartitionsForTopic(String topic); /** * return muted partitions */ List<PartitionInfo> mutedPartitionsForTopic(String topic); /** * Get the list of available partitions whose leader is this node * * @param nodeId The node id * @return available partitions of designated nodeId */ Map<String, List<PartitionInfo>> availablePartitionsForNode(int nodeId); /** * Get the list of muted partitions whose leader is this node * * @param nodeId The node id * @return muted partitions of designated nodeId */ Map<String, List<PartitionInfo>> mutedPartitionsForNode(int nodeId); }
Add a ProducerStatusContext class that describes the current network congestion related states of KafkaProducer
public class ProducerStatusContext { /** * current time */ private long now; /** * InFlight Requests corresponding to the current broker node */ private Map<Integer, Integer> inFlightRequests; /** * current inFlightBatches corresponding to each partition */ private Map<TopicPartition, Integer> inFlightBatches; }
Implementation of the default rate based circuit breaker:
/** * Implementation of failure rate based circuit breaker */ public class FailureRateCircuitBreaker implements ProducerCircuitBreaker { /** * Minimal message count to enable the breaker */ private final int triggerMessageCount; /** * Failure ratio percentage that triggers the circuit breaker */ private final int triggerFailurePerc; /** * Retry time after a partition is muted */ private final int muteRetryInterval; }
A new class PartitionsCircuitBreaker will be added and included as a member of org.apache.kafka.common.Cluster. When a breaker is in effect, it will filter out muted paritions when calculating availablePartitionsForTopic.
public final class Cluster { /** * Circuit breaker used in the cluster */ private final PartitionsCircuitBreaker partitionsCircuitBreaker; }
Proposed Changes
We propose to add a configuration driven circuit breaking mechanism that allows Kafka client to ‘mute’ partitions when certain condition is met. The mechanism adds callbacks in Sender class workflow that allows to filtering partitions based on certain policy.
In addition to the interface, we provide a default implementation that uses failure rate as the condition. The client can choose proper implementation that fits a special failure scenario.Several parameters together defines the behavior of the default circuit breaker
The breaker is enabled only after certain number (producer.circuit.breaker.trigger.message.count) of messages are sent
The breaker is triggered when failure rate partition exceeds a threshold (producer.circuit.breaker.trigger.failure.perc)
Muted partition will be monitored and reset upon successful writes after a period (producer.circuit.breaker.mute.retry.interval)
When producer.circuit.breaker.enable.mute.inflight.full is set and max.in.flight.requests.per.connection is set to 1 (sequential message), muting under infight congestion is enabled
Muting partitions have impact when the topic contains keyed message as messages will be written to more than one partitions during period of recovery. We believe this can be an explicit trade-off the application makes between availability and message ordering.
Compatibility, Deprecation, and Migration Plan
Rejected Alternatives
The proposed solution is only beneficial to applications with Kafka clients upgraded to the new version. Large organizations almost surely have mixed clients which will not all be protected. Similar mechanism can also be implemented on the server side and benefit all clients regardless of their version. We argue that client-side circuit breaking and server side broker high availability are complementary instead of conflicting. On one hand it is not likely (or extremely expensive) to implement broker HA in the control plane; on the other hand we have also often seen client side mechanism used to mitigate network problem between client and broker.