Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
public interface PartitionCircuitBreaker extends Configurable, Closeable {

    /**
     * Configure this class.
     *
     * @param configs configs in key/value pairs
     */
    void configure(Map<String, ?> configs);

    /**
     * @param circuitBreakManager
     */
    void setCircuitBreakManager(CircuitBreakManager circuitBreakManager);

    /**
     * @param context: network congestion status of the partition and node
     * @return
     */
    boolean onSend(ProducerStatusContext context);

    /**
      *
      * @param context: network congestion status of the partition and node
      * @param topicPartition: information of the partition
      * @param exception
      * @return
      */
    boolean onComplete(ProducerStatusContext context, TopicPartition topicPartition, Exception exception);

}


Add a CircuitBreakManager class that manages the partition metadata related to this mechanism

...

Code Block
languagejava
linenumberstrue
/** 
 * Implementation of failure rate based circuit breaker
 */
public class FailureRateCircuitBreaker implements ProducerCircuitBreaker {

    /**
     * Minimal message count to enable the breaker
     */
    private final int triggerMessageCount;

    /**
     * Failure ratio percentage that triggers the circuit breaker
     */
    private final int triggerFailurePerc;
    
    /**
     * Retry time after a partition is muted
     */
    private final int muteRetryInterval;
}

A new class PartitionsCircuitBreaker will be added and included as a member of org.apache.kafka.common.Cluster. When a breaker is in effect, it will filter out muted paritions when calculating availablePartitionsForTopic.

Code Block
languagejava
linenumberstrue
public final class Cluster {
	/**
 	* Circuit breaker used in the cluster
 	*/
	private final PartitionsCircuitBreaker partitionsCircuitBreaker;
}

Proposed Changes

We propose to add a configuration driven circuit breaking mechanism that allows Kafka client to mute’ partitions when certain condition is met. The mechanism adds callbacks in Sender class workflow that allows to filtering partitions based on certain policy.

In addition to the interface, we provide a default implementation that uses failure rate as the condition. The client can choose proper implementation that fits a special failure scenario.Several parameters together defines the behavior of the default circuit breaker

...