Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Warning

The current KIP won't be implemented and will replaced by KIP-839 which will provide a better and elegant solution to the need.

Current state: "Under Discussion"

Discussion thread: https://lists.apache.org/thread/ooczzl9b1wo3gtm5z3vqokowkgo9rb48

...

Code Block
languagejava
themeRDark
public class SpringAwareStreamsConfig extends StreamsConfig {

    private final List<ProducerInterceptor<?, ?>> producerInterceptors;
    private final List<ConsumerInterceptor<?, ?>> consumerInterceptors;

    public SpringAwareStreamsConfig(Map<?, ?> props,
                                   boolean doLog,
                                   List<ProducerInterceptor<?, ?>> producerInterceptors,
                                   List<ConsumerInterceptor<?, ?>> consumerInterceptors) {
        super(props, doLog);
        this.producerInterceptors = producerInterceptors;
        this.consumerInterceptors = consumerInterceptors;
    }

    @Override
    public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) {
        final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides);
        if (ProducerInterceptor.class.equals(t)) {
            configuredInstances.addAll((Collection<? extends T>) producerInterceptors);
        } 
        if (ConsumerInterceptor.class.equals(t)) {
            configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
        }
        return configuredInstances;
    }
}

Streams may also use another approach based on a custom KafkaClientSupplier and reusing this KIP

Code Block
languagejava
themeRDark
public class CustomKafkaClientSupplier extends DefaultKafkaClientSupplier {

    // additional dependencies given by constructor injection.

	@Override
	public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
		return new KafkaProducer<>(<CUSTOM_CONFIG>, new ByteArraySerializer(), new ByteArraySerializer());
	}

	@Override
	public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
		return new KafkaConsumer<>(<CUSTOM_CONFIG>, new ByteArrayDeserializer(), new ByteArrayDeserializer());
	}

	@Override
	public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) {
		return new KafkaConsumer<>(<CUSTOM_CONFIG>, new ByteArrayDeserializer(), new ByteArrayDeserializer());
	}

	@Override
	public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config) {
		return new KafkaConsumer<>(<CUSTOM_CONFIG>, new ByteArrayDeserializer(), new ByteArrayDeserializer());
	}
}
...
new KafkaStreams(<TOPOLOGY>, <PROPERTIES>, <CUSTOM_KAFKA_CLIENT_SUPPLIER>);
...

See another similar KIP for additional information related to the current KIP but for streams.

...

Important note: this request is not related to Spring and would give a way to use constructed instances instead of using reflection to build these instances from a given class.

Public Interfaces

KafkaProducer

KafkaConsumer

Proposed Changes

Create 2 new constructors in KafkaProducer

...

Code Block
languagejava
themeRDark
public KafkaConsumer(ConsumerConfig config)
public KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)

The 2 following constructors will also be added into the TopologyTestDriver

Code Block
languagejava
themeRDark
public TopologyTestDriver(Topology topology, StreamsConfig config)
public TopologyTestDriver(Topology topology, StreamsConfig config, Instant initialWallClockTime)

Compatibility, Deprecation, and Migration Plan

No compatibility issue and no migration plan needed because are required as this KIP only creates a constructor and will only create new constructors and will increase the visibility of an existing oneconstructor.

So, at the end, users will have more ways to build producers, consumers and a topology test driver without any impacts on the existing.

Rejected Alternatives

/