Status
Current state: "Under Discussion"
Discussion thread: https://lists.apache.org/thread/ooczzl9b1wo3gtm5z3vqokowkgo9rb48
JIRA: KAFKA-13864
Motivation
To allow implementing Spring managed interceptors for producers and consumers, both, the KafkaProducer and the KafkaConsumer should expose a constructor taking a config instance (ProducerConfig and ConsumerConfig) instead of a configuration given as a Properties or as a Map.
Having an access to the configuration object would give a way to override the existing method used to get interceptors from given class names.
This would also open the door for other uses cases currently using a class to get a concrete instance (partitioner, metrics report, etc)
See the following portion of code for a concrete example
public class SpringAwareProducerConfig extends ProducerConfig { private final List<ProducerInterceptor<?, ?>> producerInterceptors; public SpringAwareProducerConfig(Map<?, ?> props, boolean doLog, List<ProducerInterceptor<?, ?>> producerInterceptors) { super(props, doLog); this.producerInterceptors = producerInterceptors; } @Override public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) { final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides); if (ProducerInterceptor.class.equals(t)) { configuredInstances.addAll((Collection<? extends T>) producerInterceptors); } return configuredInstances; } }
public class SpringAwareConsumerConfig extends ConsumerConfig { private final List<ConsumerInterceptor<?, ?>> consumerInterceptors; public SpringAwareConsumerConfig(Map<?, ?> props, boolean doLog, List<ConsumerInterceptor<?, ?>> consumerInterceptors) { super(props, doLog); this.consumerInterceptors = consumerInterceptors; } @Override public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) { final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides); if (ConsumerInterceptor.class.equals(t)) { configuredInstances.addAll((Collection<? extends T>) consumerInterceptors); } return configuredInstances; } }
Also see the Spring Kafka issue for the real motivation behind this KIP
https://github.com/spring-projects/spring-kafka/issues/2244
Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor receiving a StreamsConfig object.
Concrete working example for Streams
public class SpringAwareStreamsConfig extends StreamsConfig { private final List<ProducerInterceptor<?, ?>> producerInterceptors; private final List<ConsumerInterceptor<?, ?>> consumerInterceptors; public SpringAwareStreamsConfig(Map<?, ?> props, boolean doLog, List<ProducerInterceptor<?, ?>> producerInterceptors, List<ConsumerInterceptor<?, ?>> consumerInterceptors) { super(props, doLog); this.producerInterceptors = producerInterceptors; this.consumerInterceptors = consumerInterceptors; } @Override public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) { final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides); if (ProducerInterceptor.class.equals(t)) { configuredInstances.addAll((Collection<? extends T>) producerInterceptors); } if (ConsumerInterceptor.class.equals(t)) { configuredInstances.addAll((Collection<? extends T>) consumerInterceptors); } return configuredInstances; } }
See another similar KIP for additional information related to the current KIP but for streams.
KIP-378: Enable Dependency Injection for Kafka Streams handlers
Having a new constructor and increasing the visibility of an existing constructor is quite straight and does not impact the existing. It gives an additional way to build a producer and a consumer. This approach is also align with Streams already having such kind of constructor.
Important note: this request is not related to Spring and would give a way to use constructed instances instead of using reflection to build these instances from a given class.
Public Interfaces
KafkaProducer
KafkaConsumer
Proposed Changes
Create a new constructor in KafkaProducer
public KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer)
and increase the visibility of an existing constructor in KafkaConsumer
public KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
Compatibility, Deprecation, and Migration Plan
No compatibility issue and no migration plan needed because this KIP only creates a constructor and increase the visibility of an existing one.
Rejected Alternatives
/