Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
- zkBaseRootPath/$appName-$appId-$JobName-$JobId-$stageId/
    - processors/
        - processor.000001/
        - processor.000002/
        ...
        - processor.00000N/
    - jobModels/
        - {jobModelVersion}
            JobModelObject
    - localityData
        - task01/(dataNode)locationId
           - locationId1
        - task02/(dataNode)locationId
           - locationId2
        ... 
        - task0N/(dataNode)locationId
           - locationIdN

Local store sandboxing:

In standalone landscape, the file system location to persist the local state should be provided by the users through stream processor configuration(by defining local.store.dir configuration). The configuration `local.store.dir` is expected to be preserved across processor restarts to reuse preexisting local state. It’s expected that the stream processor java process will be configured by user to run with sufficient read/write permissions to access the local state directories created by any processor in the group. The local store file hierarchy/organization followed in samza-yarn deployment model for both high and low level API will be followed in standalone.

...

Remove coordinator stream bindings from JobModel: 

JobModel is a data access object used to represent a samza job in both yarn and standalone deployment models. With existing implementation, JobModel requires LocalityManager(which is tied to coordinator stream) to read and populate processor locality assignments. However, since zookeeper is used as JobModel persistence layer and coordinator stream doesn’t exist in standalone landscape, it’s essential to remove this LocalityManager binding from JobModel and make JobModel immutable. Any existing implementations(ClusterBasedJobCoordinator, ContainerProcessManager) which depends upon this binding for functional correctness in samza-yarn, should directly read container locality from the coordinator stream instead of getting it indirectly via JobModel.

Cleaning up ContainerModel:

 ContainerModel is a data access object used in samza for holding the task to system stream partition assignments which is generated by TaskNameGrouper implementations. ContainerModel currently has two fields(processorId and containerID) used to uniquely identify a processor in a processors group. Standalone deployment model uses processorId and Yarn deployment model uses containerId field to store the unique processorId. To achieve uniformity between the two deployment models, the proposal is to remove duplicate containerId. This will not require any operational migration.

State store restoration:

 Upon processor restart, nonexistent local stores will be restored using the same restoration sequence followed in yarn deployment model.

Container to physical host assignment:

When assigning tasks to a stream processor in a run, the stream processor to which the task was assigned in the previous run will be preferred. If the stream processor to which task was assigned in previous run is unavailable in the current run, the stream processors running on physical host of previous run will be given higher priority and favored. If both of the above two conditions are not met, then the task will be assigned to any stream processor available in the processor group.

Semantics of host affinity with ‘run.id’ 

When a samza user stops all the stream processors of a samza application and starts them again(in any order), it is considered as a new samza application run. Strategy to determine continuation of states within a samza application varies for different deployment environments and input sources. The semantic meaning of run.id is the continuation of states(viz state-store, checkpoint, config, task-assignments). Samza supports deployment and management of multi-stage data pipeline jobs consuming form bounded(batch) as well as unbounded(streaming) data sources. Host affinity will be supported by default in all streaming scenarios and within a same run in batching(bounded data source) scenarios. Host affinity will not supported across multiple runs when a samza job consumes only from bounded data sources(batching scenarios in beam-runner)only within the same run.id of a application.

Public Interfaces

Code Block
languagejava
// '+' denotes addition, '-' denotes deletion.
public interface TaskNameGrouper {
  + @Deprecated
  Set<ContainerModel> group(Set<TaskModel> tasks);

  + @Deprecated
  default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
    return group(tasks);
  }
  +   Set<ContainerModel> group(Set<TaskModel> currentGenerationTaskModels, List<String> currentGenerationProcessorIds, Set<ContainerModel> previousGenerationContainerModels);
}

+ @Deprecated
public interface BalancingTaskNameGrouper extends TaskNameGrouper {
  + @Deprecated 
  Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
}

public class ContainerModel {
  - @Deprecated
  - private final int containerId;
  private final String processorId;
  private final Map<TaskName, TaskModel> tasks;
  + // New field added denoting the physical locationId.
  + private final String locationId;
}

+public interface LocationIdProvider {
   LocationId getLocationId();
}

+ public class LocalityInfo { 
+  // In case of containerized environments, LocationId is a combination of multiple fields (sliceId, containerId, hostname) instead of simple physical hostname,
+  // Using a class to represent that, rather than a primitive string. This will be provided by execution environment.
+  private Map<String, LocationId> taskLocality;
+  public Map<String, LocationId> getLocality();  
}

+ public interface LocalityManager {
   // returns processorId to host mapping.
  + public LocalityInfo readProcessorLocality();
  // writes the provided processordId to host mapping to underlying storage.
  + public boolean writeProcessorLocality(LocalityInfo localityInfo);
}

...