Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: [ UNDER DISCUSSION ]ACCEPTED

Discussion thread: <link to mailing list DISCUSS thread> http://mail-archives.apache.org/mod_mbox/samza-dev/202001.mbox/browser

JIRASAMZA-2373

Released: TBDSamza 1.5

Problem

Samza operates in a multi-tenant environment with cluster managers like Yarn and Mesos where a single host can run multiple Samza containers. Often due to soft limits configured for cluster managers like Yarn and no notion of dynamic workload balancing in Samza a host lands in a situation where it is underperforming and it is desired to move one or more containers from that host to other hosts. Today this is not possible without affecting other jobs on the hot host or restarting the affected job manually. In other use cases like resetting checkpoints of a single container or supporting canary or rolling bounces the ability to restart a single or a subset of containers without restarting the whole job is highly desirable. 

...

Selectively Spin StandBy Containers: Samza has a feature of Hot StandBy Containers for reducing stateful restoration times. Enabling this feature for a job involves doubling the containers at the least (simplest case where every container has 1 standby replica enabled). Customers are reluctant to enable this since doubling the containers increases the cost to serve. To improve the adoption for this feature we can build the ability to spin up StandBy Containers for a single or a subset of containers while the job is running, these StandBy Containers then can be used for failover to reduce downtime.  

...

  • This system does not intend to solve the problem of Dynamic Workload Balance i.e  Cruise control. It may act as a building block for one later.
  • Solving the canary problem for YARN based deployment model is out of the scope of this solution however system built should be easily extensible to support canary 
  • This system will not have built-in intelligence to find a better match for the host for a container it will make simplistic decisions as per params passed by the user.

SLA / SCALE / LIMITS (Assumptions)

  • At a time AM for a single job will only serve one request per container, parallel requests across containers are still supported. If a control request is underway any other requests issued on the same container will be queued. Same assumption holds for in-flight requests on standby and active i.e if any container placement request is in-progress for an active or its standby replica, all subsequent placement actions on either are queued
  • Actions are de-queued and sorted in order of timestamps populated by the client and are executed in that order
  • The system should be capable of The system should be capable of scaling to be used across different jobs at the same time 

...

Solution 1. Write locality to Coordinator Stream (Metastore) and restart job [Rejected]

This approach proposes to directly change the container locality messages (Container to host mapping messages) in the Metadata Store (currently Kafka) and issuing a restart

Pros

Cons

  • Simple to implement the current tool does that for host affinity enabled jobs (since they maintain locality mapping)
  • Needs a job restart and does a best effort to get preferred hosts for containers but has no guarantee on getting them
  • If a job has standby containers enabled, this method involves changing standby mapping in addition to active container mappings 
  • Job faces downtime when the job has hundreds of containers and only one of them needs to be restarted, if it is stateful there is a likelihood that containers might not get the new asked resource on the restart and start bootstrapping
  • This solution is not scalable to be used by Controllers who want to take multiple control actions on containers across several jobs, for example, auto-sizing controller
  • This method will not be work for building Canary / Cluster Balancer

Solution 2. Container Placement

...

API [Accepted]

API design

On the basis of types of Control actions, the commands are the following:       

...

API

placeContainer

Description

Active Container: Stop Stops container process on source-host and starts it for 

  1. Stateless Job on either
    1. Destination-host (destination host can be source as well)
    2. Any host (destination-host = ANY_HOST)
  2. Stateful Job on either 
    1. Destination-host (if specified, destination host can be source as well)
    2. Standby Container (destination-host = STANDBY)
    3. Any host (destination-host = ANY_HOST)

StandBy Container: Stop Stops container process on source-host and starts it on:

    1. Destination-host (if specified & matches StandBy Constraints)
    2. Any host (otherwise which matches StandBy Constraints)

Parameters

uuid: unique identifier of a request, populated by the client

applicationIddeploymentId: unique identifier of the deployed app for which the action is taken

processor-id: Samza resource id of container e.g 0, 1, 2 

destination-host: valid hostname / “ANY_HOST” / “STANDBY”

request-expiry-timeout: [optional]: timeout for any resource request to the cluster manager 

Status code

CREATED, BAD_REQUEST, ACCEPTED, IN_PROGRESS, SUCCEEDED, FAILED

Returns

Since this is an ASYNC API nothing is returned, UUID for the client to query the status of the request can be queried by processorId

Failure Scenarios

There are following cases under which a request to place container might fail:

  1. When an active container stop fails, in this case, we mark the request failed
  2. When requested resources cannot be obtained from the cluster manager, in this case, we mark the request failed
  3. When stopped active container fails to start on destination host in that case we mark the request failed and attempt to start on the source host, failure to do so results in starting the same on ANY_HOST

...

API

containerStatus

Description

Gives the status & info of the container placement request, for ex is it running, stopped what control commands are issued on it

Parameters

