Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The input system used by Samza may need to expand the number of partitions of the input streams for various reasons. For example, Kafka generally needs to limit the maximum size of each partition to scale up its performance. Thus the number of partitions of a Kafka topic needs to be expanded to reduce the partition size if the average byte-in-rate or retention time of the Kafka topic has doubled.

However, Samza currently is not able to handle partition expansion of the input streams for stateful jobs. For example, if user is running a stateful job using GroupByPartition as grouper and the partition of the input stream has increased from 2 to 4, then for a period of time after partition expansion, it is possible (e.g. with Kafka) that messages with a given key exists in both partition 1 an 3. Because GroupByPartition will assign partition 1 and 3 to different tasks, messages with the same key may be handled by different task/container/process and their state will be stored in different changelog partition. Thus the output from Samza may be wrong becasue this violates the assumption used by Samza for stateful jobs.partition 3 may be assigned to a new task after partition expansion. Because the new task handling partition 3 does not have the previous state to resume the work, a key-based counter would wrongfully start from count 0 for a specific key that was reassigned from partition 1 to partition 3 after the expansion, instead of correctly starting from the previous count held by task 1.

The goal of this proposal is to enable partition expansion of the input streams while still allowing stateful jobs in Samza to produce the correct result.

Motivation

This design doc provides a solution to increase partition number of the input streams of a stateful Samza job while still ensuring the correctness of Samze job output. The solution should work for all input systems that satisfy the operation requirement described below, i.e. 1) hash algorithm should be implemented in the form of hash(key) % partitionNum and 2) partitionNum should be multiplied by power of two when we increase it. The motivation of increasing partition number of Kafka topic includes 1) limit the maximum size of a partition in order to improve broker performance and 2) increase throughput of Kafka consumer in the Samza container. Other input systems may have different reasons for partition expansion.

The current solution would keep task number unchanged and use a proper partition-to-task assignment to make sure the Samza output is correct after partition expansion. An alternative solution is to allow task number to increase after partition expansion and uses a proper task-to-container assignment to make sure the Samza output is correct. The second solution, which allows task expansion, is needed in order to scale up the performance of Samza. Note that this solution would also allow partition expansion for stateful job that doesn't use join operation. However, the second solution is much more complicated to design and implement than the first solution the solution proposed in this doc. And it doesn't enable the partition expansion for stateful Samza jobs that uses join operation (See Rejected Alternative section), which can be addressed by this proposal. Thus, these two solutions don't replace each other and can be designed independently. We plan to use the first solution described in this doc to enable partition expansion as a low hanging fruit. The feature of task expansion is out of the scope of this proposal and will be addressed in a future SEP.

...

The partitioning algorithm should be implemented in the form of hash(key) % partitionNum. User is free to choose hash function but the hashed result must be mod by the partitionNum.

 Note that this requirement is satisfied by the default partitioning algorithms in Kafka. Samza needs to repartition the user’s stream if user uses a customized partitioning algorithm in their producer implementation that doesn’t meet this requirement.

...

The underlying system should ensure that messages with the same key from the same producer are always consumed in the order they are produced before and after the partition expansion. 

 Note that this requirement is not currently supported in the Kafka. For example, say the partition is expanded from 2 to 4. Messages with a given key may be produced to partition 1 before expansion and to partition 3 after expansion. Because Kafka consumer does not guarantee order of message delivery across partitions, it is possible that Samza consumes messages with that key from partition 3 before partition 1 which violates the requirement. Thus additional development work is needed in Kafka to meet this requirement. We will provide in this doc the link to the KIP once it is available.

Samza Implementation Change

...

With the proposal in this doc, we should deprecate the existing method group(Set<SystemStreamPartition> ssps) of the interface SystemStreamPartitionGrouper and replace it with the method group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps). The new method takes the SystemStreamPartition-to-Task assignment from the previous job model which can be read from the coordinator stream.

3) Create

...

class GroupByPartitionWithFixedTaskNum and 

...

