Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: [ UNDER DISCUSSION ]ACCEPTED

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/202001.mbox/browser

JIRASAMZA-2373

Released: TBDSamza 1.5

Problem

Samza operates in a multi-tenant environment with cluster managers like Yarn and Mesos where a single host can run multiple Samza containers. Often due to soft limits configured for cluster managers like Yarn and no notion of dynamic workload balancing in Samza a host lands in a situation where it is underperforming and it is desired to move one or more containers from that host to other hosts. Today this is not possible without affecting other jobs on the hot host or restarting the affected job manually. In other use cases like resetting checkpoints of a single container or supporting canary or rolling bounces the ability to restart a single or a subset of containers without restarting the whole job is highly desirable. 

...

Selectively Spin StandBy Containers: Samza has a feature of Hot StandBy Containers for reducing stateful restoration times. Enabling this feature for a job involves doubling the containers at the least (simplest case where every container has 1 standby replica enabled). Customers are reluctant to enable this since doubling the containers increases the cost to serve. To improve the adoption for this feature we can build the ability to spin up StandBy Containers for a single or a subset of containers while the job is running, these StandBy Containers then can be used for failover to reduce downtime.  

...

  • This system does not intend to solve the problem of Dynamic Workload Balance i.e  Cruise control. It may act as a building block for one later.
  • Solving the canary problem for YARN based deployment model is out of the scope of this solution however system built should be easily extensible to support canary 
  • This system will not have built-in intelligence to find a better match for the host for a container it will make simplistic decisions as per params passed by the user.

SLA / SCALE / LIMITS (Assumptions)

  • At a time AM for a single job will only serve one request per container, parallel requests across containers are still supported. If a control request is underway any other requests issued on the same container will be queued. Same assumption holds for in-flight requests on standby and active i.e if any container placement request is in-progress for an active or its standby replica, all subsequent placement actions on either are queued
  • Actions are de-queued and sorted in order of timestamps populated by the client and are executed in that order
  • The system should be capable of scaling to be used across different jobs at the same time 

...

Solution 1. Write locality to Coordinator Stream (Metastore) and restart job [Rejected]

This approach proposes to directly change the container locality messages (Container to host mapping messages) in the Metadata Store (currently Kafka) and issuing a restart

Pros

Cons

  • Simple to implement the current tool does that for host

Pros

Cons

  • Simple to implement the current tool does that for host affinity enabled jobs (since they maintain locality mapping)
  • Needs a job restart and does a best effort to get preferred hosts for containers but has no guarantee on getting them
  • If a job has standby containers enabled, this method involves changing standby mapping in addition to active container mappings 
  • Job faces downtime when the job has hundreds of containers and only one of them needs to be restarted, if it is stateful there is a likelihood that containers might not get the new asked resource on the restart and start bootstrapping
  • This solution is not scalable to be used by Controllers who want to take multiple control actions on containers across several jobs, for example, auto-sizing controller
  • This method will not be work for building Canary / Cluster Balancer

Solution 2. Container Placement

...

API [Accepted]

API design

On the basis of types of Control actions, the commands are the following:       

...

API

placeContainer

Description

Active Container: Stops container process on source-host and starts it for 

  1. Stateless Job on either
    1. Destination-host (destination host can be source as well)
    2. Any host (destination-host = ANY_HOST)
  2. Stateful Job on either 
    1. Destination-host (if specified, destination host can be source as well)
    2. Standby Container (destination-host = STANDBY)
    3. Any host (destination-host = ANY_HOST)

StandBy Container: Stops container process on source-host and starts it on:

    1. Destination-host (if specified & matches StandBy Constraints)
    2. Any host (otherwise which matches StandBy Constraints)

Parameters

uuid: unique identifier of a request, populated by the client

applicationIddeploymentId: unique identifier of the deployed app for which the action is taken

processor-id: Samza resource id of container e.g 0, 1, 2 

destination-host: valid hostname / “ANY_HOST” / “STANDBY”

request-expiry-timeout: [optional]: timeout for any resource request to the cluster manager 

Status code

CREATED, BAD_REQUEST, ACCEPTED, IN_PROGRESS, SUCCEEDED, FAILED

Returns

Since this is an ASYNC API nothing is returned, UUID for the client to query the status of the request can be queried by processorId

Failure Scenarios

