You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state: UNDER DISCUSSION

Discussion thread<link to mailing list DISCUSS thread>

JIRA: SAMZA-871

Released: 

Problem

(taken from SAMZA-871)

Right now, Samza relies on YARN to detect whether a container is alive or not. This has a few problems:
1) with the effort to make standalone Samza (SAMZA-516) and make Samza more pluggable w/ other distributed cluster management system (like Mesos, Kubernetes), we need to make the container liveness detection independent.
2) YARN based liveness detection has also created problems w/ leaking containers when NM crashed. It creates a dilemma:
In the case that NM can be restarted quickly, we would like to keep the container alive w/o being affected by NM goes down since that saves ongoing work. yarn.nodemanager.recovery.enabled=true
However, when RM loses the heart beat from NM and determines that the container is "dead", we truly need to make sure to kill the container to avoid duplicate containers being launched, since AM has no other way to know whether the container is actually alive or not.

Motivation

If we implement a direct heart beat mechanism between Samza JobCoordinator and SamzaContainer, we can be agnostic to whatever the YARN RM/NM/AM sync status is.

Proposed Changes

JobCoordinator side

  • Expose a REST endpoint (eg: /isContainerValid) who's purpose is to get requests from the Samza container periodically and respond back weather the container is in the Job Coordinator's current list of valid containers.
  • Endpoint could be a part of the JobModelManager's servlet which is currently used for retrieving the JobModel by the containers during startup.
  • Endpoint can accept a "Execution Resource ID" (eg: YARN container ID) and validate it against state maintained by the Job Coordinator (eg: YarnAppState) and future implementations of other cluster managers need to implement this endpoint and expose the same validation.

Container side

  • On the container side we start a new thread that periodically polls this endpoint described above to check if the container is valid. If its not, we shutdown the run loop and raise an error (so that the exit code is non 0 so that YARN reschedules the container)
  • Since raising an exception may not be the ideal way to shutdown the container (skips all the shutdowns in the finally block). It may be useful to set a member variable in SamzaContainer to denote that an exception was raised. When shutting down in the LocalContainerRunner we can check for this variable and exit with a non-zero exit code.

Public Interfaces

This would introduce a few new configs:

  • `job.container.validator.enabled` to control if we should enable the container validator on the container side
  • `job.container.validator.schedule.ms` to control the time period between validation checks between container and JobCoordinator
  • set an environment variable with the "Execution Resource ID" during container launch. This can be read from the container to make requests to the above endpoint.
  • Implement a new interface "Killable" which has "shutdown()" as a method. This can be implemented the RunLoop and AsyncRunLoop. The reason for this is so that the run loop can be passed to the validator thread to call shutdown.

Implementation and Test Plan

  • Introduce the new REST endpoints for YARN.
  • Implement validator on container side and check for reschedules after a container shutdown.
  • Setup metrics for number of invalid containers that made a call to the endpoint.
  • Add unit tests to test/verify compatibility

Compatibility, Deprecation, and Migration Plan

The changes in this SEP currently only targets the YARN deployment model and is not intended for Standalone.

Rejected Alternatives

  • Use CoordinatorStream as the heart beat channel. 
    • Pros: use async pub-sub model to avoid timeouts in sync methods, easy to scale to a large number of containers
    • Cons: protocol is more complex to implement, message/token delivery latency maybe uncertain and make the heart beat process much longer.

 

  • No labels