Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This approach proposes to directly change the container locality messages (Container to host mapping messages) in the Metadata Store (currently Kafka) and issuing a restart

Pros

Cons

  • Simple to implement the current tool does that for host affinity enabled jobs (since they maintain locality mapping)
  • Needs a job restart and does a best effort to get preferred hosts for containers but has no guarantee on getting them
  • If a job has standby containers enabled, this method involves changing standby mapping in addition to active container mappings 
  • Job faces downtime when the job has hundreds of containers and only one of them needs to be restarted, if it is stateful there is a likelihood that containers might not get the new asked resource on the restart and start bootstrapping
  • This solution is not scalable to be used by Controllers who want to take multiple control actions on containers across several jobs, for example, auto-sizing controller
  • This method will not be work for building Canary / Cluster Balancer

Solution 2. Container Placement

...

Service [Accepted]

API design

On the basis of types of Control actions, the commands are the following:       

...

  1. ContainerPlacementHandler dispatches the move request to ContainerProcessManager 
  2. ContainerProcessManager dispatches placement request to Containermanager which validates the action and issues a preferred resource request with ContainerAllocator (Async)
  3. Now there are two possible scenarios
    1. Requested Resources are allocated by ClusterResourceManager 
      1. In this case, ContainerManager initiates a container shutdown and waits for the ContainerProcessManager callback on successfully stopping the container
      2. On getting successful stop callback, ContainerManager issues a start container start request to ClusterResourceManage  on allocated resources 
      3. If the container start request succeeds then a success notification is sent (updating the Control Action Metadata) otherwise a new resource request to start this container on source host is issued & and a failure notification is sent to the user
  4. Resource Request expires
    1. In cases where Resource Request expires since ClusterResourceManager is not able to allocate requested resources a failure notification is sent and the active container remains unaffected


Who writes the new locality mapping after a successful move?

Samza container on a successful start write their new locality message to the metadata store (code), hence after a successful move container writes its new locality

Failure Scenarios:

  • If the preferred resources are not able to be acquired the active container is never stopped and a failure notification is sent for the ContainerPacementRequest
  • If the ContainerPlacementManager is not able to stop the active container (3.1 #1 above fails) in that  case the request is marked failed & a failure notification is sent for the ContainerPacementRequest
  • If ClusterResourceManager fails to start the stopped active container on the accrued destination host, then container fallbacks to source host and a failure notification are sent for the ContainerPacementRequest. If a container fails to start on source host then an attempt is made to start on ANY_HOST

...

Pros

Cons

  • The container at any point in time is only holding resources it needs to start on a host 
  • Weaker Restart semantics since there is a likely chance of restarting this container on any other host then the source host (since container is stopped & when Cluster Manager cannot return resources)

...

Usage Example:


Code Block
languagejava
titleContainerPlacementMessage.java
linenumberstrue
bash
 place-container --deployment-id 1581635852024-5117e303 --app-name snjain-test-cp --processor-id 4 --request-expiry 10 --destination-host ltx1-app1536.stg.linkedin.com 


Code Block
@CommandLine.Command(name = "place-container", description = "Request to move/restart container at destination-host")
public class ContainerPlacementTool {
    ...
    MetadataStore metadataStore = buildMetadataStore();
    _deploymentId = // read from commandline
    _processorId = // read from commandline
    _destinationHost = // read from commandline
    _requestExpiry = // read from commandline
    try {
      ContainerPlacementMetadataStore containerPlacementMetadataStore =
          new ContainerPlacementMetadataStore(metadataStore);
      containerPlacementMetadataStore.start();
      Duration requestExpiry = _requestExpiry != null ? Duration.ofSeconds(_requestExpiry) : null;
      UUID uuid = containerPlacementMetadataStore.writeContainerPlacementRequestMessage(_deploymentId, _processorId,
          _destinationHost, requestExpiry, System.currentTimeMillis());
      System.out.println("Request received query the status using: " + uuid);
    } finally {
      samzaJob.close();
      metadataStore.close();
 
   }
}


Public Interfaces


Code Block
languagejava
titleContainerPlacementMessage.java
linenumberstrue
/**
* Encapsulates the request or response payload /**
* Encapsulates the request or response payload information between the ContainerPlacementHandler service and external
* controllers issuing placement actions
*/
public abstract class ContainerPlacementMessage {

public enum StatusCode {
 /**
  * Indicates that the container placement action is created
  */
 CREATED,

 /**
  * Indicates that the container placement action was rejected because request was deemed invalid
  */
 BAD_REQUEST,

 /**
  * Indicates that the container placement action is accepted and waiting to be processed
  */
 ACCEPTED,

 /**
  * Indicates that the container placement action is in progress
  */
 IN_PROGRESS,

 /**
  * Indicates that the container placement action is in progress
  */
 SUCCEEDED,

 /**
  * Indicates that the container placement action is in failed
  */
 FAILED;
}

/**
* UUID attached to a message which helps in identifying duplicate request messages written to metastore and not
* retake actions even if metastore is eventually consistent
*/
protected final UUID uuid;
/**
* Unique identifier for a deployment so messages can be invalidated across a job restarts
* for ex yarn bases cluster manager should set this to app attempt id
*/
protected final String deploymentId;
// Logical container Id 0, 1, 2
protected final String processorId;
// Destination host where container is desired to be moved
protected final String destinationHost;
// Optional request expiry which acts as a timeout for any resource request to cluster resource manager
protected final Duration requestExpiry;
// Status of the current request
protected final StatusCode statusCode;
// Timestamp of the request or response message
protected final long timestamp;

protected ContainerPlacementMessage(UUID uuid, String deploymentId, String processorId, String destinationHost,
   Duration requestExpiry, StatusCode statusCode, long timestamp) {…}

}

...

Code Block
languagejava
titleContainerPlacementResponseMessage
linenumberstrue
/**
* Encapsulates the response sent from the JobCoordinator for a container placement action
*/
public class ContainerPlacementResponseMessage extends ContainerPlacementMessage {
 // Returned status of the request
 private String responseMessage;

 public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost,
     Duration requestExpiry, StatusCode statusCode, String responseMessage, long timestamp) {...}

 public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost,
     StatusCode statusCode, String responseMessage, long timestamp) {...}


