Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Table of Contents

Status

Current state: UNDER DISCUSSION

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201705.mbox/%3CCANxwKLaVro6MBvUJW2RvoNLDO9-G87Y3Ox%2B5W66K_CxBqeVfgQ%40mail.gmail.com%3E

JIRA: SAMZA-871

Released:  <Samza version>

Problem

Right now, Samza relies on YARN to detect whether a container is alive/valid or not. This has a few problems as the YARN based liveness detection fails when the NM crashes, causing the container to be rescheduled on a different host without killing the old container, leading to double processing of messages. We need a way to make sure that invalid containers are killed in order to handle duplicate containers being launched.

The proposal is to solve this by implementing a JobCoordinator HTTP endpoint for a heart beat between the containers and the JobCoordinator.

Motivation

With the direct heart beat mechanism between the JobCoordinator and SamzaContainer, we can be agnostic to whatever the YARN RM/NM/AM sync status is. It is also simple to implement and understand due to its synchronous flow. 

Proposed Changes

JobCoordinator side

  • Expose a REST endpoint (eg: /containerHeartbeat) who's purpose is to get requests from the Samza container periodically and respond back weather the container is in the Job Coordinator's current list of valid containers.

    Code Block
    languagebash
    $ curl <host>:<port>/containerHeartbeat?executionContainerId=container_1490224420978_0323_01_000282
    {
    	alive: true
    }
  • Endpoint could be a part of the JobModelManager's servlet which is currently used for retrieving the JobModel by the containers during startup.
  • Endpoint can accept a "executionContainerId" (eg: YARN container ID) and validate it against state maintained by the Job Coordinator (eg: YarnAppState) and future implementations of other cluster managers need to implement this endpoint and expose the same validation.

Container side

  • In the LocalContainerRunner we can start a monitor that periodically polls the above endpoint to check if the container is valid.
    This new ContainerHeartbeatMonitor class accepts a callback and a ContainerHeartbeatClient (which implements the business logic to make heartbeat checks on the JC endpoint).
    The ContainerHeartbeatMonitor schedules a thread at a fixed rate which uses the client to check if the heartbeat is still valid. On failure of the heartbeat, the callback is executed, which is used to shutdown the container and set state on LocalContainerRunner to shutdown the main thread with a non-zero code.

Public Interfaces

  • Set an environment variable "EXECUTION_ENV_CONTAINER_ID" (eg: YARN container ID) during container launch. This can be read from the container to make requests to the above endpoint.

Implementation and Test Plan

  • Introduce the new REST endpoints for YARN.
  • Implement heartbeat monitor on container side that is responsible for killing the container if needed.
  • Setup metrics for number of invalid containers that made a call to the endpoint.
  • Add unit tests to test/verify compatibility

Compatibility, Deprecation, and Migration Plan

The changes in this SEP currently only targets the YARN deployment model and is not intended for Standalone.

Rejected Alternatives

...

does not allow partitions of the input stream to increase after a stateful job is created. This causes problem when Kafka is used as the input system, because we need to expand partitions of an existing topic as the byte-in-rate of the topic increases over time in order to limit the size of the maximum partition in Kafka. Kafka broker may have performance issue if the size of a given partition is too large.

Motivation

This design doc provides a solution to increase partition number of the input streams of a stateful Samza job while still ensuring the correctness of Samze job output. The solution should work when Kafka is used as the input system. We expect this solution to work similarly with other input system as well. The motivation of increasing partition number of Kafka topic is 1) increase performance of Kafka broker and 2) increase throughput of Kafka consumer in the Samza container.

Proposed Changes


Kafka Operational Requirement

1) Partitioning algorithms in producer

The partitioning algorithm should be implemented in the form of hash(key) % partitionNum. User is free to choose hash function but the hashed result must be mod by the partitionNum. This requirement is satisfied by the default partitioning algorithms in Kafka. Samza needs to repartition the user’s topic if user uses a customized partitioning algorithm in their producer implementation that doesn’t meet this requirement.

2) Topic management

 

The partition number of any keyed topic that may be consumed by Samza should always be multiplied by power of two when we increase it. This guideline should be enforced by whoever may increase partition of the topic.

