You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

Status

Current state: UNDER DISCUSSION

Discussion thread<link to mailing list DISCUSS thread>

JIRA: SAMZA-871

Released: 

Problem

Right now, Samza relies on YARN to detect whether a container is alive/valid or not. This has a few problems as the YARN based liveness detection fails when the NM crashes, causing the container to be rescheduled on a different host without killing the old container, leading to double processing of messages. We need a way to make sure that invalid containers are killed in order to handle duplicate containers being launched.

The proposal is to solve this by implementing a JobCoordinator HTTP endpoint for a heart beat between the containers and the JobCoordinator.

Motivation

With the direct heart beat mechanism between the JobCoordinator and SamzaContainer, we can be agnostic to whatever the YARN RM/NM/AM sync status is. It is also simple to implement and understand due to its synchronous flow. 

Proposed Changes

JobCoordinator side

  • Expose a REST endpoint (eg: /isContainerValid) who's purpose is to get requests from the Samza container periodically and respond back weather the container is in the Job Coordinator's current list of valid containers.

    $ curl <host>:<port>/isContainerValid?execution_resource_id=container_1490224420978_0323_01_000282
    {
    	isValid: true
    }
  • Endpoint could be a part of the JobModelManager's servlet which is currently used for retrieving the JobModel by the containers during startup.
  • Endpoint can accept a "Execution Resource ID" (eg: YARN container ID) and validate it against state maintained by the Job Coordinator (eg: YarnAppState) and future implementations of other cluster managers need to implement this endpoint and expose the same validation.

Container side

  • On the container side we start a new thread that periodically polls this endpoint described above to check if the container is valid. If its not, we shutdown the run loop and raise an error (so that the exit code is non 0 so that YARN reschedules the container)
  • Calling the shutdown function on the run loop from the ContainerValidator thread would causes the finally block in SamzaContainer.run() to execute, we need all the shutdown functions to execute before we can exit with a non-zero code. Therefore we need a way to indicate to the LocalContainerRunner class that the SamzaContainer class exited with an exception.
    • This could involve setting a member variable in SamzaContainer to denote that an exception was raised. When shutting down the LocalContainerRunner we can check for this variable and exit with a non-zero exit code. This would mean that the container validator thread has a reference to the container instance (which seems wrong).
    • ???

Public Interfaces

This would introduce a few new configs:

  • `job.container.validator.enabled` to control if we should enable the container validator on the container side
  • `job.container.validator.schedule.ms` to control the time period between validation checks between container and JobCoordinator
  • set an environment variable with the "Execution Resource ID" during container launch. This can be read from the container to make requests to the above endpoint.
  • In order to shutdown the run loop from the validator thread I plan to create a new interface called Killable: 

    public interface Killable {
      void shutdown();
    }


    Since shutdown() is already implemented by the RunLoop and AsyncRunLoop, we can pass a Killable type to the validator thread to shutdown the run loop if the container is invalid.

Implementation and Test Plan

  • Introduce the new REST endpoints for YARN.
  • Implement validator on container side and check for reschedules after a container shutdown.
  • Setup metrics for number of invalid containers that made a call to the endpoint.
  • Add unit tests to test/verify compatibility

Compatibility, Deprecation, and Migration Plan

The changes in this SEP currently only targets the YARN deployment model and is not intended for Standalone.

Rejected Alternatives

  • Use CoordinatorStream as the heart beat channel. 
    • Pros: use async pub-sub model to avoid timeouts in sync methods, easy to scale to a large number of containers
    • Cons: protocol is more complex to implement, message/token delivery latency maybe uncertain and make the heart beat process much longer.

 

  • No labels