Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder DiscussionVoting

Discussion thread: here

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

 KAFKA-12793

Motivation

When Kafka is used to build data pipeline in mission critical business scenarios, availability and throughput are the most important operational goals that need to be maintained in presence of transience transient or permanent local failure. In one One typical situation that requires Ops intervention is disk failure, some partitions have long write latency caused by extremely high disk utilization; since all partitions share the same buffer under the current producer thread model, the buffer will be filled up quickly and eventually the good partitions are impacted as well. The cluster level success rate and timeout ratio will degrade until the local infrastructure issue is resolved.

...

New producer config option is added:

  • producerenable.circuitmute.breaker.class: class name of the break class

  • producer.circuit.breaker.xxx: customized parameters used by the implementation class (optional)

...

  • partition: When set to ‘true’, Producer will call ProducerInterceptor and Partitioner initialization ProducerMuteManager during construction


Add a ProducerMuteManager class that manages the partition metadata related to this mechanism

Code Block
languagejava
linenumberstrue
public interface PartitionCircuitBreaker extends Configurable, Closeable {

    /**
     * Configure thisA class.
 which maintains mute of *
TopicPartitions. Also keeps the number *of @paramTopicPartitions configsaccumulated-batches configsand in key/value pairs
     -flight requests.
 */
public class ProducerMuteManager implements void configure(Map<String, ?> configs);Closeable {

    /**
     * @paramAdd circuitBreakManager
mute of    */
    void setCircuitBreakManager(CircuitBreakManager circuitBreakManager);

TopicPartition
     /**
     * @param context: network congestion status of the partition and nodetopicPartition
     * @return/
    public */
    boolean onSend(ProducerStatusContext contextvoid mute(TopicPartition topicPartition);

    /**
      *
 Remove     * @param context: network congestion status of the partition and nodemuted of TopicPartition
      * @param topicPartition: information of the partition
      * @param exceptiontopicPartition
      * @return/
    public  */
    boolean onComplete(ProducerStatusContext context, void unmute(TopicPartition topicPartition, Exception exception);
}

Add a CircuitBreakManager class that manages the partition metadata related to this mechanism

Code Block
languagejava
linenumberstrue
public final class CircuitBreakManager extends Closeable {

    /**
public     * mute partition by nodeId
     */
    void mute(int nodeIdboolean isMute(TopicPartition topicPartition);

    /**
     * unmute partitionReturn bymuted nodeId     
     */
    void unmute(int nodeId);

    /*of TopicPartitions
     *
     * mute a partition@return
     */
    voidpublic Set<TopicPartition> mutegetMutedPartitions(TopicPartition topicPartition);

    /**
     *  unmute a partition
     */
    void unmute(TopicPartition topicPartitionpublic void close();

    /**
     * return availableReturn partitions
the number of TopicPartition accumulated-batches */requests
    List<PartitionInfo> availablePartitionsForTopic(String topic);

    /**
     * return muted partitions@return
     */
    List<PartitionInfo> mutedPartitionsForTopic(String topicpublic Map<TopicPartition, Integer> getAccumulatedBatches();

    /**
     * Get the list of available partitions whose leader is this node
     *
     * @param nodeId The node id
     * @return available partitions of designated nodeId
     */
    Map<String, List<PartitionInfo>> availablePartitionsForNode(int nodeIdvoid setInFlightBatches(Map<TopicPartition, List<ProducerBatch>> inFlightBatches);

    /**
     * GetReturn the listnumber of mutedTopicPartition partitions whose leader is this nodein-flight requests
     *
     * @param nodeId@return The noderequest idcount.
     * @return muted partitions of designated nodeId/
     */
    Map<Stringpublic Map<TopicPartition, List<PartitionInfo>>Integer> mutedPartitionsForNodegetInFlightRequestCount(int nodeId);
}


Add a

...

'initialize' method in Partitioner class

Code Block
languagejava
linenumberstrue
public class ProducerStatusContextinterface Partitioner extends Configurable, Closeable {

    /**
     * currentThis time
method is called when the */
Producer is built to privateinitialize long now;

    /**the partition mute manager
     *
 InFlight Requests corresponding to the current* broker@param nodeproducerMuteManager
     */
    default private Map<Integer, Integer> inFlightRequests;
void initialize(ProducerMuteManager producerMuteManager) {
    /**}

     * current inFlightBatches corresponding to each partition
     */
    private Map<TopicPartition, Integer> inFlightBatches;
}

...

int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
}


Add a 'initialize' method in ProducerInterceptor class

Code Block
languagejava
linenumberstrue
/**public 
interface *ProducerInterceptor Implementationextends of failure rate based circuit breaker
 */
public class FailureRateCircuitBreaker implements ProducerCircuitBreaker Configurable, Closeable {

    /**
     * MinimalThis messagemethod countis tocalled enablewhen the breaker
Producer is built to initialize */
the partition mute manager
 private final int triggerMessageCount;

    /**
     * Failure ratio percentage that triggers the circuit breaker@param producerMuteManager
     */
    privatedefault finalvoid int triggerFailurePerc;
    initialize(ProducerMuteManager producerMuteManager) {
    /**
     * Retry time after a partition is muted}

    public */
ProducerRecord<K, V> onSend(ProducerRecord<K,  private final int muteRetryIntervalV> record);

A new class PartitionsCircuitBreaker will be added and included as a member of org.apache.kafka.common.Cluster. When a breaker is in effect, it will filter out muted paritions when calculating availablePartitionsForTopic.

Code Block
languagejava
linenumberstrue
public final class Cluster {
	/**
 	* Circuit breaker used in the cluster
 	*/
	private final PartitionsCircuitBreaker partitionsCircuitBreaker;
}
 public void onAcknowledgement(RecordMetadata metadata, Exception exception);
}

Proposed Changes

We propose to add a configuration driven circuit breaking mechanism that allows Kafka client to mute’ partitions when certain condition is met. The mechanism adds callbacks in Sender class workflow that allows to filtering partitions based on certain policy.

In addition to the interface, we provide a default implementation that uses failure rate as the condition. The client can choose proper implementation that fits a special failure scenario.Several parameters together defines the behavior of the default circuit breaker, Client-side custom implementation of Partitioner and ProducerInterceptor

  • Customize the implementation of ProducerInterceptor, and choose the strategy to mute partitions.

  • Customize the implementation of Partitioner, and choose the strategy to filtering partitions.

  • The breaker is enabled only after certain number (producer.circuit.breaker.trigger.message.count) of messages are sent

  • The breaker is triggered when failure rate partition exceeds a threshold (producer.circuit.breaker.trigger.failure.perc)

  • Muted partition will be monitored and reset upon successful writes after a period (producer.circuit.breaker.mute.retry.interval)

  • When producer.circuit.breaker.enable.mute.inflight.full is set and max.in.flight.requests.per.connection is set to 1 (sequential message), muting under infight congestion is enabled

Muting partitions have impact when the topic contains keyed message as messages will be written to more than one partitions during period of recovery. We believe this can be an explicit trade-off the application makes between availability and message ordering.

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

The proposed solution is only beneficial to applications with Kafka clients upgraded to the new version. Large organizations almost surely have mixed clients which will not all be protected. Similar mechanism can also be implemented on the server side and benefit all clients regardless of their version. We argue that client-side circuit breaking and server side broker high availability are complementary instead of conflicting. On one hand it is not likely (or extremely expensive) to implement broker HA in the control plane; on the other hand we have also often seen client side mechanism used to mitigate network problem between client and broker.