You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state: "Under Discussion"

Discussion thread: https://lists.apache.org/thread/ooczzl9b1wo3gtm5z3vqokowkgo9rb48

JIRA: KAFKA-13864

Motivation

To allow implementing Spring managed interceptors for producers and consumers, both, the KafkaProducer and the KafkaConsumer should expose constructors taking a config instance (ProducerConfig and ConsumerConfig) instead of a configuration given as a Properties or as a Map.

Having an access to the configuration object would give a way to override the existing method used to get interceptors from given class names.

This would also open the door for other uses cases currently using a class to get a concrete instance (partitioner, metrics report, etc).

A possible alternative would be to create more constructors with an additional parameter to specify the interceptors but this would increase the number of constructors without having a way to build instances for other types.

See the following portion of code for a concrete example

public class SpringAwareProducerConfig extends ProducerConfig {

    private final List<ProducerInterceptor<?, ?>> producerInterceptors;

    public SpringAwareProducerConfig(Map<?, ?> props,
                                     boolean doLog,
                                     List<ProducerInterceptor<?, ?>> producerInterceptors) {
        super(props, doLog);
        this.producerInterceptors = producerInterceptors;
    }

    @Override
    public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) {
        final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides);
        if (ProducerInterceptor.class.equals(t)) {
            configuredInstances.addAll((Collection<? extends T>) producerInterceptors);
        }
        return configuredInstances;
    }
}
public class SpringAwareConsumerConfig extends ConsumerConfig {

    private final List<ConsumerInterceptor<?, ?>> consumerInterceptors;

    public SpringAwareConsumerConfig(Map<?, ?> props,
                                     boolean doLog,                               
                                     List<ConsumerInterceptor<?, ?>> consumerInterceptors) {
        super(props, doLog);
        this.consumerInterceptors = consumerInterceptors;
    }

    @Override
    public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) {
        final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides);
        if (ConsumerInterceptor.class.equals(t)) {
            configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
        }
        return configuredInstances;
    }
}

Also see the Spring Kafka issue for the real motivation behind this KIP

https://github.com/spring-projects/spring-kafka/issues/2244

Kafka streams is not concerned by this issue as the KafkaStreams object is already exposing a constructor receiving a StreamsConfig object.

Concrete working example for Streams

public class SpringAwareStreamsConfig extends StreamsConfig {

    private final List<ProducerInterceptor<?, ?>> producerInterceptors;
    private final List<ConsumerInterceptor<?, ?>> consumerInterceptors;

    public SpringAwareStreamsConfig(Map<?, ?> props,
                                   boolean doLog,
                                   List<ProducerInterceptor<?, ?>> producerInterceptors,
                                   List<ConsumerInterceptor<?, ?>> consumerInterceptors) {
        super(props, doLog);
        this.producerInterceptors = producerInterceptors;
        this.consumerInterceptors = consumerInterceptors;
    }

    @Override
    public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) {
        final var configuredInstances = super.getConfiguredInstances(classNames, t, configOverrides);
        if (ProducerInterceptor.class.equals(t)) {
            configuredInstances.addAll((Collection<? extends T>) producerInterceptors);
        } 
        if (ConsumerInterceptor.class.equals(t)) {
            configuredInstances.addAll((Collection<? extends T>) consumerInterceptors);
        }
        return configuredInstances;
    }
}

Streams may also use another approach based on a custom KafkaClientSupplier and reusing this KIP

public class CustomKafkaClientSupplier extends DefaultKafkaClientSupplier {

    // additional dependencies given by constructor injection.

	@Override
	public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
		return new KafkaProducer<>(<CUSTOM_CONFIG>, new ByteArraySerializer(), new ByteArraySerializer());
	}

	@Override
	public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
		return new KafkaConsumer<>(<CUSTOM_CONFIG>, new ByteArrayDeserializer(), new ByteArrayDeserializer());
	}

	@Override
	public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) {
		return new KafkaConsumer<>(<CUSTOM_CONFIG>, new ByteArrayDeserializer(), new ByteArrayDeserializer());
	}

	@Override
	public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config) {
		return new KafkaConsumer<>(<CUSTOM_CONFIG>, new ByteArrayDeserializer(), new ByteArrayDeserializer());
	}
}
...
new KafkaStreams(<TOPOLOGY>, <PROPERTIES>, <CUSTOM_KAFKA_CLIENT_SUPPLIER>);
...

See another similar KIP for additional information related to the current KIP but for streams.

KIP-378: Enable Dependency Injection for Kafka Streams handlers

Having a new constructor and increasing the visibility of an existing constructor is quite straight and does not impact the existing. It gives an additional way to build a producer and a consumer. This approach is also align with Streams already having such kind of constructor.

Important note: this request is not related to Spring and would give a way to use constructed instances instead of using reflection to build these instances from a given class.

Public Interfaces

KafkaProducer

KafkaConsumer

Proposed Changes

Create 2 new constructors in KafkaProducer

public KafkaProducer(ProducerConfig config)
public KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer)

and create a new constructor + increase the visibility of an existing constructor in KafkaConsumer

public KafkaConsumer(ConsumerConfig config)
public KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)

Compatibility, Deprecation, and Migration Plan

No compatibility issue and no migration plan needed because this KIP only creates a constructor and increase the visibility of an existing one.

Rejected Alternatives

/


  • No labels