Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Released: <Samza version>

Problem

...

and Goal

The input system used by Samza may need to expand the number of Right now, Samza does not allow partitions of the input stream to increase after a stateful job is created. This causes problem when Kafka is used as the input system, because we need to expand partitions of an existing topic as the streams for various reasons. For example, Kafka generally needs to limit the maximum size of each partition to scale up its performance. Thus the number of partitions of a Kafka topic needs to be expanded to reduce the partition size if the average byte-in-rate or retention time of the topic increases over time in order to limit the size of the maximum partition in Kafka. Kafka broker may have performance issue if the size of a given partition is too largeKafka topic has doubled.

However, Samza currently is not able to handle partition expansion of the input streams for stateful jobs. For example, if user is running a stateful job using GroupByPartition as grouper and the partition of the input stream has increased from 2 to 4, then partition 3 may be assigned to a new task after partition expansion. Because the new task handling partition 3 does not have the previous state to resume the work, a key-based counter would wrongfully start from count 0 for a specific key that was reassigned from partition 1 to partition 3 after the expansion, instead of correctly starting from the previous count held by task 1.

The goal of this proposal is to enable partition expansion of the input streams while still allowing stateful jobs in Samza to produce the correct result.

Motivation

This design doc provides a solution to increase partition number of the input streams of a stateful Samza job while still ensuring the correctness of Samze job output. The solution should work when Kafka is used as the input system. We expect this solution to work similarly with other input system as wellfor all input systems that satisfy the operation requirement described below, i.e. 1) hash algorithm should be implemented in the form of hash(key) % partitionNum and 2) partitionNum should be multiplied by power of two when we increase it. The motivation of increasing partition number of Kafka topic is includes 1) increase performance of Kafka broker limit the maximum size of a partition in order to improve broker performance and 2) increase throughput of Kafka consumer in the Samza container. Other input systems may have different reasons for partition expansion.

Proposed Changes

Kafka Operational Requirement

1) Partitioning algorithms in producer

The partitioning algorithm should be implemented in the form of hash(key) % partitionNum. User is free to choose hash function but the hashed result must be mod by the partitionNum. This requirement is satisfied by the default partitioning algorithms in Kafka. Samza needs to repartition the user’s topic if user uses a customized partitioning algorithm in their producer implementation that doesn’t meet this requirement.

The current solution would keep task number unchanged and use a proper partition-to-task assignment to make sure the Samza output is correct after partition expansion. An alternative solution is to allow task number to increase after partition expansion and uses a proper task-to-container assignment to make sure the Samza output is correct. The second solution, which allows task expansion, is needed in order to scale up the performance of Samza. Note that this solution would also allow partition expansion for stateful job that doesn't use join operation for co-partitioned streams. However, the second solution is much more complicated to design and implement than the solution proposed in this doc. And it doesn't enable the partition expansion for stateful Samza jobs that uses join operation for co-partitioned streams (See Rejected Alternative section), which can be addressed by this proposal. Thus, these two solutions don't replace each other and can be designed independently. We plan to use the first solution described in this doc to enable partition expansion as a low hanging fruit. The feature of task expansion is out of the scope of this proposal and will be addressed in a future SEP.

 

Proposed Changes

Operational Requirement of the Input System

1) New partition to old partition mapping algorithm

The expansion expansion can be supported after this proposal if and only if the partition expansion algorithm in the input system meets the following requirement:

- Let's say the partition of a given stream in the input system has been expanded from previousPartitionCount to currentPartitionCount. Then the messages that are currently mapped to the partition with index X must have been mapped to partition with the same index X if X < previousPartitionCount (suppose partition index starts from 0).
- There exists a function such that, given previouPartitionCount, currentPartitionCount and a partitionIndex, the function can deterministically determine the index of the partition X where all messages currently mapped to the partition partitionIndex after partition expansion would have been mapped to the partition X before the partition expansion(s).

For example, this requirement is satisfied by Kafka when the following two requirements are met:

- User uses default partitioning algorithm of Kafka. This means the partitioning algorithm is implemented in the form of hash(key) % partitionCount.
- Partition number of any Kafka topic is always multiplied by a power of the same factor (e.g. 2) when we expand the partition.

2) Stream

...

management 

The partition number of any keyed topic that may be consumed by Samza stream that is used as input stream of stateful job should always be multiplied by power of two when we increase it. This guideline should be enforced by whoever may increase partition of the topic

Note that this can be trivially supported in Kafka.

3) Order of message consumption

The underlying system should ensure that messages with the same key from the same producer are always consumed in the order they are produced before and after the partition expansion. 