There are following cases under which a request to place container might fail:

  1. When an active container stop fails, in this case, we mark the request failed
  2. When requested resources cannot be obtained from the cluster manager, in this case, we mark the request failed
  3. When stopped active container fails to start on destination host in that case we mark the request failed and attempt to start on the source host, failure to do so results in starting the same on ANY_HOST

...

API

containerStatus

Description

Gives the status & info of the container placement request, for ex is it running, stopped what control commands are issued on it

Parameters

processor-id: Samza resource id of container e.g 0, 1, 2 

applicationIddeploymentId: unique identifier of the deployed app for which the action is taken

uuid: unique identifier of a request

Status code

BAD_REQUEST, SUCCEEDED

Returns

Status of the Container placement action 

...

API

controlStandBy

Description

Starts or Stops a standBy container for the active container

Parameters

processor-id: Samza resource id of container e.g 0, 1, 2 

applicationIddeploymentId: unique identifier of the deployed app for which the action is takenuuid: unique identifier of a request

Status code

CREATED, BAD_REQUEST, ACCEPTED, IN_PROGRESS, SUCCEEDED, FAILED

Architecture

Returns UUID for the client to query the status of the request

Architecture

For For implementing a scalable container placement control system, the proposed solution is divided into two parts:

...

Samza Metastore provides an API to write to the coordinator stream. One simple way to expose Container Placement API is, Container Placement handler can have a coordinator stream consumer polling control messages from the coordinator stream

under a separate namespace (different than the one where locality messages of containers live)  & acting on them.

Pros

Cons

  • No need to build Authentication & Authorization, already handled by the Metadata auth service
  • No need to enable Rate limiting since requests are queued so the flow of requests can be regulated at the consumer side
  • If AM dies there can be still queued requests in Coordinator Stream, such requests have to be handled across AM restarts
  • Coordinator stream is log compacted so control messages written to the coordinator stream need to be deleted to prevent it from growing to large sizes which can affect job start times

...

  1. ContainerPlacementHandler is a stateless handler dispatching ContainerPlacementRequestMessages from Metastore to Container Placement Service & ContainerPlacementResponseMessages from Container Placement Service to metastore for external controls to query the status of an action. (PR). 
  2. Metastore used today by in Samza by default is Kafka (coordinator stream) which is used to store configs & container mappings & is log compacted
  3. ContainerPlacementRequestMessage & ContainerPlacementResponseMessage are maintained in namespace using in same namespace using NamespaceAwareMetaStore ("samza-place-container-v1")

...

KeyValueField DescriptionField Type
"UUID.subType"uuidUnique identifier of a response messageRequired

processorId 

Logical processor id 0,1,2 of the containerRequired

deploymentIdUnique identifier for a deploymentRequired

subTypeType of message here: ContainerPlacementRequestMessageRequired

destinationHostDestination host where the container is desired to be movedRequired

statusCodeStatus of the requestRequired

timestampThe timestamp of the response messageRequired

requestExpiryEequest Request expiry which acts as a timeout for any resource request to cluster resource managerOptional

...

KeyValueField DescriptionField Type
"UUID.subType"uuidUnique identifier of a response messageRequired

processorId 

Logical processor id 0,1,2 of the containerRequired

deploymentIdUnique identifier for a deploymentRequired

subTypeType of message here: ContainerPlacementResponseMessageRequired

destinationHostDestination host where the container is desired to be movedRequired

statusCodeStatus of the responseRequired

responseMessageResponse message in conjunction to statusRequired

timestampThe timestamp of the response messageRequired

requestExpiryEequest Request expiry which acts as a timeout for any resource request to cluster resource managerOptional

...

  • Remove the HostAwareContainerAllocator & ContainerAllocator, simplify Container Allocator as a simple lightweight entity allocating requests to available resources (PR1, PR2)
  • Introduce ContainerManager which acts as a brain for validating and issuing any actions on containers in the Job Coordinator for both active & Standby containers. (PR)
    • Transfer state & validation of container launch & expired request handling from ContainerAllocator to ContainerManager
    • Transfer state & lifecycle management of Container allocator & resource request on job start (reading locality mapping) from ClusterResourceManager.CallBack(ContainerProcessManager) to ContainerManager*
  • Encapsulates logic and state related to container placement actions like move, restarts for active & standby container in ContainerManager (PR-1, TDB)
    • It is ContainerManager’s duty to validate any ContainerPlacementRequestMessages & also invalidate messages from the previous deployment incarnation
    • It is ContainerManager’s duty to write ContainerPlacementResponseMessages to Metastore for the external controller to query the status of the request
    • ContainerPlacementMetadata is a metadata holder for container actions (ControlActionMetadata) forex requestfor a request_id, current status, requested resources etc

