Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here 

JIRA: KAFKA-14318

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The fundamental unit of work in Kafka Streams is the StreamTask, with a 1:1 mapping from task to partition (technically partition number as in the case of e.g. joins, one task may process two partitions from two different input topics). Because of this, the number of partitions of the user input topics represent an upper limit on the parallelism and ability to scale out. Currently this is effectively a permanent upper limit as Streams assumes the partition count is fixed, and will shut itself down if any changes are detected as the internal topics will no longer match the partitioning of the user topics. The only way to increase the parallelism in today's world is by stopping Streams and manually repartitioning not just the external user topics but also any internal topics (such as repartitions and changelogs). Besides being massively inconvenient and requiring a long pause in processing, the internal topics are generally to be managed by Streams itself and may not all be known to the user.

...

One of the main challenges to supporting repartitioning within Streams is the fact that partitioning generally (and by default) depends on the current number of partitions. Thus, any change to the partition count will result in keys being routed to a new/different partition number, effectively losing all the state it had built up so far. To ensure correctness of results in the face of changing partition count, there are two options: (a) manually reprocess all data in the application's history and migrate it to new topics, or (b) ensure that keys continue to be routed to the same partition number, regardless of changes to the total partition count. In To keep things lightweight the autoscaling feature introduced by this KIP we choose to go with option b, and will support live autoscaling in Kafka Streams for applications with static partitioningwill not attempt to repartition historical data or state, which can be tackled in a future KIP if there is demand. Instead, this KIP intends to solve the partition scaling problem for the subset of applications that are able to tolerate partitions being added, such as those which are stateless or statically partitioned.

Public Interfaces

The only public facing changes in this KIP will be the introduction of two new metrics and configs related to autoscaling. The first config is a simple feature flag that users can turn on (or off) to control whether Streams will automatically react to partition expansions upstream by expanding the internal topics to match. When this is not enabled, if input topics have their partition count changed in a way that is incompatible with the current application state (as discussed above), Streams will log the details and send the INCOMPLETE_SOURCE_TOPIC_METADATA error code to all clients as is the case today.

In addition, there will be a new timeout config added for users to bound the maximum time that Streams will spend attempting to retry partition autoscaling, in case of failures or other edge cases. This is discussed in more detail below.Since enabling the new autoscaling feature will require static partitioning throughout the application to ensure correctness, we propose to expose this feature through such a partitioner. Specifically, a new high-level StreamsConfig will be introduced that accepts a class or class name, similar to existing class-based configs such as the rocksdb.config.setter:

Code Block
languagejava
titleStreamsConfig.java
// default to false, priority LOW
public static final String STATICPARTITION_PARTITIONERAUTOSCALING_CLASSENABLED_CONFIG = "staticpartition.partitionerautoscaling.classenabled";
private static final String STATICPARTITION_PARTITIONERAUTOSCALING_CLASSENABLED_DOC = "Enable autoscaling of internal topicthe partitions byof passinginternal intopics awhich staticare partitionermanaged classby orStreams. classIf namean thatinternal implementstopic's the <code>org.apache.kafka.streams.processor.StaticStreamPartitioner</code> interface. This partitioner will be enforced as the default partitioner throughout the application, and cannot be overridden by other <code>StreamPartitioner</code> implementations for specific operators.";

Autoscaling will be automatically enabled for any application that passes in a valid class which implements the new StaticStreamPartitioner interface:

Code Block
languagejava
titleorg.apache.kafka.streams.processor.StaticStreamPartitioner.java
/**
 * A special implementation of the {@link StreamPartitioner} interface which enforces that keys are "statically
 * partitioned" ie sent to the same partition regardless of changes to the underlying topic partition count.
 * Implementing this and passing it in to the {@code static.partitioner.class} config will enable automatic
 * partition scaling for the Streams application.
 * 
 * Automatic partition scaling for Streams applications with static partitioning means that Streams will detect 
 * any changes (increases) to the external user topics' partition counts, and automatically adjust the partitions
 * of any internal topics managed by Streams. 
 * 
 * Besides the static partitioning constraint, to best utilize this feature we recommend that users perform any
 * partition expansions starting with the most downstream output topics, and working upstream from there to the 
 * input topics. This will help Streams to stabilize with the least amount of disruption and/or downtime.
 * 
 * It is also recommended to avoid sending any records with keys that are to go into the new partitions until after
 * Streams has fully processed the expansion. The {@link #onPartitionExpansion(int, int)} method can be implemented
 * as a callback to be notified when Streams has finished expanding any internal topics.
 */