Note that this requirement is not currently supported in the Kafka. For example, say the partition is expanded from 2 to 4. Messages with a given key may be produced to partition 1 before expansion and to partition 3 after expansion. Because Kafka consumer does not guarantee order of message delivery across partitions, it is possible that Samza consumes messages with that key from partition 3 before partition 1 which violates the requirement. Thus additional development work is needed in Kafka to meet this requirement. We will provide in this doc the link to the KIP once it is available.

Samza Implementation Change

...

Create classes SystemStreamPartitionAssignmentManager and SetSSPTaskMapping to read and write SystemStreamPartition-to-Task assignment in the coordinator stream. This will be done similar to how ChangelogPartitionManager and SetChangelogMapping how ChangelogPartitionManager and SetChangelogMapping are used to read and write Task-to-ChangeLogPartition assignment in the coordinator stream. The assignement assignment should be written to the coordinator stream every time the job model is initialized.

2)

...

Add interface InputStreamPartitionExpansionAlgorithm and class DefaultPartitionExpansionAlgorithm

This interface includes the method int getPartitionBeforeExpansion(int currentPartitionIdx, int currentPartitionCount, int initialPartitionCount). This method should meet two requirements:

- This method returns the index of the partition that would contain all messages in the partition currentPartitionIdx if the input stream has never been expanded. 
- If currentPartitionIdx < initialPartitionCount (suppose partition index starts from 0), then the corresponding partition index before partition should be currentPartitionIdx.

Users can provide custom implementation of this interface and specify it to be used by JobCoordinator to calculate JobModel. It allows Samza to support task expansion for any input system whose partition expansion algorithm can be expressed by this method.

We will also create class DefaultPartitionExpansionAlgorithm that implements this interface. Given current partition index currentPartitionIdx and the initial partition count initialPartitionCount, this class returns currentPartitionIdx % initialPartitionCount as the index of the corresponding partition before ANY partition expansion.

3) Add interface SystemStreamPartitionGrouperFixedTasks

The new interface should extend the existing interface SystemStreamPartitionGrouper. It should include a new method groupCurrently Samza uses various implementations of the interface SystemStreamPartitionGrouper.group(Set<SystemStreamPartition> ssps) to derive the SystemStreamPartition-to-Task assignment. With the proposal in this doc, we should deprecate the existing method group(Set<SystemStreamPartition> ssps) in this interface SystemStreamPartitionGrouper and replace it with method  group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps). The new method takes the SystemStreamPartition-to-Task assignment from the previous job model which can be read from the coordinator stream.

The SystemStreamPartition-to-Task assignment from the previous job model can be used to detect partition number change of the input stream and help determine the new SystemStreamPartition-to-Task assignment for the new job model. The new SystemStreamPartition-to-Task assignment should ensure that partitions which may have messages with the same key will be mapped to the same task. For example, if partition is increased from 2 to 4, partitions 0 and 2 should be mapped to the same task and partitions 1 and 3 should be mapped to the same task.

Currently we have three implementations of the interface SystemStreamPartitionGrouper, i.e. AllSspToSingleTaskGrouper, GroupByPartition and GroupBySystemStreamPartition. We don't need to use the new information in AllSspToSingleTaskGrouper.group(...) because it already ensures that partitions which may have messages with the same key will be mapped to the same task. And we don't need to use this information in GroupBySystemStreamPartition.group(...) as well because it should not be used for stateful jobs. Thus this proposal will only use previousSystemStreamPartitionMapping in GroupByPartition.group(...) to determine the new SystemStreamPartition-to-Task assignment.

Given SystemStreamPartition ssp, GroupByPartition.group(...) will determine the taskName that this ssp should be mapped to using the algorithm below:

- Define previousSSPSetOfStream to be the subset of previousSystemStreamPartitionMapping.keys() whose stream equals ssp.getStream()
- Define previousTaskSetOfStream to be the subset of previousSystemStreamPartitionMapping.values() which are mapped to using any SSP previousSSPSetOfStream
- Determine previousTaskSetOfStream.size() as the number of partitions of the stream ssp.getStream() when the job was first created.
- Determine String.format("Partition %d", ssp.getPartition().getPartitionId() % previousTaskSetOfStream.size())

4) Create class GroupByPartitionFixedTasks and GroupBySystemStreamPartitionFixedTasks

