Status
Current state: [ UNDER DISCUSSION ]ACCEPTED
Discussion thread: http://mail-archives.apache.org/mod_mbox/samza-dev/202001.mbox/browser
JIRA: SAMZA-2373
Released: TBDSamza 1.5
Problem
Samza operates in a multi-tenant environment with cluster managers like Yarn and Mesos where a single host can run multiple Samza containers. Often due to soft limits configured for cluster managers like Yarn and no notion of dynamic workload balancing in Samza a host lands in a situation where it is underperforming and it is desired to move one or more containers from that host to other hosts. Today this is not possible without affecting other jobs on the hot host or restarting the affected job manually. In other use cases like resetting checkpoints of a single container or supporting canary or rolling bounces the ability to restart a single or a subset of containers without restarting the whole job is highly desirable.
...
- This system does not intend to solve the problem of Dynamic Workload Balance i.e Cruise control. It may act as a building block for one later.
- Solving the canary problem for YARN based deployment model is out of the scope of this solution however system built should be easily extensible to support canary
- This system will not have built-in intelligence to find a better match for the host for a container it will make simplistic decisions as per params passed by the user.
SLA / SCALE / LIMITS (Assumptions)
- At a time AM for a single job will only serve one request per container, parallel requests across containers are still supported. If a control request is underway any other requests issued on the same container will be queued. Same assumption holds for in-flight requests on standby and active i.e if any container placement request is in-progress for an active or its standby replica, all subsequent placement actions on either are queued
- Actions are de-queued and sorted in order of timestamps populated by the client and are executed in that order
- The system should be capable of scaling to be used across different jobs at the same time
...
Solution 1. Write locality to Coordinator Stream (Metastore) and restart job [Rejected]
This approach proposes to directly change the container locality messages (Container to host mapping messages) in the Metadata Store (currently Kafka) and issuing a restart
Pros | Cons |
| |
Pros | Cons |
|
|
Solution 2. Container Placement
...
API [Accepted]
API design
On the basis of types of Control actions, the commands are the following:
...
API | placeContainer |
Description | Active Container: Stops container process on source-host and starts it for
StandBy Container: Stops container process on source-host and starts it on:
|
Parameters | uuid: unique identifier of a request, populated by the client applicationIddeploymentId: unique identifier of the deployed app for which the action is taken processor-id: Samza resource id of container e.g 0, 1, 2 destination-host: valid hostname / “ANY_HOST” / “STANDBY” request-expiry-timeout: [optional]: timeout for any resource request to the cluster manager |
Status code | CREATED, BAD_REQUEST, ACCEPTED, IN_PROGRESS, SUCCEEDED, FAILED |
Returns | Since this is an ASYNC API nothing is returned, UUID for the client to query the status of the request can be queried by processorId |
Failure Scenarios | There are following cases under which a request to place container might fail:
|
...
API | containerStatus |
Description | Gives the status & info of the container placement request, for ex is it running, stopped what control commands are issued on it |
Parameters | processor-id: Samza resource id of container e.g 0, 1, 2 applicationIddeploymentId: unique identifier of the deployed app for which the action is taken uuid: unique identifier of a request |
Status code | BAD_REQUEST, SUCCEEDED |
Returns | Status of the Container placement action |
...
API | controlStandBy |
Description | Starts or Stops a standBy container for the active container |
Parameters | processor-id: Samza resource id of container e.g 0, 1, 2 applicationIddeploymentId: unique identifier of the deployed app for which the action is takenuuid: unique identifier of a request |
Status code | CREATED, BAD_REQUEST, ACCEPTED, IN_PROGRESS, SUCCEEDED, FAILED |
Architecture
Returns | UUID for the client to query the status of the request |
Architecture
For For implementing a scalable container placement control system, the proposed solution is divided into two parts:
...
Samza Metastore provides an API to write to the coordinator stream. One simple way to expose Container Placement API is, Container Placement handler can have a coordinator stream consumer polling control messages from the coordinator stream
under a separate namespace (different than the one where locality messages of containers live) & acting on them.
Pros | Cons |
|
|
...
- ContainerPlacementHandler is a stateless handler dispatching ContainerPlacementRequestMessages from Metastore to Container Placement Service & ContainerPlacementResponseMessages from Container Placement Service to metastore for external controls to query the status of an action. (PR).
- Metastore used today by in Samza by default is Kafka (coordinator stream) which is used to store configs & container mappings & is log compacted
- ContainerPlacementRequestMessage & ContainerPlacementResponseMessage are maintained in namespace using in same namespace using NamespaceAwareMetaStore ("samza-place-container-v1")
...
- ContainerPlacementHandler dispatches the move request to ContainerProcessManager
- ContainerProcessManager dispatches placement request to Containermanager which validates the action and issues a preferred resource request with ContainerAllocator (Async)
- Now there are two possible scenarios
- Requested Resources are allocated by ClusterResourceManager
- In this case, ContainerManager initiates a container shutdown and waits for the ContainerProcessManager callback on successfully stopping the container
- On getting successful stop callback, ContainerManager issues a start container start request to ClusterResourceManage on allocated resources
- If the container start request succeeds then a success notification is sent (updating the Control Action Metadata) otherwise a new resource request to start this container on source host is issued & and a failure notification is sent to the user
- Requested Resources are allocated by ClusterResourceManager
- Resource Request expires
- In cases where Resource Request expires since ClusterResourceManager is not able to allocate requested resources a failure notification is sent and the active container remains unaffected
Who writes the new locality mapping after a successful move?
Samza container on a successful start write their new locality message to the metadata store (code), hence after a successful move container writes its new locality
Failure Scenarios:
- If the preferred resources are not able to be acquired the active container is never stopped and a failure notification is sent for the ContainerPacementRequest
- If the ContainerPlacementManager is not able to stop the active container (3.1 #1 above fails) in that case the request is marked failed & a failure notification is sent for the ContainerPacementRequest
- If ClusterResourceManager fails to start the stopped active container on the accrued destination host, then container fallbacks to source host and a failure notification are sent for the ContainerPacementRequest. If a container fails to start on source host then an attempt is made to start on ANY_HOST
...
Pros | Cons |
|
|
Usage Example:
Code Block | ||
---|---|---|
| ||
place-container --deployment-id 1581635852024-5117e303 --app-name snjain-test-cp --app.id = 1 --processor-id 4 --request-expiry 10 --destination-host abc.prod.com |
Code Block |
---|
@CommandLine.Command(name = "place-container", description = "Request to move/restart container at destination-host")
public class ContainerPlacementTool {
...
_appName = // read from commandline
_appId = // read from commandline
_deploymentId = // read from commandline
_processorId = // read from commandline
_destinationHost = // read from commandline
_requestExpiry = // read from commandline
MetadataStore metadataStore = buildMetadataStore(_appName, _appId);
try {
ContainerPlacementMetadataStore containerPlacementMetadataStore =
new ContainerPlacementMetadataStore(metadataStore);
containerPlacementMetadataStore.start();
Duration requestExpiry = _requestExpiry != null ? Duration.ofSeconds(_requestExpiry) : null;
UUID uuid = containerPlacementMetadataStore.writeContainerPlacementRequestMessage(_deploymentId, _processorId,
_destinationHost, _requestExpiry, System.currentTimeMillis());
System.out.println("Request received query the status using: " + uuid);
} finally {
metadataStore.close();
}
} |
Public Interfaces
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Encapsulates the request or response payload information between the ContainerPlacementHandler service and external * controllers issuing placement actions */ @InterfaceStability.Evolving public abstract class ContainerPlacementMessage { public enum StatusCode { /** * Indicates that the container placement action is created */ CREATED, /** * Indicates that the container placement action was rejected because request was deemed invalid */ BAD_REQUEST, /** * Indicates that the container placement action is accepted and waiting to be processed */ ACCEPTED, /** * Indicates that the container placement action is in progress */ IN_PROGRESS, /** * Indicates that the container placement action is in progress */ SUCCEEDED, /** * Indicates that the container placement action is in failed */ FAILED; } /** * UUID attached to a message which helps in identifying duplicate request messages written to metastore and not * retake actions even if metastore is eventually consistent */ protected final UUID uuid; /** * Unique identifier for a deployment so messages can be invalidated across a job restarts * for ex yarn bases cluster manager should set this to app attempt id */ protected final String applicationIddeploymentId; // Logical container Id 0, 1, 2 protected final String processorId; // Destination host where container is desired to be moved protected final String destinationHost; // Optional request expiry which acts as a timeout for any resource request to cluster resource manager protected final Duration requestExpiry; // Status of the current request protected final StatusCode statusCode; // Timestamp of the request or response message protected final long timestamp; protected ContainerPlacementMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost, Duration requestExpiry, StatusCode statusCode, long timestamp) {…} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Encapsulates the request sent from the external controller to the JobCoordinator to take a container placement action */ public class ContainerPlacementRequestMessage extends ContainerPlacementMessage { public ContainerPlacementRequestMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...} public ContainerPlacementRequestMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost, long timestamp) {...} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Encapsulates the response sent from the JobCoordinator for a container placement action */ public class ContainerPlacementResponseMessage extends ContainerPlacementMessage { // Returned status of the request private String responseMessage; public ContainerPlacementResponseMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost, Duration requestExpiry, StatusCode statusCode, String responseMessage, long timestamp) {...} public ContainerPlacementResponseMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost, StatusCode statusCode, String responseMessage, long timestamp) {...} |
Other Interfaces
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Entity managing read writes to the metastore for {@link org.apache.samza.container.placement.ContainerPlacementRequestMessage} * and {@link org.apache.samza.container.placement.ContainerPlacementResponseMessage} */ public class ContainerPlacementMetadataStore { /** * Writes a {@link ContainerPlacementRequestMessage} to the underlying metastore. This method should be used by external controllers * to issue a request to JobCoordinator * * @param deploymentId identifier of the deployment * @param processorId logical id of the samza container 0,1,2 * @param destinationHost host where the container is desired to move * @param requestExpiry optional per request expiry timeout for requests to cluster manager * @param timestamp timestamp of the request * @return uuid generated for the request */ public UUID writeContainerPlacementRequestMessage(String deploymentId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...} /** * Reads a {@link ContainerPlacementRequestMessage} from the underlying metastore * @param uuid uuid of the request * @return ContainerPlacementRequestMessage is its present */ public Optional<ContainerPlacementRequestMessage> readContainerPlacementRequestMessage(UUID uuid) {...} /** * Reads a {@link ContainerPlacementResponseMessage} from the underlying metastore * @param uuid uuid of the request * @return ContainerPlacementResponseMessage is its present */ public Optional<ContainerPlacementResponseMessage> readContainerPlacementResponseMessage(UUID uuid) {..} /** * Deletes a {@link ContainerPlacementRequestMessage} if present identified by the key {@code uuid} * @param uuid uuid of the request */ public void deleteContainerPlacementRequestMessage(UUID uuid) {..} /** * Deletes a {@link ContainerPlacementResponseMessage} if present identified by the key {@code uuid} * @param uuid uuid of the request */ public void deleteContainerPlacementResponseMessage(UUID uuid) {..} /** * Deletes both {@link ContainerPlacementRequestMessage} and {@link ContainerPlacementResponseMessage} identified by uuid * @param uuid uuid of request and response message */ public void deleteAllContainerPlacementMessages(UUID uuid) {...} /** * Deletes all {@link ContainerPlacementMessage} * @param uuid uuid of the request or response message */ public void deleteAllContainerPlacementMessages(UUID uuid) {..} /** * Writes a {@link ContainerPlacementResponseMessage} to the underlying metastore. * This method should be used by Job Coordinator only to write responses to Container Placement Action * @param responseMessage response message */ void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage responseMessage) {..} } |
...