THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.errors; public class InternalTopicInitExceptionInternalTopicSetupException extends StreamsException { public class MissingInternalTopicException extends InternalTopicSetupException { public List<String> topics(); } public class MissingInternalTopicExceptionMisconfiguredInternalTopicException extends InternalTopicInitExceptionInternalTopicSetupException { } public class MisconfiguredInternalTopicExceptionInternalTopicsAlreadySetupException extends InternalTopicInitExceptionInternalTopicSetupException { } |
We propose to add an initialization method to the KafkaStreams
class.
Code Block | ||
---|---|---|
| ||
public class KafkaStreams { /** * Initializes broker-side state. * * If some but not all of the internal topics are absent it throws an exception. a MissingInternalTopicException. * If some of the internal topics are misconfigured it throws a MisconfiguredInternalTopicException. * If all internal topics are already setup it throws a InternalTopicsAlreadySetupException */ public void init(); /** * Initializes broker-side state. * * If some but not all of the internal topics are absent it throws an exception a MissingInternalTopicException. * If some of the internal topics are misconfigured it throws a MisconfiguredInternalTopicException. * If the initialization has not concluded after duration timeout this method throws a TimeoutException */ public void init(final Duration timeout); /** * Initializes broker-side state. * * This methods takes parameters that specify which internal topics to setup if some * but not all of them are absent. */ public void init(final InitParameters initParameters); /** * Initializes broker-side state. * * This methods takes parameters that specify which internal topics to setup if some * but not all of them are absent. * If the initialization has not concluded after duration timeout this method throws a TimeoutException */ public void init(final InitParameters initParameters, final Duration timeout); public class InitParameters { public static InitParameters initParameters(); // specifies to disable allthe setup of internal topics if some topics are missing public InitParameters enableSetupInternalTopicsIfIncomplete(); // specifies to setup repartition and changelog topics if some are missing public InitParameters disableSetupInternalTopicsIfIncomplete(); // specifies to throw if some but not all repartition or changelog topics are missing public boolean setupInternalTopicsIfIncompleteEnabled(); // getter } } |
...
Migrating from the current behavior to INTERNAL_TOPIC_SETUP
set to MANUAL_SETUP
can be done without any specific migration plan. Users need to set INTERNAL_TOPIC_SETUP
to MANUAL_SETUP
. and need to change their code to call KafkaStreams#init()
accordingly.
Rejected Alternatives
- Persist a flag for the first-ever rebalance broker side: This approach was rejected because that would imply changes on the brokers which we thought we can avoid and still get a good solution with the approach proposed in this KIP.
- Use committed offsets for a repartition topic to verify if a repartition topic existed: This approach would not work since committed offsets are removed when a topic is deleted.
...