Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
- zkBaseRootPath/$appName-$appId-$JobName-$JobId-$stageId/
    - processors/
        - processor.000001/
            - locatoinId1(stored as value in processor zookeeper node)
        - processor.000002/
            - locationId2(stored as value in processor zookeeper node)
        ...
        - processor.00000N/
            - locationIdN(stored as value in processor zookeeper node)
    - jobModels/
        - {jobModelVersion}
            JobModelObject
    - localityData
        - task01/
           - locationId1(stored as value in task zookeeper node)
        - task02/
           - locationId2(stored as value in task zookeeper node)
        ... 
        - task0N/
           - locationIdN(stored as value in task -zookeeper locationIdNnode)

Local store sandboxing:

In standalone landscape, the file system location to persist the local state should be provided by the users through stream processor configuration(by defining local.store.dir configuration). The configuration `local.store.dir` is expected to be preserved across processor restarts to reuse preexisting local state. It’s expected that the stream processor java process will be configured by user to run with sufficient read/write permissions to access the local state directories created by any processor in the group. The local store file hierarchy/organization followed in samza-yarn deployment model for both high and low level API will be followed in standalone.

...

Code Block
languagejava
// '+' denotes addition, '-' denotes deletion.
public interface TaskNameGrouper {
  + @Deprecated
  Set<ContainerModel> group(Set<TaskModel> tasks);

  + @Deprecated
  default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
    return group(tasks);
  }
  +   Set<ContainerModel> group(Set<TaskModel> taskModels, Map<String, LocationId> taskLocality, Map<String, LocationId> processorsLocalityprocessorLocality);
}

+ @Deprecated
public interface BalancingTaskNameGrouper extends TaskNameGrouper {
  + @Deprecated 
  Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
}

public class ContainerModel {
  - @Deprecated
  - private final int containerId;
  private final String processorId;
  private final Map<TaskName, TaskModel> tasks;
  + // New field added denoting the physical locationId.
  + private final String locationId;
}

+public interface LocationIdProvider {
   +  // In case of containerized environments, LocationId is a combination of multiple fields (sliceId, containerId, hostname) instead of simple physical hostname,
   +  // Using a class to represent that, rather than a primitive string. This will be provided by execution environment.
   + LocationId getLocationId();
}

+ public interface LocalityManager {
   // returns the processorId to LocationId mapping.
  + public  Map<String, LocationId>  readProcessorLocality();

  // returns the taskName to LocationId mapping.
  + public Map<String, LocationId> readTaskLocality();
 
  // writes the provided processordId to host mapping to underlying storage.
  + public boolean writeProcessorLocality(Map<String, LocationId> processorLocality);
}

For yarn, preferred host mapping in the coordinator stream will be used for locality of processors and tasks unchanged between successive generations. If tasks or processor added in a run, in yarn it will be assigned to any new host.   

Here are few reasons supporting the modification of TaskNameGrouper interface and removing LocalityManager from interface methods:

...