Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.



Discussion thread


Released: TBDSamza 1.5


Samza operates in a multi-tenant environment with cluster managers like Yarn and Mesos where a single host can run multiple Samza containers. Often due to soft limits configured for cluster managers like Yarn and no notion of dynamic workload balancing in Samza a host lands in a situation where it is underperforming and it is desired to move one or more containers from that host to other hosts. Today this is not possible without affecting other jobs on the hot host or restarting the affected job manually. In other use cases like resetting checkpoints of a single container or supporting canary or rolling bounces the ability to restart a single or a subset of containers without restarting the whole job is highly desirable. 


This approach proposes to directly change the container locality messages (Container to host mapping messages) in the Metadata Store (currently Kafka) and issuing a restart



  • Simple to implement the current tool does that for host affinity enabled jobs (since they maintain locality mapping)
  • Needs a job restart and does a best effort to get preferred hosts for containers but has no guarantee on getting them
  • If a job has standby containers enabled, this method involves changing standby mapping in addition to active container mappings 
  • Job faces downtime when the job has hundreds of containers and only one of them needs to be restarted, if it is stateful there is a likelihood that containers might not get the new asked resource on the restart and start bootstrapping
  • This solution is not scalable to be used by Controllers who want to take multiple control actions on containers across several jobs, for example, auto-sizing controller
  • This method will not be work for building Canary / Cluster Balancer

Solution 2. Container Placement


API [Accepted]

API design

On the basis of types of Control actions, the commands are the following:       





Gives the status & info of the container placement request, for ex is it running, stopped what control commands are issued on it


processor-id: Samza resource id of container e.g 0, 1, 2 

deploymentId: unique identifier of the deployed app for which the action is taken

Status code



Status of the Container placement action 


  1. ContainerPlacementHandler dispatches the move request to ContainerProcessManager 
  2. ContainerProcessManager dispatches placement request to Containermanager which validates the action and issues a preferred resource request with ContainerAllocator (Async)
  3. Now there are two possible scenarios
    1. Requested Resources are allocated by ClusterResourceManager 
      1. In this case, ContainerManager initiates a container shutdown and waits for the ContainerProcessManager callback on successfully stopping the container
      2. On getting successful stop callback, ContainerManager issues a start container start request to ClusterResourceManage  on allocated resources 
      3. If the container start request succeeds then a success notification is sent (updating the Control Action Metadata) otherwise a new resource request to start this container on source host is issued & and a failure notification is sent to the user
  4. Resource Request expires
    1. In cases where Resource Request expires since ClusterResourceManager is not able to allocate requested resources a failure notification is sent and the active container remains unaffected

Who writes the new locality mapping after a successful move?

Samza container on a successful start write their new locality message to the metadata store (code), hence after a successful move container writes its new locality