processor-id: Samza resource id of container e.g 0, 1, 2 

applicationIddeploymentId: unique identifier of the deployed app for which the action is taken

uuid: unique identifier of a request

Status code

BAD_REQUEST, SUCCEEDED

Returns

Status of the Container placement action 

...

API

controlStandBy

Description

Starts or Stops a standBy container for the active container

Parameters

processor-id: Samza resource id of container e.g 0, 1, 2 

applicationIddeploymentId: unique identifier of the deployed app for which the action is taken

uuid: unique identifier of a request

Status code

CREATED, BAD_REQUEST, ACCEPTED, IN_PROGRESS, SUCCEEDED, FAILED

Returns UUID for the client to query the status of the request

Architecture

For implementing a scalable container placement control system, the proposed solution is divided into two parts:

...

  1. Control Plane is a channel outside the job that allows taking control actions by multiple controllers like Samza Dashboard, Startpoints controller. 
  2. ContainerPlacementHandler is a stateless handler registered to control plane that dispatches placement actions to invoke Container Placement Service APIs


Image RemovedImage Added

This control plane can be implemented in the following ways 

Option 1: Samza metastore serviceMetastore API [Preferred]

Samza Metastore will provide Metastore provides an API to write to the coordinator stream. One simple way to expose Container Placement API is, Container Placement handler can have a coordinator stream consumer polling control messages from the coordinator stream

under a separate namespace (different than the one where locality messages of containers live)  & acting on them.

Pros

Cons

  • No need to build Authentication & Authorization, already handled by the Metadata auth service
  • No need to enable Rate limiting since requests are queued so the flow of requests can be regulated at the consumer side
  • If AM dies there can be still queued requests in Coordinator Stream, such requests have to be handled across AM restarts
  • Coordinator stream is log compacted so control messages written to the coordinator stream need to be deleted to prevent it from growing to large sizes which can affect job start times

...

  1. ContainerPlacementHandler is a stateless handler dispatching ContainerPlacementRequestMessages from Metastore to Container Placement Service & ContainerPlacementResponseMessages from Container Placement Service to metastore for external controls to query the status of an action. (PR). 
  2. Metastore used today by in Samza by default is Kafka (coordinator stream) which is used to store configs & container mappings & is log compacted
  3. ContainerPlacementRequestMessage & ContainerPlacementResponseMessage are maintained in namespace using in same namespace using NamespaceAwareMetaStore ("samza-place-container-v1")

...

Key for storing the ContainerPlacementRequestMessage & ContainerPlacementResponseMessage in Metastore is chosen to be UUID + "." + messageType(ContainerPlacementResponseMessage or ContainerPlacementRequestMessage). Value will be payload container ContainerPlacementRequestMessage & ContainerPlacementResponseMessage. Messages are written and read to the Metastore through the MetadataStore abstraction. Since the metastore is eventually consistent, duplicate messages are required to be handled by ContainerPlacementService

ContainerPlacementRequestMessage:

KeyValueField DescriptionField Type
"UUID.subType"uuidUnique identifier of a response messageRequired

processorId 

Logical processor id 0,1,2 of the containerRequired

deploymentIdUnique identifier for a deploymentRequired

subTypeType of message here: ContainerPlacementResponseMessageContainerPlacementRequestMessageRequired

destinationHostDestination host where the container is desired to be movedRequired

statusCodeStatus of the current actionrequestRequiredresponseMessageResponse message in conjunction to statusRequiredtimestampThe timestamp of the response messageRequired

timestampThe timestamp of the response messageRequired

requestExpiryRequest requestExpiryEequest expiry which acts as a timeout for any resource request to cluster resource managerOptional

...

KeyValueField DescriptionField Type
"UUID.subType"uuidUnique identifier of a response messageRequired

processorId 

Logical processor id 0,1,2 of the containerRequired

deploymentIdUnique identifier for a deploymentRequired

subTypeType of message here: ContainerPlacementResponseMessageRequired

destinationHostDestination host where the container is desired to be movedRequired

statusCodeStatus of the current actionresponseRequired

responseMessageResponse message in conjunction to statusRequired

timestampThe timestamp of the response messageRequired

requestExpiryEequest Request expiry which acts as a timeout for any resource request to cluster resource managerOptional

...

KeyValue

[1,"samza-place-container-v1","88b0d30c-d518-4307-9e8e-c8529eb30f04.ContainerPlacementResponseMessage"]

{"processorId":"1","deploymentId":"app-atttempt-001","subType":"ContainerPlacementResponseMessage","responseMessage":"Request is accepted","uuid":"88b0d30c-d518-4307-9e8e-c8529eb30f04","destinationHost":"ANY_HOST","statusCode":"ACCEPTED","timestamp":1578694070875}