Samza Implementation Change

1) Store SystemStreamPartition-to-Task assignment in the coordinator stream

 

Create classes SystemStreamPartitionAssignmentManager and SetSSPTaskMapping to read and write SystemStreamPartition-to-Task assignment in the coordinator stream. This will be done similar to how ChangelogPartitionManager and SetChangelogMapping are used to read and write Task-to-ChangeLogPartition assignment in the coordinator stream. The assignement should be written to the coordinator stream every time the job model is initialized.

2) Provide the SystemStreamPartition-to-Task assignment from previous job model to SystemStreamPartitionGrouper.group(...)

Currently Samza uses various implementations of the interface SystemStreamPartitionGrouper.group(Set<SystemStreamPartition> ssps) to derive the SystemStreamPartition-to-Task assignment. With the proposal in this doc, we should deprecate the existing method group(Set<SystemStreamPartition> ssps) in this interface SystemStreamPartitionGrouper and replace it with method  group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps). The new method takes the SystemStreamPartition-to-Task assignment from the previous job model which can be read from the coordinator stream.

The SystemStreamPartition-to-Task assignment from the previous job model can be used to detect partition number change of the input stream and help determine the new SystemStreamPartition-to-Task assignment for the new job model. The new SystemStreamPartition-to-Task assignment should ensure that partitions which may have messages with the same key will be mapped to the same task. For example, if partition is increased from 2 to 4, partitions 0 and 2 should be mapped to the same task and partitions 1 and 3 should be mapped to the same task.

Currently we have three implementations of the interface SystemStreamPartitionGrouper, i.e. AllSspToSingleTaskGrouper, GroupByPartition and GroupBySystemStreamPartition. We don't need to use the new information in AllSspToSingleTaskGrouper.group(...) because it already ensures that partitions which may have messages with the same key will be mapped to the same task. And we don't need to use this information in GroupBySystemStreamPartition.group(...) as well because it should not be used for stateful jobs. Thus this proposal will only use previousSystemStreamPartitionMapping in GroupByPartition.group(...) to determine the new SystemStreamPartition-to-Task assignment.

 

Given SystemStreamPartition ssp, GroupByPartition.group(...) will determine the taskName that this ssp should be mapped to using the algorithm below:

- Define previousSSPSetOfStream to be the subset of previousSystemStreamPartitionMapping.keys() whose stream equals ssp.getStream()
- Define previousTaskSetOfStream to be the subset of previousSystemStreamPartitionMapping.values() which are mapped to using any SSP previousSSPSetOfStream
- Determine previousTaskSetOfStream.size() as the number of partitions of the stream ssp.getStream() when the job was first created.
- Determine String.format("Partition %d", ssp.getPartition().getPartitionId() % previousTaskSetOfStream.size())

This algorithm guarantees that partitions which may have messages with the same key will be mapped to the same task, when the partition of the input Kafka topic has increased, if the Kafka Operational Requirement described above is enforced. 

3) Handle partition expansion while tasks are running

Samza should monitor change of the partition count of the input streams. When the partition of any input stream has changed, container should query Job Coordinator (referred to as JC below) for the set of partitions its tasks should consume by sending HTTP request to JC. JC should return the new Job Model which derives the new SystemStreamPartition-to-Task as described above. Then the container should update its consumer to consume from the new set of partitions and update its tasks to commit offsets for the new set of partitions.

Public Interfaces

1) Depreciate the method Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) of the interface SystemStreamPartitionGrouper

2) Add method Map<TaskName, Set<SystemStreamPartition>> group(Map<SystemStreamPartition, String> previousSystemStreamPartitionMapping, Set<SystemStreamPartition> ssps) in the interface SystemStreamPartitionGrouper.

The interface required to handle partition expansion while tasks are running will be specified later.

Implementation and Test Plan

To be added

Compatibility, Deprecation, and Migration Plan

User needs to implement the new method if he/she has custom implementation of the interface SystemStreamPartitionGrouper. The proposed solution is backward compatible and can be deployed withotu specific migration plan.

Rejected Alternatives