GroupBySystemStreamPartitionWithFixedTaskNum

We should create two new classes GroupByPartitionFixedTaskNum GroupByPartitionWithFixedTaskNum and GroupBySystemStreamPartitionFixedTaskNumGroupBySystemStreamPartitionWithFixedTaskNum which implements the interface SystemStreamPartitionGrouperGroupByPartitionFixedTaskNum GroupByPartitionWithFixedTaskNum (or GroupBySystemStreamPartitionFixedTaskNum GroupBySystemStreamPartitionWithFixedTaskNum) should group system-stream-partitions in the same way as GroupByPartition (or GroupBySystemStreamPartition) if previousSystemStreamPartitionMapping is empty (i.e. the job is run for the first time) or if partition of those streams has not changed since the job is created. Otherwise, GroupByPartitionFixedTaskNum GroupByPartitionWithFixedTaskNum should group partitions in such a way that 1) the number of tasks consuming from any given stream does not change before and after the partition expansion; and 2) messages with the same key in the same stream will be consumed by the same task before and after the expansion.

More specifically, GroupByPartitionFixedTaskNum GroupByPartitionWithFixedTaskNum will map a given SystemStreamPartition ssp to the taskName which is determined using the following algorithm:

- If previousSystemStreamPartitionMapping is empty, return GroupByPartition.group(ssps).get(ssp), where ssps represents to entire set of SystemStreamPartition to be grouped.
- Calculate from previous assignment previousSystemStreamPartitionMapping the total number of tasks that are consuming from partitions of the stream ssp.getStream(). Denote this as taskNumForStream.
- return previousSystemStreamPartitionMapping.get(new SystemStreamPartition(ssp.getSystem(), ssp.getStream(), ssp.getPartition() % taskNumForStream))

Similarly, GroupBySystemStreamPartitionFixedTaskNumGroupBySystemStreamPartitionWithFixedTaskNum will map a given SystemStreamPartition ssp to the taskName which is determined using the following algorithm:

...

Stateful Samza job which is using GroupByPartition (or GroupBySystemStreamPartition) as grouper class should be configured to use GroupByPartitionFixedTaskNum GroupByPartitionWithFixedTaskNum (or GroupBySystemStreamPartitionFixedTaskNum GroupBySystemStreamPartitionWithFixedTaskNum ) in order to allow partition expansion. Note that GroupByPartitionFixedTaskNum GroupByPartitionWithFixedTaskNum (or GroupBySystemStreamPartitionFixedTaskNumGroupBySystemStreamPartitionWithFixedTaskNum ) is backward compatible with GroupByPartition (or GroupBySystemStreamPartition) because they return the same partition-to-task assignment if partition doesn't expand. Thus user's job should not need to bootstrap key/value store from the changelog topic.

...

For example, suppose partition is increased from 2 to 4 and we use GroupByPartitionFixedTaskNum GroupByPartitionWithFixedTaskNum as grouper, partitions 0 and 2 should be mapped to the same task and partitions 1 and 3 should be mapped to the same task. The figure below shows the relation between partitions, buckets and tasks after we increase partition from 2 to 4.

...

2) Add method Map<TaskName, Set<SystemStreamPartition>> group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps) in the interface SystemStreamPartitionGrouper.

3) Add class GroupByPartitionFixedTaskNum GroupByPartitionWithFixedTaskNum which implements the interface SystemStreamPartitionGrouper

4) Add class GroupBySystemStreamPartitionFixedTaskNumGroupBySystemStreamPartitionWithFixedTaskNum which implements the interface SystemStreamPartitionGrouper

...

- Deprecate the method Map<TaskNameSet<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) of the interface SystemStreamPartitionGrouper.
- Users need to implement the newly-added method if they have a custom implementation of the interface SystemStreamPartitionGrouper.
- Users need to use the newly-added GroupByPartitionFixedTaskNum as the grouper class in order to deal with possible partition number change of their input streams.

Rejected Alternatives

1. Allow task number to increase instead of creating a new grouper class.

...