Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: UNDER DISCUSSION Accepted

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201802.mbox/%3CCAFvExu1GHnphidP_wRriMey-T7Hss4AqAxscOoBFUHuMR5sq%3DQ%40mail.gmail.com%3E

...

Code Block
languagejava
// '+' denotes addition, '-' denotes deletion.
public interface TaskNameGrouper {
  + @Deprecated
  Set<ContainerModel> group(Set<TaskModel> tasks);

  + @Deprecated
  default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
    return group(tasks);
  }
  /**
   * @param taskModels, represents the taskModels generated by the SSPGrouper.
   * @param taskLocality, taskName to locationId mapping of the previous generation. 
   * @param processorLocality, processorId to locationId mapping.
   * @return the containerModels generated.   
   */  
  + Set<ContainerModel> group(Set<TaskModel> taskModels, Map<String, String> taskLocality, Map<String, String> processorLocality);
}

+ @Deprecated
public interface BalancingTaskNameGrouper extends TaskNameGrouper {
  + @Deprecated 
  Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
}

public class ContainerModel {
  - @Deprecated
  - private final int containerId;
  private final String processorId;
  private final Map<TaskName, TaskModel> tasks;
  + // New field added denoting the physical locationId.
  + private final String locationId;
}

+public interface LocationIdProvider {
   +  // In case of containerized environments, LocationId is a combination of multiple fields (sliceId, containerId, hostname) instead of simple physical hostname,
   +  // This will be provided by the execution environment of the processor.
   + String getLocationId();
}


+ public interface MetadataStore {
  + // returnsGets the processorId to LocationId mapping value associated with the specified {@code key}.
  + public Map<String, String> readProcessorLocality(byte[] get(byte[] key);
  
  + // returnsUpdates the taskNamemapping toof LocationIdthe mapping.
  + public Map<String, String> readTaskLocality(specified key-value pair; Associates the specified {@code key} with the specified {@code value} 
  + void put(byte[] key, byte[] value);
 
  + // writes Deletes the mapping for the provided processordId to host mapping to underlying storage specified {@code key} from this store (if such mapping exists).
  + publicvoid boolean writeProcessorLocality(Map<String, String> processorLocalityremove(byte[] key);
}

LocationId reported by the live processors of the group and last reported task locality will be used to calculate the task to container assignment in standalone. Preferred host mapping will be used for task and processor locality in case of yarn. Any new task/processor for which grouping in unknown(unavailable in preferred host/task-locality in underlying storage layer), will be treated as any_host during assignment.

...

  • Modify the existing interfaces and classes as per the proposed solution.

  • Add unit tests to test and validate compatibility and functional correctness. 

  • Add a integration test tests in samza standalone samples to verify the host affinity feature

  • Add an integration test to verify that there are minimal partition movements during rolling upgrade.

  • Verify compatibility - Jackson, a java serialization/deserialization library is used to convert data model objects in samza into JSON and back. After removing containerId field from ContainerModel, it should be verified that deserialization of old ContainerModel data with new ContainerModel spec works. 

  • Some TaskNameGrouper implementations assumes the comparability of integer containerId present in ContainerModel(for instance - GroupByContainerCount, a TaskNameGrouper implementation). Modify existing TaskNameGrouper implementations to take in collection of string processorId’s, as opposed to assuming that containerId is integer and lies within [0, N-1] interval(without incurring any change in functionality).

...