Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose to add a static initialization method to the KafkaStreams class. Method init() uses the topology and the application properties to set up the broker-side state for a Kafka Streams application.

Code Block
languagejava
public class KafkaStreams {

	public static void init(final Topology topology,
                            final Properties props);

}

...


We propose to add a new configuration to Kafka Streams to determine whether the initialization should be done automatically during a rebalance or by the user calling KafkaStreams.init().

Code Block
languagejava
public class StreamsConfig {

	// possible values
	public static final String AUTOMATIC_INITIALIZATION = "automatic";
	public static final String USER_INITIALIZATION = "user";

	// configuration
    public static final String APPLICATION_INITIALIZATION = "application.initialization";  // default is AUTOMATIC_INITIALIZATION 
 
}


We propose to add a new exception that helps to discriminate errors originating from missing internal topics from other errors in the uncaught exception handler. For example, reacting on a missing source topic (i.e., MissingSourceTopicException) might be different from reacting on missing internal topics, because the source topics might be owned by a different team than the internal topics, consequently different people need to be paged.

Code Block
languagejava
package org.apache.kafka.streams.errors;

public class MissingInternalTopicException extends StreamsException {
}


Proposed Changes

If configuration APPLICATION_INITIALIZATION is set to AUTOMATIC_INITIALIZATION, the internal topics will be set up during a rebalance. If the internal topics do not exist, they will be created. That corresponds to the current behavior of Kafka Streams. Users can also call KafkaStreams.init() to setup the internal topics when APPLICATION_INITIALIZATION is set to AUTOMATIC_INITIALIZATION, but the call is not necessary since the internal topics would be created anyways during the next rebalance.

If configuration APPLICATION_INITIALIZATION is set to USER_INITIALIZATION, the internal topics will not be set up during a rebalance but users need to call KafkaStreams.init() to setup the internal topics. If the internal topics do not exist during a rebalance because KafkaStreams.init() was not called or one or more internal topics were deleted, a MissingInternalTopicException is thrown in each Kafka Streams client.

Compatibility, Deprecation, and Migration Plan

...