Status
Current state: UNDER DISCUSSION
Discussion thread:
JIRA: SAMZA-1293
Released: <Samza version>
Problem
Right now, Samza does not allow partitions of the input stream to increase after a stateful job is created. This causes problem when Kafka is used as the input system, because we need to expand partitions of an existing topic as the byte-in-rate of the topic increases over time in order to limit the size of the maximum partition in Kafka. Kafka broker may have performance issue if the size of a given partition is too large.
Motivation
This design doc provides a solution to increase partition number of the input streams of a stateful Samza job while still ensuring the correctness of Samze job output. The solution should work for all input systems that satisfy the operation requirement described below, i.e. hash algorithm should be implemented in the form of hash(key) % partitionNum
and partitionNum
should be multiplied by power of two when we increase it. In the rest of the documentation we will assume Kafka as the input system. The motivation of increasing partition number of Kafka topic is 1) increase performance of Kafka broker and 2) increase throughput of Kafka consumer in the Samza container.
Proposed Changes
Kafka Operational Requirement
1) Partitioning algorithms in producer
The partitioning algorithm should be implemented in the form of hash(key) % partitionNum
. User is free to choose hash function but the hashed result must be mod by the partitionNum
. This requirement is satisfied by the default partitioning algorithms in Kafka. Samza needs to repartition the user’s topic if user uses a customized partitioning algorithm in their producer implementation that doesn’t meet this requirement.
2) Topic management
The partition number of any keyed topic that may be consumed by Samza should always be multiplied by power of two when we increase it. This guideline should be enforced by whoever may increase partition number of the Kafka topic.
Samza Implementation Change
1) Store SystemStreamPartition-to-Task assignment in the coordinator stream
Create classes SystemStreamPartitionAssignmentManager
and SetSSPTaskMapping
to read and write SystemStreamPartition-to-Task assignment in the coordinator stream. This will be done similar to how ChangelogPartitionManager
and SetChangelogMapping
are used to read and write Task-to-ChangeLogPartition assignment in the coordinator stream. The assignment should be written to the coordinator stream every time the job model is initialized.
2) Change interface SystemStreamPartitionGrouper and create class GroupByPartitionFixedTaskNum
With the proposal in this doc, we should deprecate the existing method group(Set<SystemStreamPartition> ssps)
of the interface SystemStreamPartitionGrouper
and replace it with the method group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps)
. The new method takes the SystemStreamPartition-to-Task assignment from the previous job model which can be read from the coordinator stream.
We should create a new class GroupByPartitionFixedTaskNum
which implements the interface SystemStreamPartitionGrouper
. GroupByPartitionFixedTaskNum
should group system-stream-partitions in the same way as the existing method GroupByPartition.group(...)
if previousSystemStreamPartitionMapping
is empty (i.e. the job is run for the first time) or if partition number of those streams has not changed since the job is created. Otherwise, in the case where partition of some stream has expanded since the job is created, GroupByPartitionFixedTaskNum
should group partitions in such a way that 1) the number of tasks consuming from any given stream does not change before and after the partition expansion; and 2) messages with the same key in the same stream will be consumed by the same task before and after the expansion.
More specifically, GroupByPartitionFixedTaskNum.group(...)
will map a given SystemStreamPartition
ssp
to the taskName which is determined using the following algorithm:
- Define previousSSPSetOfStream
to be the subset of previousSystemStreamPartitionMapping.keys()
whose stream equals ssp.getStream()
.
- Define previousTaskSetOfStream
to be the subset of previousSystemStreamPartitionMapping.values()
which are mapped to using any SSP previousSSPSetOfStream
. This is the set of tasks that were consuming from the given stream. The same set of tasks should share partitions of this stream before and after partition expansion. Note that previousTaskSetOfStream.size()
should equal the number of partitions of the stream when the job was run for the first time.
- Return taskName = String.format("Partition %d", ssp.getPartition().getPartitionId() % previousTaskSetOfStream.size()). Note that resulting task names should be the same as those returned by GroupByPartition.group(...)
if partition number of the input streams has not changed.
Stateful Samza job which is using GroupByPartition
as grouper class should be configured to use GroupByPartitionFixedTaskNum
in order to allow partition expansion. Note that GroupByPartitionFixedTaskNum
is backward compatible with GroupByPartition
because it doesn't change partition-to-task assignment if partition doesn't expand. Thus user's job should not need to bootstrap key/value store from the changelog topic.
To help understand this algorithm, the idea is to split partitions into disjoint buckets (or groups) of partitions where the union of those buckets equals the original set of partitions. The partition-to-bucket assignment ensure that messages with the same key will be produced to the partitions of the same bucket. Then partitions in the same bucket will be assigned to the same task to ensure that messages with the same key will go to the same task.
For example, if partition is increased from 2 to 4, partitions 0 and 2 should be mapped to the same task and partitions 1 and 3 should be mapped to the same task. The figure below shows the relation between partitions, buckets and tasks after we increase partition from 2 to 4.
3) Handle partition expansion while tasks are running
JobCoordinator
is already monitoring partition expansion of input streams as of current Samza implementation. And it already has logic to restart container in case of container failure. All we need to do is to let JobCoordinator
re-calculate JobModel
and restart container using the new JobModel
when partition expansion is detected.
Public Interfaces
1) Deprecate the method Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps)
of the interface SystemStreamPartitionGrouper
2) Add method Map<TaskName, Set<SystemStreamPartition>> group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps)
in the interface SystemStreamPartitionGrouper
.
3) Add class GroupByPartitionFixedTaskNum
which implements the interface SystemStreamPartitionGrouper
Test Plan
To be added
Compatibility, Deprecation, and Migration Plan
- Deprecate the method Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps)
of the interface SystemStreamPartitionGrouper
.
- Users need to implement the newly-added method if they have a custom implementation of the interface SystemStreamPartitionGrouper
.
- Users need to use the newly-added GroupByPartitionFixedTaskNum
as the grouper class in order to deal with possible partition number change of their input streams.
Rejected Alternatives
1. Allow task number to increase instead of creating a new grouper class.
Allowing task number to increase is useful since it increases the performance of a given Samza job. However, this feature alone does not solve the problem of allowing partition expansion. For example, say we have a job that joins two streams both of which have 3 partitions. If partition number of one stream increases from 3 to 6, we still want the task number to remain 3 to make sure that messages with the same key from both streams will be handled by the same task. This needs to be done with the new grouper class we proposed in this doc.