...

  1. ContainerPlacementHandler dispatches the move request to ContainerProcessManager 
  2. ContainerProcessManager dispatches placement request to Containermanager which validates the action and issues a preferred resource request with ContainerAllocator (Async)
  3. Now there are two possible scenarios
    1. Requested Resources are allocated by ClusterResourceManager 
      1. In this case, ContainerManager initiates a container shutdown and waits for the ContainerProcessManager callback on successfully stopping the container
      2. On getting successful stop callback, ContainerManager issues a start container start request to ClusterResourceManage  on allocated resources 
      3. If the container start request succeeds then a success notification is sent (updating the Control Action Metadata) otherwise a new resource request to start this container on source host is issued & and a failure notification is sent to the user
  4. Resource Request expires
    1. In cases where Resource Request expires since ClusterResourceManager is not able to allocate requested resources a failure notification is sent and the active container remains unaffected


Who writes the new locality mapping after a successful move?

Samza container on a successful start write their new locality message to the metadata store (code), hence after a successful move container writes its new locality

Failure Scenarios:

  • If the preferred resources are not able to be accrued the acquired the active container is never stopped and a failure notification is sent for the ContainerPacementRequest
  • If the ContainerPlacementManager is not able to stop the active container (3.1 #1 above fails) in that  case the request is marked failed & a failure notification is sent for the ContainerPacementRequest
  • If ClusterResourceManager fails to start the stopped active container on the accrued destination host, then we attempt to start the container back on the container fallbacks to source host and a failure notification is are sent for the ContainerPacementRequest. If a container fails to start on source host then an attempt is made to start on ANY_HOST

...

Pros

Cons

  • The container at any point in time is only holding resources it needs to start on a host 
  • Weaker Restart semantics since there is a likely chance of restarting this container on any other host then the source host (since container is stopped & when Cluster Manager cannot return resources)is a likely chance of restarting this container on any other host then the source host (since container is stopped & when Cluster Manager cannot return resources)

Usage Example:


Code Block
languagebash
 place-container --deployment-id 1581635852024-5117e303 --app-name snjain-test-cp --app.id = 1 --processor-id 4 --request-expiry 10 --destination-host abc.prod.com


Code Block
@CommandLine.Command(name = "place-container", description = "Request to move/restart container at destination-host")
public class ContainerPlacementTool {
    ...
    _appName = // read from commandline
    _appId = // read from commandline
    _deploymentId = // read from commandline
    _processorId = // read from commandline
    _destinationHost = // read from commandline
    _requestExpiry = // read from commandline
    
    MetadataStore metadataStore = buildMetadataStore(_appName, _appId);
    try {
      ContainerPlacementMetadataStore containerPlacementMetadataStore =
          new ContainerPlacementMetadataStore(metadataStore);
      containerPlacementMetadataStore.start();
      Duration requestExpiry = _requestExpiry != null ? Duration.ofSeconds(_requestExpiry) : null;
      UUID uuid = containerPlacementMetadataStore.writeContainerPlacementRequestMessage(_deploymentId, _processorId,
          _destinationHost, _requestExpiry, System.currentTimeMillis());
      System.out.println("Request received query the status using: " + uuid);
    } finally {
      metadataStore.close();
   }
}


Public Interfaces


Code Block
languagejava
titleContainerPlacementMessage.java
linenumberstrue
/**
* Encapsulates the request or response payload information between the ContainerPlacementHandler service and external
* controllers issuing placement actions
*/
@InterfaceStability.Evolving
public abstract class ContainerPlacementMessage {

public enum StatusCode {
 /**
  * Indicates that the container placement action is created
  */
 CREATED,

 /**
  * Indicates that the container placement action was rejected because request was deemed invalid
  */
 BAD_REQUEST,

 /**
  * Indicates that the container placement action is accepted and waiting to be processed
  */
 ACCEPTED,

 /**
  * Indicates that the container placement action is in progress
  */
 IN_PROGRESS,

 /**
  * Indicates that the container placement action is in progress
  */
 SUCCEEDED,

 /**
  * Indicates that the container placement action is in failed
  */
 FAILED;
}

/**
* UUID attached to a message which helps in identifying duplicate request messages written to metastore and not
* retake actions even if metastore is eventually consistent
*/
protected final UUID uuid;
/**
* Unique identifier for a deployment so messages can be invalidated across a job restarts
* for ex yarn bases cluster manager should set this to app attempt id
*/
protected final String applicationIddeploymentId;
// Logical container Id 0, 1, 2
protected final String processorId;
// Destination host where container is desired to be moved
protected final String destinationHost;
// Optional request expiry which acts as a timeout for any resource request to cluster resource manager
protected final Duration requestExpiry;
// Status of the current request
protected final StatusCode statusCode;
// Timestamp of the request or response message
protected final long timestamp;

protected ContainerPlacementMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost,
   Duration requestExpiry, StatusCode statusCode, long timestamp) {…}

}

