Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

We propose to add a two new exception that helps help to discriminate errors originating from missing internal topics from other errors in the uncaught exception handler. For example, reacting on a missing source topic (i.e., MissingSourceTopicException) might be different from reacting on missing internal topics, because the process for re-creating source topics might differ from the process for re-creating internal topics. Furthermore, source topics might be owned by a different team than the internal topics, consequently different people need to be paged.

Code Block
languagejava
package org.apache.kafka.streams.errors;

public class InternalTopicInitException extends StreamsException {
	public List<String> topics();
}

public class MissingInternalTopicException extends StreamsException InternalTopicInitException {
}

public class MisconfiguredInternalTopicException extends InternalTopicInitException {
}


We propose to add an initialization method to the KafkaStreams class.

Code Block
languagejava
public class KafkaStreams {

	/**
	 * Initializes broker-side state.
     * 
     * If some but not all of the internal topics are absent it throws an exception.
     */
	public void init();

	/**
	 * Initializes broker-side state.
     * 
     * If some but not all of the internal topics are absent it throws an exception.
     */
	public void init(final Duration timeout);

    /**
     * Initializes broker-side state.
     *
     * This methods takes parameters that specify which internal topics to setup if some
     * but not all of them are absent.
     */
	public void init(final InitParameters initParameters);

	public void init(final InitParameters initParameters, final Duration timeout);

	public class InitParameters {
        
        public static InitParameters initParameters();                        // specifies to disable all setup of internal topics 

 		public InitParameters enableSetupInternalTopicsIfIncomplete();        // specifies to setup repartition and changelog topics if some are missing
 		public InitParameters disableSetupInternalTopicsIfIncomplete();       // specifies to throw if some but not all repartition or changelog topics are missing
 		public boolean setupInternalTopicsIfIncompleteEnabled();              // getter
	}
}

...