GC policy for stale messages in metastore

  1. One way to delete stale ContainerPlacementMessages is to delete request / responses from the previous incarnation of the job in the metastore on job restarts, this is the responsibility of ContainerPlacementService 
  2. Once the request is complete, ContainerPlacementService can issue an async delete to the metastore
  3. Request / response message can be externally cleaned by a tool
Part 2. Container Placement Service

Container Placement service is a set of APIs built around AM to move/restart containers. The solution proposes to refactor & simplify the current AM code & introduce a ContainerManager which is a single entity managing container actions like start, stop for both active and standby containers. Enlisted are functions of ContainerManager & proposed refactoring around the AM code 

  • Remove the HostAwareContainerAllocator & ContainerAllocator, simplify Container Allocator as a simple lightweight entity allocating requests to available resources (PR1, PR2)
  • Introduce ContainerManager which acts as a brain for validating and issuing any actions on containers in the Job Coordinator for both active & Standby containers. (PR)
    • Transfer state & validation of container launch & expired request handling from ContainerAllocator to ContainerManager
    • Transfer state & lifecycle management of Container allocator & resource request on boot  from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager
  • Encapsulates logic and state related to container placement actions like move, restarts for active & standby container in ContainerManager (PR-1, TDB)
    • It is ContainerManager’s duty to validate any ContainerPlacementRequestMessages & also invalidate messages from the previous deployment incarnation
    • It is ContainerManager’s duty to write ContainerPlacementResponseMessages to Metastore for external control to query the status of the request
    • ContainerPlacementMetadata is a metadata holder for container actions (ControlActionMetadata) for ex request_id, current status, requested resources etc

Image Removed

Note: ClusterResourceManager.Callback (ContainerProcessManager) is tightly coupled with ClusterbasedJobCoordinator today, all the proposed changes will be done except for moving state & lifecycle management of Container allocator & resource request on boot from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager in phase 1 of the implementation so that this feature can be developed faster. Hence ContainerProcessManager will still be tied with ClusterBasedJobCoordinator and will intercept any container placement requests. 

2.1 Container Move

2.1.1 Stateless Container Move & Stateful Container Move (without Standby)

Option 1: Reserve-Stop-Move: Request Resource first and Only make a move once resources are allocated by ClusterResource Manager (Preferred)

...

Pros

...

Cons

...

  • Container only moves when a preferred resource is available for the container to start
  • Offers stronger move semantics more effective when used with a Workload balancer 

...

  • Complex implementation as compared to Option 2

Option 2: Stop-Reserve-Move: Stop the container and then make a resource request for preferred resources and attempt a start (Rejected Alternative)

...

Pros

...

Cons

...

  • Easier to implement

...

  • Weaker Move semantics since if preferred resources are not available and since the container is already stopped then clearly this has a higher chance of moving container to anything other than a preferred host which is ineffective for a workload balancer

Orchestration of Stateless Container Move using Option 1: The image shows a sequence of steps involved in moving a container. Note that there is a specific number of retries configured for requesting preferred hosts to ClusterResourceManager.

The image shows a sequence of steps involved in moving a container. Note that there is a specific number of retries configured for requesting preferred hosts to ClusterResourceManager.

  1. ContainerPlacementHandler dispatches the move request to ContainerProcessManager 
  2. ContainerProcessManager dispatches placement request to Containermanager which validates the action and issues a preferred resource request with ContainerAllocator (Async)
  3. Now there are two possible scenarios
    1. Requested Resources are allocated by ClusterResourceManager 
      1. In this case, ContainerManager initiates a container shutdown and waits for the ContainerProcessManager callback on successfully stopping the container
      2. On getting successful stop callback, ContainerManager issues a start container start request to ClusterResourceManage  on allocated resources 
      3. If the container start request succeeds then a success notification is sent (updating the Control Action Metadata) otherwise a new resource request to start this container on source host is issued & and a failure notification is sent to the user
  4. Resource Request expires
    1. In cases where Resource Request expires since ClusterResourceManager is not able to allocate requested resources a failure notification is sent and the active container remains unaffected

