You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Status

Current state: Under Discussion

Discussion thread: here 

JIRA: KAFKA-14318

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The fundamental unit of work in Kafka Streams is the StreamTask, with a 1:1 mapping from task to partition (technically partition number as in the case of e.g. joins, one task may process two partitions from two different input topics). Because of this, the number of partitions of the user input topics represent an upper limit on the parallelism and ability to scale out. Currently this is effectively a permanent upper limit as Streams assumes the partition count is fixed, and will shut itself down if any changes are detected as the internal topics will no longer match the partitioning of the user topics. The only way to increase the parallelism in today's world is by stopping Streams and manually repartitioning not just the external user topics but also any internal topics (such as repartitions and changelogs). Besides being massively inconvenient and requiring a long pause in processing, the internal topics are generally to be managed by Streams itself and may not all be known to the user.

Therefore, we intend to introduce automatic and live scaling out by expanding the partition count to match any increase in the partitions on the external user topics. Users can simply repartition the topics they manage, and Streams will detect this and scale itself to match without any downtime. 

One of the main challenges to supporting repartitioning within Streams is the fact that partitioning generally (and by default) depends on the current number of partitions. Thus, any change to the partition count will result in keys being routed to a new/different partition number, effectively losing all the state it had built up so far. To ensure correctness of results in the face of changing partition count, there are two options: (a) manually reprocess all data in the application's history and migrate it to new topics, or (b) ensure that keys continue to be routed to the same partition number, regardless of changes to the total partition count. In this KIP we choose to go with option b, and will support live autoscaling in Kafka Streams for applications with static partitioning.

Public Interfaces

Since enabling the new autoscaling feature will require static partitioning throughout the application to ensure correctness, we propose to expose this feature through such a partitioner. Specifically, a new high-level StreamsConfig will be introduced that accepts a class or class name, similar to existing class-based configs such as the rocksdb.config.setter:

StreamsConfig.java
public static final String STATIC_PARTITIONER_CLASS_CONFIG = "static.partitioner.class";
private static final String STATIC_PARTITIONER_CLASS_DOC = "Enable autoscaling of internal topic partitions by passing in a static partitioner class or class name that implements the <code>org.apache.kafka.streams.processor.StaticStreamPartitioner</code> interface. This partitioner will be enforced as the default partitioner throughout the application, and cannot be overridden by other <code>StreamPartitioner</code> implementations for specific operators.";

Autoscaling will be automatically enabled for any application that passes in a valid class which implements the new StaticStreamPartitioner interface:

org.apache.kafka.streams.processor.StaticStreamPartitioner.java
/**
 * A special implementation of the {@link StreamPartitioner} interface which enforces that keys are "statically
 * partitioned" ie sent to the same partition regardless of changes to the underlying topic partition count.
 * Implementing this and passing it in to the {@code static.partitioner.class} config will enable automatic
 * partition scaling for the Streams application.
 * 
 * Automatic partition scaling for Streams applications with static partitioning means that Streams will detect 
 * any changes (increases) to the external user topics' partition counts, and automatically adjust the partitions
 * of any internal topics managed by Streams. 
 * 
 * Besides the static partitioning constraint, to best utilize this feature we recommend that users perform any
 * partition expansions starting with the most downstream output topics, and working upstream from there to the 
 * input topics. This will help Streams to stabilize with the least amount of disruption and/or downtime.
 * 
 * It is also recommended to avoid sending any records with keys that are to go into the new partitions until after
 * Streams has fully processed the expansion. The {@link #onPartitionExpansion(int, int)} method can be implemented
 * as a callback to be notified when Streams has finished expanding any internal topics.
 */
public interface StaticStreamPartitioner<K, V> extends StreamPartitioner<K, V> {

    /**
     * Determine the partition number for a record with the given key and the current number of partitions.
     * The partition number must be static and deterministic by key, meaning that any and all future
     * records with the same key will always be placed into the same partition, regardless of a change
     * in the number of partitions. In other words, should the number of partitions increase, only new keys
     * may be routed to the new partitions; all keys that have been seen before should end up in their original
     * partitions. It is up to the user to enforce this.
     *
     * If the partitioning is not static, autoscaling of internal topics may cause incorrect results if keys
     * end up getting routed to different partitions when the {@code numPartitions} changes.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an integer between 0 and {@code numPartitions - 1}
     *
     * @throws TaskMigratedException if the partitioner requires more partitions/wants to send the given key to
     *                               a partition that doesn't exist yet/is higher than the upper limit of
     *                               {@code numPartitions-1}. This should only happen when the user failed to wait
     *                               for Streams to scale out/restabilize after a partition expansion, thus throwing
     *                               this exception tells Streams to rebalance and recheck the number of partitions
     *                               throughout the topology.
    int staticPartition(String topic, K key, V value, int numPartitions);

    /**
     * An optional callback for the user to take some action when Streams has detected and finished reacting to
     * a change in the number of partitions. This callback will be invoked once the application has re-stabilized
     * to a consistent number of partitions across the topology.
     *
     * Note: this will only be triggered once per rebalance/partition expansion, for the member which is the group 
     * leader for that rebalance. In other words, expect only a single StreamThreads in a single application instance
     * to invoke this callback.
     *
     * @param oldNumPartitions
     * @param newNumPartitions
     */
    default void onPartitionExpansion(int oldNumPartitions, int newNumPartitions) { }

    @Override
    default Integer partition(String topic, K key, V value, int numPartitions) {
        return staticPartition(topic, key, numPartitions);
    }

}

