...
This approach proposes to directly change the container locality messages (Container to host mapping messages) in the Metadata Store (currently Kafka) and issuing a restart
Pros | Cons |
|
|
Solution 2. Container Placement
...
Service [Accepted]
API design
On the basis of types of Control actions, the commands are the following:
...
- ContainerPlacementHandler dispatches the move request to ContainerProcessManager
- ContainerProcessManager dispatches placement request to Containermanager which validates the action and issues a preferred resource request with ContainerAllocator (Async)
- Now there are two possible scenarios
- Requested Resources are allocated by ClusterResourceManager
- In this case, ContainerManager initiates a container shutdown and waits for the ContainerProcessManager callback on successfully stopping the container
- On getting successful stop callback, ContainerManager issues a start container start request to ClusterResourceManage on allocated resources
- If the container start request succeeds then a success notification is sent (updating the Control Action Metadata) otherwise a new resource request to start this container on source host is issued & and a failure notification is sent to the user
- Requested Resources are allocated by ClusterResourceManager
- Resource Request expires
- In cases where Resource Request expires since ClusterResourceManager is not able to allocate requested resources a failure notification is sent and the active container remains unaffected
Who writes the new locality mapping after a successful move?
Samza container on a successful start write their new locality message to the metadata store (code), hence after a successful move container writes its new locality
Failure Scenarios:
- If the preferred resources are not able to be acquired the active container is never stopped and a failure notification is sent for the ContainerPacementRequest
- If the ContainerPlacementManager is not able to stop the active container (3.1 #1 above fails) in that case the request is marked failed & a failure notification is sent for the ContainerPacementRequest
- If ClusterResourceManager fails to start the stopped active container on the accrued destination host, then container fallbacks to source host and a failure notification are sent for the ContainerPacementRequest. If a container fails to start on source host then an attempt is made to start on ANY_HOST
...
Pros | Cons |
|
|
...
Usage Example:
Code Block | |||||||
---|---|---|---|---|---|---|---|
| |||||||
place-container --deployment-id 1581635852024-5117e303 --app-name snjain-test-cp --processor-id 4 --request-expiry 10 --destination-host ltx1-app1536.stg.linkedin.com |
Code Block |
---|
@CommandLine.Command(name = "place-container", description = "Request to move/restart container at destination-host")
public class ContainerPlacementTool {
...
MetadataStore metadataStore = buildMetadataStore();
_deploymentId = // read from commandline
_processorId = // read from commandline
_destinationHost = // read from commandline
_requestExpiry = // read from commandline
try {
ContainerPlacementMetadataStore containerPlacementMetadataStore =
new ContainerPlacementMetadataStore(metadataStore);
containerPlacementMetadataStore.start();
Duration requestExpiry = _requestExpiry != null ? Duration.ofSeconds(_requestExpiry) : null;
UUID uuid = containerPlacementMetadataStore.writeContainerPlacementRequestMessage(_deploymentId, _processorId,
_destinationHost, requestExpiry, System.currentTimeMillis());
System.out.println("Request received query the status using: " + uuid);
} finally {
samzaJob.close();
metadataStore.close();
}
} |
Public Interfaces
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Encapsulates the request or response payload /** * Encapsulates the request or response payload information between the ContainerPlacementHandler service and external * controllers issuing placement actions */ public abstract class ContainerPlacementMessage { public enum StatusCode { /** * Indicates that the container placement action is created */ CREATED, /** * Indicates that the container placement action was rejected because request was deemed invalid */ BAD_REQUEST, /** * Indicates that the container placement action is accepted and waiting to be processed */ ACCEPTED, /** * Indicates that the container placement action is in progress */ IN_PROGRESS, /** * Indicates that the container placement action is in progress */ SUCCEEDED, /** * Indicates that the container placement action is in failed */ FAILED; } /** * UUID attached to a message which helps in identifying duplicate request messages written to metastore and not * retake actions even if metastore is eventually consistent */ protected final UUID uuid; /** * Unique identifier for a deployment so messages can be invalidated across a job restarts * for ex yarn bases cluster manager should set this to app attempt id */ protected final String deploymentId; // Logical container Id 0, 1, 2 protected final String processorId; // Destination host where container is desired to be moved protected final String destinationHost; // Optional request expiry which acts as a timeout for any resource request to cluster resource manager protected final Duration requestExpiry; // Status of the current request protected final StatusCode statusCode; // Timestamp of the request or response message protected final long timestamp; protected ContainerPlacementMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, Duration requestExpiry, StatusCode statusCode, long timestamp) {…} } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Encapsulates the response sent from the JobCoordinator for a container placement action */ public class ContainerPlacementResponseMessage extends ContainerPlacementMessage { // Returned status of the request private String responseMessage; public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, Duration requestExpiry, StatusCode statusCode, String responseMessage, long timestamp) {...} public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost, StatusCode statusCode, String responseMessage, long timestamp) {...} |
Other Interfaces
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Entity managing read writes to the metastore for {@link org.apache.samza.container.placement.ContainerPlacementRequestMessage} * and {@link org.apache.samza.container.placement.ContainerPlacementResponseMessage} */ public class ContainerPlacementMetadataStore { /** * Writes a {@link ContainerPlacementRequestMessage} to the underlying metastore. This method should be used by external controllers * to issue a request to JobCoordinator * * @param deploymentId identifier of the deployment * @param processorId logical id of the samza container 0,1,2 * @param destinationHost host where the container is desired to move * @param requestExpiry optional per request expiry timeout for requests to cluster manager * @param timestamp timestamp of the request * @return uuid generated for the request */ public UUID writeContainerPlacementRequestMessage(String deploymentId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...} /** * Reads a {@link ContainerPlacementRequestMessage} from the underlying metastore * @param uuid uuid of the request * @return ContainerPlacementRequestMessage is its present */ public Optional<ContainerPlacementRequestMessage> readContainerPlacementRequestMessage(UUID uuid) {...} /** * Reads a {@link ContainerPlacementResponseMessage} from the underlying metastore * @param uuid uuid of the request * @return ContainerPlacementResponseMessage is its present */ public Optional<ContainerPlacementResponseMessage> readContainerPlacementResponseMessage(UUID uuid) {..} /** * Deletes a {@link ContainerPlacementRequestMessage} if present identified by the key {@code uuid} * @param uuid uuid of the request */ public void deleteContainerPlacementRequestMessage(UUID uuid) {..} /** * Deletes a {@link ContainerPlacementResponseMessage} if present identified by the key {@code uuid} * @param uuid uuid of the request */ public void deleteContainerPlacementResponseMessage(UUID uuid) {..} /** * Deletes both {@link ContainerPlacementRequestMessage} and {@link ContainerPlacementResponseMessage} identified by uuid * @param uuid uuid of request and response message */ public void deleteAllContainerPlacementMessages(UUID uuid) {...} /** * Deletes all {@link ContainerPlacementMessage} * @param uuid uuid of the request or response message */ public void deleteAllContainerPlacementMessages(UUID uuid) {..} /** * Writes a {@link ContainerPlacementResponseMessage} to the underlying metastore. * This method should be used by Job Coordinator only to write responses to Container Placement Action * @param responseMessage response message */ void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage responseMessage) {..} } |
...