...
Create classes SystemStreamPartitionAssignmentManager
and SetSSPTaskMapping
to read and write SystemStreamPartition-to-Task assignment in the coordinator stream. This will be done similar to how ChangelogPartitionManager
and SetChangelogMapping
are used to read and write Task-to-ChangeLogPartition assignment in the coordinator stream. The assignment should be written to the coordinator stream every time the job model is initialized.
2)
...
Add interface SystemStreamPartitionGrouperWithFixedTaskNum
The new interface should extend the existing interface SystemStreamPartitionGrouper
. It should include a new method With the proposal in this doc, we should deprecate the existing method group(Set<SystemStreamPartition> ssps)
of the interface SystemStreamPartitionGrouper
and replace it with the method group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps)
. The new method takes the SystemStreamPartition-to-Task assignment from the previous job model which can be read from the coordinator stream.
...
We should create two new classes GroupByPartitionWithFixedTaskNum
and GroupBySystemStreamPartitionWithFixedTaskNum
which implements the interface SystemStreamPartitionGrouper
SystemStreamPartitionGrouperWithFixedTaskNum
. GroupByPartitionWithFixedTaskNum
(or GroupBySystemStreamPartitionWithFixedTaskNum
) should group system-stream-partitions in the same way as GroupByPartition
(or GroupBySystemStreamPartition
) if previousSystemStreamPartitionMapping
is empty (i.e. the job is run for the first time) or if partition of those streams has not changed since the job is created. Otherwise, GroupByPartitionWithFixedTaskNum
should group partitions in such a way that 1) the number of tasks consuming from any given stream does not change before and after the partition expansion; and 2) messages with the same key in the same stream will be consumed by the same task before and after the expansion.
...
JobCoordinator
is already monitoring partition expansion of input streams as of current Samza implementation. And it already has logic to restart container in case of container failure. All we need to do is to let JobCoordinator
re-calculate JobModel
and When JobCoordinator
detects partition expansion of any input stream, it should shutdown all containers using the off-the-shelf Yarn API, re-calculate JobModel
and restart container using the new new JobModel
when partition expansion is detected.
Public Interfaces
1) Deprecate the method Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps)
of the interface SystemStreamPartitionGrouper
2) Add method Map<TaskName, Set<SystemStreamPartition>> Add interface SystemStreamPartitionGrouperWithFixedTaskNum
with the following definition:
Code Block | ||
---|---|---|
| ||
public interface SystemStreamPartitionGrouperWithFixedTaskNum extends SystemStreamPartitionGrouper { Map<TaskName, Set<SystemStreamPartition>> group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps) |
...
;
} |
3) Add class GroupByPartitionWithFixedTaskNum
which implements the interface SystemStreamPartitionGrouper
SystemStreamPartitionGrouperWithFixedTaskNum
4) Add class GroupBySystemStreamPartitionWithFixedTaskNum
which implements the interface SystemStreamPartitionGrouper
SystemStreamPartitionGrouperWithFixedTaskNum
Test Plan
To be added
Compatibility, Deprecation, and Migration Plan
- The change made in this proposal is both source backward-compatible and binary backward compatible. Their code can compile and run correctly without change.
- For users who want to enable partition expansion for its input streams, they can do the following:
- Set grouper class to GroupByPartitionWithFixedTaskNum if the job is using GroupByPartition as grouper class
- Set grouper class to GroupBySystemStreamPartitionWithFixedTaskNum if the job is using GroupBySystemStreamPartition as grouper class
- Change their custom grouper class implementation to extend the new interface if the job is using a custom grouper class implementation- Deprecate the method Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps)
of the interface SystemStreamPartitionGrouper
.
- Users need to implement the newly-added method if they have a custom implementation of the interface SystemStreamPartitionGrouper
.
Rejected Alternatives
1. Allow task number to increase instead of creating a new grouper class.
Allowing Allow task number to increase is useful since it increases the performance of a given Samza job. However, this feature alone does not solve the problem of allowing partition expansion. For example, say we have a job that joins two streams both of which have 3 partitions. If partition number of one stream increases from 3 to 6, we would still want the task number to remain 3 to make sure that messages with the same key from both streams will be handled by the same task. This needs to be done with the new grouper classes proposed in this doc.
2. Add the new method to the existing interface SystemStreamPartitionGrouper instead of creating a new interface.
The advantage of this alternative solution is that it requires less interface class and shorter class hierarchy. But we choose to follow the existing pattern of interface extension in Samza as we do with BalancingTaskNameGrouper
.
3. Add new config and class for user to specify the new-partition to old-partition mapping strategy based on the input system.
The current proposal relies on the input system to meet the specified operational requirements. While these requirements can be satisfied by Kafka, they may or may not be satisfied by other systems such as Kinesis. We can support partition expansion for more input systems than Kafka if user is able to express the new-partition to old-partition mapping strategy. However, since we currently don't know how user is going to use such config/class to do it, we choose to keep the current SEP simple and only add new config/class when we have specific use-case for them.