Status
Current state: [ UNDER DISCUSSION ]ACCEPTED
Discussion thread: http://mail-archives.apache.org/mod_mbox/samza-dev/202001.mbox/browser
JIRA: SAMZA-2373
Released: TBDSamza 1.5
Problem
Samza operates in a multi-tenant environment with cluster managers like Yarn and Mesos where a single host can run multiple Samza containers. Often due to soft limits configured for cluster managers like Yarn and no notion of dynamic workload balancing in Samza a host lands in a situation where it is underperforming and it is desired to move one or more containers from that host to other hosts. Today this is not possible without affecting other jobs on the hot host or restarting the affected job manually. In other use cases like resetting checkpoints of a single container or supporting canary or rolling bounces the ability to restart a single or a subset of containers without restarting the whole job is highly desirable.
...
This approach proposes to directly change the container locality messages (Container to host mapping messages) in the Metadata Store (currently Kafka) and issuing a restart
Pros | Cons |
|
|
Solution 2. Container Placement
...
API [Accepted]
API design
On the basis of types of Control actions, the commands are the following:
...
API | containerStatus |
Description | Gives the status & info of the container placement request, for ex is it running, stopped what control commands are issued on it |
Parameters | processor-id: Samza resource id of container e.g 0, 1, 2 deploymentId: unique identifier of the deployed app for which the action is taken |
Status code | BAD_REQUEST, SUCCEEDED |
Returns | Status of the Container placement action |
...
- ContainerPlacementHandler dispatches the move request to ContainerProcessManager
- ContainerProcessManager dispatches placement request to Containermanager which validates the action and issues a preferred resource request with ContainerAllocator (Async)
- Now there are two possible scenarios
- Requested Resources are allocated by ClusterResourceManager
- In this case, ContainerManager initiates a container shutdown and waits for the ContainerProcessManager callback on successfully stopping the container
- On getting successful stop callback, ContainerManager issues a start container start request to ClusterResourceManage on allocated resources
- If the container start request succeeds then a success notification is sent (updating the Control Action Metadata) otherwise a new resource request to start this container on source host is issued & and a failure notification is sent to the user
- Requested Resources are allocated by ClusterResourceManager
- Resource Request expires
- In cases where Resource Request expires since ClusterResourceManager is not able to allocate requested resources a failure notification is sent and the active container remains unaffected
Who writes the new locality mapping after a successful move?
Samza container on a successful start write their new locality message to the metadata store (code), hence after a successful move container writes its new locality
Failure Scenarios:
- If the preferred resources are not able to be acquired the active container is never stopped and a failure notification is sent for the ContainerPacementRequest
- If the ContainerPlacementManager is not able to stop the active container (3.1 #1 above fails) in that case the request is marked failed & a failure notification is sent for the ContainerPacementRequest
- If ClusterResourceManager fails to start the stopped active container on the accrued destination host, then container fallbacks to source host and a failure notification are sent for the ContainerPacementRequest. If a container fails to start on source host then an attempt is made to start on ANY_HOST
...
Pros | Cons |
|
|
...
Usage Example:
Code Block | |||||||
---|---|---|---|---|---|---|---|
| |||||||
place-container --deployment-id 1581635852024-5117e303 --app-name snjain-test-cp --app.id = 1 --processor-id 4 --request-expiry 10 --destination-host abc.prod.com |
Code Block |
---|
@CommandLine.Command(name = "place-container", description = "Request to move/restart container at destination-host")
public class ContainerPlacementTool {
...
_appName = // read from commandline
_appId = // read from commandline
_deploymentId = // read from commandline
_processorId = // read from commandline
_destinationHost = // read from commandline
_requestExpiry = // read from commandline
MetadataStore metadataStore = buildMetadataStore(_appName, _appId);
try {
ContainerPlacementMetadataStore containerPlacementMetadataStore =
new ContainerPlacementMetadataStore(metadataStore);
containerPlacementMetadataStore.start();
Duration requestExpiry = _requestExpiry != null ? Duration.ofSeconds(_requestExpiry) : null;
UUID uuid = containerPlacementMetadataStore.writeContainerPlacementRequestMessage(_deploymentId, _processorId,
_destinationHost, _requestExpiry, System.currentTimeMillis());
System.out.println("Request received query the status using: " + uuid);
} finally {
metadataStore.close();
}
} |
Public Interfaces
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Encapsulates the request or /** * Encapsulates the request or response payload information between the ContainerPlacementHandler service and external * controllers issuing placement actions */ @InterfaceStability.Evolving public abstract class ContainerPlacementMessage { public enum StatusCode { /** * Indicates that the container placement action is created */ CREATED, /** * Indicates that the container placement action was rejected because request was deemed invalid */ BAD_REQUEST, /** * Indicates that the container placement action is accepted and waiting to be processed */ ACCEPTED, /** * Indicates that the container placement action is in progress */ IN_PROGRESS, /** * Indicates that the container placement action is in progress */ SUCCEEDED, /** * Indicates that the container placement action is in failed */ FAILED; } /** * UUID attached to a message which helps in identifying duplicate request messages written to metastore and not * retake actions even if metastore is eventually consistent */ protected final UUID uuid; /** * Unique identifier for a deployment so messages can be invalidated across a job restarts * for ex yarn bases cluster manager should set this to app attempt id */ protected final String deploymentId; // Logical container Id 0, 1, 2 protected final String processorId; // Destination host where container is desired to be moved protected final String destinationHost; // Optional request expiry which acts as a timeout for any resource request to cluster resource manager protected final Duration requestExpiry; // Status of the current request protected final StatusCode statusCode; // Timestamp of the request or response message protected final long timestamp; protected ContainerPlacementMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, Duration requestExpiry, StatusCode statusCode, long timestamp) {…} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Encapsulates the response sent from the JobCoordinator for a container placement action */ public class ContainerPlacementResponseMessage extends ContainerPlacementMessage { // Returned status of the request private String responseMessage; public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, Duration requestExpiry, StatusCode statusCode, String responseMessage, long timestamp) {...} public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, StatusCode statusCode, String responseMessage, long timestamp) {...} |
Other Interfaces
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Entity managing read writes to the metastore for {@link org.apache.samza.container.placement.ContainerPlacementRequestMessage} * and {@link org.apache.samza.container.placement.ContainerPlacementResponseMessage} */ public class ContainerPlacementMetadataStore { /** * Writes a {@link ContainerPlacementRequestMessage} to the underlying metastore. This method should be used by external controllers * to issue a request to JobCoordinator * * @param deploymentId identifier of the deployment * @param processorId logical id of the samza container 0,1,2 * @param destinationHost host where the container is desired to move * @param requestExpiry optional per request expiry timeout for requests to cluster manager * @param timestamp timestamp of the request * @return uuid generated for the request */ public UUID writeContainerPlacementRequestMessage(String deploymentId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...} /** * Reads a {@link ContainerPlacementRequestMessage} from the underlying metastore * @param uuid uuid of the request * @return ContainerPlacementRequestMessage is its present */ public Optional<ContainerPlacementRequestMessage> readContainerPlacementRequestMessage(UUID uuid) {...} /** * Reads a {@link ContainerPlacementResponseMessage} from the underlying metastore * @param uuid uuid of the request * @return ContainerPlacementResponseMessage is its present */ public Optional<ContainerPlacementResponseMessage> readContainerPlacementResponseMessage(UUID uuid) {..} /** * Deletes a {@link ContainerPlacementRequestMessage} if present identified by the key {@code uuid} * @param uuid uuid of the request */ public void deleteContainerPlacementRequestMessage(UUID uuid) {..} /** * Deletes a {@link ContainerPlacementResponseMessage} if present identified by the key {@code uuid} * @param uuid uuid of the request */ public void deleteContainerPlacementResponseMessage(UUID uuid) {..} /** * Deletes both {@link ContainerPlacementRequestMessage} and {@link ContainerPlacementResponseMessage} identified by uuid * @param uuid uuid of request and response message */ public void deleteAllContainerPlacementMessages(UUID uuid) {...} /** * Deletes all {@link ContainerPlacementMessage} * @param uuid uuid of the request or response message */ public void deleteAllContainerPlacementMessages(UUID uuid) {..} /** * Writes a {@link ContainerPlacementResponseMessage} to the underlying metastore. * This method should be used by Job Coordinator only to write responses to Container Placement Action * @param responseMessage response message */ void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage responseMessage) {..} } |
...