You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 16 Next »


Status

Current state: UNDER DISCUSSION

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201705.mbox/%3CCANxwKLaVro6MBvUJW2RvoNLDO9-G87Y3Ox%2B5W66K_CxBqeVfgQ%40mail.gmail.com%3E

JIRA: SAMZA-871

Released: <Samza version>

Problem

Right now, Samza does not allow partitions of the input stream to increase after a stateful job is created. This causes problem when Kafka is used as the input system, because we need to expand partitions of an existing topic as the byte-in-rate of the topic increases over time in order to limit the size of the maximum partition in Kafka. Kafka broker may have performance issue if the size of a given partition is too large.

Motivation

This design doc provides a solution to increase partition number of the input streams of a stateful Samza job while still ensuring the correctness of Samze job output. The solution should work when Kafka is used as the input system. We expect this solution to work similarly with other input system as well. The motivation of increasing partition number of Kafka topic is 1) increase performance of Kafka broker and 2) increase throughput of Kafka consumer in the Samza container.

Proposed Changes


Kafka Operational Requirement

1) Partitioning algorithms in producer

The partitioning algorithm should be implemented in the form of hash(key) % partitionNum. User is free to choose hash function but the hashed result must be mod by the partitionNum. This requirement is satisfied by the default partitioning algorithms in Kafka. Samza needs to repartition the user’s topic if user uses a customized partitioning algorithm in their producer implementation that doesn’t meet this requirement.

2) Topic management

 

The partition number of any keyed topic that may be consumed by Samza should always be multiplied by power of two when we increase it. This guideline should be enforced by whoever may increase partition of the topic.

Samza Implementation Change

1) Store SystemStreamPartition-to-Task assignment in the coordinator stream

 

Create classes SystemStreamPartitionAssignmentManager and SetSSPTaskMapping to read and write SystemStreamPartition-to-Task assignment in the coordinator stream. This will be done similar to how ChangelogPartitionManager and SetChangelogMapping are used to read and write Task-to-ChangeLogPartition assignment in the coordinator stream. The assignement should be written to the coordinator stream every time the job model is initialized.

2) Provide the SystemStreamPartition-to-Task assignment from previous job model to SystemStreamPartitionGrouper.group(...)

Currently Samza uses various implementations of the interface SystemStreamPartitionGrouper.group(Set<SystemStreamPartition> ssps) to derive the SystemStreamPartition-to-Task assignment. With the proposal in this doc, we should deprecate the existing method group(Set<SystemStreamPartition> ssps) in this interface SystemStreamPartitionGrouper and replace it with method  group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps). The new method takes the SystemStreamPartition-to-Task assignment from the previous job model which can be read from the coordinator stream.

The SystemStreamPartition-to-Task assignment from the previous job model can be used to detect partition number change of the input stream and help determine the new SystemStreamPartition-to-Task assignment for the new job model. The new SystemStreamPartition-to-Task assignment should ensure that partitions which may have messages with the same key will be mapped to the same task. For example, if partition is increased from 2 to 4, partitions 0 and 2 should be mapped to the same task and partitions 1 and 3 should be mapped to the same task.

Currently we have three implementations of the interface SystemStreamPartitionGrouper, i.e. AllSspToSingleTaskGrouper, GroupByPartition and GroupBySystemStreamPartition. We don't need to use the new information in AllSspToSingleTaskGrouper.group(...) because it already ensures that partitions which may have messages with the same key will be mapped to the same task. And we don't need to use this information in GroupBySystemStreamPartition.group(...) as well because it should not be used for stateful jobs. Thus this proposal will only use previousSystemStreamPartitionMapping in GroupByPartition.group(...) to determine the new SystemStreamPartition-to-Task assignment.

 

Given SystemStreamPartition ssp, GroupByPartition.group(...) will determine the taskName that this ssp should be mapped to using the algorithm below:

- Define previousSSPSetOfStream to be the subset of previousSystemStreamPartitionMapping.keys() whose stream equals ssp.getStream()
- Define previousTaskSetOfStream to be the subset of previousSystemStreamPartitionMapping.values() which are mapped to using any SSP previousSSPSetOfStream
- Determine previousTaskSetOfStream.size() as the number of partitions of the stream ssp.getStream() when the job was first created.
- Determine String.format("Partition %d", ssp.getPartition().getPartitionId() % previousTaskSetOfStream.size())

This algorithm guarantees that partitions which may have messages with the same key will be mapped to the same task, when the partition of the input Kafka topic has increased, if the Kafka Operational Requirement described above is enforced. 

3) Handle partition expansion while tasks are running

Samza should monitor change of the partition count of the input streams. When the partition of any input stream has changed, container should query Job Coordinator (referred to as JC below) for the set of partitions its tasks should consume by sending HTTP request to JC. JC should return the new Job Model which derives the new SystemStreamPartition-to-Task as described above. Then the container should update its consumer to consume from the new set of partitions and update its tasks to commit offsets for the new set of partitions.

Public Interfaces

1) Depreciate the method Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) of the interface SystemStreamPartitionGrouper

2) Add method Map<TaskName, Set<SystemStreamPartition>> group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps) in the interface SystemStreamPartitionGrouper.

The interface required to handle partition expansion while tasks are running will be specified later.

Implementation and Test Plan

To be added

Compatibility, Deprecation, and Migration Plan

User needs to implement the new method if he/she has custom implementation of the interface SystemStreamPartitionGrouper. The proposed solution is backward compatible and can be deployed withotu specific migration plan.

Rejected Alternatives

 

  • No labels