Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Create classes SystemStreamPartitionAssignmentManager and SetSSPTaskMapping to read and write SystemStreamPartition-to-Task assignment in the coordinator stream. This will be done similar to how ChangelogPartitionManager and SetChangelogMapping are used to read and write Task-to-ChangeLogPartition assignment in the coordinator stream. The assignement should be written to the coordinator stream every time the job model is initialized.

2)

...

Change interface SystemStreamPartitionGrouper and create class GroupByPartitionFixedTaskNum 

Currently Samza uses various implementations of the interface SystemStreamPartitionGrouper.group(Set<SystemStreamPartition> ssps) to derive the SystemStreamPartition-to-Task assignment. With the proposal in this doc, we should deprecate the existing method group(Set<SystemStreamPartition> ssps) in this of the interface SystemStreamPartitionGrouper and replace it with the method  groupgroup(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps). The new method takes the SystemStreamPartition-to-Task assignment from the previous job model which can be read from the coordinator stream.

The SystemStreamPartition-to-Task assignment from the previous job model can be used to detect partition number change of the input stream and help determine the new SystemStreamPartition-to-Task assignment for the new job model. The new SystemStreamPartition-to-Task assignment should ensure that partitions which may have messages with the same key will be mapped to the same task. For example, if partition is increased from 2 to 4, partitions 0 and 2 should be mapped to the same task and partitions 1 and 3 should be mapped to the same task.

Currently we have three implementations of the interface SystemStreamPartitionGrouper, i.e. AllSspToSingleTaskGrouper, GroupByPartition and GroupBySystemStreamPartition. We don't need to use the new information in AllSspToSingleTaskGrouper.group(...) because it already ensures that partitions which may have messages with the same key will be mapped to the same task. And we don't need to use this information in GroupBySystemStreamPartition.group(...) as well because it should not be used for stateful jobs. Thus this proposal will only use previousSystemStreamPartitionMapping in GroupByPartition.group(...) to determine the new SystemStreamPartition-to-Task assignment.

We should create a new class GroupByPartitionFixedTaskNum which implements the interface SystemStreamPartitionGrouper. GroupByPartitionFixedTaskNum should group system-stream-partitions in the same way as the existing GroupByPartition.group(...) if previousSystemStreamPartitionMapping is empty (i.e. the job is run for the first time) or if partition of those streams has not changed since the job is created. Otherwise, in the case where partition of some stream has expanded since the job is created, GroupByPartitionFixedTaskNum should group partitions in such a way that 1) the number of tasks consuming from any given stream does not change before and after the partition expansion; and 2) messages with the same key in the same stream will be consumed by the same task before and after the expansion.

More specifically, GroupByPartitionFixedTaskNumGiven SystemStreamPartition ssp, GroupByPartition.group(...) will determine the taskName that this ssp should be mapped to using the algorithm belowmap a given SystemStreamPartition ssp to the taskName which is determined using the following algorithm:

- Define previousSSPSetOfStream to be the subset of previousSystemStreamPartitionMapping.keys() whose stream equals ssp.getStream().
- Define previousTaskSetOfStream to be the subset of previousSystemStreamPartitionMapping.values() which are mapped to using any SSP previousSSPSetOfStream
- Determine previousTaskSetOfStream. This is the set of tasks that were consuming from the given stream. The same set of tasks should share partitions of this stream after partition expansion. Note that previousTaskSetOfStream.size() as should equal the number of partitions of the stream ssp.getStream() the stream when the job was run for the first createdtime.
- Determine StringReturn taskName = String.format("Partition TaskName-%d", ssp.getPartition().getPartitionId() % previousTaskSetOfStream.size())This algorithm guarantees that partitions which may have messages with the same key will be mapped to the same task, when the partition of the input Kafka topic has increased, if the Kafka Operational Requirement described above is enforced. 

Stateful Samza job which is using GroupByPartition as grouper class should be configured to use GroupByPartitionFixedTaskNum in order to allow partition expansion. Note that GroupByPartitionFixedTaskNum is backward compatible with GroupByPartition because it doesn't change partition-to-task assignment if partition doesn't expand. Thus user's job should not need to rebuild key/value store from the changelog topic.

To help understand this algorithm, the idea is to split partitions into disjoint buckets (or groups) of partitions where the union of those buckets equals the original set of partitions. The partition-to-bucket assignment ensure that messages with the same key will be produced to the partitions of the same bucket even if they can be in different partitions. Then partitions can in the same bucket will be assigned to tasks in the unit of buckets the same task to ensure that messages with the same key will go to the same task. 

For example, if partition is increased from 2 to 4, partitions 0 and 2 should be mapped to the same task and partitions 1 and 3 should be mapped to the same taskThe figure below shows the relation between partitions, buckets and tasks after we increase partition from 2 to 4.

...

View file
namesamza.pdf
height250

...



3) Handle partition expansion while tasks are running

JobCoordinator is already monitoring partition expansion of input streams as of current Samza implementation. And it already has logic to restart container in case of container failure. All we need to do is to let JobCoordinator re-calculate JobModel and restart container using the new JobModel when partition expansion is detected.

 Samza should monitor change of the partition count of the input streams. When the partition of any input stream has changed, container should query Job Coordinator (referred to as JC below) for the set of partitions its tasks should consume by sending HTTP request to JC. JC should return the new Job Model which derives the new SystemStreamPartition-to-Task as described above. Then the container should update its consumer to consume from the new set of partitions and update its tasks to commit offsets for the new set of partitions.

Public Interfaces

1) Depreciate the method Map<TaskNameSet<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) of the interface SystemStreamPartitionGrouper

2) Add method Map<TaskName, Set<SystemStreamPartition>> group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps) in the interface SystemStreamPartitionGrouper.The interface required to handle partition expansion while tasks are running will be specified later.

3) Add class GroupByPartitionFixedTaskNum which implements the interface SystemStreamPartitionGrouper


Implementation and Test Plan

To be added

Compatibility, Deprecation, and Migration Plan

User needs to implement the new method if he/she has custom implementation of the interface SystemStreamPartitionGrouper. The proposed solution is backward compatible and can be deployed withotu specific migration plan.

...

To be added

Rejected Alternatives

1. Allow task number to increase instead of creating a new grouper class.

Allowing task number to increase is useful since it increases the performance of a given Samza job. However, this feature alone does not solve the problem of allowing partition expansion. For example, say we have a job that joins two streams both of which have 3 partitions. If partition number of one stream increases from 3 to 6, we still want the task number to remain 3 to make sure that messages with the same key from both streams will be handled by the same task. This needs to be done with the new grouper class we proposed in this doc.

2. Change the existing implementation of GroupByPartition instead of creating a new grouper class.

To be discussed.