We should create two new classes GroupByPartitionFixedTasks and GroupBySystemStreamPartitionFixedTasks which implements the interface SystemStreamPartitionGrouperFixedTasksGroupByPartitionFixedTasks (or GroupBySystemStreamPartitionFixedTasks) should group system-stream-partitions in the same way as GroupByPartition (or GroupBySystemStreamPartition) if previousSystemStreamPartitionMapping is empty (i.e. the job is run for the first time) or if partition of those streams has not changed since the job is created. Otherwise, GroupByPartitionFixedTasks should group partitions in such a way that 1) the number of tasks consuming from any given stream does not change before and after the partition expansion; and 2) messages with the same key in the same stream will be consumed by the same task before and after the expansion.

More specifically, GroupByPartitionFixedTasks will map a given SystemStreamPartition ssp to the taskName which is determined using the following algorithm:

- If previousSystemStreamPartitionMapping is empty, return GroupByPartition.group(ssps).get(ssp), where ssps represents to entire set of SystemStreamPartition to be grouped.
- Calculate from previous assignment previousSystemStreamPartitionMapping the total number of tasks that are consuming from partitions of the stream ssp.getStream(). Denote this as taskNumForStream.
- Determine the partition corresponding to the ssp before ANY partition expansion using this:
previousPartitionIdx = InputStreamPartitionExpansionAlgorithm.getPartitionBeforeExpansion(ssp.getPartition(), currentPartitionCountForStream, taskNumForStream)
- return previousSystemStreamPartitionMapping.get(new SystemStreamPartition(ssp.getSystem(), ssp.getStream(), previousPartitionIdx))

Similarly, GroupBySystemStreamPartitionFixedTasks will map a given SystemStreamPartition ssp to the taskName which is determined using the following algorithm:

- If previousSystemStreamPartitionMapping is empty, return GroupBySystemStreamPartition.group(ssps).get(ssp), where ssps represents to entire set of SystemStreamPartition to be grouped.
- Calculate from previous assignment previousSystemStreamPartitionMapping the total number of tasks that are consuming from partitions of the stream ssp.getStream(). Denote this as taskNumForStream.
- Determine the partition corresponding to the ssp before ANY partition expansion using this:
previousPartitionIdx = InputStreamPartitionExpansionAlgorithm.getPartitionBeforeExpansion(ssp.getPartition(), currentPartitionCountForStream, taskNumForStream)
- return previousSystemStreamPartitionMapping.get(new SystemStreamPartition(ssp.getSystem(), ssp.getStream(), previousPartitionIdx))

Stateful Samza job which is using GroupByPartition (or GroupBySystemStreamPartition) as grouper class should be configured to use GroupByPartitionFixedTasks (or GroupBySystemStreamPartitionFixedTasks ) in order to allow partition expansion. Note that GroupByPartitionFixedTasks (or GroupBySystemStreamPartitionFixedTasks) is backward compatible with GroupByPartition (or GroupBySystemStreamPartition) because they return the same partition-to-task assignment if partition doesn't expand. Thus user's job should not need to bootstrap key/value store from the changelog topic.

User should also provide custom implementation of InputStreamPartitionExpansionAlgorithm and specify it in the config if the DefaultPartitionExpansionAlgorithm does not match with the partition expansion algorithm used in the job's input system.This algorithm guarantees that partitions which may have messages with the same key will be mapped to the same task, when the partition of the input Kafka topic has increased, if the Kafka Operational Requirement described above is enforced. 

To help understand this algorithm, the idea is to split partitions into disjoint buckets (or groups) of partitions where the union of those buckets equals the original set of partitions. The partition-to-bucket assignment ensure that messages with the same key will be produced to the partitions of the same bucket even if they can be in different partitions. Then partitions can in the same bucket will be assigned to tasks in the unit of buckets the same task to ensure that messages with the same key will go to the same task. 

For example, suppose partition is increased from 2 to 4 and we use GroupByPartitionFixedTasks as grouper, partitions 0 and 2 should be mapped to the same task and partitions 1 and 3 should be mapped to the same taskThe figure below shows the relation between partitions, buckets and tasks after we increase partition from 2 to 4.

...

View file
namesamza.pdf
height250

 

...



5Handle partition expansion while tasks are running

Samza should monitor change of the partition count of the input streams. When the partition JobCoordinator is already monitoring partition expansion of input streams as of current Samza implementation. When JobCoordinator detects partition expansion of any input stream has changed, it should re-calculate JobModel, shutdown all containers using the off-the-shelf Yarn API, wait for callback to confirm that these containers have been shutdown, and restart container using the new JobModel.

  container should query Job Coordinator (referred to as JC below) for the set of partitions its tasks should consume by sending HTTP request to JC. JC should return the new Job Model which derives the new SystemStreamPartition-to-Task as described above. Then the container should update its consumer to consume from the new set of partitions and update its tasks to commit offsets for the new set of partitions.