Failure Scenarios:

  • If the preferred resources are not able to be acquired the active container is never stopped and a failure notification is sent for the ContainerPacementRequest
  • If the ContainerPlacementManager is not able to stop the active container (3.1 #1 above fails) in that  case the request is marked failed & a failure notification is sent for the ContainerPacementRequest
  • If ClusterResourceManager fails to start the stopped active container on the accrued destination host, then container fallbacks to source host and a failure notification are sent for the ContainerPacementRequest. If a container fails to start on source host then an attempt is made to start on ANY_HOST




  • The container at any point in time is only holding resources it needs to start on a host 
  • Weaker Restart semantics since there is a likely chance of restarting this container on any other host then the source host (since container is stopped & when Cluster Manager cannot return resources)


Usage Example:

Code Block
 place-container --deployment-id 1581635852024-5117e303 --app-name snjain-test-cp = 1 --processor-id 4 --request-expiry 10 --destination-host

Code Block
@CommandLine.Command(name = "place-container", description = "Request to move/restart container at destination-host")
public class ContainerPlacementTool {
    _appName = // read from commandline
    _appId = // read from commandline
    _deploymentId = // read from commandline
    _processorId = // read from commandline
    _destinationHost = // read from commandline
    _requestExpiry = // read from commandline
    MetadataStore metadataStore = buildMetadataStore(_appName, _appId);
    try {
      ContainerPlacementMetadataStore containerPlacementMetadataStore =
          new ContainerPlacementMetadataStore(metadataStore);
      Duration requestExpiry = _requestExpiry != null ? Duration.ofSeconds(_requestExpiry) : null;
      UUID uuid = containerPlacementMetadataStore.writeContainerPlacementRequestMessage(_deploymentId, _processorId,
          _destinationHost, _requestExpiry, System.currentTimeMillis());
      System.out.println("Request received query the status using: " + uuid);
    } finally {

Public Interfaces

Code Block
* Encapsulates the request or /**
* Encapsulates the request or response payload information between the ContainerPlacementHandler service and external
* controllers issuing placement actions
public abstract class ContainerPlacementMessage {

public enum StatusCode {
  * Indicates that the container placement action is created

  * Indicates that the container placement action was rejected because request was deemed invalid

  * Indicates that the container placement action is accepted and waiting to be processed

  * Indicates that the container placement action is in progress

  * Indicates that the container placement action is in progress

  * Indicates that the container placement action is in failed

* UUID attached to a message which helps in identifying duplicate request messages written to metastore and not
* retake actions even if metastore is eventually consistent
protected final UUID uuid;
* Unique identifier for a deployment so messages can be invalidated across a job restarts
* for ex yarn bases cluster manager should set this to app attempt id
protected final String deploymentId;
// Logical container Id 0, 1, 2
protected final String processorId;
// Destination host where container is desired to be moved
protected final String destinationHost;
// Optional request expiry which acts as a timeout for any resource request to cluster resource manager
protected final Duration requestExpiry;
// Status of the current request
protected final StatusCode statusCode;
// Timestamp of the request or response message
protected final long timestamp;

protected ContainerPlacementMessage(UUID uuid, String deploymentId, String processorId, String destinationHost,
   Duration requestExpiry, StatusCode statusCode, long timestamp) {…}



Code Block
* Encapsulates the response sent from the JobCoordinator for a container placement action
public class ContainerPlacementResponseMessage extends ContainerPlacementMessage {
 // Returned status of the request
 private String responseMessage;

 public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost,
     Duration requestExpiry, StatusCode statusCode, String responseMessage, long timestamp) {...}

 public ContainerPlacementResponseMessage(UUID uuid, String deploymentId, String processorId, String destinationHost,
     StatusCode statusCode, String responseMessage, long timestamp) {...}

Other Interfaces

Code Block
 * Entity managing read writes to the metastore for {@link org.apache.samza.container.placement.ContainerPlacementRequestMessage}
 * and {@link org.apache.samza.container.placement.ContainerPlacementResponseMessage}
public class ContainerPlacementMetadataStore {

 * Writes a {@link ContainerPlacementRequestMessage} to the underlying metastore. This method should be used by external controllers
 * to issue a request to JobCoordinator
 * @param deploymentId identifier of the deployment
 * @param processorId logical id of the samza container 0,1,2
 * @param destinationHost host where the container is desired to move
 * @param requestExpiry optional per request expiry timeout for requests to cluster manager
 * @param timestamp timestamp of the request
 * @return uuid generated for the request
public UUID writeContainerPlacementRequestMessage(String deploymentId, String processorId, String destinationHost,
      Duration requestExpiry, long timestamp) {...}

 * Reads a {@link ContainerPlacementRequestMessage} from the underlying metastore
 * @param uuid uuid of the request
 * @return ContainerPlacementRequestMessage is its present
public Optional<ContainerPlacementRequestMessage> readContainerPlacementRequestMessage(UUID uuid) {...}

 * Reads a {@link ContainerPlacementResponseMessage} from the underlying metastore
 * @param uuid uuid of the request
 * @return ContainerPlacementResponseMessage is its present
public Optional<ContainerPlacementResponseMessage> readContainerPlacementResponseMessage(UUID uuid) {..}

 * Deletes a {@link ContainerPlacementRequestMessage} if present identified by the key {@code uuid}
 * @param uuid uuid of the request
public void deleteContainerPlacementRequestMessage(UUID uuid) {..}

 * Deletes a {@link ContainerPlacementResponseMessage} if present identified by the key {@code uuid}
 * @param uuid uuid of the request
public void deleteContainerPlacementResponseMessage(UUID uuid) {..}

 * Deletes both {@link ContainerPlacementRequestMessage} and {@link ContainerPlacementResponseMessage} identified by uuid
 * @param uuid uuid of request and response message
public void deleteAllContainerPlacementMessages(UUID uuid) {...}

 * Deletes all {@link ContainerPlacementMessage}
 * @param uuid uuid of the request or response message
public void deleteAllContainerPlacementMessages(UUID uuid) {..}

 * Writes a {@link ContainerPlacementResponseMessage} to the underlying metastore. 
 * This method should be used by Job Coordinator only to write responses to Container Placement Action
 * @param responseMessage response message
void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage responseMessage) {..}

