...
We propose to add a static initialization method to the KafkaStreams
class. Method init()
uses the topology and the application properties to set up the broker-side state for a Kafka Streams application.
Code Block | ||
---|---|---|
| ||
public class KafkaStreams { public static void init(final Topology topology, final Properties props); } |
...
We propose to add a new configuration to Kafka Streams to determine whether the initialization should be done automatically during a rebalance or by the user calling KafkaStreams.init()
.
Code Block | ||
---|---|---|
| ||
public class StreamsConfig { // possible values public static final String AUTOMATIC_INITIALIZATION = "automatic"; public static final String USER_INITIALIZATION = "user"; // configuration public static final String APPLICATION_INITIALIZATION = "application.initialization"; // default is AUTOMATIC_INITIALIZATION } |
We propose to add a new exception that helps to discriminate errors originating from missing internal topics from other errors in the uncaught exception handler. For example, reacting on a missing source topic (i.e., MissingSourceTopicException) might be different from reacting on missing internal topics, because the source topics might be owned by a different team than the internal topics, consequently different people need to be paged.
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.errors; public class MissingInternalTopicException extends StreamsException { } |
Proposed Changes
If configuration APPLICATION_INITIALIZATION
is set to AUTOMATIC_INITIALIZATION
, the internal topics will be set up during a rebalance. If the internal topics do not exist, they will be created. That corresponds to the current behavior of Kafka Streams. Users can also call KafkaStreams.init()
to setup the internal topics when APPLICATION_INITIALIZATION
is set to AUTOMATIC_INITIALIZATION
, but the call is not necessary since the internal topics would be created anyways during the next rebalance.
If configuration APPLICATION_INITIALIZATION
is set to USER_INITIALIZATION
, the internal topics will not be set up during a rebalance but users need to call KafkaStreams.init()
to setup the internal topics. If the internal topics do not exist during a rebalance because KafkaStreams.init()
was not called or one or more internal topics were deleted, a MissingInternalTopicException
is thrown in each Kafka Streams client.
Compatibility, Deprecation, and Migration Plan
...