public interface StaticStreamPartitioner<K, V> {

    /**
     * Determine the partition number for a record with the given key and the current number of partitions.
     * The partition number must be static and deterministic by key, meaning that any and all future
     * records with the same key will always be placed into the same partition, regardless of a change
     * in the number of partitions. In other words, should the number of partitions increase, only new keys
     * may be routed to the new partitions; all keys that have been seen before should end up in their original
     * partitions. It is up to the user to enforce this.
     *
     * If the partitioning is not static, autoscaling of internal topics may cause incorrect results if keys
     * end up getting routed to different partitions when the {@code numPartitions} changes.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an integer between 0 and {@code numPartitions - 1}
     *
     * @throws TaskMigratedException if the partitioner requires more partitions/wants to send the given key to
     *                               a partition that doesn't exist yet/is higher than the upper limit of
     *                               {@code numPartitions-1}. This should only happen when the user failed to wait
     *                               for Streams to scale out/restabilize after a partition expansion, thus throwing
     *                               this exception tells Streams to rebalance and recheck the number of partitions
     *                               throughout the topology.
    int staticPartition(String topic, K key, int numPartitions);

    /**
     * An optional callback for the user to take some action when Streams has detected and finished reacting to
     * a change in the number of partitions. This callback will be invoked once the application has re-stabilized
     * to a consistent number of partitions across the topology.
     *
     * Note: this will only be triggered once per rebalance/partition expansion, for the member which is the group 
     * leader for that rebalance. In other words, expect only a single StreamThreads in a single application instance
     * to invoke this callback.
     *
     * @param oldNumPartitions
     * @param newNumPartitions
     */
    default void onPartitionExpansion(int oldNumPartitions, int newNumPartitions) { }

    @Override
    default Integer partition(String topic, K key, V value, int numPartitions) {
        return staticPartition(topic, key, numPartitions);
    }

}

Note that the new StaticStreamPartitioner interface differs from the regular StreamPartitioner interface in that only the key, rather than the key and value, are passed in to the partitioner. This is to help enforce static partitioning, which should be deterministic for a given key and not depend on the record values.

We will also provide a new callback that the user can implement in order to take some action and/or as a hook to signal when Streams has processed a partition expansion event.

Proposed Changes

As discussed above, the autoscaling feature can (and will) be enabled by passing in a partitioner class implementing the StaticStreamPartitioner interface. This will become the default partitioner used at each operator, and cannot be overridden – if a user attempts to pass in a custom partitioner anywhere else in the topology, we will log a warning and ignore it. 

The advised strategy for users to proceed with a partition expansion is described in the javadocs for the interface and summarized as follows: begin with the most downstream output topic and work your way upstream to the input topics. A hook via the StaticStreamPartitioner's #onPartitionExpansion callback can be used to detect when Streams has finished adjusting to a partition expansion event.

partition count depends on an upstream input topic (or topics), then expanding the number of partitions on the input topic(s) will result in the internal topic(s) automatically being expanded to match.";

// default to 15 minutes, priority LOW
public static final String PARTITION_AUTOSCALING_TIMEOUT_MS_CONFIG = "partition.autoscaling.timeout.ms";
private static final String PARTITION_AUTOSCALING_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds that Streams will attempt to retry autoscaling of internal topic partitions.";

We will also provide three new INFO metrics as a means of monitoring the status and completion of autoscaling events. The first two will be subtopology-level counters for the current and expected/target number of partitions (and thus tasks/parallelism):

current-subtopology-parallelism  

expected-subtopology-parallelism  (only reported when feature flag is turned on)

  • type = stream-subtopology-metrics
  • subtopology-id=[subtopologyId] (also called groupId sometimes)

