...
We will also create class DefaultPartitionExpansionAlgorithm that implements this interface. Given current partition index currentPartitionIdx
and the initial partition count initialPartitionCount
, this class returns currentPartitionIdx %
initialPartitionCount
as the index of the corresponding partition before ANY partition expansion.
3) Add
...
interface SystemStreamPartitionGrouperFixedTasks
The new interface should extend the existing interface SystemStreamPartitionGrouper
. It should include a new method group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps)
. The new method takes the SystemStreamPartition-to-Task assignment from the previous job model which can be read from the coordinator stream.
4) Create
...
class GroupByPartitionFixedTasks and
...
GroupBySystemStreamPartitionFixedTasks
We should create two new classes GroupByPartitionWithFixedTaskNum
GroupByPartitionFixedTasks
and GroupBySystemStreamPartitionWithFixedTaskNum
GroupBySystemStreamPartitionFixedTasks
which implements the interface SystemStreamPartitionGrouperWithFixedTaskNum
SystemStreamPartitionGrouperFixedTasks
. GroupByPartitionWithFixedTaskNum
GroupByPartitionFixedTasks
(or GroupBySystemStreamPartitionWithFixedTaskNum
GroupBySystemStreamPartitionFixedTasks
) should group system-stream-partitions in the same way as GroupByPartition
(or GroupBySystemStreamPartition
) if previousSystemStreamPartitionMapping
is empty (i.e. the job is run for the first time) or if partition of those streams has not changed since the job is created. Otherwise, GroupByPartitionWithFixedTaskNum
GroupByPartitionFixedTasks
should group partitions in such a way that 1) the number of tasks consuming from any given stream does not change before and after the partition expansion; and 2) messages with the same key in the same stream will be consumed by the same task before and after the expansion.
More specifically, GroupByPartitionWithFixedTaskNum
GroupByPartitionFixedTasks
will map a given SystemStreamPartition
ssp
to the taskName which is determined using the following algorithm:
- If previousSystemStreamPartitionMapping
is empty, return GroupByPartition.group(ssps).get(ssp)
, where ssps
represents to entire set of SystemStreamPartition
to be grouped.
- Calculate from previous assignment previousSystemStreamPartitionMapping
the total number of tasks that are consuming from partitions of the stream ssp.getStream()
. Denote this as taskNumForStream
.
- Determine the partition corresponding to the ssp
before ANY partition expansion using this:previousPartitionIdx = InputStreamPartitionExpansionAlgorithm.getPartitionBeforeExpansion(ssp.getPartition(), currentPartitionCountForStream, taskNumForStream)
- return previousSystemStreamPartitionMapping.get(new SystemStreamPartition(ssp.getSystem(), ssp.getStream(), previousPartitionIdx))
Similarly, GroupBySystemStreamPartitionWithFixedTaskNum
GroupBySystemStreamPartitionFixedTasks
will map a given SystemStreamPartition
ssp
to the taskName which is determined using the following algorithm:
...
Stateful Samza job which is using GroupByPartition
(or GroupBySystemStreamPartition
) as grouper class should be configured to use GroupByPartitionWithFixedTaskNum
GroupByPartitionFixedTasks
(or GroupBySystemStreamPartitionWithFixedTaskNumGroupBySystemStreamPartitionFixedTasks
) in order to allow partition expansion. Note that GroupByPartitionWithFixedTaskNum
GroupByPartitionFixedTasks
(or GroupBySystemStreamPartitionWithFixedTaskNum
GroupBySystemStreamPartitionFixedTasks
) is backward compatible with GroupByPartition
(or GroupBySystemStreamPartition
) because they return the same partition-to-task assignment if partition doesn't expand. Thus user's job should not need to bootstrap key/value store from the changelog topic.
...
For example, suppose partition is increased from 2 to 4 and we use GroupByPartitionWithFixedTaskNum
GroupByPartitionFixedTasks
as grouper, partitions 0 and 2 should be mapped to the same task and partitions 1 and 3 should be mapped to the same task. The figure below shows the relation between partitions, buckets and tasks after we increase partition from 2 to 4.
...
Public Interfaces
1) Add interface SystemStreamPartitionGrouperWithFixedTaskNum
SystemStreamPartitionGrouperFixedTasks
with the following definition:
Code Block | ||
---|---|---|
| ||
public interface SystemStreamPartitionGrouperWithFixedTaskNumSystemStreamPartitionGrouperFixedTasks extends SystemStreamPartitionGrouper { Map<TaskName, Set<SystemStreamPartition>> group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps); } |
3) Add class GroupByPartitionWithFixedTaskNum
GroupByPartitionFixedTasks
which implements the interface SystemStreamPartitionGrouperWithFixedTaskNum
SystemStreamPartitionGrouperFixedTasks
4) Add class GroupBySystemStreamPartitionWithFixedTaskNum
GroupBySystemStreamPartitionFixedTasks
which implements the interface SystemStreamPartitionGrouperWithFixedTaskNum
SystemStreamPartitionGrouperFixedTasks
5) Add interface InputStreamPartitionExpansionAlgorithm
with the following definition:
...
- The change made in this proposal is both source backward-compatible and binary backward compatible. Their code can compile and run correctly without change.
- For users who want to enable partition expansion for its input streams, they can do the following:
- Set grouper class to GroupByPartitionWithFixedTaskNum GroupByPartitionFixedTasks if the job is using GroupByPartition as grouper class
- Set grouper class to GroupBySystemStreamPartitionWithFixedTaskNum GroupBySystemStreamPartitionFixedTasks if the job is using GroupBySystemStreamPartition as grouper class
- Change their custom grouper class implementation to extend the new interface if the job is using a custom grouper class implementation.
- Set job.coordinator.monitor-partition-change to true
in the job configuration
- Run ConfigManager
...