Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

After the rebalancing phase, before the start of processing each stream processor will register the details of physical host on which it runs in the localityData zookeeper node. The goal here is to separate the locality information from the JobModel itself (JobModel will be used to hold the task assignments). LocalityManagerMetadataStore abstraction will be used to read and write locality information for different deployment models in appropriate storage layers. There will be two implementations of LocalityManager MetadataStore viz CoordinatorStreamBasedLocalityManager CoordinatorStreamBasedMetadataStore to read/write container locality information for yarn and ZkLocalityManager ZkMetadataStore to read/write container locality information in zookeeper for standalone. In case of standalone, last known physical host in which each  samza task had run will be stored in zookeeper, which will then be used to assign tasks to stream processors. Stream processor will update the task locality of the tasks assigned to it before it begins processing(This is synonymous to behaviour in yarn, where locality is updated in SamzaContainer as a part of startup sequence).

...

Code Block
languagejava
// '+' denotes addition, '-' denotes deletion.
public interface TaskNameGrouper {
  + @Deprecated
  Set<ContainerModel> group(Set<TaskModel> tasks);

  + @Deprecated
  default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
    return group(tasks);
  }
  /**
   * @param taskModels, represents the taskModels generated by the SSPGrouper.
   * @param taskLocality, taskName to locationId mapping of the previous generation. 
   * @param processorLocality, processorId to locationId mapping.
   * @return the optimal containerModels generated.   
   */  
  + Set<ContainerModel> group(Set<TaskModel> taskModels, Map<String, String> taskLocality, Map<String, String> processorLocality);
}

+ @Deprecated
public interface BalancingTaskNameGrouper extends TaskNameGrouper {
  + @Deprecated 
  Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
}

public class ContainerModel {
  - @Deprecated
  - private final int containerId;
  private final String processorId;
  private final Map<TaskName, TaskModel> tasks;
  + // New field added denoting the physical locationId.
  + private final String locationId;
}

+public interface LocationIdProvider {
   +  // In case of containerized environments, LocationId is a combination of multiple fields (sliceId, containerId, hostname) instead of simple physical hostname,
   +  // This will be provided by the execution environment of the processor.
   + String getLocationId();
}


+ public interface LocalityManagerMetadataStore {
   // returns the processorId to LocationId mapping.
  + public Map<String, String> readProcessorLocality();

  // returns the taskName to LocationId mapping.
  + public Map<String, String> readTaskLocality();
 
  // writes the provided processordId to host mapping to underlying storage.
  + public boolean writeProcessorLocality(Map<String, String> processorLocality);
}

For yarn, preferred host mapping in the coordinator stream LocationId registered by live processors and task locality of previous generation will be used to calculate the assignment of current generation in standalone. Preferred host mapping will be used for task and processor locality of processors and tasks unchanged between successive generations. If tasks or processor added in a run, in yarn it will be assigned to any new host.   in case of yarn. Any new task/processor for which grouping in unknown(unavailable in preferred host/task-locality in underlying storage layer), will be treated as any_host during assignment.

Here are few reasons supporting the modification of TaskNameGrouper interface and removing LocalityManager from interface methods:

...