Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleContainerPlacementMessage.java
linenumberstrue
/**
* Encapsulates the request or response payload information between the ContainerPlacementHandler service and external
* controllers issuing placement actions
*/
public abstract class ContainerPlacementMessage {

public enum StatusCode {
 /**
  * Indicates that the container placement action is created
  */
 CREATED,

 /**
  * Indicates that the container placement action was rejected because request was deemed invalid
  */
 BAD_REQUEST,

 /**
  * Indicates that the container placement action is accepted and waiting to be processed
  */
 ACCEPTED,

 /**
  * Indicates that the container placement action is in progress
  */
 IN_PROGRESS,

 /**
  * Indicates that the container placement action is in progress
  */
 SUCCEEDED,

 /**
  * Indicates that the container placement action is in failed
  */
 FAILED;
}

/**
* UUID attached to a message which helps in identifying duplicate request messages written to metastore and not
* retake actions even if metastore is eventually consistent
*/
protected final UUID uuid;
/**
* Unique identifier for a deployment so messages can be invalidated across a job restarts
* for ex yarn bases cluster manager should set this to app attempt id
*/
protected final String applicationId;
// Logical container Id 0, 1, 2
protected final String processorId;
// Destination host where container is desired to be moved
protected final String destinationHost;
// Optional request expiry which acts as a timeout for any resource request to cluster resource manager
protected final Duration requestExpiry;
// Status of the current request
protected final StatusCode statusCode;
// Timestamp of the request or response message
protected final long timestamp;

protected ContainerPlacementMessage(UUID uuid, String applicationId, String processorId, String destinationHost,
   Duration requestExpiry, StatusCode statusCode, long timestamp) {…}

}


Code Block
languagejava
titleContainerPlacementRequestMessage
linenumberstrue
/**
* Encapsulates the request sent from the external controller to the JobCoordinator to take a container placement action
*/
public class ContainerPlacementRequestMessage extends ContainerPlacementMessage {

public ContainerPlacementRequestMessage(UUID uuid, String applicationId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...}

public ContainerPlacementRequestMessage(UUID uuid, String applicationId, String processorId, String destinationHost, long timestamp) {...}
}


Code Block
languagejava
titleContainerPlacementResponseMessage
linenumberstrue
/**
* Encapsulates the response sent from the JobCoordinator for a container placement action
*/
public class ContainerPlacementResponseMessage extends ContainerPlacementMessage {
 // Returned status of the request
 private String responseMessage;

 public ContainerPlacementResponseMessage(UUID uuid, String applicationId, String processorId, String destinationHost,
     Duration requestExpiry, StatusCode statusCode, String responseMessage, long timestamp) {.. .}

