You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Writing in progress... Not ready for review.

Current state: "Under Discussion"

Discussion thread

JIRA

Motivation

To gives more flexibility, builders should be provided for the following objects

  • KafkaProducer
  • KafkaConsumer
  • KafkaStreams 

These builders will give an easy way to construct these objects using different arguments/combinations without having to add a new constructor every time a new parameter is required.

From a user point of view, builders would be used as follow

KafkaProducer kafkaProducer = new ProducerBuilder<String, MyPojo>(<MAP_OR_PROPERTIES_OR_CONFIG>)
  .withKeySerializer(<KEY_SERIALIZER>)
  .withValueSerializer(<VALUE_SERIALIZER>)
  .withInterceptors(<LIST_OF_INTERCEPTORS>)
  .withPartitioner(<PARTITIONER>)
  .withMetricsReporter(<METRICS_REPORTER>)
  .build();  

KafkaConsumer consumer = new ConsumerBuilder<String, MyPojo>(<MAP_OR_PROPERTIES_OR_CONFIG>)
  .withKeyDeserializer(<KEY_DESERIALIZER>)
  .withValueDeserializer(<VALUE_DESERIALIZER>)
  .withInterceptors(<LIST_OF_INTERCEPTORS>)
  .withMetricsReporter(<METRICS_REPORTER>)
  .build();

KafkaStreams kafkaStreams = new KafkaStreamsBuilder(<TOPOLOGY>, <MAP_OR_PROPERTIES_OR_CONFIG>)
  .withProducerInterceptors(<LIST_OF_PRODUER_INTERCEPTORS>)
  .withConsumerInterceptors(<LIST_OF_CONSUMER_INTERCEPTORS>)
  .withTime(<TIME>)
  .withKafkaClientSupplier(<KAFKA_CLIENT_SUPPLIER>)
  .withMetricsReporter(<METRICS_REPORTER>)
  .build();

This KIP can be seen as the continuity of the KIP-832.

Proposed Changes

Three new builders will be added

  • KafkaProducerBuilder
  • KafkaConsumerBuilder
  • KafkaStreamsBuilder

Requirements:

  • The builder will always
    • override → single value
    • complete the configuration parameters → list values
  • Any closeable objects provided to a builder will still be closed once the client is closed, and as a result, a builder should only be used once. An exception will be thrown if build() is invoked multiple times.
  • Any configurable object provided to a builder will be configured by the client after instantiation.

To finalize KIP-378, two new constructors will be added in the TopologyTestDriver as follow

public TopologyTestDriver(Topology topology, StreamsConfig config)
public TopologyTestDriver(Topology topology, StreamsConfig config, Instant initialWallClockTime) 

Compatibility, Deprecation, and Migration Plan

No compatibility issue and no migration plan are required as this KIP will create new builders to give more flexibility.

Rejected Alternatives

/


  • No labels