Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The partitioning algorithm should be implemented in the form of hash(key) % partitionNum. User is free to choose hash function but the hashed result must be mod by the partitionNum. This requirement is satisfied by the default partitioning algorithms in Kafka. Samza needs to repartition the user’s topic if user uses a customized partitioning algorithm in their producer implementation that doesn’t meet this requirement.

...

The partition number of any keyed topic that may be consumed by Samza should always be multiplied by power of two when we increase it. This guideline should be enforced by whoever may increase partition number of the Kafka topic.

Samza Implementation Change

...

Create classes SystemStreamPartitionAssignmentManager and SetSSPTaskMapping to read and write SystemStreamPartition-to-Task assignment in the coordinator stream. This will be done similar to how ChangelogPartitionManager and SetChangelogMapping how ChangelogPartitionManager and SetChangelogMapping are used to read and write Task-to-ChangeLogPartition assignment in the coordinator stream. The assignement assignment should be written to the coordinator stream every time the job model is initialized.

...

With the proposal in this doc, we should deprecate the existing method groupmethod group(Set<SystemStreamPartition> ssps) of the interface SystemStreamPartitionGrouper interface SystemStreamPartitionGrouper and replace it with the method group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps). The new method takes the SystemStreamPartition-to-Task assignment from the previous job model which can be read from the coordinator stream.

We should create a new class GroupByPartitionFixedTaskNum which implements the interface SystemStreamPartitionGrouperinterface SystemStreamPartitionGrouper. GroupByPartitionFixedTaskNum  GroupByPartitionFixedTaskNum should group system-stream-partitions in the same way as the existing GroupByPartitionexisting method GroupByPartition.group(...) if previousSystemStreamPartitionMapping if previousSystemStreamPartitionMapping is empty (i.e. the job is run for the first time) or if partition number of those streams has not changed since the job is created. Otherwise, in the case where partition of some stream has expanded since the job is created, GroupByPartitionFixedTaskNum  GroupByPartitionFixedTaskNum should group partitions in such a way that 1) the number of tasks consuming from any given stream does not change before and after the partition expansion; and 2) messages with the same key in the same stream will be consumed by the same task before and after the expansion.

More specifically, GroupByPartitionFixedTaskNum.group(...) will map a given SystemStreamPartition ssp to the taskName which is determined using the following algorithm:

- Define previousSSPSetOfStream to be the subset of previousSystemStreamPartitionMapping.keys() whose stream equals ssp.getStream().
- Define previousTaskSetOfStream to be the subset of previousSystemStreamPartitionMappingof previousSystemStreamPartitionMapping.values() which are mapped to using any SSP previousSSPSetOfStream. This is the set of tasks that were consuming from the given stream. The same set of tasks should share partitions of this stream before and after partition expansion. Note that previousTaskSetOfStreamthat previousTaskSetOfStream.size() should equal the number of partitions of the stream when the job was run for the first time.
- Return taskName = String.format("TaskName-Partition %d", ssp.getPartition().getPartitionId() % previousTaskSetOfStream.size()). Note that resulting task names should be the same as those returned by GroupByPartition.group(...) if partition number of the input streams has not changed.

Stateful Samza job which is using GroupByPartition as GroupByPartition as grouper class should be configured to use GroupByPartitionFixedTaskNum in order to allow partition expansion. Note that GroupByPartitionFixedTaskNum that GroupByPartitionFixedTaskNum is backward compatible with GroupByPartition with GroupByPartition because it doesn't change partition-to-task assignment if partition doesn't expand. Thus user's job should not need to rebuild bootstrap key/value store from the changelog topic.

...

3) Handle partition expansion while tasks are running

JobCoordinator is already monitoring partition expansion of input streams as of current Samza implementation. And it already has logic to restart container in case of container failure. All we need to do is to let JobCoordinator re JobCoordinator re-calculate JobModel and restart container using the new JobModel when partition expansion is detected.

 

Public Interfaces

1) Depreciate Deprecate the method Map<TaskNameSet<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) of the interface SystemStreamPartitionGrouperinterface SystemStreamPartitionGrouper

2) Add method Map<TaskNamemethod Map<TaskName, Set<SystemStreamPartition>> group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps) in the interface SystemStreamPartitionGrouperinterface SystemStreamPartitionGrouper.

3) Add class GroupByPartitionFixedTaskNum class GroupByPartitionFixedTaskNum which implements the interface SystemStreamPartitionGrouperinterface SystemStreamPartitionGrouper


Implementation and Test Plan

To be added

Compatibility, Deprecation, and Migration Plan

- Deprecate the method Map<TaskNameSet<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) of the interface SystemStreamPartitionGrouper.
- Users need to implement the newly-added method if they have a custom implementation of the interface SystemStreamPartitionGrouper.
- Users need to use the newly-added GroupByPartitionFixedTaskNum as the grouper class in order to deal with possible partition number change of their input streams.To be added

Rejected Alternatives

1. Allow task number to increase instead of creating a new grouper class.

...