Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Released: <Samza version>

Problem

Right now, Samza does not allow The input system used by Samza may need to expand the number of partitions of the input stream to increase after a stateful job is created. This causes problem when Kafka is used as the input system, because we need to expand partitions of an existing topic as the streams for various reasons. For example, Kafka generally needs to limit the maximum size of partition to scale up its performance. Thus the number of partitions of a Kafka topic needs to be expanded to reduce the partition size if the average byte-in-rate or retention time of the topic increases over time in order to limit the size of the maximum partition in Kafka. Kafka broker may have performance issue if the size of a given partition is too largeKafka topic has doubled. However, Samza currently is not able to handle partition expansion of the input streams. For example, if user is running a stateful job using GroupByPartition as grouper and the partition of the input stream has increased from 2 to 4, then for a period of time after partition expansion, it is possible (e.g. with Kafka) that messages with a given key exists in both partition 1 an 3. Because GroupByPartition will assign partition 1 and 3 to different tasks, messages with the same key may be handled by different task/container/process and their state will be stored in different changelog partition. Thus the output from Samza may be wrong becasue this violates the assumption used by Samza for stateful jobs.

Motivation

This design doc provides a solution to increase partition number of the input streams of a stateful Samza job while still ensuring the correctness of Samze job output. The solution should work for all input systems that satisfy the operation requirement described below, i.e. 1) hash algorithm should be implemented in the form of hash(key) % partitionNum and 2) partitionNum should be multiplied by power of two when we increase it. In the rest of the documentation we will assume Kafka as the input system. The motivation of increasing partition number of Kafka topic is includes 1) increase performance of Kafka broker limit the maximum size of a partition in order to improve broker performance and 2) increase throughput of Kafka consumer in the Samza container. Other input systems may have different reasons for partition expansion.

Proposed Changes

...

The current solution would keep task number unchanged and use a proper partition-to-task assignment to make sure the Samza output is correct after partition expansion. An alternative solution is to allow task number to increase after partition expansion and uses a proper task-to-container assignment to make sure the Samza output is correct. The second solution, which allows task expansion, is needed in order to scale up the performance of Samza. Note that this solution would also allow partition expansion for stateful job that doesn't use join operation. However, the second solution is much more complicated to design and implement than the first solution in this doc. And it doesn't enable the partition expansion for stateful Samza jobs that uses join operation (See Rejected Alternative section), which can be addressed by this proposal. Thus, these two solutions don't replace each other and can be designed independently. We plan to use the first solution described in this doc to enable partition expansion as a low hanging fruit. Later we will come up with a separate proposal to address the task expansion problem.

 

Proposed Changes

Operational Requirement of the Input System

1) Partitioning algorithms in producer

The partitioning algorithm should be implemented in the form of hash(key) % partitionNum. User is free to choose hash function but the hashed result must be mod by the partitionNum. This

 Note that this requirement is satisfied by the default partitioning algorithms in Kafka. Samza  Samza needs to repartition the user’s topic stream if user uses a customized partitioning algorithm in their producer implementation that doesn’t meet this requirement.

2)

...

Stream management 

The partition number of any keyed topic that may be consumed by Samza stream that is used as input stream of stateful job should always be multiplied by power of two when we increase it. This guideline should be enforced by whoever may increase partition number of the Kafka topic

Note that this can be trivially supported in Kafka.

3) Order of message consumption

The underlying system should ensure that messages with the same key from the same producer are always consumed in the order they are produced before and after the partition expansion. 

 Note that this requirement is not currently supported in the Kafka. For example, say the partition is expanded from 2 to 4. Messages with a given key may be produced to partition 1 before expansion and to partition 3 after expansion. Because Kafka consumer does not guarantee order of message delivery across partitions, it is possible that Samza consumes messages with that key from partition 3 before partition 1 which violates the requirement. Thus additional development work is needed in Kafka to meet this requirement.

Samza Implementation Change

...

Allowing task number to increase is useful since it increases the performance of a given Samza job. However, this feature alone does not solve the problem of allowing partition expansion. For example, say we have a job that joins two streams both of which have 3 partitions. If partition number of one stream increases from 3 to 6, we would still want the task number to remain 3 to make sure that messages with the same key from both streams will be handled by the same task. This needs to be done with the new grouper class we classes proposed in this doc.