Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The only public facing changes in this KIP will be the introduction of two new metrics and three new configs related to autoscaling. The first config is a simple feature flag that users can turn on (or off) to control whether Streams will automatically react to partition expansions upstream by expanding the internal topics to match. When this is not enabled, if input topics have their partition count changed in a way that is incompatible with the current application state (as discussed above), Streams will log the details and send the INCOMPLETE_SOURCE_TOPIC_METADATA error code to all clients as is the case today.

Since one of the main use cases this KIP is targeting revolves around static partitioning, we want to make sure it's as easy as possible for users to implement this and use the feature correctly. To that end we will also introduce a new config that allows you to set a default StreamPartitioner class for the application, rather than requiring one be passed in at every sink node which is error prone and frankly just annoying. The default will be applied across the topology but may still be overridden by other custom partitioners at specific nodes, if so desired.

Finally, In addition, there will be a new timeout config added for users to bound the maximum time that Streams will spend attempting to retry partition autoscaling, in case of failures or other edge cases. This is discussed in more detail below.

Code Block
languagejava
titleStreamsConfig.java
// default to false, priority LOW
public static final String PARTITION_AUTOSCALING_ENABLED_CONFIG = "partition.autoscaling.enabled";
private static final String PARTITION_AUTOSCALING_ENABLED_DOC = "Enable autoscaling the partitions of internal topics which are managed by Streams. If an internal topic's partition count depends on an upstream input topic (or topics), then expanding the number of partitions on the input topic(s) will result in the internal topic(s) automatically being expanded to match.";

// default to 15 minutes, priority LOW
public static final String PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG = "partition.autoscaling.timeout.ms";
private static final String PARTITION_AUTOSCALING_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds that Streams will attempt to retry autoscaling of internal topic partitions.";

// default to null/none, priority LOW
public static final String DEFAULT_STREAM_PARTITIONER_CLASS_CONFIG = "default.stream.partitioner.class";
private static final String DEFAULT_STREAM_PARTITIONER_CLASS_DOC = "Default partitioning class that implements the <code>org.apache.kafka.streams.processor.StreamPartitioner</code> interface. Will be used for all sink nodes unless overridden by another custom partitioner";

We will also provide two new INFO metrics as a means of monitoring the status and completion of autoscaling events. The first will be a subtopology-level counter for the number of partitions (and thus tasks/parallelism):

...

  1. As mentioned briefly in the 'Motivation' section, an alternative to enforcing the static partitioning condition would be to implement some kind of manual data migration via historical reprocessing. While this would enable the feature to be used for all applications, it would have extreme downsides and make the feature itself far less useful by requiring long downtimes, as well as being incredibly resource heavy. Thus we feel it's more practical to introduce the best version of the feature by imposing a constraint: static partitioning.
  2. To prevent users from shooting themselves in the foot, this KIP chose not to use a feature flag in the initial proposal, and instead to expose the feature via a special static partitioner interface to discourage turning on the feature in applications that aren't compatible with autoscaling. However since this is an advanced feature anyways, we decided to trust advanced users to know what they are doing and allow full control over this feature.
  3. Considered including a new config for a default partitioner, but this turned out to be non-trivial and was extracted for a followup PR