...
Create classes SystemStreamPartitionAssignmentManager
and SetSSPTaskMapping
to read and write SystemStreamPartition-to-Task assignment in the coordinator stream. This will be done similar to how ChangelogPartitionManager
and SetChangelogMapping
are used to read and write Task-to-ChangeLogPartition assignment in the coordinator stream. The assignment should be written to the coordinator stream every time the job model is initialized.
2) Change
...
interface SystemStreamPartitionGrouper
With the proposal in this doc, we should deprecate the existing method group(Set<SystemStreamPartition> ssps)
of the interface SystemStreamPartitionGrouper
and replace it with the method group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps)
. The new method takes the SystemStreamPartition-to-Task assignment from the previous job model which can be read from the coordinator stream.
3) Create class GroupByPartitionFixedTaskNum and GroupBySystemStreamPartitionFixedTaskNum
We should create a two new class classes GroupByPartitionFixedTaskNum
which and GroupBySystemStreamPartitionFixedTaskNum
which implements the interface SystemStreamPartitionGrouper
. GroupByPartitionFixedTaskNum
(or GroupBySystemStreamPartitionFixedTaskNum
) should group system-stream-partitions in the same way as the existing method GroupByPartition.group(...)
if as GroupByPartition
(or GroupBySystemStreamPartition
) if previousSystemStreamPartitionMapping
is empty (i.e. the job is run for the first time) or if partition number of those streams has not changed since the job is created. Otherwise, in the case where partition of some stream has expanded since the job is created, GroupByPartitionFixedTaskNum
should group partitions in such a way that 1) the number of tasks consuming from any given stream does not change before and after the partition expansion; and 2) messages with the same key in the same stream will be consumed by the same task before and after the expansion.
More specifically, GroupByPartitionFixedTaskNum.group(...)
will map a given SystemStreamPartition
ssp
to the taskName which is determined using the following algorithm:
- If previousSystemStreamPartitionMapping
is empty, return GroupByPartition.group(ssps).get(ssp)
, where ssps
represents to entire set of SystemStreamPartition
to be grouped.
- Calculate from previous assignment previousSystemStreamPartitionMapping
the total number of tasks that are consuming from partitions of the stream ssp.getStream()
. Denote this as taskNumForStream
.
- return previousSystemStreamPartitionMapping.get(new SystemStreamPartition(ssp.getSystem(), ssp.getStream(), ssp.getPartition() % taskNumForStream))
Similarly, GroupBySystemStreamPartitionFixedTaskNum
will map a given SystemStreamPartition
ssp
to the taskName which is determined using the following algorithm:
- If previousSystemStreamPartitionMapping
is empty, return GroupBySystemStreamPartition.group(ssps).get(ssp)
, where ssps
represents to entire set of SystemStreamPartition
to be grouped.
- Calculate from previous assignment previousSystemStreamPartitionMapping
the total number of tasks that are consuming from partitions of the stream ssp.getStream()
. Denote this as taskNumForStream
.
- return previousSystemStreamPartitionMapping.get(new SystemStreamPartition(ssp.getSystem(), ssp.getStream(), ssp.getPartition() % taskNumForStream))
Define previousSSPSetOfStream
to be the subset of previousSystemStreamPartitionMapping.keys()
whose stream equals ssp.getStream()
.
- Define previousTaskSetOfStream
to be the subset of previousSystemStreamPartitionMapping.values()
which are mapped to using any SSP previousSSPSetOfStream
. This is the set of tasks that were consuming from the given stream. The same set of tasks should share partitions of this stream before and after partition expansion. Note that previousTaskSetOfStream.size()
should equal the number of partitions of the stream when the job was run for the first time.
- Return taskName = String.format("Partition %d", ssp.getPartition().getPartitionId() % previousTaskSetOfStream.size()). Note that resulting task names should be the same as those returned by GroupByPartition.group(...)
if partition number of the input streams has not changed.
Stateful Samza job which is using GroupByPartition
(or GroupBySystemStreamPartition
) as grouper class should be configured to use GroupByPartitionFixedTaskNum
(or GroupBySystemStreamPartitionFixedTaskNum
) in order to allow partition expansion. Note that GroupByPartitionFixedTaskNum
(or GroupBySystemStreamPartitionFixedTaskNum
) is backward compatible with GroupByPartition
because it doesn't change (or GroupBySystemStreamPartition
) because they return the same partition-to-task assignment if partition doesn't expand. Thus user's job should not need to bootstrap key/value store from the changelog topic.
To help understand this algorithm, the idea is to split partitions into disjoint buckets (or groups) of partitions where the union of those buckets equals the original set of partitions. The partition-to-bucket assignment ensure that messages with the same key will be produced to the partitions of the same bucket. Then partitions in the same bucket will be assigned to the same task to ensure that messages with the same key will go to the same task.
For example, if suppose partition is increased from 2 to 4 , partitions and we use GroupByPartitionFixedTaskNum
as grouper, partitions 0 and 2 should be mapped to the same task and partitions 1 and 3 should be mapped to the same task. The figure below shows the relation between partitions, buckets and tasks after we increase partition from 2 to 4.
...
3) Add class GroupByPartitionFixedTaskNum
which implements the interface SystemStreamPartitionGrouper
4) Add class GroupBySystemStreamPartitionFixedTaskNum
which implements the interface SystemStreamPartitionGrouper
...