Note that we will not provide a default implementation of the new StaticStreamPartitioner interface for users to plug in, again to enforce they have consciously thought through the partitioning of their specific use case and are aware of how the autoscaling feature may impact their application if used in an inappropriate setting. Of course there is (unfortunately) nothing we can do to prevent users from consciously deciding to abuse this or just innocently plug in a poorly implemented static partitioner. There's only so much handholding we can do in practice, and most if not all features have some way for users to shoot themselves in the foot if they really try, so we won't do any explicit verification of the custom partitioners. Instead the focus will be on clear and detailed documentation for all new APIs so that users have the best chance at utilizing the feature correctly.

We will also provide a new callback that the user can implement in order to take some action and/or as a hook to signal when Streams has processed a partition expansion event.

Proposed Changes

As discussed above, the autoscaling feature can (and will) be enabled by passing in a partitioner class implementing the StaticStreamPartitioner interface. This will become the default partitioner used at each operator, and cannot be overridden – if a user attempts to pass in a custom partitioner anywhere else in the topology, we will log a warning and ignore it. 

The advised strategy for users to proceed with a partition expansion is described in the javadocs for the interface and summarized as follows: begin with the most downstream output topic and work your way upstream to the input topics. A hook via the StaticStreamPartitioner's #onPartitionExpansion callback can be used to detect when Streams has finished adjusting to a partition expansion event.

Of course, there may be errors, stalls, or other problems impacting the ability to scale out "in order" as outlined above. We should anticipate that for any given rebalance, there may be inconsistencies in the partitioning across the topology. Without the autoscaling feature, this would result in a full application shutdown. However with this new feature enabled, we will wait and retry according to the same timeout config used elsewhere, the ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG.  If the application topology still remains unevenly partitioned after this timeout has elapsed, a followup rebalance will be scheduled according to the Streams config probing.rebalance.interval.ms. No repartitioning of internal topics will be performed until all external user topics have finished being expanded and stabilized on the same partition count (excluding any discrepancies due to intentional differences via an explicit Repartition operation). The onPartitionExpansion callback will similarly be invoked only when the entire topology has stabilized on a new partition count.

Compatibility, Deprecation, and Migration Plan

Since this is a new feature there are no compatibility concerns or need for a migration plan. However, it should be noted that you should not change the partitioner for existing applications and so this feature will generally be limited to new applications only.

Test Plan

We plan to add a new system test to the Streams suite for this feature. This will be fairly straightforward and cover starting up a Streams app, changing the number of partitions for external topics (following the procedure laid out above), validating that the internal topics have expanded to match the new partition count, and finally verifying that the application has been producing correct/expected results. 

The application in question should cover multiple sub-topologies and include any operators that have special dependencies and/or effects related to partitioning, such as: a Repartition, a Join, and a ForeignKeyJoin.

Rejected Alternatives

  1. As mentioned briefly in the 'Motivation' section, an alternative to enforcing the static partitioning condition would be to implement some kind of manual data migration via historical reprocessing. While this would enable the feature to be used for all applications, it would have extreme downsides and make the feature itself far less useful by requiring long downtimes, as well as being incredibly resource heavy. Thus we feel it's more practical to introduce the best version of the feature by imposing a constraint: static partitioning.
  2. A second important possibility to consider here is whether to open up the feature to work with applications that are stateless, rather than requiring strict static partitioning. Technically, stateless applications would continue to produce correct results in the face of repartitioning like this, since they by definition do not care about the key's history. However, as mentioned during the KIP discussion, we worry about users getting into trouble should we allow this, if for example the application feeds into a stateful application downstream. Thus we are leaning towards enabling autoscaling for static partitioning cases only (though future versions could expand to include stateless apps as well).
  3. In the 'Proposed Changes' section, we described at a high level how Streams will react to and handle partition expansion events. One of the trickiest aspects of this feature will be getting the synchronization right: in other words, what does Streams do when it detects changes to the partition count of some topics but the expansion has not yet percolated throughout the topology?
    1. One option is to end the rebalance immediately and continue processing until the last (input) topic change is detected. However since we can only detect changes to the partition count of input topics, this wouldn't provide a sure-fire method of ensuring that the entire topology has caught up to the new number of partitions. Furthermore, due to cooperative rebalancing Kafka Streams actually does continue to process during a rebalance, so all StreamThreads except for the leader – who will be waiting for the partition expansion to complete inside the StreamPartitionAssignor's callback – will be able to remain processing existing tasks during this time. So ending the rebalance and triggering a new one will, we believe, be more of a hassle than any benefit it might provide.
    2. Another option is to wait for some constant or configured amount of time and then shut down the entire application, rather than retrying for a while and then ending the rebalance.
    3. We could introduce another new config for the wait/retry time within a rebalance, however we feel that the existing api timeout config is sufficient and given that it is used elsewhere in the rebalance/assignment logic and that exceeding this timeout will not result in a full shutdown but rather a new rebalance attempt, it is of less importance to ensure the partition expansion completes within this timeout
  4. API alternatives:
    1. One possibility was to expose this explicitly via an 'enable/disable' feature flag, rather than implicitly via the static.partitioner.class config. However because of the static partitioning constraint we felt it was best to tie the feature's operation directly to this constraint. In addition to helping Streams ensure the user is doing it, adding a config for the default partitioner makes it considerably easier for the user to implement static partitioning, as the alternative would be to pass in the partitioner all over the topology and hope not to miss anywhere it's needed (which can be difficult to figure out).
    2. Another option we considered was whether or not to enforce that only the default partitioner passed in via config be used, or to allow it to be overridden – though only by other static partitioners – for specific operators. Although there's no technical reason to require the same partitioner be used across the topology, we believe it to be unlikely for inconsistent partitioning to be very common and easier for users to reason about a single partitioner. Of course, the partitioner implementation may itself incorporate multiple different strategies for different topics and/or parts of the topology, thus it is still possible to achieve the same outcome if truly desired.
  • No labels