...
New producer config option is added:
producerenable.circuitmute.breaker.class: class name of the break class
producer.circuit.breaker.xxx: customized parameters used by the implementation class (optional)partition:
Add a
...
ProducerMuteManager class that manages the partition metadata related to this mechanism
Code Block | ||||
---|---|---|---|---|
| ||||
public interface PartitionCircuitBreaker extends Configurable, Closeable { /** * ConfigureA this class. which maintains mute of * TopicPartitions. Also keeps the number *of @paramTopicPartitions configsaccumulated-batches configsand in key/value pairs -flight requests. */ public class ProducerMuteManager implements void configure(Map<String, ?> configs);Closeable { /** * @paramAdd circuitBreakManager mute of */TopicPartition void setCircuitBreakManager(CircuitBreakManager circuitBreakManager); /** * @param context: network congestion status of the partition and nodetopicPartition * @return/ public */ boolean onSend(ProducerStatusContext contextvoid mute(TopicPartition topicPartition); /** * Remove * @param context: network congestion status of the partition and node muted of TopicPartition * @param topicPartition: information of the partition * @param exceptiontopicPartition * @return/ public */ boolean onComplete(ProducerStatusContext context, void unmute(TopicPartition topicPartition, Exception exception); } |
Add a CircuitBreakManager class that manages the partition metadata related to this mechanism
Code Block | ||||
---|---|---|---|---|
| ||||
public final class CircuitBreakManager extends Closeable { /** public * mute partition by nodeId */ void mute(int nodeIdboolean isMute(TopicPartition topicPartition); /** * unmute partitionReturn bymuted nodeId of TopicPartitions */ void unmute(int nodeId); /** * mute a partition @return */ public voidSet<TopicPartition> mutegetMutedPartitions(TopicPartition topicPartition); /** * unmute a partition */ void unmute(TopicPartition topicPartitionpublic void close(); /** * return availableReturn partitions the number of TopicPartition accumulated-batches */ List<PartitionInfo> availablePartitionsForTopic(String topic); requests /** * return muted partitions@return */ public List<PartitionInfo> mutedPartitionsForTopic(String topicMap<TopicPartition, Integer> getAccumulatedBatches(); /** * Get the list of available partitions whose leader is this node * * @param nodeId The node id * @return available partitions of designated nodeId */ Map<String, List<PartitionInfo>> availablePartitionsForNode(int nodeIdvoid setInFlightBatches(Map<TopicPartition, List<ProducerBatch>> inFlightBatches); /** * GetReturn the listnumber of mutedTopicPartition partitions whose leader is this nodein-flight requests * * @param nodeId@return The noderequest idcount. * @return muted partitions of designated nodeId/ */ Map<Stringpublic Map<TopicPartition, List<PartitionInfo>>Integer> mutedPartitionsForNodegetInFlightRequestCount(int nodeId); } |
Add a
...
'initialize' method in Partitioner class
Code Block | ||||
---|---|---|---|---|
| ||||
public class ProducerStatusContextinterface Partitioner extends Configurable, Closeable { /** * currentThis time method is called when the */ Producer is built to privateinitialize long now; /**the partition mute manager * InFlight Requests corresponding to the* current@param broker nodeproducerMuteManager */ default private Map<Integer, Integer> inFlightRequests; void initialize(ProducerMuteManager producerMuteManager) { /**} * current inFlightBatches corresponding to each partition */ private Map<TopicPartition, Integer> inFlightBatches; } |
...
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
} |
Add a 'initialize' method in ProducerInterceptor class
Code Block | ||||
---|---|---|---|---|
| ||||
/**public interface *ProducerInterceptor Implementationextends of failure rate based circuit breaker */ public class FailureRateCircuitBreaker implements ProducerCircuitBreaker Configurable, Closeable { /** * MinimalThis messagemethod countis tocalled enablewhen the breaker Producer is built to initialize */ the partition mute manager private final int triggerMessageCount; /** * Failure ratio percentage that triggers the circuit breaker@param producerMuteManager */ privatedefault finalvoid int triggerFailurePerc; initialize(ProducerMuteManager producerMuteManager) { /** * Retry time after a partition is muted */} privatepublic finalProducerRecord<K, int muteRetryInterval; } |
A new class PartitionsCircuitBreaker will be added and included as a member of org.apache.kafka.common.Cluster. When a breaker is in effect, it will filter out muted paritions when calculating availablePartitionsForTopic.
Code Block | ||||
---|---|---|---|---|
| ||||
public final class Cluster { /** * Circuit breaker used in the cluster */ private final PartitionsCircuitBreaker partitionsCircuitBreaker; } V> onSend(ProducerRecord<K, V> record); public void onAcknowledgement(RecordMetadata metadata, Exception exception); } |
Proposed Changes
We propose to add a configuration driven circuit breaking mechanism that allows Kafka client to ‘mute’ partitions when certain condition is met. The mechanism adds callbacks in Sender class workflow that allows to filtering partitions based on certain policy.
...