Status
Current state: [ UNDER DISCUSSION ]ACCEPTED
Discussion thread: <link to mailing list DISCUSS thread>: http://mail-archives.apache.org/mod_mbox/samza-dev/202001.mbox/browser
JIRA: SAMZA-2373
Released: TBDSamza 1.5
Problem
Samza operates in a multi-tenant environment with cluster managers like Yarn and Mesos where a single host can run multiple Samza containers. Often due to soft limits configured for cluster managers like Yarn and no notion of dynamic workload balancing in Samza a host lands in a situation where it is underperforming and it is desired to move one or more containers from that host to other hosts. Today this is not possible without affecting other jobs on the hot host or restarting the affected job manually. In other use cases like resetting checkpoints of a single container or supporting canary or rolling bounces the ability to restart a single or a subset of containers without restarting the whole job is highly desirable.
...
Selectively Spin StandBy Containers: Samza has a feature of Hot StandBy Containers for reducing stateful restoration times. Enabling this feature for a job involves doubling the containers at the least (simplest case where every container has 1 standby replica enabled). Customers are reluctant to enable this since doubling the containers increases the cost to serve. To improve the adoption for this feature we can build the ability to spin up StandBy Containers for a single or a subset of containers while the job is running, these StandBy Containers then can be used for failover to reduce downtime.
...
- This system does not intend to solve the problem of Dynamic Workload Balance i.e Cruise control. It may act as a building block for one later.
- Solving the canary problem for YARN based deployment model is out of the scope of this solution however system built should be easily extensible to support canary
- This system will not have built-in intelligence to find a better match for the host for a container it will make simplistic decisions as per params passed by the user.
SLA / SCALE / LIMITS (Assumptions)
- At a time AM for a single job will only serve one request per container, parallel requests across containers are still supported. If a control request is underway any other requests issued on the same container will be queuedqueued. Same assumption holds for in-flight requests on standby and active i.e if any container placement request is in-progress for an active or its standby replica, all subsequent placement actions on either are queued
- Actions are de-queued and sorted in order of timestamps populated by the client and are executed in that order
- The system should be The system should be capable of scaling to be used across different jobs at the same time
...
Solution 1. Write locality to Coordinator Stream (Metastore) and restart job [Rejected]
This approach proposes to directly change the container locality messages (Container to host mapping messages) in the Metadata Store (currently Kafka) and issuing a restart
Pros | Cons |
|
|
Solution 2. Container Placement
...
API [Accepted]
API design
On the basis of types of Control actions, the commands are the following:
...
API | placeContainer |
Description | Active Container: Stop Stops container process on source-host and starts it for
StandBy Container: Stop Stops container process on source-host and starts it on:
|
Parameters | uuid: unique identifier of a request, populated by the client applicationIddeploymentId: unique identifier of the deployed app for which the action is taken processor-id: Samza resource id of container e.g 0, 1, 2 destination-host: valid hostname / “ANY_HOST” / “STANDBY” request-expiry-timeout: [optional]: timeout for any resource request to the cluster manager |
Status code | CREATED, BAD_REQUEST, ACCEPTED, IN_PROGRESS, SUCCEEDED, FAILED |
Returns | Since this is an ASYNC API nothing is returned, UUID for the client to query the status of the request can be queried by processorId |
Failure Scenarios | There are following cases under which a request to place container might fail:
|
...
API | containerStatus |
Description | Gives the status & info of the container placement request, for ex is it running, stopped what control commands are issued on it |
Parameters | processor-id: Samza resource id of container e.g 0, 1, 2 applicationIddeploymentId: unique identifier of the deployed app for which the action is taken uuid: unique identifier of a request |
Status code | BAD_REQUEST, SUCCEEDED |
Returns | Status of the Container placement action |
...
API | controlStandBy |
Description | Starts or Stops a standBy container for the active container |
Parameters | processor-id: Samza resource id of container e.g 0, 1, 2 applicationIddeploymentId: unique identifier of the deployed app for which the action is takenuuid: unique identifier of a request |
Status code | CREATED, BAD_REQUEST, ACCEPTED, IN_PROGRESS, SUCCEEDED, FAILED |
Returns | UUID for the client to query the status of the request |
Architecture
For implementing a scalable container placement control system, the proposed solution is divided into two parts:
...
- Control Plane is a channel outside the job that allows taking control actions by multiple controllers like Samza Dashboard, Startpoints controller.
- ContainerPlacementHandler is a stateless handler registered to control plane that dispatches placement actions to invoke Container Placement Service APIs
This control plane can be implemented in the following ways
Option 1: Samza metastore serviceMetastore API [Preferred]
Samza Metastore will provide Metastore provides an API to write to the coordinator stream. One simple way to expose Container Placement API is, Container Placement handler can have a coordinator stream consumer polling control messages from the coordinator stream
under a separate namespace (different than the one where locality messages of containers live) & acting on them.
Pros | Cons |
|
|
...
- ContainerPlacementHandler is a stateless handler dispatching ContainerPlacementRequestMessages from Metastore to Container Placement Service & ContainerPlacementResponseMessages from Container Placement Service to metastore for external controls to query the status of an action. (PR).
- Metastore used today by in Samza by default is Kafka (coordinator stream) which is used to store configs & container mappings & is log compacted
- ContainerPlacementRequestMessage & ContainerPlacementResponseMessage are maintained in namespace using in same namespace using NamespaceAwareMetaStore ("samza-place-container-v1")
...
Key for storing the ContainerPlacementRequestMessage & ContainerPlacementResponseMessage in Metastore is chosen to be UUID + "." + messageType(ContainerPlacementResponseMessage or ContainerPlacementRequestMessage). Value will be payload container ContainerPlacementRequestMessage & ContainerPlacementResponseMessage. Messages are written and read to the Metastore through the MetadataStore abstraction. Since the metastore is eventually consistent, duplicate messages are required to be handled by ContainerPlacementService.
ContainerPlacementRequestMessage:
Key | Value | Field Description | Field Type | ||||
---|---|---|---|---|---|---|---|
"UUID.subType" | uuid | Unique identifier of a response message | Required | ||||
processorId | Logical processor id 0,1,2 of the container | Required | |||||
deploymentId | Unique identifier for a deployment | Required | |||||
subType | Type of message here: ContainerPlacementResponseMessageContainerPlacementRequestMessage | Required | |||||
destinationHost | Destination host where the container is desired to be moved | Required | |||||
statusCode | Status of the current actionrequest | Required | responseMessage | ||||
timestamp | The timestamp of the response message | Response message in conjunction to status | Required | timestamp | The timestamp of the response message | Required | |
requestExpiry | Eequest Request expiry which acts as a timeout for any resource request to cluster resource manager | Optional |
...
Key | Value | Field Description | Field Type |
---|---|---|---|
"UUID.subType" | uuid | Unique identifier of a response message | Required |
processorId | Logical processor id 0,1,2 of the container | Required | |
deploymentId | Unique identifier for a deployment | Required | |
subType | Type of message here: ContainerPlacementResponseMessage | Required | |
destinationHost | Destination host where the container is desired to be moved | Required | |
statusCode | Status of the current actionresponse | Required | |
responseMessage | Response message in conjunction to status | Required | |
timestamp | The timestamp of the response message | Required | |
requestExpiry | Eequest Request expiry which acts as a timeout for any resource request to cluster resource manager | Optional |
...
Key | Value |
---|---|
[1,"samza-place-container-v1","88b0d30c-d518-4307-9e8e-c8529eb30f04.ContainerPlacementResponseMessage"] | {"processorId":"1","deploymentId":"app-atttempt-001","subType":"ContainerPlacementResponseMessage","responseMessage":"Request is accepted","uuid":"88b0d30c-d518-4307-9e8e-c8529eb30f04","destinationHost":"ANY_HOST","statusCode":"ACCEPTED","timestamp":1578694070875} |
GC policy for stale messages in metastore
- One way to delete stale ContainerPlacementMessages is to delete request / responses from the previous incarnation of the job in the metastore on job restarts, this is the responsibility of ContainerPlacementService
- Once the request is complete, ContainerPlacementService can issue an async delete to the metastore
- Request / response message can be externally cleaned by a tool
Part 2. Container Placement Service
Container Placement service is a set of APIs built around AM to move/restart containers. The solution proposes to refactor & simplify the current AM code & introduce a ContainerManager which is a single entity managing container actions like start, stop for both active and standby containers. Enlisted are functions of ContainerManager & proposed refactoring around the AM code
- Remove the HostAwareContainerAllocator & ContainerAllocator, simplify Container Allocator as a simple lightweight entity allocating requests to available resources (PR1, PR2)
- Introduce ContainerManager which acts as a brain for validating and issuing any actions on containers in the Job Coordinator for both active & Standby containers. (PR)
- Transfer state & validation of container launch & expired request handling from ContainerAllocator to ContainerManager
- Transfer state & lifecycle management of Container allocator & resource request on boot from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager
- Encapsulates logic and state related to container placement actions like move, restarts for active & standby container in ContainerManager (PR-1, TDB)
- It is ContainerManager’s duty to validate any ContainerPlacementRequestMessages & also invalidate messages from the previous deployment incarnation
- It is ContainerManager’s duty to write ContainerPlacementResponseMessages to Metastore for external control to query the status of the request
- ContainerPlacementMetadata is a metadata holder for container actions (ControlActionMetadata) for ex request_id, current status, requested resources etc
Note: ClusterResourceManager.Callback (ContainerProcessManager) is tightly coupled with ClusterbasedJobCoordinator today, all the proposed changes will be done except for moving state & lifecycle management of Container allocator & resource request on boot from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager in phase 1 of the implementation so that this feature can be developed faster. Hence ContainerProcessManager will still be tied with ClusterBasedJobCoordinator and will intercept any container placement requests.
2.1 Container Move
2.1.1 Stateless Container Move & Stateful Container Move (without Standby)
Option 1: Reserve-Stop-Move: Request Resource first and Only make a move once resources are allocated by ClusterResource Manager (Preferred)
...
Pros
...
Cons
...
- Container only moves when a preferred resource is available for the container to start
- Offers stronger move semantics more effective when used with a Workload balancer
...
- Complex implementation as compared to Option 2
Option 2: Stop-Reserve-Move: Stop the container and then make a resource request for preferred resources and attempt a start (Rejected Alternative)
...
Pros
...
Cons
...
- Easier to implement
...
- Weaker Move semantics since if preferred resources are not available and since the container is already stopped then clearly this has a higher chance of moving container to anything other than a preferred host which is ineffective for a workload balancer
Orchestration of Stateless Container Move using Option 1: The image shows a sequence of steps involved in moving a container. Note that there is a specific number of retries configured for requesting preferred hosts to ClusterResourceManager.
The image shows a sequence of steps involved in moving a container. Note that there is a specific number of retries configured for requesting preferred hosts to ClusterResourceManager.
- ContainerPlacementHandler dispatches the move request to ContainerProcessManager
- ContainerProcessManager dispatches placement request to Containermanager which validates the action and issues a preferred resource request with ContainerAllocator (Async)
- Now there are two possible scenarios
- Requested Resources are allocated by ClusterResourceManager
- In this case, ContainerManager initiates a container shutdown and waits for the ContainerProcessManager callback on successfully stopping the container
- On getting successful stop callback, ContainerManager issues a start container start request to ClusterResourceManage on allocated resources
- If the container start request succeeds then a success notification is sent (updating the Control Action Metadata) otherwise a new resource request to start this container on source host is issued & and a failure notification is sent to the user
- Requested Resources are allocated by ClusterResourceManager
- Resource Request expires
- In cases where Resource Request expires since ClusterResourceManager is not able to allocate requested resources a failure notification is sent and the active container remains unaffected
Failure Scenarios:
- If the preferred resources are not able to be accrued the active container is never stopped and a failure notification is sent for the ContainerPacementRequest
- If the ContainerPlacementManager is not able to stop the active container (3.1 #1 above fails) in that case the request is marked failed & a failure notification is sent for the ContainerPacementRequest
- If ClusterResourceManager fails to start the stopped active container on the accrued destination host, then we attempt to start the container back on the source host and a failure notification is sent for the ContainerPacementRequest. If container fails to start on source host then an attempt is made to start on ANY_HOST
Option 3: Stateful without Standby (Spin Up StandBy container & then move) (Phase 2) [Strech]
If we expose an API to spin a StandBy Container for an active container, a client can then make two requests, one to spin up a StandBy then periodically monitor the lag for StandByCotntainer (Using Diagnostics). Once StandBy Container has a steady lag is ready to move, this case becomes Case 1 mentioned above. (More details TBD later)
2.1.2 Stateful Container Move with Standby Enabled
The stateful container is divided into two cases on the basis of whether the job has StandBy Container for it or not
Let's take a case when the job has a stateful container C on H1 and has a StandBy container enabled C’ on H2
Option 1: Stop-Reserve-Move: Initiate a Stop on C & issue a Standby failover (Preferred - Phase 1*)
In this option, C is stopped on H1 and failover is initiated from H1 to H2 with current semantics of Failover of an active Container in the Hot standby feature. This ensures C moves either to H2 (in the best case) or to some other hosts (when Cluster Manager cannot allocate capacity on H2, i.e H2 is under capacity)
...
Pros
...
Cons
...
- Easy to implement, ability to do this already exists
...
- C always moves from H1 and there a slim chance that C won’t land on H2 as per the current semantics (stolen-host scenario: Cluster Manager fails to allocate H2 for C)
Option 2: Reserve-Stop-Move: Initiate a Standby move first then issue a move for Active Container (Preferred - Phase 2*)
- In this option request is first issued to stop C’
- Then request H2 from cluster manager to start C
- Issue a stop on C only H2 can be allocated (#2 succeeds).
- Then request C’ to start ANY_HOST except for H1, H2
Two ways to achieve this:
- A client can make two calls for this first move C’ to ANY_HOST apart from H2, H1 (similar to a stateless move), then move the request of C to C’ (similar to a stateless move) so the state maintenance of this lives on the client
- Maintain state in JC to accomplish this
...
Pros
...
Cons
...
- Stronger move semantics, since the active container won’t be stopped if it cannot be moved to the standby container
...
- More complex as compared to Option 1 needs to maintain more state
* Phase 1 & Phase 2 refer to the implementation phases, please see the section: Implementation Plan
Option 3: Reserve-Stop-Move: First Request H2 from Cluster Manager and initiate the failover once you get H2 as a preferred resource [Rejected Alternative]
- In this option C’ is stopped on H2 and failover is initiated from H1 to H2 with current semantics as developed by the feature Hot standby
- In case of the move failed only affects standby
...
Pros
...
Cons
...
- Stronger move semantics are guaranteed, the active container won’t be stopped unless it can be started with H2
...
- Since there is a standby already running on H2, there is a higher chance that cluster manager might fail to allocate for H2 and move requests are ineffective (Host is running full capacity)
- Need to design a new failover scheme hence more development time to implement
Challenges with Metastore
Metastore today (Kafka) is at least once & eventually consistent, hence ContainerPlacementService has to do in-memory caching of UUIDs of accepted actions so that it does not take one request twice in case of duplicates delivered. But the in-memory caching must not be an unbounded cache since that can result in a job running out of memory. Size of a UUID is 16bytes, at max a job lets say might have 500 containers, then one request action per container for 500 containers will result in 0.008 MBs of increase memory (just in memory lookup). If we cache lets say last 20K actions (which can accomodate 40 failovers of 500 containers in the current scenario) the memory used will be 0.64 MBs at max if we implement a FIFO cache (inmemory lookup + fifo queue).
GC policy for stale messages in metastore
- One way to delete stale ContainerPlacementMessages is to delete request/responses from the previous incarnation of the job in the metastore on job restarts, this is the responsibility of ContainerPlacementService
- Once the request is complete, ContainerPlacementService can issue an async delete to clean up the request from the metastore
- Request/response message can be externally cleaned by a tool
Part 2. Container Placement Service
Container Placement service is a set of APIs built around AM to move/restart containers. Container Placement Service periodically gets a queue of placement actions per container which it issues in parallel across different containers but sequentially on one container. Each placement request has a "deploymentId" attached to it because if a job restarts all the placement actions queued for the previous deployment must be disregarded and deleted. Samza internally has an id generated for each run of a job ("app.run.id") that is generated at the job planning phase we can use that id as the "deploymentId" for placement requests.
The solution proposes to refactor & simplify the current AM code & introduce a ContainerManager which is a single entity managing container actions like start, stop for both active and standby containers. Enlisted are functions of ContainerManager & proposed refactoring around the AM code
- Remove the HostAwareContainerAllocator & ContainerAllocator, simplify Container Allocator as a simple lightweight entity allocating requests to available resources (PR1, PR2)
- Introduce ContainerManager which acts as a brain for validating and issuing any actions on containers in the Job Coordinator for both active & Standby containers. (PR)
- Transfer state & validation of container launch & expired request handling from ContainerAllocator to ContainerManager
- Transfer state & lifecycle management of Container allocator & resource request on job start (reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager*
- Encapsulates logic and state related to container placement actions like move, restarts for active & standby container in ContainerManager (PR-1, TDB)
- It is ContainerManager’s duty to validate any ContainerPlacementRequestMessages & also invalidate messages from the previous deployment incarnation
- It is ContainerManager’s duty to write ContainerPlacementResponseMessages to Metastore for the external controller to query the status of the request
- ContainerPlacementMetadata is a metadata holder for container actions (ControlActionMetadata) for a request_id, current status, requested resources etc
Note: *ClusterResourceManager.Callback (ContainerProcessManager) is tightly coupled with ClusterbasedJobCoordinator today, all the proposed changes will be done except for moving state & lifecycle management of Container allocator & resource request on job start (reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager in phase 1 of the implementation so that this feature can be developed faster. Hence ContainerProcessManager will still be tied with ClusterBasedJobCoordinator and will intercept any container placement requests.
2.1 Container Move
2.1.1 Stateless Container Move & Stateful Container Move (without Standby)
Option 1: Reserve-Stop-Move: Request Resource first and Only make a move once resources are allocated by ClusterResource Manager (Preferred)
Pros | Cons |
|
|
Option 2: Stop-Reserve-Move: Stop the container and then make a resource request for preferred resources and attempt a start (Rejected Alternative)
Pros | Cons |
|
|
Orchestration of Stateless Container Move using Option 1: The image shows a sequence of steps involved in moving a container. Note that there is a specific number of retries configured for requesting preferred hosts to ClusterResourceManager.
The image shows a sequence of steps involved in moving a container. Note that there is a specific number of retries configured for requesting preferred hosts to ClusterResourceManager.
- ContainerPlacementHandler dispatches the move request to ContainerProcessManager
- ContainerProcessManager dispatches placement request to Containermanager which validates the action and issues a preferred resource request with ContainerAllocator (Async)
- Now there are two possible scenarios
- Requested Resources are allocated by ClusterResourceManager
- In this case, ContainerManager initiates a container shutdown and waits for the ContainerProcessManager callback on successfully stopping the container
- On getting successful stop callback, ContainerManager issues a start container start request to ClusterResourceManage on allocated resources
- If the container start request succeeds then a success notification is sent (updating the Control Action Metadata) otherwise a new resource request to start this container on source host is issued & and a failure notification is sent to the user
- Requested Resources are allocated by ClusterResourceManager
- Resource Request expires
- In cases where Resource Request expires since ClusterResourceManager is not able to allocate requested resources a failure notification is sent and the active container remains unaffected
Who writes the new locality mapping after a successful move?
Samza container on a successful start write their new locality message to the metadata store (code), hence after a successful move container writes its new locality
Failure Scenarios:
- If the preferred resources are not able to be acquired the active container is never stopped and a failure notification is sent for the ContainerPacementRequest
- If the ContainerPlacementManager is not able to stop the active container (3.1 #1 above fails) in that case the request is marked failed & a failure notification is sent for the ContainerPacementRequest
- If ClusterResourceManager fails to start the stopped active container on the accrued destination host, then container fallbacks to source host and a failure notification are sent for the ContainerPacementRequest. If a container fails to start on source host then an attempt is made to start on ANY_HOST
Note: ClusterResourceManager.Callback (ContainerProcessManager) is tightly coupled with ClusterbasedJobCoordinator today, all the proposed changes (in part 2) will be done except for moving state & lifecycle management of Container allocator & resource request on job start(reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager in phase 1 of the implementation so that this feature can be developed faster. Hence ContainerProcessManager will still be tied with ClusterBasedJobCoordinator and will intercept any container placement requests.
Option 3: Stateful without Standby (Spin Up StandBy container & then move) (Phase 2) [Strech]
If we expose an API to spin a StandBy Container for an active container, a client can then make two requests, one to spin up a StandBy then periodically monitor the lag for StandByCotntainer (Using Diagnostics). Once StandBy Container has a steady lag is ready to move, this case becomes Case 1 mentioned above. (More details TBD later)
2.1.2 Stateful Container Move with Standby Enabled
The stateful container is divided into two cases on the basis of whether the job has StandBy Container for it or not
Let's take a case when the job has a stateful container C on H1 and has a StandBy container enabled C’ on H2
Option 1: Stop-Reserve-Move: Initiate a Stop on C & issue a Standby failover (Preferred - Phase 1*)
In this option, C is stopped on H1 and failover is initiated from H1 to H2 with current semantics of Failover of an active Container in the Hot standby feature. This ensures C moves either to H2 (in the best case) or to some other hosts (when Cluster Manager cannot allocate capacity on H2, i.e H2 is under capacity)
Pros | Cons |
|
|
Option 2: Reserve-Stop-Move: Initiate a Standby move first then issue a move for Active Container (Preferred - Phase 2*)
- In this option request is first issued to stop C’
- Then request H2 from cluster manager to start C
- Issue a stop on C only H2 can be allocated (#2 succeeds).
- Then request C’ to start ANY_HOST except for H1, H2
Two ways to achieve this:
- A client can make two calls for this first move C’ to ANY_HOST apart from H2, H1 (similar to a stateless move), then move the request of C to C’ (similar to a stateless move) so the state maintenance of this lives on the client
- Maintain state in JC to accomplish this
Pros | Cons |
|
|
* Phase 1 & Phase 2 refer to the implementation phases, please see the section: Implementation Plan
Option 3: Reserve-Stop-Move: First Request H2 from Cluster Manager and initiate the failover once you get H2 as a preferred resource [Rejected Alternative]
- In this option C’ is stopped on H2 and failover is initiated from H1 to H2 with current semantics as developed by the feature Hot standby
- In case of the move failed only affects standby
Pros | Cons |
|
|
Orchestration of StandBy Enabled Container Move using Option 1 (Phase 1)
- ContainerPlacementHandler dispatches the move request to ContainerProcessManager
- ContainerProcessManager registers a move & initiates a failover with StandByContainerManager then the following failover sequence is followed
2.2 Container Restart
This is much simpler as compared to Move. We can either make restart equivalent to a move or issue a stop a container and ContainerProcessManager will try to start that container again. Let's discuss these options in detail:
Option 1: Reserve-Stop-Start: Restart is equivalent to move on the same host [Preferred]
- In this option, resources are requested on the same host first before issuing a stop on Container
- Once the resources are accrued a container is stopped & requested to start
- So this is equivalent to move semantics above
Pros | Cons |
|
|
Both stateless & stateful container restarts will be equivalent to Stateless Container Move on the same host. Since the container is restarted on the same host then
Option 2: Stop-Reserve-Start: Restart is equivalent to stopping container first then attempting to start it [Rejected Alternative]
- In this option, a container is issued a stop first
- Once the container stops, resources are requested for starting on the same host (last seen host)
- Once the resources are accrued, the container is issued a start on the same host
Pros | Cons |
|
|
Usage Example:
Code Block | ||
---|---|---|
| ||
place-container --deployment-id 1581635852024-5117e303 --app-name snjain-test-cp --app.id = 1 --processor-id 4 --request-expiry 10 --destination-host abc.prod.com |
Code Block |
---|
@CommandLine.Command(name = "place-container", description = "Request to move/restart container at destination-host")
public class ContainerPlacementTool {
...
_appName = // read from commandline
_appId = // read from commandline
_deploymentId = // read from commandline
_processorId = // read from commandline
_destinationHost = // read from commandline
_requestExpiry = // read from commandline
MetadataStore metadataStore = buildMetadataStore(_appName, _appId);
try {
ContainerPlacementMetadataStore containerPlacementMetadataStore =
new ContainerPlacementMetadataStore(metadataStore);
containerPlacementMetadataStore.start();
Duration requestExpiry = _requestExpiry != null ? Duration.ofSeconds(_requestExpiry) : null;
UUID uuid = containerPlacementMetadataStore.writeContainerPlacementRequestMessage(_deploymentId, _processorId,
_destinationHost, _requestExpiry, System.currentTimeMillis());
System.out.println("Request received query the status using: " + uuid);
} finally {
metadataStore.close();
}
} |
Public Interfaces
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Encapsulates the request or response payload information between the ContainerPlacementHandler service and external
* controllers issuing placement actions
*/
@InterfaceStability.Evolving
public abstract class ContainerPlacementMessage {
public enum StatusCode {
/**
* Indicates that the container placement action is created
*/
CREATED,
/**
* Indicates that the container placement action was rejected because request was deemed invalid
*/
BAD_REQUEST,
/**
* Indicates that the container placement action is accepted and waiting to be processed
*/
ACCEPTED,
/**
* Indicates that the container placement action is in progress
*/
IN_PROGRESS,
/**
* Indicates that the container placement action is in progress
*/
SUCCEEDED,
/**
* Indicates that the container placement action is in failed
*/
FAILED;
}
/**
* UUID attached to a message which helps in identifying duplicate request messages written to metastore and not
* retake actions even if metastore is eventually consistent
*/
protected final UUID uuid;
/**
* Unique identifier for a deployment so messages can be invalidated across a job restarts
* for ex yarn bases cluster manager should set this to app attempt id
*/
protected final String deploymentId;
// Logical container Id 0, 1, 2
protected final String processorId;
// Destination host where container is desired to be moved
protected final String destinationHost;
// Optional request expiry which acts as a timeout for any resource request to cluster resource manager
protected final Duration requestExpiry;
// Status of the current request
protected final StatusCode statusCode;
// Timestamp of the request or response message
protected final long timestamp;
protected ContainerPlacementMessage(UUID uuid, String deploymentId, String processorId, String destinationHost,
Duration requestExpiry, StatusCode statusCode, long timestamp) {…}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Encapsulates the request sent from the external controller to the JobCoordinator to take a container placement action
*/
public class ContainerPlacementRequestMessage extends ContainerPlacementMessage {
public ContainerPlacementRequestMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...}
public ContainerPlacementRequestMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, long timestamp) {...}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Encapsulates the response sent from the JobCoordinator for a container placement action
*/
public class ContainerPlacementResponseMessage extends ContainerPlacementMessage {
// Returned status of the request
private String responseMessage;
public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost,
Duration requestExpiry, StatusCode statusCode, String responseMessage, long timestamp) {...}
public ContainerPlacementResponseMessage(UUID uuid, String deploymentId |
Orchestration of StandBy Enabled Container Move using Option 1 (Phase 1)
- ContainerPlacementHandler dispatches the move request to ContainerProcessManager
- ContainerProcessManager registers a move & initiates a failover with StandByContainerManager then the following failover sequence is followed
2.2 Container Restart
This is much simpler as compared to Move. We can either make restart equivalent to a move or issue a stop a container and ContainerProcessManager will try to start that container again. Let's discuss these options in detail:
Option 1: Reserve-Stop-Start: Restart is equivalent to move on the same host [Preferred]
- In this option, resources are requested on the same host first before issuing a stop on Container
- Once the resources are accrued a container is stopped & requested to start
- So this is equivalent to move semantics above
...
Pros
...
Cons
...
- Strong Restart semantics since the container is not stopped if it cannot be restarted on the same resource
...
- For a container to restart, it will be holding 2x resources on the same host for the time it has accrued resources to the time when the active container is issued a stop
Both stateless & stateful container restarts will be equivalent to Stateless Container Move on the same host. Since the container is restarted on the same host then
Option 2: Stop-Reserve-Start: Restart is equivalent to stopping container first then attempting to start it [Rejected Alternative]
- In this option, a container is issued a stop first
- Once the container stops, resources are requested for starting on the same host (last seen host)
- Once the resources are accrued, the container is issued a start on the same host
...
Pros
...
Cons
...
- The container at any point in time is only holding resources it needs to start on a host
...
- Weaker Restart semantics since there is a likely chance of restarting this container on any other host then the source host (since container is stopped & when Cluster Manager cannot return resources)
Public Interfaces
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Encapsulates the request or response payload information between the ContainerPlacementHandler service and external * controllers issuing placement actions */ public abstract class ContainerPlacementMessage { public enum StatusCode { /** * Indicates that the container placement action is created */ CREATED, /** * Indicates that the container placement action was rejected because request was deemed invalid */ BAD_REQUEST, /** * Indicates that the container placement action is accepted and waiting to be processed */ ACCEPTED, /** * Indicates that the container placement action is in progress */ IN_PROGRESS, /** * Indicates that the container placement action is in progress */ SUCCEEDED, /** * Indicates that the container placement action is in failed */ FAILED; } /** * UUID attached to a message which helps in identifying duplicate request messages written to metastore and not * retake actions even if metastore is eventually consistent */ protected final UUID uuid; /** * Unique identifier for a deployment so messages can be invalidated across a job restarts * for ex yarn bases cluster manager should set this to app attempt id */ protected final String applicationId; // Logical container Id 0, 1, 2 protected final String processorId; // Destination host where container is desired to be moved protected final String destinationHost; // Optional request expiry which acts as a timeout for any resource request to cluster resource manager protected final Duration requestExpiry; // Status of the current request protected final StatusCode statusCode; protected ContainerPlacementMessage(UUID uuid, String applicationId, String processorId, String destinationHost, StatusCode statusCode, DurationString requestExpiryresponseMessage, StatusCodelong statusCodetimestamp) {…...} } |
Other Interfaces
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * EncapsulatesEntity themanaging request sent from the external controllerread writes to the JobCoordinatormetastore tofor take a container placement action */ public class ContainerPlacementRequestMessage extends ContainerPlacementMessage { public ContainerPlacementRequestMessage(UUID uuid, String applicationId, String processorId, String destinationHost, Duration requestExpiry) {...} public ContainerPlacementRequestMessage(UUID uuid, String applicationId, String processorId, String destinationHost) {...} } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Encapsulates the response sent from the JobCoordinator for a container placement action
*/
public class ContainerPlacementResponseMessage extends ContainerPlacementMessage {
// Returned status of the request
private String responseMessage;
public ContainerPlacementResponseMessage(UUID uuid, String applicationId, String processorId, String destinationHost,
Duration requestExpiry, StatusCode statusCode, String responseMessage) {.. }
public ContainerPlacementResponseMessage(UUID uuid, String applicationId, String processorId, String destinationHost,
StatusCode statusCode, String responseMessage) {
this(uuid, applicationId, processorId, destinationHost, null, statusCode, responseMessage);
}
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class ContainerPlacementHandler { /** * Writes{@link org.apache.samza.container.placement.ContainerPlacementRequestMessage} * and {@link org.apache.samza.container.placement.ContainerPlacementResponseMessage} */ public class ContainerPlacementMetadataStore { /** * Writes a {@link ContainerPlacementRequestMessage} to the underlying metastore. This method should be used by external controllers * to issue a request to JobCoordinator * * @param deploymentId identifier of the deployment * @param processorId logical id of the samza container 0,1,2 * @param destinationHost host where the container is desired to move * @param requestExpiry optional per request expiry timeout for requests to cluster manager * @param timestamp timestamp of the request * @return uuid generated for the request */ public UUID writeContainerPlacementRequestMessage(String deploymentId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...} /** * Reads a {@link ContainerPlacementRequestMessage} tofrom the underlying metastore. * This@param methoduuid shoulduuid be used by external controllers to issue a request to JobCoordinator * * @param message container placement request of the request * @return ContainerPlacementRequestMessage is its present */ public voidOptional<ContainerPlacementRequestMessage> writeContainerPlacementRequestMessagereadContainerPlacementRequestMessage(ContainerPlacementRequestMessageUUID messageuuid) {...} /** * Reads a {@link ContainerPlacementRequestMessageContainerPlacementResponseMessage} from the underlying metastore * @param processorId key of the message, logical processor id * @param uuid uuid of a samza container 0,1,2 the request * @return ContainerPlacementRequestMessageContainerPlacementResponseMessage is its present */ public Optional<ContainerPlacementRequestMessage>Optional<ContainerPlacementResponseMessage> readContainerPlacementRequestMessagereadContainerPlacementResponseMessage(StringUUID processorIduuid) {...} /** * ReadsDeletes a {@link ContainerPlacementResponseMessageContainerPlacementRequestMessage} fromif thepresent underlyingidentified metastore * @param processorIdby the key of the message, logical processor id{@code uuid} * @param uuid uuid of a samza container 0,1,2 * @return ContainerPlacementResponseMessage is its present */ the request */ public Optional<ContainerPlacementResponseMessage>void readContainerPlacementResponseMessagedeleteContainerPlacementRequestMessage(StringUUID processorIduuid) {..} /** * Deletes a {@link ContainerPlacementRequestMessageContainerPlacementResponseMessage} if present identified by the key {@code processorIduuid} * @param processorIduuid logicaluuid processorof id 0,1,2 the request */ public void deleteContainerPlacementRequestMessagedeleteContainerPlacementResponseMessage(StringUUID processorIduuid) {..} /** * Deletes aboth {@link ContainerPlacementResponseMessageContainerPlacementRequestMessage} and if{@link presentContainerPlacementResponseMessage} identified by theuuid key* {@code processorId} * @param processorId logical processor id 0,1,2 @param uuid uuid of request and response message */ public void deleteContainerPlacementResponseMessagedeleteAllContainerPlacementMessages(StringUUID processorIduuid) {...} /** * Deletes all {@link ContainerPlacementRequestMessage} present in underlying metastore all {@link ContainerPlacementMessage} * @param uuid uuid of the request or response message */ public void deleteAllContainerPlacementRequestMessagesdeleteAllContainerPlacementMessages(UUID uuid) {..} /** * DeletesWrites alla {@link ContainerPlacementResponseMessage} presentto inthe underlying metastore. */ public void deleteAllContainerPlacementResponseMessages() {..} /** * Deletes all {@link ContainerPlacementMessage} */ public void deleteAllContainerPlacementMessages( * This method should be used by Job Coordinator only to write responses to Container Placement Action * @param responseMessage response message */ void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage responseMessage) {..} } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Writes a Stateless handler that periodically dispatches {@link ContainerPlacementResponseMessageContainerPlacementRequestMessage} toread thefrom underlyingMetadata metastore. * This method should be used bystore to Job Coordinator only to write responses to Container Placement Action * @param message */ void writeContainerPlacementResponseMessage {*/ public class ContainerPlacementRequestAllocator implements Runnable { @Override public void run() {...} } } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class ContainerManager { /** * Registers a container placement action to move the running container to destination host * * @param requestMessage request containing details of placement request * @param containerAllocator to request physical resources */ public void registerContainerPlacementAction(ContainerPlacementRequestMessage requestMessage, ContainerAllocator containerAllocator) {...} /** * Handles the container start action for both active & standby containers. This method is invoked by the allocator thread * * @param request pending request for the preferred host * @param preferredHost preferred host to start the container * @param allocatedResource resource allocated from {@link ClusterResourceManager} * @param resourceRequestState state of request in {@link ContainerAllocator} * @param allocator to request resources from @{@link ClusterResourceManager} * * @return true if the container launch is complete, false if the container launch is in progress. */ boolean handleContainerLaunch(SamzaResourceRequest request, String preferredHost, SamzaResource allocatedResource, ResourceRequestState resourceRequestState, ContainerAllocator allocator) {..} /** * Handle the container launch failure for active containers and standby (if enabled). * * @param processorId logical id of the container eg 1,2,3 * @param containerId last known id of the container deployed * @param preferredHost host on which container is requested to be deployed * @param containerAllocator allocator for requesting resources */ void handleContainerLaunchFail(String processorId, String containerId, String preferredHost, ContainerAllocator containerAllocator) {...} /** * Handles the state update on successful launch of a container * * @param processorId logical processor id of container 0,1,2 */ void handleContainerLaunchSuccess(String processorId) {...} /** * Handles the action to be taken after the container has been stopped. * * @param processorId logical id of the container eg 1,2,3 * @param containerId last known id of the container deployed * @param preferredHost host on which container was last deployed * @param exitStatus exit code returned by the container * @param preferredHostRetryDelay delay to be incurred before requesting resources * @param containerAllocator allocator for requesting resources */ void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus, Duration preferredHostRetryDelay, ContainerAllocator containerAllocator) {..} /** * Handles an expired resource request for both active and standby containers. * * @param processorId logical id of the container * @param preferredHost host on which container is requested to be deployed * @param request pending request for the preferred host * @param allocator allocator for requesting resources * @param resourceRequestState state of request in {@link ContainerAllocator} */ void handleExpiredRequest(String processorId, String preferredHost, SamzaResourceRequest request, ContainerAllocator allocator, ResourceRequestState resourceRequestState) {..} } |
...