 public ContainerPlacementResponseMessage(UUID uuid, String applicationId, String processorId, String destinationHost,
     StatusCode statusCode, String responseMessage) {
   this(uuid, applicationId, processorId, destinationHost, null, statusCode, responseMessage);
 long timestamp) {...}


Code Block
languagejava
titleContainerPlacementHandlerContainerPlacementMetadataStore
linenumberstrue
public class ContainerPlacementHandler {

/**
 * Entity managing read writes to the metastore for {@link org.apache.samza.container.placement.ContainerPlacementRequestMessage}
 * and {@link org.apache.samza.container.placement.ContainerPlacementResponseMessage}
 */
public class ContainerPlacementMetadataStore {

/**
 * Writes a {@link ContainerPlacementRequestMessage} to the underlying metastore.
* This method should be used by external controllers
 * to issue a request to JobCoordinator
*
* @param message container placement request
*/
public void writeContainerPlacementRequestMessage(ContainerPlacementRequestMessage message a request to JobCoordinator
 *
 * @param deploymentId identifier of the deployment
 * @param processorId logical id of the samza container 0,1,2
 * @param destinationHost host where the container is desired to move
 * @param requestExpiry optional per request expiry timeout for requests to cluster manager
 * @param timestamp timestamp of the request
 * @return uuid generated for the request
 */
public UUID writeContainerPlacementRequestMessage(String deploymentId, String processorId, String destinationHost,
      Duration requestExpiry, long timestamp) {...}

/**
 * Reads a {@link ContainerPlacementRequestMessage} from the underlying metastore
 * @param processorIduuid keyuuid of the message, logical processor id of a samza container 0,1,2
request
 * @return ContainerPlacementRequestMessage is its present
 */
public Optional<ContainerPlacementRequestMessage> readContainerPlacementRequestMessage(StringUUID processorIduuid) {...}

/**
 * Reads a {@link ContainerPlacementResponseMessage} from the underlying metastore
 * @param processorId key of the message, logical processor id of a samza container 0,1,2
uuid uuid of the request
 * @return ContainerPlacementResponseMessage is its present
 */

public Optional<ContainerPlacementResponseMessage> readContainerPlacementResponseMessage(StringUUID processorIduuid) {..}

/**
 * Deletes a {@link ContainerPlacementRequestMessage} if present identified by the key {@code processorIduuid}
 * @param processorIduuid logicaluuid processorof id 0,1,2
the request
 */
public void deleteContainerPlacementRequestMessage(StringUUID processorIduuid) {..}

/**
 * Deletes a {@link ContainerPlacementResponseMessage} if present identified by the key {@code processorIduuid}
 * @param processorId logical processor id 0,1,2
*/
public void deleteContainerPlacementResponseMessage(String processorId) {..}

/**
* Deletes all {@link ContainerPlacementRequestMessage} present in underlying metastore
uuid uuid of the request
 */
public void deleteAllContainerPlacementRequestMessagesdeleteContainerPlacementResponseMessage(UUID uuid) {..}

/**
 * Deletes all both {@link ContainerPlacementRequestMessage} and {@link ContainerPlacementResponseMessage} present in underlying metastore
*/
 identified by uuid
 * @param uuid uuid of request and response message
 */
public void deleteAllContainerPlacementResponseMessagesdeleteAllContainerPlacementMessages(UUID uuid) {...}

/**
 * Deletes all {@link ContainerPlacementMessage}
 * @param uuid uuid of the request
 */
public void deleteAllContainerPlacementMessages(UUID uuid) {..}

/**
 * Writes a {@link ContainerPlacementResponseMessage} to the underlying metastore. 
 * This method should be used by Job Coordinator only to write responses to Container Placement Action
 * @param responseMessage response message
 */
void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage responseMessage) {..}

}


Code Block
languagejava
titleContainerManager
linenumberstrue
public class ContainerManager {
/**
* Registers a container placement action to move the running container to destination host
*
* @param requestMessage request containing details of placement request
* @param containerAllocator to request physical resources
*/
public void registerContainerPlacementAction(ContainerPlacementRequestMessage requestMessage, ContainerAllocator containerAllocator) {...}

/**
* Handles the container start action for both active & standby containers. This method is invoked by the allocator thread
*
* @param request pending request for the preferred host
* @param preferredHost preferred host to start the container
* @param allocatedResource resource allocated from {@link ClusterResourceManager}
* @param resourceRequestState state of request in {@link ContainerAllocator}
* @param allocator to request resources from @{@link ClusterResourceManager}
*
* @return true if the container launch is complete, false if the container launch is in progress. 
*/
boolean handleContainerLaunch(SamzaResourceRequest request, String preferredHost, SamzaResource allocatedResource,
   ResourceRequestState resourceRequestState, ContainerAllocator allocator) {..}

/**
* Handle the container launch failure for active containers and standby (if enabled).
*
* @param processorId logical id of the container eg 1,2,3
* @param containerId last known id of the container deployed
* @param preferredHost host on which container is requested to be deployed
* @param containerAllocator allocator for requesting resources
*/
void handleContainerLaunchFail(String processorId, String containerId, String preferredHost,
   ContainerAllocator containerAllocator) {...}

/**
* Handles the state update on successful launch of a container
*
* @param processorId logical processor id of container 0,1,2
*/
void handleContainerLaunchSuccess(String processorId) {...}

/**
* Handles the action to be taken after the container has been stopped.
*
* @param processorId logical id of the container eg 1,2,3
* @param containerId last known id of the container deployed
* @param preferredHost host on which container was last deployed
* @param exitStatus exit code returned by the container
* @param preferredHostRetryDelay delay to be incurred before requesting resources
* @param containerAllocator allocator for requesting resources
*/
void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus,
   Duration preferredHostRetryDelay, ContainerAllocator containerAllocator) {..}

/**
* Handles an expired resource request for both active and standby containers.
*
* @param processorId logical id of the container
* @param preferredHost host on which container is requested to be deployed
* @param request pending request for the preferred host
* @param allocator allocator for requesting resources
* @param resourceRequestState state of request in {@link ContainerAllocator}
*/
void handleExpiredRequest(String processorId, String preferredHost,
   SamzaResourceRequest request, ContainerAllocator allocator, ResourceRequestState resourceRequestState) {..}
}

...