Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams.errors;

public class InternalTopicInitExceptionInternalTopicSetupException extends StreamsException {

public class MissingInternalTopicException extends InternalTopicSetupException {
	public List<String> topics();
}

public class MissingInternalTopicExceptionMisconfiguredInternalTopicException extends InternalTopicInitExceptionInternalTopicSetupException {
}

public class MisconfiguredInternalTopicExceptionInternalTopicsAlreadySetupException extends InternalTopicInitExceptionInternalTopicSetupException {
}


We propose to add an initialization method to the KafkaStreams class.

Code Block
languagejava
public class KafkaStreams {

	/**
	 * Initializes broker-side state.
     * 
     * If some but not all of the internal topics are absent it throws an exception. a MissingInternalTopicException.
     * If some of the internal topics are misconfigured it throws a MisconfiguredInternalTopicException.
     * If all internal topics are already setup it throws a InternalTopicsAlreadySetupException
     */
	public void init();

	/**
	 * Initializes broker-side state.
     * 
     * If some but not all of the internal topics are absent it throws an exception a MissingInternalTopicException.
     * If some of the internal topics are misconfigured it throws a MisconfiguredInternalTopicException.
     * If the initialization has not concluded after duration timeout this method throws a TimeoutException 
     */
	public void init(final Duration timeout);

    /**
     * Initializes broker-side state.
     *
     * This methods takes parameters that specify which internal topics to setup if some
     * but not all of them are absent.
     */
	public void init(final InitParameters initParameters);

    /**
     * Initializes broker-side state.
     *
     * This methods takes parameters that specify which internal topics to setup if some
     * but not all of them are absent.
     * If the initialization has not concluded after duration timeout this method throws a TimeoutException 
     */
	public void init(final InitParameters initParameters, final Duration timeout);

	public class InitParameters {
        
        public static InitParameters initParameters();                        // specifies to disable allthe setup of internal topics if some topics are missing 

 		public InitParameters enableSetupInternalTopicsIfIncomplete();        // specifies to setup repartition and changelog topics if some are missing
 		public InitParameters disableSetupInternalTopicsIfIncomplete();       // specifies to throw if some but not all repartition or changelog topics are missing
 		public boolean setupInternalTopicsIfIncompleteEnabled();              // getter
	}
}

...

Migrating from the current behavior to INTERNAL_TOPIC_SETUP set to MANUAL_SETUP can be done without any specific migration plan. Users need to set INTERNAL_TOPIC_SETUP to MANUAL_SETUP.   and need to change their code to call KafkaStreams#init() accordingly.

Rejected Alternatives

  • Persist a flag for the first-ever rebalance broker side: This approach was rejected because that would imply changes on the brokers which we thought we can avoid and still get a good solution with the approach proposed in this KIP.
  • Use committed offsets for a repartition topic to verify if a repartition topic existed: This approach would not work since committed offsets are removed when a topic is deleted.

...