Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStreamsConfig.java
// default to false, priority LOW
public static final String PARTITION_AUTOSCALING_ENABLED_CONFIG = "partition.autoscaling.enabled";
private static final String PARTITION_AUTOSCALING_ENABLED_DOC = "Enable autoscaling the partitions of internal topics which are managed by Streams. If an internal topic's partition count depends on an upstream input topic (or topics), then expanding the number of partitions on the input topic(s) will result in the internal topic(s) automatically being expanded to match.";

// default to 15 minutes, priority LOW
public static final String PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG = "partition.autoscaling.timeout.ms";
private static final String PARTITION_AUTOSCALING_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds that Streams will attempt to retry autoscaling of internal topic partitions.";

We will also provide two three new INFO metrics as a means of monitoring the status and completion of autoscaling events. The first two will be a subtopology-level counter counters for the current and expected/target number of partitions (and thus tasks/parallelism):

current-subtopology-parallelism  

expected-subtopology-parallelism  (only reported when feature flag is turned on)

  • type = stream-subtopology-metrics
  • subtopology-id=[subtopologyId] (also called groupId sometimes)

The other metric will track failures where Streams has given up on retrying at the client level. This can be monitored to send an alert when Streams is struggling to successfully add partitions, for example due to a quota violation, and indicates an issue that should be investigated further. 

num-autoscaling-failures ( one will only be reported when the feature flag is turned on.)

num-autoscaling-failures 

  • type = stream-client-metrics
  • client-id=[clientId]

...

Of course, it's always possible for something to go awry and cause the topic scaling to fail: request timeouts, internal errors, and quota violations should all be anticipated. To handle such failures, Streams will automatically retry until the operation is successful or it reaches the maximum allowed retry time. This will be configurable for users via the new partition.autoscaling.timeout.ms config, which will start counting after the first failure (rather than when the autoscaling attempt began). The timeout will be reset after any success, so in the case of interleaving success and failure we would still only time out if there is no progress at all over the full timeout that's configured.

If the Admin#createPartitions call does fail, Streams will schedule a followup rebalance to re-attempt the autoscaling. This is because the API is not atomic and may succeed for some topics but not others, so we want to refresh the cluster metadata and refine the set of topics to expand when retrying. If the timeout has been exceeded, a warning will be logged instead of trying to create partitions and Streams will continue on. Note that if successful, we will need to trigger a final rebalance after all internal topics have been expanded in order to assign those partitions from the updated cluster metadata. The final followup will be scheduled for 10 seconds after the successful request, as the javadocs for Admin#createPartitions call out that

...