The other metric will track failures where Streams has given up on retrying at the client level. This can be monitored to send an alert when Streams is struggling to successfully add partitions, for example due to a quota violation, and indicates an issue that should be investigated further. 

num-autoscaling-failures (only be reported when the feature flag is turned on.)

  • type = stream-client-metrics
  • client-id=[clientId]

Proposed Changes

As discussed above, the autoscaling feature can be enabled for any kind of application and allow Streams to handle partition expansion of its input topics. When upstream topics change in partition count, this can/will change the expected number of partitions for downstream repartition topics and changelogs. Normally upon startup, these internal topics are created during the first rebalance: during (any) rebalance, Streams will process the topology to resolve the intended number of partitions for all topics and create them topics if necessary. If those topics already exist, we validate their partition count instead. With this KIP, when discrepancies are found during the validation step, we will branch the handling logic on the feature flag: when enabled, Streams will now attempt to "fix" any such discrepancies by requesting the internal topics be expanded to match the intended number of partitions. Note that we will still trigger an error if internal topics are found to be over-partitioned, that is, because you can only increase the partition count of a topic and not decrease, finding internal topics with more than the expected number indicates a user error/misconfiguration and is not something Streams can fix.

We can rely on the fact that a change in the partitions of any topic the StreamThread is subscribed to is guaranteed to trigger a rebalance – this will occur when the metadata refreshes, so we should begin autoscaling within the metadata.max.age.ms of a partition change. In the case of source nodes with more than one source topic, multiple rebalances may be triggered if these topics are expanded out of sync with enough time between them. In the case of multiple source topics, Streams will begin autoscaling internal topics when the first source topic is expanded.

Of course, it's always possible for something to go awry and cause the topic scaling to fail: request timeouts, internal errors, and quota violations should all be anticipated. To handle such failures, Streams will automatically retry until the operation is successful or it reaches the maximum allowed retry time. This will be configurable for users via the new partition.autoscaling.timeout.ms config, which will start counting after the first failure (rather than when the autoscaling attempt began). The timeout will be reset after any success, so in the case of interleaving success and failure we would still only time out if there is no progress at all over the full timeout that's configured.

If the Admin#createPartitions call does fail, Streams will schedule a followup rebalance to re-attempt the autoscaling. This is because the API is not atomic and may succeed for some topics but not others, so we want to refresh the cluster metadata and refine the set of topics to expand when retrying. If the timeout has been exceeded, a warning will be logged instead of trying to create partitions and Streams will continue on. Note that if successful, we will need to trigger a final rebalance after all internal topics have been expanded in order to assign those partitions from the updated cluster metadata. The final followup will be scheduled for 10 seconds after the successful request, as the javadocs for Admin#createPartitions call out that

It may take several seconds after this method returns success for all the brokers to become aware that the partitions have been created.

The final change of note is the two new metrics. The num-autoscaling-failures is self-explanatory, incrementing each time the partition.autoscaling.timeout.ms is exceeded and Streams has to stop retrying. The subtopology-parallelism metric will reflect the current number of partitions/tasks, and thus the degree of parallelism, for each subtopology. Note: users should roll this up and take the maximum value across all clients, since only the group leader will process any expansions that occur and the new partitions will not necessarily be noticed by all clients/threads. The point of this metric is to provide visibility into the status of autoscaling and let users know when Streams has finished successfully expanding all internal topics, since they should wait for this before sending any records to the new input topic partitionsOf course, there may be errors, stalls, or other problems impacting the ability to scale out "in order" as outlined above. We should anticipate that for any given rebalance, there may be inconsistencies in the partitioning across the topology. Without the autoscaling feature, this would result in a full application shutdown. However with this new feature enabled, we will wait and retry according to the same timeout config used elsewhere, the ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG.  If the application topology still remains unevenly partitioned after this timeout has elapsed, a followup rebalance will be scheduled according to the Streams config probing.rebalance.interval.ms. No repartitioning of internal topics will be performed until all external user topics have finished being expanded and stabilized on the same partition count (excluding any discrepancies due to intentional differences via an explicit Repartition operation). The onPartitionExpansion callback will similarly be invoked only when the entire topology has stabilized on a new partition count.

