Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStreamsConfig.java
// default to false, priority LOW
public static final String PARTITION_AUTOSCALING_ENABLED_CONFIG = "partition.autoscaling.enabled";
private static final String PARTITION_AUTOSCALING_ENABLED_DOC = "Enable autoscaling the partitions of internal topics which are managed by Streams. If an internal topic's partition count depends on an upstream input topic (or topics), then expanding the number of partitions on the input topic(s) will result in the internal topic(s) automatically being expanded to match.";

// default to 15 minutes, priority LOW
public static final String PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG = "partition.autoscaling.timeout.ms";
private static final String PARTITION_AUTOSCALING_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds that Streams will attempt to retry autoscaling of internal topic partitions.";

// default to null/none, priority LOW
public static final String DEFAULT_STREAMSSTREAM_PARTITIONER_CLASS_CONFIG = "default.streamsstream.partitioner.class";
private static final String DEFAULT_STREAMSSTREAM_PARTITIONER_CLASS_DOC = "Default partitioning class that implements the <code>org.apache.kafka.streams.processor.StreamPartitioner</code> interface. Will be used for all sink nodes unless overridden by another custom partitioner";

...

subtopology-parallelism  

  • type = stream-subtopology-metrics
  • subtopology-id=[subtopologyId] (also called groupId sometimes)

The other metric will track failures where Streams has given up on retrying at the client level. This one will only be reported when the feature flag is turned on.

num-autoscaling-failures 

  • type = stream-client-metrics
  • client-id=[clientId]

Proposed Changes

As discussed above, the autoscaling feature can be enabled for any kind of application and allow Streams to handle partition expansion of its input topics. When upstream topics change in partition count, this can/will change the expected number of partitions for downstream repartition topics and changelogs. Normally upon startup, these internal topics are created during the first rebalance: during (any) rebalance, Streams will process the topology to resolve the intended number of partitions for all topics and create them topics if necessary. If those topics already exist, we validate their partition count instead. With this KIP, when discrepancies are found during the validation step, we will branch the handling logic on the feature flag: when enabled, Streams will now attempt to "fix" any such discrepancies by requesting the internal topics be expanded to match the intended number of partitions. Note that we will still trigger an error if internal topics are found to be over-partitioned, that is, because you can only increase the partition count of a topic and not decrease, finding internal topics with more than the expected number indicates a user error/misconfiguration and is not something Streams can fix.

...