...
Code Block | ||
---|---|---|
| ||
- zkBaseRootPath/$appName-$appId-$JobName-$JobId-$stageId/ - processors/ - processor.000001/ - locatoinId1(stored as value in processor zookeeper node) - processor.000002/ - locationId2(stored as value in processor zookeeper node) ... - processor.00000N/ - locationIdN(stored as value in processor zookeeper node) - jobModels/ - {jobModelVersion} JobModelObject - localityData - task01/ - locationId1(stored as value in task zookeeper node) - task02/ - locationId2(stored as value in task zookeeper node) ... - task0N/ - locationIdN(stored as value in task -zookeeper locationIdNnode) |
Local store sandboxing:
In standalone landscape, the file system location to persist the local state should be provided by the users through stream processor configuration(by defining local.store.dir configuration). The configuration `local.store.dir` is expected to be preserved across processor restarts to reuse preexisting local state. It’s expected that the stream processor java process will be configured by user to run with sufficient read/write permissions to access the local state directories created by any processor in the group. The local store file hierarchy/organization followed in samza-yarn deployment model for both high and low level API will be followed in standalone.
...
Code Block | ||
---|---|---|
| ||
// '+' denotes addition, '-' denotes deletion. public interface TaskNameGrouper { + @Deprecated Set<ContainerModel> group(Set<TaskModel> tasks); + @Deprecated default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) { return group(tasks); } + Set<ContainerModel> group(Set<TaskModel> taskModels, Map<String, LocationId> taskLocality, Map<String, LocationId> processorsLocalityprocessorLocality); } + @Deprecated public interface BalancingTaskNameGrouper extends TaskNameGrouper { + @Deprecated Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager); } public class ContainerModel { - @Deprecated - private final int containerId; private final String processorId; private final Map<TaskName, TaskModel> tasks; + // New field added denoting the physical locationId. + private final String locationId; } +public interface LocationIdProvider { + // In case of containerized environments, LocationId is a combination of multiple fields (sliceId, containerId, hostname) instead of simple physical hostname, + // Using a class to represent that, rather than a primitive string. This will be provided by execution environment. + LocationId getLocationId(); } + public interface LocalityManager { // returns the processorId to LocationId mapping. + public Map<String, LocationId> readProcessorLocality(); // returns the taskName to LocationId mapping. + public Map<String, LocationId> readTaskLocality(); // writes the provided processordId to host mapping to underlying storage. + public boolean writeProcessorLocality(Map<String, LocationId> processorLocality); } |
For yarn, preferred host mapping in the coordinator stream will be used for locality of processors and tasks unchanged between successive generations. If tasks or processor added in a run, in yarn it will be assigned to any new host.
Here are few reasons supporting the modification of TaskNameGrouper interface and removing LocalityManager from interface methods:
...