Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The partitioning algorithm should be implemented in the form of hash(key) % partitionNum. User is free to choose hash function but the hashed result must be mod by the partitionNum. This requirement is satisfied by the default partitioning algorithms in Kafka. Samza needs to repartition the user’s topic if user uses a customized partitioning algorithm in their producer implementation that doesn’t meet this requirement.

2) Topic management 

The partition number of any keyed topic that may be consumed by Samza should always be multiplied by power of two when we increase it. This guideline should be enforced by whoever may increase partition of the topic.

...

1) Store SystemStreamPartition-to-Task assignment in the coordinator stream

 

Create classes SystemStreamPartitionAssignmentManager and SetSSPTaskMapping to read and write SystemStreamPartition-to-Task assignment in the coordinator stream. This will be done similar to how ChangelogPartitionManager and SetChangelogMapping are used to read and write Task-to-ChangeLogPartition assignment in the coordinator stream. The assignement should be written to the coordinator stream every time the job model is initialized.

...

Currently we have three implementations of the interface SystemStreamPartitionGrouper, i.e. AllSspToSingleTaskGrouper, GroupByPartition and GroupBySystemStreamPartition. We don't need to use the new information in AllSspToSingleTaskGrouper.group(...) because it already ensures that partitions which may have messages with the same key will be mapped to the same task. And we don't need to use this information in GroupBySystemStreamPartition.group(...) as well because it should not be used for stateful jobs. Thus this proposal will only use previousSystemStreamPartitionMapping in GroupByPartition.group(...) to determine the new SystemStreamPartition-to-Task assignment.

 

Given SystemStreamPartition ssp, GroupByPartition.group(...) will determine the taskName that this ssp should be mapped to using the algorithm below:

...

This algorithm guarantees that partitions which may have messages with the same key will be mapped to the same task, when the partition of the input Kafka topic has increased, if the Kafka Operational Requirement described above is enforced

To help understand this algorithm, the idea is to split partitions into disjoint buckets (or groups) of partitions where the union of those buckets equals the original set of partitions. The partition-to-bucket assignment ensure that messages with the same key will be produced to the partitions of same bucket even if they can be in different partitions. Then partitions can be assigned to tasks in the unit of buckets to ensure that messages with the same key will go to the same task. The figure below shows the relation between partitions, buckets and tasks after we increase partition from 2 to 4.

 

View file
namesamza.pdf
height250

 


3) Handle partition expansion while tasks are running

Samza should monitor change of the partition count of the input streams. When the partition of any input stream has changed, container should query Job Coordinator (referred to as JC below) for the set of partitions its tasks should consume by sending HTTP request to JC. JC should return the new Job Model which derives the new SystemStreamPartition-to-Task as described above. Then the container should update its consumer to consume from the new set of partitions and update its tasks to commit offsets for the new set of partitions.

Public Interfaces

1) Depreciate the method Map<TaskNameSet<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) of the interface SystemStreamPartitionGrouper

...