...

Code Block
languagejava
titleContainerPlacementRequestMessage
linenumberstrue
/**
* Encapsulates the request sent from the external controller to the JobCoordinator to take a container placement action
*/
public class ContainerPlacementRequestMessage extends ContainerPlacementMessage {

public ContainerPlacementRequestMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...}

public ContainerPlacementRequestMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost, long timestamp) {...}
}

...

Code Block
languagejava
titleContainerPlacementResponseMessage
linenumberstrue
/**
* Encapsulates the response sent from the JobCoordinator for a container placement action
*/
public class ContainerPlacementResponseMessage extends ContainerPlacementMessage {
 // Returned status of the request
 private String responseMessage;

 public ContainerPlacementResponseMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost,
     Duration requestExpiry, StatusCode statusCode, String responseMessage, long timestamp) {...}

 public ContainerPlacementResponseMessage(UUID uuid, String applicationIddeploymentId, String processorId, String destinationHost,
     StatusCode statusCode, String responseMessage, long timestamp) {...}


Other Interfaces


Code Block
languagejava
titleContainerPlacementMetadataStore
linenumberstrue
/**
 * Entity managing read writes to the metastore for {@link org.apache.samza.container.placement.ContainerPlacementRequestMessage}
 * and {@link org.apache.samza.container.placement.ContainerPlacementResponseMessage}
 */
public class ContainerPlacementMetadataStore {

/**
 * Writes a {@link ContainerPlacementRequestMessage} to the underlying metastore. This method should be used by external controllers
 * to issue a request to JobCoordinator
 *
 * @param deploymentId identifier of the deployment
 * @param processorId logical id of the samza container 0,1,2
 * @param destinationHost host where the container is desired to move
 * @param requestExpiry optional per request expiry timeout for requests to cluster manager
 * @param timestamp timestamp of the request
 * @return uuid generated for the request
 */
public UUID writeContainerPlacementRequestMessage(String deploymentId, String processorId, String destinationHost,
      Duration requestExpiry, long timestamp) {...}

/**
 * Reads a {@link ContainerPlacementRequestMessage} from the underlying metastore
 * @param uuid uuid of the request
 * @return ContainerPlacementRequestMessage is its present
 */
public Optional<ContainerPlacementRequestMessage> readContainerPlacementRequestMessage(UUID uuid) {...}

/**
 * Reads a {@link ContainerPlacementResponseMessage} from the underlying metastore
 * @param uuid uuid of the request
 * @return ContainerPlacementResponseMessage is its present
 */
public Optional<ContainerPlacementResponseMessage> readContainerPlacementResponseMessage(UUID uuid) {..}

/**
 * Deletes a {@link ContainerPlacementRequestMessage} if present identified by the key {@code uuid}
 * @param uuid uuid of the request
 */
public void deleteContainerPlacementRequestMessage(UUID uuid) {..}

/**
 * Deletes a {@link ContainerPlacementResponseMessage} if present identified by the key {@code uuid}
 * @param uuid uuid of the request
 */
public void deleteContainerPlacementResponseMessage(UUID uuid) {..}

/**
 * Deletes both {@link ContainerPlacementRequestMessage} and {@link ContainerPlacementResponseMessage} identified by uuid
 * @param uuid uuid of request and response message
 */
public void deleteAllContainerPlacementMessages(UUID uuid) {...}

/**
 * Deletes all {@link ContainerPlacementMessage}
 * @param uuid uuid of the request or response message
 */
public void deleteAllContainerPlacementMessages(UUID uuid) {..}

/**
 * Writes a {@link ContainerPlacementResponseMessage} to the underlying metastore. 
 * This method should be used by Job Coordinator only to write responses to Container Placement Action
 * @param responseMessage response message
 */
void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage responseMessage) {..}

}

...