Public Interfaces

1) Depreciate the method Map<TaskNameSet<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) of the interface SystemStreamPartitionGrouper

2) Add method Map<TaskName, Set<SystemStreamPartition>> group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps) in the interface SystemStreamPartitionGrouper.

The interface required to handle partition expansion while tasks are running will be specified later.

Add interface SystemStreamPartitionGrouperFixedTasks with the following definition:

Code Block
languagejava
public interface SystemStreamPartitionGrouperFixedTasks extends SystemStreamPartitionGrouper {
  Map<TaskName, Set<SystemStreamPartition>> group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps);
}


3) Add class GroupByPartitionFixedTasks which implements the interface SystemStreamPartitionGrouperFixedTasks

4) Add class GroupBySystemStreamPartitionFixedTasks which implements the interface SystemStreamPartitionGrouperFixedTasks

5) Add interface InputStreamPartitionExpansionAlgorithm with the following definition:

Code Block
languagejava
public interface InputStreamPartitionExpansionAlgorithm {
  int getPartitionBeforeExpansion(int currentPartitionIdx, int currentPartitionCount, int initialPartitionCount);
}


This method returns the index of the partition that should contain all messages in the partition currentPartitionIdx if the input stream has NEVER been expanded.

6) Add class DefaultPartitionExpansionAlgorithm which implements this InputStreamPartitionExpansionAlgorithm. Given current partition index currentPartitionIdx and the initial partition count initialPartitionCount, this class returns currentPartitionIdx % initialPartitionCount as the index of the corresponding partition before ANY partition expansion.

7) Add config job.systemstreampartition.expansion.algorithm. This config specifies the canonical name of a class that implements the interface InputStreamPartitionExpansionAlgorithm. The default value of this config will be the canonical name of the class DefaultPartitionExpansionAlgorithm.


...

Test Plan

To be added

Compatibility, Deprecation, and Migration Plan

User needs to implement the new method if he/she has custom implementation of the interface SystemStreamPartitionGrouper. The proposed solution is backward compatible and can be deployed withotu specific migration plan.

- The change made in this proposal is both source backward-compatible and binary backward compatible. Their code can compile and run correctly without change.
- For users who want to enable partition expansion for its input streams, they can do the following:
  - Set grouper class to GroupByPartitionFixedTasks if the job is using GroupByPartition as grouper class
  - Set grouper class to GroupBySystemStreamPartitionFixedTasks if the job is using GroupBySystemStreamPartition as grouper class 
  - Change their custom grouper class implementation to extend the new interface if the job is using a custom grouper class implementation.
  - Set job.coordinator.monitor-partition-change to true in the job configuration
  - Run ConfigManager

Rejected Alternatives

1. Add the new method to the existing interface SystemStreamPartitionGrouper instead of creating a new interface.

The advantage of this alternative solution is that it requires less interface class and shorter class hierarchy. But we choose to follow the existing pattern of interface extension in Samza as we do with BalancingTaskNameGrouper.

2. Use additional repartitioning stage to repartition data from input stream to another internal stream of the old partition count

Here are the pros and cons of the extra re-partitioning stage in comparison to the current proposal.
Pros:
- It doesn't require owner of the Samza job to know the partitioning algorithm of used for the input stream. If the owner of the Samza job is in a different organization than the producer of the input stream, this solution frees different organizations from having to coordinate with each other.
- It doesn't require owner of the Samza job to specify the partitioning algorithm of used for the input stream. Thus less config.
Cons:
- User has to make code change on their side to use the new fluent API.
- The extra partitioning stage would potentially increases latency.
- The extra partitioning stage would incur additional cost due to the extra internal topic. The cost is probably not that much with the new trim() API in Kafka if Samza uses Kafka to store the internal topic. But the cost may be doubled if Samza uses another input system that doesn't provide trim() API to delete data on demand.
It seems reasonable to adopt a hybrid solution, i.e. we still implement the current proposal in SEP-5 so that we enable partition expansion without incurring extra latency/cost and without requiring users to change their code. And user can use the extra partitioning stage if the coordination among different organization is indeed a concern.

Future work

1. Allow task number to increase instead of creating a new grouper class.

Allow task number to increase is useful since it increases the performance of a given Samza job. However, this feature alone does not solve the problem of allowing partition expansion. For example, say we have a job that joins two streams both of which have 3 partitions. If partition number of one stream increases from 3 to 6, we would still want the task number to remain 3 to make sure that messages with the same key from both streams will be handled by the same task. This needs to be done with the new grouper classes proposed in this doc.

 

 

...