Other Interfaces


Code Block
languagejava
titleContainerPlacementMetadataStore
linenumberstrue
/**
 * Entity managing read writes to the metastore for {@link org.apache.samza.container.placement.ContainerPlacementRequestMessage}
 * and {@link org.apache.samza.container.placement.ContainerPlacementResponseMessage}
 */
public class ContainerPlacementMetadataStore {

/**
 * Writes a {@link ContainerPlacementRequestMessage} to the underlying metastore. This method should be used by external controllers
 * to issue a request to JobCoordinator
 *
 * @param deploymentId identifier of the deployment
 * @param processorId logical id of the samza container 0,1,2
 * @param destinationHost host where the container is desired to move
 * @param requestExpiry optional per request expiry timeout for requests to cluster manager
 * @param timestamp timestamp of the request
 * @return uuid generated for the request
 */
public UUID writeContainerPlacementRequestMessage(String deploymentId, String processorId, String destinationHost,
      Duration requestExpiry, long timestamp) {...}

/**
 * Reads a {@link ContainerPlacementRequestMessage} from the underlying metastore
 * @param uuid uuid of the request
 * @return ContainerPlacementRequestMessage is its present
 */
public Optional<ContainerPlacementRequestMessage> readContainerPlacementRequestMessage(UUID uuid) {...}

/**
 * Reads a {@link ContainerPlacementResponseMessage} from the underlying metastore
 * @param uuid uuid of the request
 * @return ContainerPlacementResponseMessage is its present
 */
public Optional<ContainerPlacementResponseMessage> readContainerPlacementResponseMessage(UUID uuid) {..}

/**
 * Deletes a {@link ContainerPlacementRequestMessage} if present identified by the key {@code uuid}
 * @param uuid uuid of the request
 */
public void deleteContainerPlacementRequestMessage(UUID uuid) {..}

/**
 * Deletes a {@link ContainerPlacementResponseMessage} if present identified by the key {@code uuid}
 * @param uuid uuid of the request
 */
public void deleteContainerPlacementResponseMessage(UUID uuid) {..}

/**
 * Deletes both {@link ContainerPlacementRequestMessage} and {@link ContainerPlacementResponseMessage} identified by uuid
 * @param uuid uuid of request and response message
 */
public void deleteAllContainerPlacementMessages(UUID uuid) {...}

/**
 * Deletes all {@link ContainerPlacementMessage}
 * @param uuid uuid of the request or response message
 */
public void deleteAllContainerPlacementMessages(UUID uuid) {..}

/**
 * Writes a {@link ContainerPlacementResponseMessage} to the underlying metastore. 
 * This method should be used by Job Coordinator only to write responses to Container Placement Action
 * @param responseMessage response message
 */
void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage responseMessage) {..}

}

...