Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka Streams uses repartition topics to repartition the data when a key-based operation, e.g., aggregation or join, follows a key-changing operation, e.g., map. Additionally, Kafka Streams uses changelog topics to replicate the data in its state store for fault-tolerance. If any required repartition topic or changelog topic does not exist, it is created during a rebalance. More precisely these internal topics are created during the computation of the assignment. Consequently, if one of the internal topics is deleted between rebalances, it will be silently recreated as an empty topic . If the deletion of the internal topics was a mistake or – even worse – a malicious act, and data might be lost. Kafka Streams users would either not notice it the loss at all or notice it too late to to limit the damage by stopping processing.        

...

. Deletion of internal topics may happen by mistake or – even worse – as a malicious act. This problem could be avoided by creating the internal topic only once during the first-ever rebalance of the application (or after an application reset). However, determining the first-ever rebalance of an application is not always straightforward. For example, if all Kafka Streams clients of a Kafka Streams application are stopped, an internal topic is deleted, and then the Kafka Streams application is restarted, the deleted topic would be silently recreated as an empty topic because we do not have any information that we could leverage to recognize that the first rebalance after restarting is not the first-ever rebalance of the application.

We propose to move the creation of the internal topics to an initialization method that users can call before they start their application for the first time. That way, users have full control over when the internal topics are created and Kafka Streams can reliably notify the users about possible data loss by throwing an exception when internal topics do unexpectedly not exist. Additionally, we propose to add a configuration that allows to turn automatic creation of internal topics on or off. The configuration is needed for backward compatibility but also to keep the first steps with Kafka Streams simple for new users.

Public Interfaces

We propose to add a static initialization method to the KafkaStreams class.

Code Block
languagejava
public class KafkaStreams {

	public static void init(final Topology topology,
                            final Properties props);

}

Method init() uses the topology and the application properties to set up the broker-side state, i.e., the internal topics, for a Kafka Streams application.


We propose to add a new configuration to Kafka Streams to determine whether the initialization should be done automatically during a rebalance or by the user calling KafkaStreams.init().

Code Block
languagejava
public class StreamsConfig {

 
	public static final String AUTOMATIC_INITIALIZATION = "automatic";
	public static final String USER_INITIALIZATION = "user";

    public static final String APPLICATION_INITIALIZATION = "application.initialization";
 
}


We propose to add a new exception that helps to discriminate errors originating from missing internal topics from other errors in the uncaught exception handler. For example, reacting on a missing source topic (i.e., MissingSourceTopicException) might be different from reacting on missing internal topics, because the source topics might be owned by a different team than the internal topics, consequently different people need to be paged.

Code Block
languagejava
package org.apache.kafka.streams.errors;

public class MissingInternalTopicException extends StreamsException {
}


Proposed Changes



Compatibility, Deprecation, and Migration Plan

...