Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread] 

JIRA: KAFKA-14318

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The fundamental unit of work in Kafka Streams is the StreamTask, with a 1:1 mapping from task to partition (technically partition number as in the case of e.g. joins, one task may process two partitions from two different input topics). Because of this, the number of partitions of the user input topics represent an upper limit on the parallelism and ability to scale out. Currently this is effectively a permanent upper limit as Streams assumes the partition count is fixed, and will shut itself down if any changes are detected as the internal topics will no longer match the partitioning of the user topics. The only way to increase the parallelism in today's world is by stopping Streams and manually repartitioning not just the external user topics but also any internal topics (such as repartitions and changelogs). Besides being massively inconvenient and requiring a long pause in processing, the internal topics are generally to be managed by Streams itself and may not all be known to the user.

Therefore, we intend to introduce automatic and live scaling out by expanding the partition count to match any increase in the partitions on the external user topics. Users can simply repartition the topics they manage, and Streams will detect this and scale itself to match without any downtime. 

One of the main challenges to supporting repartitioning within Streams is the fact that partitioning generally (and by default) depends on the current number of partitions. Thus, any change to the partition count will result in keys being routed to a new/different partition number, effectively losing all the state it had built up so far. To ensure correctness of results in the face of changing partition count, there are two options: (a) manually reprocess all data in the application's history and migrate it to new topics, or (b) ensure that keys continue to be routed to the same partition number, regardless of changes to the total partition count. In this KIP we choose to go with option b, and will support live autoscaling in Kafka Streams for applications with static partitioning.

Public Interfaces

Since enabling the new autoscaling feature will require static partitioning throughout the application to ensure correctness, we propose to expose this feature through such a partitioner. Specifically, a new high-level StreamsConfig will be introduced that accepts a class or class name, similar to existing class-based configs such as the rocksdb.config.setter:

Code Block
languagejava
titleStreamsConfig.java
public static final String STATIC_PARTITIONER_CLASS_CONFIG = "static.partitioner.class";
private static final String STATIC_PARTITIONER_CLASS_DOC = "Enable autoscaling of internal topic partitions by passing in a static partitioner class or class name that implements the <code>org.apache.kafka.streams.processor.StaticStreamPartitioner</code> interface. This partitioner will be enforced as the default partitioner throughout the application, and cannot be overridden by other <code>StreamPartitioner</code> implementations for specific operators.";

Autoscaling will be automatically enabled for any application that passes in a valid class which implements the new StaticStreamPartitioner interface:


Code Block
languagejava
titleorg.apache.kafka.streams.processor.StaticStreamPartitioner.java
/**
 * A special implementation of the {@link StreamPartitioner} interface which enforces that keys are "statically
 * partitioned" ie sent to the same partition regardless of changes to the underlying topic partition count.
 * Implementing this and passing it in to the {@code static.partitioner.class} config will enable automatic
 * partition scaling for the Streams application.
 * 
 * Automatic partition scaling for Streams applications with static partitioning means that Streams will detect 
 * any changes (increases) to the external user topics' partition counts, and automatically adjust the partitions
 * of any internal topics managed by Streams. 
 * 
 * Besides the static partitioning constraint, to best utilize this feature we recommend that users perform any
 * partition expansions starting with the most downstream output topics, and working upstream from there to the 
 * input topics. This will help Streams to stabilize with the least amount of disruption and/or downtime.
 * 
 * It is also recommended to avoid sending any records with keys that are to go into the new partitions until after
 * Streams has fully processed the expansion. The {@link #partitionExpansion(int, int)} method can be implemented
 * as a callback to be notified when Streams has finished expanding any internal topics.
 */
public interface StaticStreamPartitioner<K, V> {

    /**
     * Determine the partition number for a record with the given key and the current number of partitions.
     * The partition number must be static and deterministic by key, meaning that any and all future
     * records with the same key will always be placed into the same partition, regardless of a change
     * in the number of partitions. In other words, should the number of partitions increase, only new keys
     * may be routed to the new partitions; all keys that have been seen before should end up in their original
     * partitions. It is up to the user to enforce this.
     *
     * If the partitioning is not static, autoscaling of internal topics may cause incorrect results if keys
     * end up getting routed to different partitions when the {@code numPartitions} changes.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param numPartitions the total number of partitions
     * @return an integer between 0 and {@code numPartitions - 1}
     *
     * @throws TaskMigratedException if the partitioner requires more partitions/wants to send the given key to
     *                               a partition that doesn't exist yet/is higher than the upper limit of
     *                               {@code numPartitions-1}. This should only happen when the user failed to wait
     *                               for Streams to scale out/restabilize after a partition expansion, thus throwing
     *                               this exception tells Streams to rebalance and recheck the number of partitions
     *                               throughout the topology.
    int staticPartition(String topic, K key, int numPartitions);

    /**
     * An optional callback for the user to take some action when Streams has detected and finished reacting to
     * a change in the number of partitions. This callback will be invoked once the application has re-stabilized
     * to a consistent number of partitions across the topology.
     *
     * Note: this will only be triggered once per rebalance/partition expansion, for the member which is the group 
     * leader for that rebalance. In other words, expect only a single StreamThreads in a single application instance
     * to invoke this callback.
     *
     * @param oldNumPartitions
     * @param newNumPartitions
     */
    default void onPartitionExpansion(int oldNumPartitions, int newNumPartitions) { }

    @Override
    default Integer partition(String topic, K key, V value, int numPartitions) {
        return staticPartition(topic, key, numPartitions);
    }

}

Note that the new StaticStreamPartitioner interface differs from the regular StreamPartitioner interface in that only the key, rather than the key and value, are passed in to the partitioner. This is to help enforce static partitioning, which should be deterministic for a given key and not depend on the record values.

We will also provide a new callback that the user can implement in order to take some action and/or as a hook to signal when Streams has processed a partition expansion event.

Proposed Changes

As discussed above, the autoscaling feature can (and will) be enabled by passing in a partitioner class implementing the StaticStreamPartitioner interface. This will become the default partitioner used at each operator, and cannot be overridden – if a user attempts to pass in a custom partitioner anywhere else in the topology, we will log a warning and ignore it. 

...