Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: [ UNDER DISCUSSION ACCEPTED ]

Discussion thread: <link to mailing list DISCUSS thread>: https://lists.apache.org/thread/vjtl5fnf64kpkoxc591466y92dlt2bsb

Voting thread: https://lists.apache.org/thread/2v4gxtc6yoy77lkn2xg4n0dgj3t2mpgq

JIRASAMZA-2687

Released: 

Problem

Some Samza jobs hit throughput issues which cannot be alleviated by sizing the job. One such issue often faced is the inability to scale up task count (and hence container count) beyond the number of input partitions. The need for this arises due to the process-time of the job’s logic which is not under Samza’s control. Then the only way to address the job’s low throughput is to manually increase the partition count of the input streams achieving a higher number of tasks. This design doc aims to solve the problem of throughput dependency on input partition count by introducing virtual tasks which are similar to tasks but consume a portion of the input partition.

...

  1. Job's scale is no longer limited by input partition count.
    1. Large process times arising from complex and long job DAG can be supported without causing lag. 
    2. Improve stateful job throughput which SEP-5: Enable partition expansion of input streams does not solve. SEP-5 allows the repartitioning of the stateful job’s input  without losing all the state. It does not allow for more tasks within the job but rather allows the same number of tasks to consume the extra number of input partitions.
  2. The streaming storage (aka input streams) is no longer forced to use the partitioning required to support the required processing parallelism.

Scope

This SEP focusses on designing elasticity for stateless job only. Elasticity for stateful jobs will be a future work and there will be a follow up SEP for it.

Requirements

The proposed solution should meet the following requirements.

  1. Support scale up and scale down of the job to increase throughput without limiting the scale up to the input streams’ partition counts. 
    1. Overcome the need for input stream repartitioning when the job faces lag.
  2. Samza guarantees should be upheld:
    1. No data loss while scaling up and scaling down the job.
    2. In-order processing of messages with the same key.
  3. The solution should be agnostic to input systems, output systems, intermediate/metadata system and  the cluster-manager as well.
  4. It should provide interfaces/api/configs for autosizing controller to size the job.
  5. The solution should work for all stateless Samza jobs.
  6. Stateful Design should be extensible for stateful jobs should be able to increase the number of (virtual) tasks beyond the number of input partitions while preserving state.

Assumptions:

  1. This work assumes Future stateful job elasticity work will need the assumption that all state of a job is keyed by input key or by a state key obtained by applying a 1:1 function to input key (to figure out for a given state key which input key it belongs to).
  2. Stateful job support is in future work will be limited to jobs using beam api and blob store based state backup and restore.

...

When two virtual tasks consuming two SSPKb of the same SSP are in a single container, their checkpoints will give different offsets for the SystemConsumers (aka consuming layer) to start consumption for a given SSP. In this case, the consuming layer takes the lower/older of the two offsets to start consuming from. This has the potential downside of duplication processing of some messages by the The virtual task which had a checkpoint with a larger/newer offset in the SSP . However, this is acceptable due to Samza's at least once guaranteewill filter out or ignore the messages w/ lower offset and start consuming when the offset matches its checkpoint.

Splitting and Merging of v.tasks

...

  1. Grouper changes to optimize the virtual-task assignment to containers such that virtual-tasks consuming from the same input partition are assigned to the same container so that the input consumption costs are lower and minimize consumption of same partition by several containers.
  2. Minimize the deserialization costs by deserializing only those that are meant to be processed by the container. Minimizing at consumption layer aka not even consuming the messages of the partition that will not be processed by the container is not possible because we do not know before hand which messages are to be processed by a container and in most input systems, subscription is at the input partition level and not sub-partition level.

Rejected Solutions

All-to-all partitioning scheme

Limitations of current design

Consider the scenario where two virtual tasks consuming the same SSP are in a container. If one of the virtual tasks is much faster than the other virtual task then in this design, the overall throughput will be limited to the slower virtual task throughput. This is because they share the same SystemConsumer in the container which subscribes to the input SSP at a partition level and hence needs to consume at the rate of the slower virtual task. However, since both virtual tasks are in the same container, there is no increased consumption cost on the underlying input system. Throughput can be increased by moving the faster virtual task into a different container but this would incur a higher consumption cost on the input system. This is a known tradeoff in this design. This can be addressed in the future via better grouper mechanism or container assignment. 

Rejected Solutions

All-to-all partitioning scheme

  1. Idea: Task split with all virtual tasks consuming all partitions within an input stream. Note that this is different from the proposed solution in terms of how many SSPs a virtual task consumes. For eg If the input stream had partitions 0,1,2,4 originally with 2 containers. Tasks a,b in container 1 read 0,1 respectively Idea: Task split with all virtual tasks consuming all partitions within an input stream. Note that this is different from the proposed solution in terms of how many SSPs a virtual task consumes. For eg If the input stream had partitions 0,1,2,4 originally with 2 containers. Tasks a,b in container 1 read 0,1 respectively and tasks c,d in container 2 read 2,4. So container 1 reads 0,1 and container 2 reads reads 2,4.  With this rejected solution - there will be 8 v.tasks in say 2 containers but each v.task needs to read all 4 partitions - aka v.task a1 will read 0,1,2,4 and so on. This leads to both container 1 and 2 reading all 0,1,2,4 .  With this rejected solution - there will be 8 v.tasks in say 2 containers but each v.task needs to read all 4 partitions - aka v.task a1 will read 0,1,2,4 and so on. This leads to both container 1 and 2 reading all 0,1,2,4 partitions.
  2. Pros:
    1. Solves the hot partition problem.
  3. Cons:
    1. Consumption will overwhelm the input system.
    2. Overhead of determining which input messages to process and which to ignore.

Per-key parallelism

  1. partitions.
  2. Pros:
    1. Solves the hot partition problem.
  3. Cons:
    1. Consumption will overwhelm the input system.
    2. Overhead of determining which input messages to process and which to ignore.

Per-key parallelism

  1. Idea: Improve parallelism such that one key gets processed by one unit (split task or thread within a task) within the container. 
  2. Pros:
    1. Maintains ordering within the key. Proposed solution also guarantees this by virtue of using a key to determine the processing virtual task.
  3. Cons:
    1. Number of keys within an input stream is not known apriori making it impossible to configure the number of threads or virtual tasks at the start of the job.
    2. Additionally, usually number of keys in an input stream is much much larger than typical number of tasks which could lead to a lot of metadata overhead.

KeyOrderedExecutor

  1. Idea: Improve parallelism such that a container has a fixed thread pool and each thread consumes a set of keys leading to in order execution within keys.
  2. Pros:
    1. Similar to elasticity design above and simpler design.
  3. Cons:
    1. Elasticity design above is integrated into the engine framework and has the advantage of checkpointing per key bucket.
    2. Additionally, elasticity design allows for higher throughput than KeyOrderedExecutor if the virtual tasks consuming a single SSP are spread across multiple containers
  4. Idea: Improve parallelism such that one key gets processed by one unit (split task or thread within a task) within the container. 
  5. Pros:
    1. Maintains ordering within the key. Proposed solution also guarantees this by virtue of using a key to determine the processing virtual task.
  6. Cons:
  7. Number of keys within an input stream is not known apriori making it impossible to configure the number of threads or virtual tasks at the start of the job.
  8. Additionally, usually number of keys in an input stream is much much larger than typical number of tasks which could lead to a lot of metadata overhead
    1. .

Public Interfaces

The following changes to public interfaces will arise out of this feature. 

...