Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Expose a REST endpoint (eg: /isContainerValidcontainerHeartbeat) who's purpose is to get requests from the Samza container periodically and respond back weather the container is in the Job Coordinator's current list of valid containers.

    Code Block
    languagebash
    $ curl <host>:<port>/isContainerValid?execution_resource_idcontainerHeartbeat?executionContainerId=container_1490224420978_0323_01_000282
    {
    	isValidalive: true
    }
  • Endpoint could be a part of the JobModelManager's servlet which is currently used for retrieving the JobModel by the containers during startup.
  • Endpoint can accept a "Execution Resource Container ID" (eg: YARN container ID) and validate it against state maintained by the Job Coordinator (eg: YarnAppState) and future implementations of other cluster managers need to implement this endpoint and expose the same validation.

...

  • On the container side we start a new thread that periodically polls this endpoint described above to check if the container is valid. If its not, we shutdown the run loop and raise an error (so that the exit code is non 0 so that YARN reschedules the container)
  • Calling the shutdown function on the run loop from the ContainerValidator ContainerHeartbeat thread would causes the finally block in SamzaContainer.run() to execute, we need all the shutdown functions to execute before we can exit with a non-zero code. Therefore we need a way to indicate to the LocalContainerRunner class that the SamzaContainer class exited with an exception.
    • This could involve setting a member variable in SamzaContainer to denote that an exception was raisedinvolve maintaining state in LocalContainerRunner to denote that we we need to exit with an error code. When shutting down the LocalContainerRunner we can check for this variable and exit with a non-zero exit code. This would mean that the container validator thread has a reference to the container instance (which seems wrong).??? 

Public Interfaces

This would introduce a few new configs:

...

  • set an environment variable with the "Execution Resource IDContainer ID" during container launch. This can be read from the container to make requests to the above endpoint.
  • In order to shutdown the run loop from the validator thread I plan to create a new interface called Killable: 

    Code Block
    languagejava
    public interface Killable {
      void shutdown();
    }

    Since shutdown() is already implemented by the RunLoop and AsyncRunLoop, we can pass a Killable type to the validator thread to shutdown the run loop if the container is invalidA new ContainerHeartbeatMonitor class that accepts a ContainerHeartbeatClient (which has the business logic to make heartbeat checks on the JC endpoint) and a Runnable callback.
    The ContainerHeartbeatMonitor schedules a separate thread at a fixed rate which uses the client to check if the heartbeat is still valid. On failure of the heartbeat, the passed in Runnable is executed, which is used to shutdown the container and set state on LocalContainerRunner to shutdown with non-zero code.

Implementation and Test Plan

...