Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Released: <Samza version>

Problem and Goal

The input system used by Samza may need to expand the number of partitions of the input streams for various reasons. For example, Kafka generally needs to limit the maximum size of partition to scale up its performance. Thus the number of partitions of a Kafka topic needs to be expanded to reduce the partition size if the average byte-in-rate or retention time of the Kafka topic has doubled. However, Samza currently is not able to handle partition expansion of the input streams. For example, if user is running a stateful job using GroupByPartition as grouper and the partition of the input stream has increased from 2 to 4, then for a period of time after partition expansion, it is possible (e.g. with Kafka) that messages with a given key exists in both partition 1 an 3. Because GroupByPartition will assign partition 1 and 3 to different tasks, messages with the same key may be handled by different task/container/process and their state will be stored in different changelog partition. Thus the output from Samza may be wrong becasue this violates the assumption used by Samza for stateful jobs.

The goal of this proposal is to enable partition expansion of the input streams.

Motivation

This design doc provides a solution to increase partition number of the input streams of a stateful Samza job while still ensuring the correctness of Samze job output. The solution should work for all input systems that satisfy the operation requirement described below, i.e. 1) hash algorithm should be implemented in the form of hash(key) % partitionNum and 2) partitionNum should be multiplied by power of two when we increase it. The motivation of increasing partition number of Kafka topic includes 1) limit the maximum size of a partition in order to improve broker performance and 2) increase throughput of Kafka consumer in the Samza container. Other input systems may have different reasons for partition expansion.

The current solution would keep task number unchanged and use a proper partition-to-task assignment to make sure the Samza output is correct after partition expansion. An alternative solution is to allow task number to increase after partition expansion and uses a proper task-to-container assignment to make sure the Samza output is correct. The second solution, which allows task expansion, is needed in order to scale up the performance of Samza. Note that this solution would also allow partition expansion for stateful job that doesn't use join operation. However, the second solution is much more complicated to design and implement than the first solution in this doc. And it doesn't enable the partition expansion for stateful Samza jobs that uses join operation (See Rejected Alternative section), which can be addressed by this proposal. Thus, these two solutions don't replace each other and can be designed independently. We plan to use the first solution described in this doc to enable partition expansion as a low hanging fruit. Later we will come up with a separate proposal to address the task expansion problemThe feature of task expansion is out of the scope of this proposal and will be addressed in a future SEP.

 

Proposed Changes

Operational Requirement of the Input System

...