You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Status

Current state: UNDER DISCUSSION

Discussion thread<link to mailing list DISCUSS thread>

JIRA: SAMZA-871

Released: 

Problem

Right now, Samza relies on YARN to detect whether a container is alive/valid or not. This has a few problems as the YARN based liveness detection fails when the NM crashes, causing the container to be rescheduled on a different host without killing the old container, leading to double processing of messages. We need a way to make sure that invalid containers are killed in order to handle duplicate containers being launched.

The proposal is to solve this by implementing a JobCoordinator HTTP endpoint for a heart beat between the containers and the JobCoordinator.

Motivation

With the direct heart beat mechanism between the JobCoordinator and SamzaContainer, we can be agnostic to whatever the YARN RM/NM/AM sync status is. It is also simple to implement and understand due to its synchronous flow. 

Proposed Changes

JobCoordinator side

  • Expose a REST endpoint (eg: /containerHeartbeat) who's purpose is to get requests from the Samza container periodically and respond back weather the container is in the Job Coordinator's current list of valid containers.

    $ curl <host>:<port>/containerHeartbeat?executionContainerId=container_1490224420978_0323_01_000282
    {
    	alive: true
    }
  • Endpoint could be a part of the JobModelManager's servlet which is currently used for retrieving the JobModel by the containers during startup.
  • Endpoint can accept a "Execution Container ID" (eg: YARN container ID) and validate it against state maintained by the Job Coordinator (eg: YarnAppState) and future implementations of other cluster managers need to implement this endpoint and expose the same validation.

Container side

  • On the container side we start a new thread that periodically polls this endpoint described above to check if the container is valid. If its not, we shutdown the run loop and raise an error (so that the exit code is non 0 so that YARN reschedules the container)
  • The plan is to setup a monitor in the LocalContainerRunner class that schedules a thread to check the above endpoint at regular intervals. On failure the thread modifies state on the LocalContainerRunner to denote that there was an error. This state is checked during exit in the LocalContainerRunner to exit with a non-zero code.

Public Interfaces

This would introduce a few new configs:

  • set an environment variable with the "Execution Container ID" during container launch. This can be read from the container to make requests to the above endpoint.
  • A new ContainerHeartbeatMonitor class that accepts a ContainerHeartbeatClient (which has the business logic to make heartbeat checks on the JC endpoint) and a Runnable callback.
    The ContainerHeartbeatMonitor schedules a separate thread at a fixed rate which uses the client to check if the heartbeat is still valid. On failure of the heartbeat, the passed in Runnable is executed, which is used to shutdown the container and set state on LocalContainerRunner to shutdown with non-zero code.

Implementation and Test Plan

  • Introduce the new REST endpoints for YARN.
  • Implement validator on container side and check for reschedules after a container shutdown.
  • Setup metrics for number of invalid containers that made a call to the endpoint.
  • Add unit tests to test/verify compatibility

Compatibility, Deprecation, and Migration Plan

The changes in this SEP currently only targets the YARN deployment model and is not intended for Standalone.

Rejected Alternatives

  • Use CoordinatorStream as the heart beat channel. 
    • Pros: use async pub-sub model to avoid timeouts in sync methods, easy to scale to a large number of containers
    • Cons: protocol is more complex to implement, message/token delivery latency maybe uncertain and make the heart beat process much longer.

 

  • No labels