Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

We propose to add a new exception that helps to discriminate errors originating from missing internal topics from other errors in the uncaught exception handler. For example, reacting on a missing source topic (i.e., MissingSourceTopicException) might be different from reacting on missing internal topics, because the process for re-creating source topics might differ from the process for re-creating internal topics. Furthermore, source topics might be owned by a different team than the internal topics, consequently different people need to be paged.

Code Block
languagejava
package org.apache.kafka.streams.errors;

public class MissingInternalTopicException extends StreamsException {
}


We propose to add an initialization method to the KafkaStreams class an initialization method to the KafkaStreams class. Method init() sets up the broker-side state for a Kafka Streams application. It needs to be called at least from one Kafka Streams client.

Code Block
languagejava
public class KafkaStreams {

	/**
	 * Initializes broker-side state.
     * 
     * If some but not all of the internal topics are absent it throws an exception.
     */
	public void init();

    /**
     * Initializes broker-side state.
     *
     * This methods takes parameters that specify which internal topics to setup if some
     * but not all of them are absent.
     */
	public void init(final InitPropertiesInitParameters initPropertiesinitParameters);

	public class InitProperties {InitParameters {
        
        public static InitParameters initParameters();                        // specifies to disable all setup of internal topics 

 		public void setUpRepartitionTopicsIfAbsentEnabled(); InitParameters enableSetupRepartitionTopicsIfIncomplete();     // specifies to setup repartition topics if some are missing
 		public InitParameters disableSetupRepartitionTopicsIfIncomplete();    // specifies to throw if some but not all repartition topics are missing
 		public boolean setupRepartitionTopicsIfIncompleteEnabled();           // getter
 		public InitParameters enableSetupChangelogTopicsIfIncomplete();       // specifies to setup changelog topics if some are missing
 		public voidInitParameters setUpRepartitionTopicsIfAbsentDisableddisableSetupChangelogTopicsIfIncomplete();      // specifies to throw if some but not all changelog topics are missing
 		public voidboolean setUpChangelogTopicsIfAbsentEnabledsetupChangelogTopicsIfIncompleteEnabled();             // getter
 		public voidInitParameters setUpChangelogTopicsIfAbsentDisabledenableSetupInternalTopicsIfIncomplete();        // specifies to setup repartition and changelog topics if some are missing
 		public voidInitParameters setUpInternalTopicsIfAbsentEnableddisableSetupInternalTopicsIfIncomplete();       // specifies to throw if some but not all repartition or changelog topics are missing
 		public voidboolean setUpInternalTopicsIfAbsentDisabledsetupInternalTopicsIfIncompleteEnabled();              // getter
	}
}


We propose to add a new configuration to Kafka Streams to determine whether the initialization internal topics should be done automatically setup during a rebalance or by users calling KafkaStreams#init().

Code Block
languagejava
public class StreamsConfig {

	// possible values
	public static final String AUTOMATIC_SETUP = "automatic";
	public static final String MANUAL_SETUP = "manual";

	// configuration
    public static final String INTERNAL_TOPIC_SETUP = "internal.topics.setup";  // default is AUTOMATIC_SETUP 
 
}

We propose to add a new exception that helps to discriminate errors originating from missing internal topics from other errors in the uncaught exception handler. For example, reacting on a missing source topic (i.e., MissingSourceTopicException) might be different from reacting on missing internal topics, because the process for re-creating source topics might differ from the process for re-creating internal topics. Furthermore, source topics might be owned by a different team than the internal topics, consequently different people need to be paged.

Code Block
languagejava
package org.apache.kafka.streams.errors;

public class MissingInternalTopicException extends StreamsException {
}



Proposed Changes

If configuration APPLICATION_INITIALIZATION configuration INTERNAL_TOPIC_SETUP is set to AUTOMATIC_INITIALIZATIONSETUP, the internal topics will be set up during a rebalance. If the internal topics do not exist, they will be created. That corresponds to the current behavior of Kafka Streams. Users can also call KafkaStreams.initKafkaStreams#init() to setup the internal topics when APPLICATION_INITIALIZATION when INTERNAL_TOPIC_SETUP is set to AUTOMATIC_INITIALIZATIONSETUP, but the call is not necessary since the internal topics would be created anyways during the next rebalance.

If configuration APPLICATION_INITIALIZATION configuration INTERNAL_TOPIC_SETUP is set to USERMANUAL_INITIALIZATIONSETUP, the internal topics will not be set up during a rebalance but users need to call KafkaStreams.initKafkaStreams#init() to setup the internal topics. If the internal topics do not exist during a rebalance because KafkaStreams.initKafkaStreams#init() has not been called or one or more internal topics have been deleted, a MissingInternalTopicException is thrown in each Kafka Streams client.

Method KafkaStreams#init() sets up the broker-side state for a Kafka Streams application. If the method:

  • does not find all internal topics for the Kafka Streams client on the brokers, it will setup all internal topics
  • finds all internal topics for the Kafka Streams client on the brokers, it will not setup any internal topics
  • finds some of the internal topics it will
    • throw MissingInternalTopicsException if KafkaStreams#init() without any parameters is called or the parameters specify to throw for the missing internal topics
    • setup the missing internal topics if the parameters passed to KafkaStreams#init() specify so

For example, if a changelog topic is missing and init(initParameters.enableSetupChangelogTopicsIfIncomplete()) is called, the missing changelog topic will be setup. However, on a missing repartition topic the same call will throw. If KafkaStreams#init() is called, the call will throw if an internal topic is missing and other internal topics not.

In addition to setup internal topics, KafkaStreams#initIf KafkaStreams.init() is called and one or more internal topics already exist, then only the missing internal topics are created as empty topics. In addition to creating internal topics, KafkaStreams.init() will make all checks that are currently done during a rebalance including checks for source and sink topics.

...

Since we introduce configuration APPLICATIONINTERNAL_TOPIC_INITIALIZATIONSETUP with default value AUTOMATIC_INITIALIZATIONSETUP that ensures the current Kafka Streams behavior, this KIP should not affect backward-compatibility. 

...

Migrating from the current behavior to APPLICATION_INITIALIZATION to INTERNAL_TOPIC_SETUP set to USERMANUAL_INITIALIZATIONSETUP can be done without any specific migration plan. Users need to set APPLICATION_INITIALIZATION to USER_INITIALIZATION. They can additionally also call KafkaStreams.init() if they think it is safe to re-create missing internal topics or if they know that no internal topics are missing.     set INTERNAL_TOPIC_SETUP to MANUAL_SETUP.  

Rejected Alternatives

  • Persist a flag for the first-ever rebalance broker side: This approach was rejected because that would imply changes on the brokers which we thought we can avoid and still get a good solution with the approach proposed in this KIP.
  • Use committed offsets for a repartition topic to verify if a repartition topic existed: This approach would not work since committed offsets are removed when a topic is deleted.

...