Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
// '+' denotes addition, '-' denotes deletion.
public interface TaskNameGrouper {
  + @Deprecated
  Set<ContainerModel> group(Set<TaskModel> tasks);

  + @Deprecated
  default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
    return group(tasks);
  }
  /**
   * @param taskModels, represents the taskModels generated by the SSPGrouper.
   * @param taskLocality, taskName to locationId mapping of the previous generation. 
   * @param processorLocality, processorId to locationId mapping.
   * @return the containerModels generated.   
   */  
  + Set<ContainerModel> group(Set<TaskModel> taskModels, Map<String, String> taskLocality, Map<String, String> processorLocality);
}

+ @Deprecated
public interface BalancingTaskNameGrouper extends TaskNameGrouper {
  + @Deprecated 
  Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
}

public class ContainerModel {
  - @Deprecated
  - private final int containerId;
  private final String processorId;
  private final Map<TaskName, TaskModel> tasks;
  + // New field added denoting the physical locationId.
  + private final String locationId;
}

+public interface LocationIdProvider {
   +  // In case of containerized environments, LocationId is a combination of multiple fields (sliceId, containerId, hostname) instead of simple physical hostname,
   +  // This will be provided by the execution environment of the processor.
   + String getLocationId();
}


+ public interface MetadataStore {
  + // returnsGets the processorId to LocationId mappingvalue associated with the specified {@code key}.
  + public Map<String, String> readProcessorLocality(byte[] get(byte[] key);
  
  + // returnsUpdates the taskNamemapping toof LocationIdthe mapping.
  + public Map<String, String> readTaskLocality(specified key-value pair; Associates the specified {@code key} with the specified {@code value} 
  + void put(byte[] key, byte[] value);
 
  + // writesDeletes the mapping for the provided processordId to host mapping to underlying storagespecified {@code key} from this store (if such mapping exists).
  + publicvoid boolean writeProcessorLocality(Map<String, String> processorLocalityremove(byte[] key);
}

LocationId reported by the live processors of the group and last reported task locality will be used to calculate the task to container assignment in standalone. Preferred host mapping will be used for task and processor locality in case of yarn. Any new task/processor for which grouping in unknown(unavailable in preferred host/task-locality in underlying storage layer), will be treated as any_host during assignment.

...