Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The fundamental unit of work in Kafka Streams is the StreamTask, with a 1:1 mapping from task to partition (technically partition number as in the case of e.g. joins, one task may process two partitions from two different input topics). Because of this, the number of partitions of the user input topics represent an upper limit on the parallelism and ability to scale out. Currently this is effectively a permanent upper limit as Streams assumes the partition count is fixed, and will shut itself down if any changes are detected as the internal topics will no longer match the partitioning of the user topics. The only way to increase the parallelism in today's world is by stopping Streams and manually repartitioning not just the external user topics but also any internal topics (such as repartitions and changelogs). Besides being massively inconvenient and requiring a long pause in processing, the internal topics are generally to be managed by Streams itself and may not all be known to the user.

...

One of the main challenges to supporting repartitioning within Streams is the fact that partitioning generally (and by default) depends on the current number of partitions. Thus, any change to the partition count will result in keys being routed to a new/different partition number, effectively losing all the state it had built up so far. To ensure correctness of results in the face of changing partition count, there are two options: (a) manually reprocess all data in the application's history and migrate it to new topics, or (b) ensure that keys continue to be routed to the same partition number, regardless of changes to the total partition count. To keep things lightweight the autoscaling feature introduced by this KIP will not attempt to repartition historical data or state, which can be tackled in a future KIP if there is demand. Instead, this KIP intends to solve the partition scaling problem for the subset of applications that are able to tolerate partitions being added, such as those which are stateless or statically partitioned.

Public Interfaces

The only public facing changes in this KIP will be the introduction of new metrics and three new configs related to autoscaling. The first is a simple feature flag that users can turn on (or off) to control whether Streams will automatically react to partition expansions upstream by expanding the internal topics to match. When this is not enabled, if input topics have their partition count changed in a way that is incompatible with the current application state (as discussed above), Streams will log the details and send the INCOMPLETE_SOURCE_TOPIC_METADATA error code to all clients as is the case today.

...

The final change of note is the two new metrics. The num-autoscaling-failures is self-explanatory, incrementing each time the partition.autoscaling.timeout.ms is exceeded and Streams has to stop retrying. The subtopology-parallelism metric will reflect the current number of partitions/tasks, and thus the degree of parallelism, for each subtopology. Note: users should roll this up and take the maximum value across all clients, since only the group leader will process any expansions that occur and the new partitions will not necessarily be noticed by all clients/threads. The point of this metric is to provide visibility into the status of autoscaling and let users know when Streams has finished successfully expanding all internal topics, since they should wait for this before sending any records to the new input topic partitions.

Compatibility, Deprecation, and Migration Plan

Since this is a new feature there are no compatibility concerns or need for a migration plan. Note that the feature flag can be enabled & disabled again at will, without impact to operations.

Test Plan

Weplan to add a new system test to the Streams suite for this feature. This will be fairly straightforward and cover starting up a Streams app, changing the number of partitions for external topics (following the procedure laid out above), validating that the internal topics have expanded to match the new partition count, and finally verifying that the application has been producing correct/expected results. 

The application in question should cover multiple sub-topologies and include any operators that have special dependencies and/or effects related to partitioning, such as: a Repartition, a Join, and a ForeignKeyJoin. We might also want to cover applications built via the PAPI. 

Rejected Alternatives

  1. As mentioned briefly in the 'Motivation' section, an alternative to enforcing the static partitioning condition would be to implement some kind of manual data migration via historical reprocessing. While this would enable the feature to be used for all applications, it would have extreme downsides and make the feature itself far less useful by requiring long downtimes, as well as being incredibly resource heavy. Thus we feel it's more practical to introduce the best version of the feature by imposing a constraint: static partitioning.
  2. To prevent users from shooting themselves in the foot, this KIP chose not to use a feature flag in the initial proposal, and instead to expose the feature via a special static partitioner interface to discourage turning on the feature in applications that aren't compatible with autoscaling. However since this is an advanced feature anyways, we decided to trust advanced users to know what they are doing and allow full control over this feature.