Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The rough translations to Streams are shown in the table below. Note that the comparisons are drawn as relates to load balancing, rather than literal definition (for example, an index is really more like a store. However for our purposes it is more useful to think of it as an entire subtopology, in that each index/subtopology is an independent job, that has some inherent "weight" – such as the number of stores for a subtopology – and its work is partitioned and distributed independently, into some number of shards/tasks. The analogies are fairly close, and Elasticsearch has to solve a load balancing problem similar to the one that Streams faces – one main high level difference to point out is that the replica shards are presumed in sync with the active shards, removing the complexity of "restore completeness" from their challenge. 

Elasticsearch

Streams

indexsubtopology
master nodegroup leader
nodeinstance
primary shardactive task
replica shardstandby task
shardtask

Elasticsearch actually breaks down the problem into two separate processes: allocation and rebalancing. Allocation refers to the assignment of (unallocated) shards to nodes, while rebalancing occurs separately and involves moving allocated shards around. By default, rebalancing can only occur when all shards are allocated (can be configured to be allowed only when active shards, or always). Multiple rebalances can take place concurrently, up to some configurable max (defaults to 2) – note that this limit applies only to "load balancing" rebalances and not those forced by environmental (user-defined) constraints. You can also dynamically disable/enable rebalancing either type of shard.

...

Flink integrates with other cluster resource managers including YARN, but can also be run as a stand-alone cluster. Unlike YARN, in general Flink jobs are continuous (like Streams or Elasticsearch) so the "load balancing" aspect is more important than the "scheduling/distributing transient resources" aspect. This brings it closer to Streams, but unlike Streams, Flink has no notion of standby tasks – instead, for high availability some distributed storage is required for checkpoints to be saved/recovered from. This greatly simplifies the assignment logic relative to the Streams/KIP-441 case.


Samza

Out of all the other systems we've looked at Samza is the closest in comparison to Kafka Streams.

Tasks and Containers

Like Streams, the unit of parallelism is a task. Samza assigns tasks to a partition from an input stream. Tasks in Samza also perform periodic checkpointing. The checkpointing allows the resuming of processing from the latest offset on a different worker in the case of failures. Tasks themselves are hosted in a Container, which is the physical unit of work, compared with tasks which are a logical unit of work. A Container will have one or more tasks and are distributed across different hosts.

Coordinator

To handle the distribution of tasks to containers, Samza uses a Coordinator. The Coordinator also monitors the containers, and when a failed container is detected, the tasks of the failed container are distributed out to the remaining healthy containers. The Coordinator is pluggable, giving Samza the unique ability to run either in standalone/embedded mode or cluster mode with a cluster-manager such a YARN.

State

Samza also offers stateful operation and uses RocksDB for the implementation of the persistent storage. Each task has its own state-store. Storage engines are pluggable as well. For fault tolerance, state-stores use changelog (compacted) topics as well.

Host Affinity

To deal with possible long startup times necessary when reading an entire changelog, Samza persists metadata containing the host that each task is running on. When users elect to enable the job .host-affinity.enabled configuration, Samza will attempt to place a container for a given task(s) on the same host every time a job is deployed. The host affinity feature is done on a best-effort basis, and in some cases, a container will get assigned to another available resource.


Given the closeness of Samzas architecture to Kafka Streams, IMHO it makes sense to take a quick look into the algorithm used by the Coordinator concerning load distribution and stateful tasks.


Old Version of KIP-441

----------------------------------------------------------------------------------------------------------------------------

...