Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleorg.apache.kafka.streams.processor.StaticStreamPartitioner.java
/**
 * A special implementation of the {@link StreamPartitioner} interface which enforces that keys are "statically
 * partitioned" ie sent to the same partition regardless of changes to the underlying topic partition count.
 * Implementing this and passing it in to the {@code static.partitioner.class} config will enable automatic
 * partition scaling for the Streams application.
 * 
 * Automatic partition scaling for Streams applications with static partitioning means that Streams will detect 
 * any changes (increases) to the external user topics' partition counts, and automatically adjust the partitions
 * of any internal topics managed by Streams. 
 * 
 * Besides the static partitioning constraint, to best utilize this feature we recommend that users perform any
 * partition expansions starting with the most downstream output topics, and working upstream from there to the 
 * input topics. This will help Streams to stabilize with the least amount of disruption and/or downtime.
 * 
 * It is also recommended to avoid sending any records with keys that are to go into the new partitions until after
 * Streams has fully processed the expansion. The {@link #partitionExpansion#onPartitionExpansion(int, int)} method can be implemented
 * as a callback to be notified when Streams has finished expanding any internal topics.
 */
public interface StaticStreamPartitioner<K, V> {

    /**
     * Determine the partition number for a record with the given key and the current number of partitions.
     * The partition number must be static and deterministic by key, meaning that any and all future
     * records with the same key will always be placed into the same partition, regardless of a change
     * in the number of partitions. In other words, should the number of partitions increase, only new keys
     * may be routed to the new partitions; all keys that have been seen before should end up in their original
     * partitions. It is up to the user to enforce this.
     *
     * If the partitioning is not static, autoscaling of internal topics may cause incorrect results if keys
     * end up getting routed to different partitions when the {@code numPartitions} changes.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an integer between 0 and {@code numPartitions - 1}
     *
     * @throws TaskMigratedException if the partitioner requires more partitions/wants to send the given key to
     *                               a partition that doesn't exist yet/is higher than the upper limit of
     *                               {@code numPartitions-1}. This should only happen when the user failed to wait
     *                               for Streams to scale out/restabilize after a partition expansion, thus throwing
     *                               this exception tells Streams to rebalance and recheck the number of partitions
     *                               throughout the topology.
    int staticPartition(String topic, K key, int numPartitions);

    /**
     * An optional callback for the user to take some action when Streams has detected and finished reacting to
     * a change in the number of partitions. This callback will be invoked once the application has re-stabilized
     * to a consistent number of partitions across the topology.
     *
     * Note: this will only be triggered once per rebalance/partition expansion, for the member which is the group 
     * leader for that rebalance. In other words, expect only a single StreamThreads in a single application instance
     * to invoke this callback.
     *
     * @param oldNumPartitions
     * @param newNumPartitions
     */
    default void onPartitionExpansion(int oldNumPartitions, int newNumPartitions) { }

    @Override
    default Integer partition(String topic, K key, V value, int numPartitions) {
        return staticPartition(topic, key, numPartitions);
    }

}

...

  1. As mentioned briefly in the 'Motivation' section, an alternative to enforcing the static partitioning condition would be to implement some kind of manual data migration via historical reprocessing. While this would enable the feature to be used for all applications, it would have extreme downsides and make the feature itself far less useful by requiring long downtimes, as well as being incredibly resource heavy. Thus we feel it's more practical to introduce the best version of the feature by imposing a constraint: static partitioning.
  2. In the 'Proposed Changes' section, we described at a high level how Streams will react to and handle partition expansion events. One of the trickiest aspects of this feature will be getting the synchronization right: in other words, what does Streams do when it detects changes to the partition count of some topics but the expansion has not yet percolated throughout the topology?
    1. One option is to end the rebalance immediately and continue processing until the last (input) topic change is detected. However since we can only detect changes to the partition count of input topics, this wouldn't provide a sure-fire method of ensuring that the entire topology has caught up to the new number of partitions. Furthermore, due to cooperative rebalancing Kafka Streams actually does continue to process during a rebalance, so all StreamThreads except for the leader – who will be waiting for the partition expansion to complete inside the StreamPartitionAssignor's callback – will be able to remain processing existing tasks during this time. So ending the rebalance and triggering a new one will, we believe, be more of a hassle than any benefit it might provide.
    2. Another option is to wait for some constant or configured amount of time and then shut down the entire application, rather than retrying for a while and then ending the rebalance.
    3. We could introduce another new config for the wait/retry time within a rebalance, however we feel that the existing api timeout config is sufficient and given that it is used elsewhere in the rebalance/assignment logic and that exceeding this timeout will not result in a full shutdown but rather a new rebalance attempt, it is of less importance to ensure the partition expansion completes within this timeout
  3. API alternatives:
    1. One possibility was to expose this explicitly via an 'enable/disable' feature flag, rather than implicitly via the static.partitioner.class config. However because of the static partitioning constraint we felt it was best to tie the feature's operation directly to this constraint. In addition to helping Streams ensure the user is doing it, adding a config for the default partitioner makes it considerably easier for the user to implement static partitioning, as the alternative would be to pass in the partitioner all over the topology and hope not to miss anywhere it's needed (which can be difficult to figure out).
    2. Another option we considered was whether or not to enforce that only the default partitioner passed in via config be used, or to allow it to be overridden – though only by other static partitioners – for specific operators. Although there's no technical reason to require the same partitioner be used across the topology, we believe it to be unlikely for inconsistent partitioning to be very common and easier for users to reason about a single partitioner. Of course, the partitioner implementation may itself incorporate multiple different strategies for different topics and/or parts of the topology, thus it is still possible to achieve the same outcome if truly desired.In designing the new StaticStreamPartitioner interface, we decided to remove the 'value' parameter from the method to compute the partition number for a record. Although the value may provide information without necessarily being used to compute the resultant partition, we worry that passing it in will encourage users to incorporate its value into the partition computation, which breaks the static partitioning constraint. Thus, we removed it.