Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current state: Under DiscussionAccepted

Discussion thread: here [TBA]

JIRA: here [TBA]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently there is a way to pass a KafkaClientSupplier  in KafkaStreams  constructor. This KIP propose proposes to add a public config in StreamsConfig  to pass the class through config and use reflection to create 

...

use a customized KafkaClientSupplier , they may still need to add required dependencies.

Public Interfaces

We will add a public field in StreamsConfig  called streams.kafkadefault.client.supplier to  to configure the KafkaClientSupplier  we use in KafkaStreams. The default value of the config will the classname

of DefaultKafkaClientSupplier .

We will also add a new public method in StreamsConfig  called getKafkaClientSupplier . It will return KafkaClientSupplier  based on the config.

Proposed Changes

In KafkaStreams  constructor, we will read the value of streams.kafkadefault.client.supplier from StreamsConfig  and it's not null  and not empty, we . We will use reflection to create an object of given class and throw

an exception if the class type isn't  KafkaClientSupplier . If user provides both the config and supply KafkaClientSupplier  in KafkaStreams  constructor, the config will be ignored and supplied

KafkaClientSupplier  will be used.

Compatibility, Deprecation, and Migration Plan

There will be no compatibility, deprecation and migration issues.

Test Plan

Unit test can capture the changes in this KIP. We can test happy path where object can be created successfully and exception case where class couldn't be found or type is wrong.

Rejected Alternatives

An alternative is that we don't need this change and user can use KafkaStreams  constructor to supply any KafkaClientSupplier  since it's a public constructor.

...

  1. Users need to change every KafkaStreams  constructor call in order to use a new KafkaClientSupplier .

So to cater for users who don't want to update their source code, we are proposing to add this config option as well.