Table of Contents |
---|
Status
Warning |
---|
The current KIP won't be implemented and will replaced by KIP-839 which will provide a better and elegant solution to the need. |
Current state: "Under Discussion"
Discussion thread: https://lists.apache.org/thread/ooczzl9b1wo3gtm5z3vqokowkgo9rb48
...
To allow implementing Spring managed interceptors for producers and consumers, both, the KafkaProducer and the KafkaConsumer should expose a constructor constructors taking a config instance (ProducerConfig and ConsumerConfig) instead of a configuration given as a Properties or as a Map.
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class SpringAwareStreamsConfig extends StreamsConfig { private final List<ProducerInterceptor<?, ?>> producerInterceptors; private final List<ConsumerInterceptor<?, ?>> consumerInterceptors; public SpringAwareStreamsConfig(Map<?, ?> props, boolean doLog, List<ProducerInterceptor<?, ?>> producerInterceptors, List<ConsumerInterceptor<?, ?>> consumerInterceptors) { super(props, doLog); this.producerInterceptors = producerInterceptors; this.consumerInterceptors = consumerInterceptors; } @Override public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) { final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides); if (ProducerInterceptor.class.equals(t)) { configuredInstances.addAll((Collection<? extends T>) producerInterceptors); } if (ConsumerInterceptor.class.equals(t)) { configuredInstances.addAll((Collection<? extends T>) consumerInterceptors); } return configuredInstances; } } |
Streams may also use another approach based on a custom KafkaClientSupplier and reusing this KIP
Code Block | ||||
---|---|---|---|---|
| ||||
public class CustomKafkaClientSupplier extends DefaultKafkaClientSupplier {
// additional dependencies given by constructor injection.
@Override
public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
return new KafkaProducer<>(<CUSTOM_CONFIG>, new ByteArraySerializer(), new ByteArraySerializer());
}
@Override
public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
return new KafkaConsumer<>(<CUSTOM_CONFIG>, new ByteArrayDeserializer(), new ByteArrayDeserializer());
}
@Override
public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) {
return new KafkaConsumer<>(<CUSTOM_CONFIG>, new ByteArrayDeserializer(), new ByteArrayDeserializer());
}
@Override
public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config) {
return new KafkaConsumer<>(<CUSTOM_CONFIG>, new ByteArrayDeserializer(), new ByteArrayDeserializer());
}
}
...
new KafkaStreams(<TOPOLOGY>, <PROPERTIES>, <CUSTOM_KAFKA_CLIENT_SUPPLIER>);
... |
See another similar KIP for additional information related to the current KIP but for streams.
...
Important note: this request is not related to Spring and would give a way to use constructed instances instead of using reflection to build these instances from a given class.
Public Interfaces
KafkaProducer
KafkaConsumer
Proposed Changes
Create a 2 new constructor constructors in KafkaProducer
Code Block | ||||
---|---|---|---|---|
| ||||
public KafkaProducer(ProducerConfig config)
public KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) |
and create a new constructor + increase the visibility of an existing constructor in KafkaConsumer
Code Block | ||||
---|---|---|---|---|
| ||||
public KafkaConsumer(ConsumerConfig config)
public KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) |
The 2 following constructors will also be added into the TopologyTestDriver
Code Block | ||||
---|---|---|---|---|
| ||||
public TopologyTestDriver(Topology topology, StreamsConfig config)
public TopologyTestDriver(Topology topology, StreamsConfig config, Instant initialWallClockTime) |
Compatibility, Deprecation, and Migration Plan
No compatibility issue and no migration plan needed because are required as this KIP only creates a constructor and will only create new constructors and will increase the visibility of an existing oneconstructor.
So, at the end, users will have more ways to build producers, consumers and a topology test driver without any impacts on the existing.
Rejected Alternatives
/