Compatibility, Deprecation, and Migration Plan

Since this is a new feature there are no compatibility concerns or need for a migration plan. However, it should be noted that you should not change the partitioner for existing applications and so this feature will generally be limited to new applications onlyNote that the feature flag can be enabled & disabled again at will, without impact to operations.

Test Plan

Weplan to add a new system test to the Streams suite for this feature. This will be fairly straightforward and cover starting up a Streams app, changing the number of partitions for external topics (following the procedure laid out above), validating that the internal topics have expanded to match the new partition count, and finally verifying that the application has been producing correct/expected results. 

The application in question should cover multiple sub-topologies and include any operators that have special dependencies and/or effects related to partitioning, such as: a Repartition, a Join, and a ForeignKeyJoin. We might also want to cover applications built via the PAPI. 

Rejected Alternatives

  1. As mentioned briefly in the 'Motivation' section, an alternative to enforcing the static partitioning condition would be to implement some kind of manual data migration via historical reprocessing. While this would enable the feature to be used for all applications, it would have extreme downsides and make the feature itself far less useful by requiring long downtimes, as well as being incredibly resource heavy. Thus we feel it's more practical to introduce the best version of the feature by imposing a constraint: static partitioning.
  2. In the 'Proposed Changes' section, we described at a high level how Streams will react to and handle partition expansion events. One of the trickiest aspects of this feature will be getting the synchronization right: in other words, what does Streams do when it detects changes to the partition count of some topics but the expansion has not yet percolated throughout the topology?
    1. One option is to end the rebalance immediately and continue processing until the last (input) topic change is detected. However since we can only detect changes to the partition count of input topics, this wouldn't provide a sure-fire method of ensuring that the entire topology has caught up to the new number of partitions. Furthermore, due to cooperative rebalancing Kafka Streams actually does continue to process during a rebalance, so all StreamThreads except for the leader – who will be waiting for the partition expansion to complete inside the StreamPartitionAssignor's callback – will be able to remain processing existing tasks during this time. So ending the rebalance and triggering a new one will, we believe, be more of a hassle than any benefit it might provide.
    2. Another option is to wait for some constant or configured amount of time and then shut down the entire application, rather than retrying for a while and then ending the rebalance.
    3. We could introduce another new config for the wait/retry time within a rebalance, however we feel that the existing api timeout config is sufficient and given that it is used elsewhere in the rebalance/assignment logic and that exceeding this timeout will not result in a full shutdown but rather a new rebalance attempt, it is of less importance to ensure the partition expansion completes within this timeout
  3. To prevent users from shooting themselves in the foot, this KIP chose not to use a feature flag in the initial proposal, and instead to expose the feature via a special static partitioner interface to discourage turning on the feature in applications that aren't compatible with autoscaling. However since this is an advanced feature anyways, we decided to trust advanced users to know what they are doing and allow full control over this feature.
  4. Considered including a new config for a default partitioner, but this turned out to be non-trivial and was extracted for a followup PRAPI alternatives:
  5. One possibility was to expose this explicitly via an 'enable/disable' feature flag, rather than implicitly via the static.partitioner.class config. However because of the static partitioning constraint we felt it was best to tie the feature's operation directly to this constraint. In addition to helping Streams ensure the user is doing it, adding a config for the default partitioner makes it considerably easier for the user to implement static partitioning, as the alternative would be to pass in the partitioner all over the topology and hope not to miss anywhere it's needed (which can be difficult to figure out).
  6. Another option we considered was whether or not to enforce that only the default partitioner passed in via config be used, or to allow it to be overridden – though only by other static partitioners – for specific operators. Although there's no technical reason to require the same partitioner be used across the topology, we believe it to be unlikely for inconsistent partitioning to be very common and easier for users to reason about a single partitioner. Of course, the partitioner implementation may itself incorporate multiple different strategies for different topics and/or parts of the topology, thus it is still possible to achieve the same outcome if truly desired.