Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose to add two new exceptions MissingInternalTopicsException and MisconfiguredInternalTopicExceptionMissingInternalTopicsException is thrown when a required internal topic does not exist and MisconfiguredInternalTopicException is thrown when the internal topic is not configured as expected. In general, misconfigurations are configurations that differ from the default values specified by Kafka Streams or from configurations specified in user code. Those misconfigurations typically occur if users change configurations directly on the brokers. For example, a changelog topic for a non-windowed state store is regarded as misconfigured if its retention period is not set to unlimited. Another example for a misconfigured internal topic is a repartition topic with the wrong number of partitions. In future, we might discover other configurations that are critical to data loss. In such a case, Kafka Streams will also throw a MisconfiguredInternalTopicException without the need of a new KIP. The exception will contain a detailed explanation of what is wrong and how to fix it. The intended recipient of these exceptions is the operator, not software.

These exceptions help to discriminate errors originating from missing or misconfigured internal topics from other errors in the uncaught exception handler. For example, reacting on a missing source topic (i.e., MissingSourceTopicException) might be different from reacting on a missing or misconfigured internal topic, because the process for re-creating source topics might differ from the process for re-creating internal topics. Furthermore, source topics might be owned by a different team than the internal topics, consequently different people need to be paged. Exceptions MissingInternalTopicsException and MisconfiguredInternalTopicException will be thrown during explicit initialization and during a rebalance when the internal topics are verified. Both exceptions are fatal. 

...

If configuration INTERNAL_TOPIC_SETUP is set to AUTOMATIC_SETUP, the group leader will set up the internal topics during a rebalance. If internal topics were deleted between rebalances, the group leader will create the deleted internal topics during the rebalance. That corresponds to the current behavior of Kafka Streams. Users can also call KafkaStreams#init() to set up the internal topics when INTERNAL_TOPIC_SETUP is set to AUTOMATIC_SETUP, but the call is not necessary since the internal topics would be created anyways during the next rebalance. Additionally, misconfigurations will be automatically rectified if possible and/or logged.

If configuration INTERNAL_TOPIC_SETUP is set to MANUAL_SETUP, the group leader will not set up internal topics during a rebalance but users need to call KafkaStreams#init() to set up the internal topics. If the internal topics do not exist during a rebalance because KafkaStreams#init() has not been called or one or more internal topics have been deleted, a MissingInternalTopicsException is thrown in each Kafka Streams client. If during a rebalance the group leader realizes that an internal topic is misconfigured, a MisconfiguredInternalTopicException is thrown in each Kafka Streams client.

...