Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: https://lists.apache.org/x/thread.html/rf65435729171c40a001a394e9f6170e7a2c24b6c9424a346954cd0e1@%3Cdev.kafka.apache.org%3E
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
We propose to move the creation of the internal topics to an initialization method that users can call before they start their application for the first time. That way, users have full control over when the internal topics are created and Kafka Streams can reliably notify the users about possible data loss by throwing an exception when internal topics do unexpectedly not exist or are misconfigured. Additionally, we propose to add a configuration that allows to turn automatic creation of internal topics on or off. The configuration is needed for backward compatibility but also to keep the first steps with Kafka Streams simple for new users.
...
We propose to add two new exceptions – namely MissingInternalTopicsException
and MisconfiguredInternalTopicException
. MissingInternalTopicsException
is thrown when a required internal topic does not exist and MisconfiguredInternalTopicException
is thrown when the internal topic is not configured as expected. In general, misconfigurations are configurations that differ from default values specified by Kafka Streams or from configurations specified in user code where configurations in user code are considered before Streams' default values. Those misconfigurations typically occur if users change configurations directly on the brokers. For example, a changelog topic for a non-windowed state store is regarded as misconfigured if its retention period is not set to unlimited. Another example for a misconfigured internal topic is a repartition topic with the wrong number of partitions. In future, we might discover other misconfigurations that are critical to data loss. In such a case, Kafka Streams will also throw a MisconfiguredInternalTopicException
without the need of a new KIP. The exception will contain a detailed explanation of what is wrong and how to fix it. The intended recipient of these exceptions is the operator, not software.
These exceptions MisconfiguredInternalTopicException
– that help to discriminate errors originating from missing or misconfigured internal topics from other errors in the uncaught exception handler. For example, reacting on a missing source topic (i.e., MissingSourceTopicException
) might be different from reacting on a missing or misconfigured internal topic, because the process for re-creating source topics might differ from the process for re-creating internal topics. Furthermore, source topics might be owned by a different team than the internal topics, consequently different people need to be paged. Exceptions Exceptions MissingInternalTopicsException
and MisconfiguredInternalTopicException
will be thrown during explicit initialization and during a rebalance when the internal topics are verified. Both exceptions are fatal.
Additionally, we propose to add an exception InternalTopicsAlreadySetupException
that is thrown if users attempt to intialize an already initialized application. Exception InternalTopicsAlreadySetupException
will be only thrown during explicit initialization. Such a behavior will ensure that users explicitly initialize the application only when:
...
If configuration INTERNAL_TOPIC_SETUP
is set to AUTOMATIC_SETUP,
the group leader will set up the internal topics during a rebalance. If internal topics were deleted between rebalances, the group leader will create the deleted internal topics during the rebalance. That corresponds to the current behavior of Kafka Streams. Users can also call KafkaStreams#init()
to set up the internal topics when INTERNAL_TOPIC_SETUP
is set to AUTOMATIC_SETUP
, but the call is not necessary since the internal topics would be created anyways during the next rebalance. Additionally, misconfigurations will be automatically rectified if possible and/or logged.
If configuration INTERNAL_TOPIC_SETUP
is set to MANUAL_SETUP
, the group leader will not set up internal topics during a rebalance but users need to call KafkaStreams#init()
to set up the internal topics. If the internal topics do not exist during a rebalance because KafkaStreams#init()
has not been called or one or more internal topics have been deleted, a MissingInternalTopicsException
is thrown in each Kafka Streams client. If during a rebalance the group leader realizes that an internal topic is misconfigured, a MisconfiguredInternalTopicException
is thrown in each Kafka Streams client.
...