Note, this is a cost minimization problem, since it may not be possible to fully satisfy all three components of the cost function.

Streams Rebalance Metadata: Remember the PrevTasks

Today Streams embed a full fledged Consumer client, which hard-code a ConsumerCoordinator inside. Streams then injects a StreamsPartitionAssignor to its plugable PartitionAssignor interface and inside the StreamsPartitionAssignor we also have a TaskAssignor interface whose default implementation is StickyPartitionAssignor. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:

Code Block
KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor.

StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:

Proposed Assignment/Rebalance Algorithm


  • balance_factor: A scalar integer value representing the target difference in number of tasks assigned to the node with the most tasks vs. the node with the least tasks. Defaults to 1. Must be at least 1.
  • acceptable_recovery_lag: A scalar integer value indicating a task lag (number of offsets to catch up) that is acceptable for immediate assignment. Defaults to 10,000 (should be well under a minute and typically a few seconds, depending on workload). Must be at least 0.
  • num_standbys: A scalar integer indicating the number of hot-standby task replicas to maintain in addition to the active processing tasks. Defaults to 0. Must be at least 0.


Code Block
  T tasks
  I instances

Code Block

Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes



SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo


An Elasticsearch cluster is a collection of nodes, an individual instance of Elasticsearch. At the cluster scope, there is a master node that is responsible for coordinating cluster changes such as adding or removing nodes, indices, etc. Each node has one or more shards, which correspond to a certain (Lucene) index that the cluster is fetching data from. An index is divided up into shards across one or more nodes, where the work for that index is distributed across the shards. Each shard has a primary shard responsible for writes, and one or more replica shards that can receive reads. 

The rough translations to Streams are shown in the table below. Note that the comparisons are drawn as relates to load balancing, rather than literal definition (for example, an index is really more like a store. However for our purposes it is more useful to think of it as an entire subtopology, in that each index/subtopology is an independent job, that has some inherent "weight" – such as the number of stores for a subtopology – and its work is partitioned and distributed independently, into some number of shards/tasks. The analogies are fairly close, and Elasticsearch has to solve a load balancing problem similar to the one that Streams faces – one main high level difference to point out is that the replica shards are presumed in sync with the active shards, removing the complexity of "restore completeness" from their challenge. 



master nodegroup leader
primary shardactive task
replica shardstandby task

Elasticsearch actually breaks down the problem into two separate processes: allocation and rebalancing. Allocation refers to the assignment of (unallocated) shards to nodes, while rebalancing occurs separately and involves moving allocated shards around. By default, rebalancing can only occur when all shards are allocated (can be configured to be allowed only when active shards, or always). Multiple rebalances can take place concurrently, up to some configurable max (defaults to 2) – note that this limit applies only to "load balancing" rebalances and not those forced by environmental (user-defined) constraints. You can also dynamically disable/enable rebalancing either type of shard.

The allocation algorithm is as follows – note that this applies only to placement of unassigned shards, but nodes may have other shards already assigned to them.

  1. Group shards by index, then sort by shard ID to get the order of shard allocation. First all primary shards are allocated, then one replica for each shard of each index, and repeat if number of replicas is greater than one.
  2. For each shard, build a list of possible nodes. Nodes may be eliminated from consideration based on user config (eg allocation filtering) or various constraints (no copies of a shard should be allocated to the same node, adequate remaining disk space, forced awareness, max retries)
  3. If step 2 returns no nodes, the shard will be retried later (possibly after a rebalance). Otherwise, we calculate the weight of each node if given the shard, and allocate it to the one with the lowest weight. The weighting function depends on two settings: indexBalance (0.55 by default) and shardBalance (0.45 by default). The total weight is the weighted average of the shard and index weights, weighted by the fractional shard and index balance respectively. This is computed as 
Code Block
private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) {
    final float weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode();
    final float weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index);
    return theta0 * weightShard + theta1 * weightIndex;


Code Block
theta0 = shardBalance / (indexBalance + shardBalance);
theta1 = indexBalance / (indexBalance + shardBalance);

The shardBalance and indexBalance can be thought of as controlling the tendency to equalize the number of shards (shardBalance) or number of shards per index (indexBalance) across nodes. This is analogous to Streams assigning a balanced number of tasks to each StreamThread for the former, and spreading the tasks of a given subtopology across StreamThreads for the latter. The weighting formula used by Elasticsearch indicates there is some tradeoff between the two, but that applies only to Elasticsearch's allocation, where some shards have already been allocated to some subset of nodes. For example, if you add a new node and a new index, you could allocate all the new shards to the empty node for shard number balance, or give each node a shard for shard index balance, but likely not both.