Failure Scenarios:

  • If the preferred resources are not able to be accrued the active container is never stopped and a failure notification is sent for the ContainerPacementRequest
  • If the ContainerPlacementManager is not able to stop the active container (3.1 #1 above fails) in that  case the request is marked failed & a failure notification is sent for the ContainerPacementRequest
  • If ClusterResourceManager fails to start the stopped active container on the accrued destination host, then we attempt to start the container back on the source host and a failure notification is sent for the ContainerPacementRequest. If container fails to start on source host then an attempt is made to start on ANY_HOST

Note: ClusterResourceManager.Callback (ContainerProcessManager) is tightly coupled with ClusterbasedJobCoordinator today, all the proposed changes will be done except for moving state & lifecycle management of Container allocator & resource request on boot from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager in phase 1 of the implementation so that this feature can be developed faster. Hence ContainerProcessManager will still be tied with ClusterBasedJobCoordinator and will intercept any container placement requests. 

Image Removed

Option 3: Stateful without Standby (Spin Up StandBy container & then move) (Phase 2) [Strech]

If we expose an API to spin a StandBy Container for an active container, a client can then make two requests, one to spin up a StandBy then periodically monitor the lag for StandByCotntainer (Using Diagnostics). Once StandBy Container has a steady lag is ready to move, this case becomes Case 1 mentioned above. (More details TBD later)

 

2.1.2 Stateful Container Move with Standby Enabled

The stateful container is divided into two cases on the basis of whether the job has StandBy Container for it or not

Image Removed

Let's take a case when the job has a stateful container C on H1 and has a StandBy container enabled C’ on H2

Option 1: Stop-Reserve-Move: Initiate a Stop on C & issue a Standby failover (Preferred - Phase 1*)

In this option, C is stopped on H1 and failover is initiated from H1 to H2 with current semantics of Failover of an active Container in the Hot standby feature. This ensures C moves either to H2 (in the best case) or to some other hosts (when Cluster Manager cannot allocate capacity on H2, i.e H2 is under capacity)

...

Pros

...

Cons

...

  • Easy to implement, ability to do this already exists 

...

  • C always moves from H1 and there a slim chance that C won’t land on H2 as per the current semantics (stolen-host scenario: Cluster Manager fails to allocate H2 for C)

Challenges with Metastore

Metastore today (Kafka) is at least once & eventually consistent, hence ContainerPlacementService has to do in-memory caching of UUIDs of accepted actions so that it does not take one request twice in case of duplicates delivered. But the in-memory caching must not be an unbounded cache since that can result in a job running out of memory. Size of a UUID is 16bytes, at max a job lets say might have 500 containers, then one request action per container for 500 containers will result in 0.008 MBs of increase memory (just in memory lookup). If we cache lets say last 20K actions (which can accomodate 40 failovers of 500 containers in the current scenario) the memory used will be 0.64 MBs at max if we implement a FIFO cache (inmemory lookup + fifo queue).

GC policy for stale messages in metastore

  1. One way to delete stale ContainerPlacementMessages is to delete request/responses from the previous incarnation of the job in the metastore on job restarts, this is the responsibility of ContainerPlacementService 
  2. Once the request is complete, ContainerPlacementService can issue an async delete to clean up the request from the metastore
  3. Request/response message can be externally cleaned by a tool
Part 2. Container Placement Service

Container Placement service is a set of APIs built around AM to move/restart containers. Container Placement Service periodically gets a queue of placement actions per container which it issues in parallel across different containers but sequentially on one container. Each placement request has a "deploymentId" attached to it because if a job restarts all the placement actions queued for the previous deployment must be disregarded and deleted. Samza internally has an id generated for each run of a job ("app.run.id") that is generated at the job planning phase we can use that id as the "deploymentId" for placement requests.

The solution proposes to refactor & simplify the current AM code & introduce a ContainerManager which is a single entity managing container actions like start, stop for both active and standby containers. Enlisted are functions of ContainerManager & proposed refactoring around the AM code 

  • Remove the HostAwareContainerAllocator & ContainerAllocator, simplify Container Allocator as a simple lightweight entity allocating requests to available resources (PR1, PR2)
  • Introduce ContainerManager which acts as a brain for validating and issuing any actions on containers in the Job Coordinator for both active & Standby containers. (PR)
    • Transfer state & validation of container launch & expired request handling from ContainerAllocator to ContainerManager
    • Transfer state & lifecycle management of Container allocator & resource request on job start (reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager*
  • Encapsulates logic and state related to container placement actions like move, restarts for active & standby container in ContainerManager (PR-1, TDB)
    • It is ContainerManager’s duty to validate any ContainerPlacementRequestMessages & also invalidate messages from the previous deployment incarnation
    • It is ContainerManager’s duty to write ContainerPlacementResponseMessages to Metastore for the external controller to query the status of the request
    • ContainerPlacementMetadata is a metadata holder for container actions (ControlActionMetadata) for a request_id, current status, requested resources etc


Image Added

Note: *ClusterResourceManager.Callback (ContainerProcessManager) is tightly coupled with ClusterbasedJobCoordinator today, all the proposed changes will be done except for moving state & lifecycle management of Container allocator & resource request on job start (reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager in phase 1 of the implementation so that this feature can be developed faster. Hence ContainerProcessManager will still be tied with ClusterBasedJobCoordinator and will intercept any container placement requests. 

2.1 Container Move

2.1.1 Stateless Container Move & Stateful Container Move (without Standby)

Option 1: Reserve-Stop-Move: Request Resource first and Only make a move once resources are allocated by ClusterResource Manager (Preferred)

Pros

Cons

  • Container only moves when a preferred resource is available for the container to start
  • Offers stronger move semantics more effective when used with a Workload balancer 
  • Complex implementation as compared to Option 2


Option 2: Stop-Reserve-Move: Stop the container and then make a resource request for preferred resources and attempt a start (Rejected Alternative)

Pros

Cons

  • Easier to implement
  • Weaker Move semantics since if preferred resources are not available and since the container is already stopped then clearly this has a higher chance of moving container to anything other than a preferred host which is ineffective for a workload balancer

Orchestration of Stateless Container Move using Option 1: The image shows a sequence of steps involved in moving a container. Note that there is a specific number of retries configured for requesting preferred hosts to ClusterResourceManager.

The image shows a sequence of steps involved in moving a container. Note that there is a specific number of retries configured for requesting preferred hosts to ClusterResourceManager.

  1. ContainerPlacementHandler dispatches the move request to ContainerProcessManager 
  2. ContainerProcessManager dispatches placement request to Containermanager which validates the action and issues a preferred resource request with ContainerAllocator (Async)
  3. Now there are two possible scenarios
    1. Requested Resources are allocated by ClusterResourceManager 
      1. In this case, ContainerManager initiates a container shutdown and waits for the ContainerProcessManager callback on successfully stopping the container
      2. On getting successful stop callback, ContainerManager issues a start container start request to ClusterResourceManage  on allocated resources 
      3. If the container start request succeeds then a success notification is sent (updating the Control Action Metadata) otherwise a new resource request to start this container on source host is issued & and a failure notification is sent to the user
  4. Resource Request expires
    1. In cases where Resource Request expires since ClusterResourceManager is not able to allocate requested resources a failure notification is sent and the active container remains unaffected


Who writes the new locality mapping after a successful move?

Samza container on a successful start write their new locality message to the metadata store (code), hence after a successful move container writes its new locality

Failure Scenarios:

  • If the preferred resources are not able to be acquired the active container is never stopped and a failure notification is sent for the ContainerPacementRequest
  • If the ContainerPlacementManager is not able to stop the active container (3.1 #1 above fails) in that  case the request is marked failed & a failure notification is sent for the ContainerPacementRequest
  • If ClusterResourceManager fails to start the stopped active container on the accrued destination host, then container fallbacks to source host and a failure notification are sent for the ContainerPacementRequest. If a container fails to start on source host then an attempt is made to start on ANY_HOST


Note: ClusterResourceManager.Callback (ContainerProcessManager) is tightly coupled with ClusterbasedJobCoordinator today, all the proposed changes (in part 2) will be done except for moving state & lifecycle management of Container allocator & resource request on job start(reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager in phase 1 of the implementation so that this feature can be developed faster. Hence ContainerProcessManager will still be tied with ClusterBasedJobCoordinator and will intercept any container placement requests. 

Image Added

Option 3: Stateful without Standby (Spin Up StandBy container & then move) (Phase 2) [Strech]

If we expose an API to spin a StandBy Container for an active container, a client can then make two requests, one to spin up a StandBy then periodically monitor the lag for StandByCotntainer (Using Diagnostics). Once StandBy Container has a steady lag is ready to move, this case becomes Case 1 mentioned above. (More details TBD later)

 

2.1.2 Stateful Container Move with Standby Enabled

The stateful container is divided into two cases on the basis of whether the job has StandBy Container for it or not

Image Added

Let's take a case when the job has a stateful container C on H1 and has a StandBy container enabled C’ on H2

Option 1: Stop-Reserve-Move: Initiate a Stop on C & issue a Standby failover (Preferred - Phase 1*)

In this option, C is stopped on H1

Option 2: Reserve-Stop-Move: Initiate a Standby move first then issue a move for Active Container (Preferred - Phase 2*)

  1. In this option request is first issued to stop C’ 
  2. Then request H2 from cluster manager to start C 
  3. Issue a stop on C only H2 can be allocated (#2 succeeds). 
  4. Then request C’ to start ANY_HOST except for H1, H2 

Two ways to achieve this: 

  • A client can make two calls for this first move C’ to ANY_HOST apart from H2, H1 (similar to a stateless move), then move the request of C to C’ (similar to a stateless move) so the state maintenance of this lives on the client
  • Maintain state in JC to accomplish this 

...

Pros

...

Cons

...

  • Stronger move semantics, since the active container won’t be stopped if it cannot be moved to the standby container

...

  • More complex as compared to Option 1 needs to maintain more state  

* Phase 1 & Phase 2 refer to the implementation phases, please see the section: Implementation Plan 

Option 3: Reserve-Stop-Move: First Request H2 from Cluster Manager and initiate the failover once you get H2 as a preferred resource [Rejected Alternative]

...

and failover is initiated from H1 to H2 with current semantics

...

of Failover of an active Container in the Hot standby feature. This ensures C moves either to H2 (in the best case) or to some other hosts (when Cluster Manager cannot allocate capacity on H2, i.e H2 is under capacity)

Pros

Cons

  • Stronger move semantics are guaranteed, the active container won’t be stopped unless it can be started with H2
  • Since there is a standby already running on H2, there is a higher chance that cluster manager might fail to allocate for H2 and move requests are ineffective (Host is running full capacity)
  • Need to design a new failover scheme hence more development time to implement

Orchestration of StandBy Enabled Container Move using Option 1 (Phase 1)

  1. ContainerPlacementHandler dispatches the move request to ContainerProcessManager
  2. ContainerProcessManager registers a move & initiates a failover with StandByContainerManager then the following failover sequence is followed

2.2 Container Restart

This is much simpler as compared to Move. We can either make restart equivalent to a move or issue a stop a container and ContainerProcessManager will try to start that container again. Let's discuss these options in detail:

Option 1: Reserve-Stop-Start: Restart is equivalent to move on the same host [Preferred]

  • In this option, resources are requested on the same host first before issuing a stop on Container
  • Once the resources are accrued a container is stopped & requested to start 
  • So this is equivalent to move semantics above 

...

Pros

...

Cons

...

  • Strong Restart semantics since the container is not stopped if it cannot be restarted on the same resource

...

  • For a container to restart, it will be holding 2x resources on the same host for the time it has accrued resources to the time when the active container is issued a stop
  • Easy to implement, ability to do this already exists 
  • C always moves from H1 and there a slim chance that C won’t land on H2 as per the current semantics (stolen-host scenario: Cluster Manager fails to allocate H2 for C)


Option 2: Reserve-Stop-Move: Initiate a Standby move first then issue a move for Active Container (Preferred - Phase 2*)

  1. In this option request is first issued to stop C’ 
  2. Then request H2 from cluster manager to start C 
  3. Issue a stop on C only H2 can be allocated (#2 succeeds). 
  4. Then request C’ to start ANY_HOST except for H1, H2 

Two ways to achieve this: 

  • A client can make two calls for this first move C’ to ANY_HOST apart from H2, H1 (similar to a stateless move), then move the request of C to C’ (similar to a stateless move) so the state maintenance of this lives on the client
  • Maintain state in JC to accomplish this 

Pros

Cons

  • Stronger move semantics, since the active container won’t be stopped if it cannot be moved to the standby container
  • More complex as compared to Option 1 needs to maintain more state  

* Phase 1 & Phase 2 refer to the implementation phases, please see the section: Implementation Plan 


Option 3: Reserve-Stop-Move: First Request H2 from Cluster Manager and initiate the failover once you get H2 as a preferred resource

Both stateless & stateful container restarts will be equivalent to Stateless Container Move on the same host. Since the container is restarted on the same host then

Option 2: Stop-Reserve-Start: Restart is equivalent to stopping container first then attempting to start it [Rejected Alternative]

  • In this option , a container is issued a stop first
  • Once the container stops, resources are requested for starting on the same host (last seen host)
  • Once the resources are accrued, the container is issued a start on the same host

...

Pros

...

Cons

...

  • The container at any point in time is only holding resources it needs to start on a host 
  • C’ is stopped on H2 and failover is initiated from H1 to H2 with current semantics as developed by the feature Hot standby
  • In case of the move failed only affects standby

Pros

Cons

  • Stronger move semantics are guaranteed, the active container won’t be stopped unless it can be started with H2
  • Since there is a standby already running on H2, there is a higher chance that cluster manager might fail to allocate for H2 and move requests are ineffective (Host is running full capacity)
  • Need to design a new failover scheme hence more development time to implement

Orchestration of StandBy Enabled Container Move using Option 1 (Phase 1)

  1. ContainerPlacementHandler dispatches the move request to ContainerProcessManager
  2. ContainerProcessManager registers a move & initiates a failover with StandByContainerManager then the following failover sequence is followed

2.2 Container Restart

This is much simpler as compared to Move. We can either make restart equivalent to a move or issue a stop a container and ContainerProcessManager will try to start that container again. Let's discuss these options in detail:

Option 1: Reserve-Stop-Start: Restart is equivalent to move on the same host [Preferred]

  • In this option, resources are requested on the same host first before issuing a stop on Container
  • Once the resources are accrued a container is stopped & requested to start 
  • So this is equivalent to move semantics above 

Pros

Cons

  • Strong Restart semantics since the container is not stopped if it cannot be restarted on the same resource
  • For a container to restart, it will be holding 2x resources on the same host for the time it has accrued resources to the time when the active container is issued a stop

Both stateless & stateful container restarts will be equivalent to Stateless Container Move on the same host. Since the container is restarted on the same host then


Option 2: Stop-Reserve-Start: Restart is equivalent to stopping container first then attempting to start it [Rejected Alternative]

  • In this option, a container is issued a stop first
  • Once the container stops, resources are requested for starting on the same host (last seen host)
  • Once the resources are accrued, the container is issued a start on the same host

Pros

Cons

  • The container at any point in time is only holding resources it needs to start on a host 
  • Weaker Restart semantics since there is a likely chance of restarting this container on any other host then the source host (since container is stopped & when Cluster Manager cannot return resources)

Usage Example:


Code Block
languagebash
 place-container --deployment-id 1581635852024-5117e303 --app-name snjain-test-cp --app.id = 1 --processor-id 4 --request-expiry 10 --destination-host abc.prod.com


Code Block
@CommandLine.Command(name = "place-container", description = "Request to move/restart container at destination-host")
public class ContainerPlacementTool {
    ...
    _appName = // read from commandline
    _appId = // read from commandline
    _deploymentId = // read from commandline
    _processorId = // read from commandline
    _destinationHost = // read from commandline
    _requestExpiry = // read from commandline
    
    MetadataStore metadataStore = buildMetadataStore(_appName, _appId);
    try {
      ContainerPlacementMetadataStore containerPlacementMetadataStore =
          new ContainerPlacementMetadataStore(metadataStore);
      containerPlacementMetadataStore.start();
      Duration requestExpiry = _requestExpiry != null ? Duration.ofSeconds(_requestExpiry) : null;
      UUID uuid = containerPlacementMetadataStore.writeContainerPlacementRequestMessage(_deploymentId, _processorId,
          _destinationHost, _requestExpiry, System.currentTimeMillis());
      System.out.println("Request received query the status using: " + uuid);
    } finally {
      metadataStore.close();
   }
}

...


Public Interfaces


Code Block
languagejava
titleContainerPlacementMessage.java
linenumberstrue
/**
* Encapsulates the request or response payload information between the ContainerPlacementHandler service and external
* controllers issuing placement actions
*/
@InterfaceStability.Evolving
public abstract class ContainerPlacementMessage {

public enum StatusCode {
 /**
  * Indicates that the container placement action is created
  */
 CREATED,

 /**
  * Indicates that the container placement action was rejected because request was deemed invalid
  */
 BAD_REQUEST,

 /**
  * Indicates that the container placement action is accepted and waiting to be processed
  */
 ACCEPTED,

 /**
  * Indicates that the container placement action is in progress
  */
 IN_PROGRESS,

 /**
  * Indicates that the container placement action is in progress
  */
 SUCCEEDED,

 /**
  * Indicates that the container placement action is in failed
  */
 FAILED;
}

/**
* UUID attached to a message which helps in identifying duplicate request messages written to metastore and not
* retake actions even if metastore is eventually consistent
*/
protected final UUID uuid;
/**
* Unique identifier for a deployment so messages can be invalidated across a job restarts
* for ex yarn bases cluster manager should set this to app attempt id
*/
protected final String applicationIddeploymentId;
// Logical container Id 0, 1, 2
protected final String processorId;
// Destination host where container is desired to be moved
protected final String destinationHost;
// Optional request expiry which acts as a timeout for any resource request to cluster resource manager
protected final Duration requestExpiry;
// Status of the current request
protected final StatusCode statusCode;
// Timestamp of the request or response message
protected final long timestamp;

protected ContainerPlacementMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost,
   Duration requestExpiry, StatusCode statusCode, long timestamp) {…}

}

...

Code Block
languagejava
titleContainerPlacementRequestMessage
linenumberstrue
/**
* Encapsulates the request sent from the external controller to the JobCoordinator to take a container placement action
*/
public class ContainerPlacementRequestMessage extends ContainerPlacementMessage {

public ContainerPlacementRequestMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...}

public ContainerPlacementRequestMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost, long timestamp) {...}
}

...

Code Block
languagejava
titleContainerPlacementResponseMessage
linenumberstrue
/**
* Encapsulates the response sent from the JobCoordinator for a container placement action
*/
public class ContainerPlacementResponseMessage extends ContainerPlacementMessage {
 // Returned status of the request
 private String responseMessage;

 public ContainerPlacementResponseMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost,
     Duration requestExpiry, StatusCode statusCode, String responseMessage, long timestamp) {...}

 public ContainerPlacementResponseMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost,
     StatusCode statusCode, String responseMessage, long timestamp) {...}


Other Interfaces


Code Block
languagejava
titleContainerPlacementMetadataStore
linenumberstrue
/**
 * Entity managing read writes to the metastore for {@link org.apache.samza.container.placement.ContainerPlacementRequestMessage}
 * and {@link org.apache.samza.container.placement.ContainerPlacementResponseMessage}
 */
public class ContainerPlacementMetadataStore {

/**
 * Writes a {@link ContainerPlacementRequestMessage} to the underlying metastore. This method should be used by external controllers
 * to issue a request to JobCoordinator
 *
 * @param deploymentId identifier of the deployment
 * @param processorId logical id of the samza container 0,1,2
 * @param destinationHost host where the container is desired to move
 * @param requestExpiry optional per request expiry timeout for requests to cluster manager
 * @param timestamp timestamp of the request
 * @return uuid generated for the request
 */
public UUID writeContainerPlacementRequestMessage(String deploymentId, String processorId, String destinationHost,
      Duration requestExpiry, long timestamp) {...}

/**
 * Reads a {@link ContainerPlacementRequestMessage} from the underlying metastore
 * @param uuid uuid of the request
 * @return ContainerPlacementRequestMessage is its present
 */
public Optional<ContainerPlacementRequestMessage> readContainerPlacementRequestMessage(UUID uuid) {...}

/**
 * Reads a {@link ContainerPlacementResponseMessage} from the underlying metastore
 * @param uuid uuid of the request
 * @return ContainerPlacementResponseMessage is its present
 */
public Optional<ContainerPlacementResponseMessage> readContainerPlacementResponseMessage(UUID uuid) {..}

/**
 * Deletes a {@link ContainerPlacementRequestMessage} if present identified by the key {@code uuid}
 * @param uuid uuid of the request
 */
public void deleteContainerPlacementRequestMessage(UUID uuid) {..}

/**
 * Deletes a {@link ContainerPlacementResponseMessage} if present identified by the key {@code uuid}
 * @param uuid uuid of the request
 */
public void deleteContainerPlacementResponseMessage(UUID uuid) {..}

/**
 * Deletes both {@link ContainerPlacementRequestMessage} and {@link ContainerPlacementResponseMessage} identified by uuid
 * @param uuid uuid of request and response message
 */
public void deleteAllContainerPlacementMessages(UUID uuid) {...}

/**
 * Deletes all {@link ContainerPlacementMessage}
 * @param uuid uuid of the request or response message
 */
public void deleteAllContainerPlacementMessages(UUID uuid) {..}

/**
 * Writes a {@link ContainerPlacementResponseMessage} to the underlying metastore. 
 * This method should be used by Job Coordinator only to write responses to Container Placement Action
 * @param responseMessage response message
 */
void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage responseMessage) {..}

}


Code Block
languagejava
titleContainerPlacementRequestAllocator
linenumberstrue
/**
 * Stateless handler that periodically dispatches {@link ContainerPlacementRequestMessage} read from Metadata store to Job Coordinator
 */
public class ContainerPlacementRequestAllocator implements Runnable {

@Override
  public void run() {...}
}


Code Block
languagejava
titleContainerManager
linenumberstrue
public class ContainerManager {
/**
* Registers a container placement action to move the running container to destination host
*
* @param requestMessage request containing details of placement request
* @param containerAllocator to request physical resources
*/
public void registerContainerPlacementAction(ContainerPlacementRequestMessage requestMessage, ContainerAllocator containerAllocator) {...}

/**
* Handles the container start action for both active & standby containers. This method is invoked by the allocator thread
*
* @param request pending request for the preferred host
* @param preferredHost preferred host to start the container
* @param allocatedResource resource allocated from {@link ClusterResourceManager}
* @param resourceRequestState state of request in {@link ContainerAllocator}
* @param allocator to request resources from @{@link ClusterResourceManager}
*
* @return true if the container launch is complete, false if the container launch is in progress. 
*/
boolean handleContainerLaunch(SamzaResourceRequest request, String preferredHost, SamzaResource allocatedResource,
   ResourceRequestState resourceRequestState, ContainerAllocator allocator) {..}

/**
* Handle the container launch failure for active containers and standby (if enabled).
*
* @param processorId logical id of the container eg 1,2,3
* @param containerId last known id of the container deployed
* @param preferredHost host on which container is requested to be deployed
* @param containerAllocator allocator for requesting resources
*/
void handleContainerLaunchFail(String processorId, String containerId, String preferredHost,
   ContainerAllocator containerAllocator) {...}

/**
* Handles the state update on successful launch of a container
*
* @param processorId logical processor id of container 0,1,2
*/
void handleContainerLaunchSuccess(String processorId) {...}

/**
* Handles the action to be taken after the container has been stopped.
*
* @param processorId logical id of the container eg 1,2,3
* @param containerId last known id of the container deployed
* @param preferredHost host on which container was last deployed
* @param exitStatus exit code returned by the container
* @param preferredHostRetryDelay delay to be incurred before requesting resources
* @param containerAllocator allocator for requesting resources
*/
void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus,
   Duration preferredHostRetryDelay, ContainerAllocator containerAllocator) {..}

/**
* Handles an expired resource request for both active and standby containers.
*
* @param processorId logical id of the container
* @param preferredHost host on which container is requested to be deployed
* @param request pending request for the preferred host
* @param allocator allocator for requesting resources
* @param resourceRequestState state of request in {@link ContainerAllocator}
*/
void handleExpiredRequest(String processorId, String preferredHost,
   SamzaResourceRequest request, ContainerAllocator allocator, ResourceRequestState resourceRequestState) {..}
}

...