If you start with a "clean assignment" where all shards must be (re)allocated, both can be balanced at once. You can simply group the shards by index, then loop through the nodes assigning a shard from the list in round-robin fashion. This would be the case when first starting up in Streams. However, the more "sticky" the assignment in Streams the better, so we tend to avoid this kind of simple round-robin assignment on subsequent rebalances. So the tradeoff in Streams is between balance (however that is defined) and stickiness – whereas in Elasticsearch, the tradeoff is first between types of balance where stickiness is effectively a constraint (of allocation), and the stickiness is then (potentially) traded for further overall balance by the rebalancing operation. One interesting thing to note is that, while Streams effectively enforces "total balance" (defined as number of tasks across threads), this is actually configurable in Elasticsearch. You can tune the cluster to be more or less aggressive about balancing the shards across nodes. This might be useful if users just want to get up and running again as fast as possible, and would be satisfied with a slightly imbalance workload.

Of course, the workload in Streams is already potentially unbalanced because our definition of balance is simple and does not account for degree of statefulness. But maybe assigning weights that account for "stickiness" – how much would this instance have to catch up? – and "load" – some measure of the number of stores – would be useful for distributing tasks. 


“Yet Another Resource Negotiator” is a cluster management technology introduced in Hadoop 2.0 that decouples resource management from job scheduling. The YARN framework consists of a global master daemon called the ResourceManager, a per-application ApplicationMaster, and per-node NodeManagers. NodeManagers are responsible for allocating containers and running/managing processes within them, and monitoring/reporting their resource usage to the ResourceManager, who in turn is responsible for keeping track of live NodeManagers and arbitrating resources among competing applications. The ApplicationMaster manages the application life cycle, negotiating resources from the ResourceManager and triggering NodeManagers to begin executing tasks for that application. 

The ResourceManager itself has two main components, the Scheduler and the ApplicationsManager. The ApplicationManager accepts jobs and is responsible for managing the application's ApplicationMaster (including starting & restarting it if necessary). The Scheduler is then free to allocate resources to applications without concerning itself with monitoring applications or restarting failed tasks. The actual scheduling policy can be plugged in to partition the cluster resources. 

This scheduling however is fairly orthogonal to the balancing/restoring problem we face, since they involve the question of how to distribute new/freed resources to existing jobs rather than how to distribute all existing "jobs" (tasks) to all available "resources" (instances/threads). The interesting difference here is that in Streams, a new resource (fresh instance) necessitates revoking tasks from another instance, whereas in YARN a new resource can simply be given away. Maybe the right way to look at it is to consider the YARN resources as Stream tasks, and YARN jobs as Streams instances (or threads, just using instance for now) – one resource/task can only ever run one job/be run on one instance, while new instances/jobs are started and stopped, and require some (re)allocation of resources/tasks. 

Flink jobs are composed of operators, a chain of which forms a task, which can be executed on threads as a one or more subtasks. The total number of subtasks is the parallelism of that operator. A Flink task corresponds to a Streams subtopology, and a Flink subtask corresponds to a Stream task. In the following section, task will refer to a Flink task and any references to a Streams task will be labelled as such. 

The Flink runtime consists of two processes, the JobManager which schedules tasks and coordinates checkpoints, failure recovery, etc. and one or more TaskManagers which execute the subtasks and buffer/exchange streams of data. TaskManagers, similar to a Streams app instance, are brought up and connect to the JobManager to announce themselves as available, then are assigned work. Each TaskManager has one or more task slots corresponding to a fixed subset of the TaskManager's resources – they share network connections similar to how Streams tasks share a StreamThread consumer.

Also similarly to StreamThreads executing one or more Streams tasks, Flink allows subtasks of the same job to share task slots (note that a single task slot may use one or more threads, though Flink does recommend matching the number of task slots to the number of CPU cores). This can be controlled to a hard or soft degree by the user defined CoLocationGroup (which subtasks must share a slot) and SlotSharingGroup (which subtasks can share a slot). 

Flink integrates with other cluster resource managers including YARN, but can also be run as a stand-alone cluster. Unlike YARN, in general Flink jobs are continuous (like Streams or Elasticsearch) so the "load balancing" aspect is more important than the "scheduling/distributing transient resources" aspect. This brings it closer to Streams, but unlike Streams, Flink has no notion of standby tasks – instead, for high availability some distributed storage is required for checkpoints to be saved/recovered from. This greatly simplifies the assignment logic relative to the Streams/KIP-441 case.

Old Version of KIP-441

