Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

ZK Data Model to support host affinity:

After the rebalancing phase, before the start of processing each stream processor will register the details of physical host on which it runs in the localityData zookeeper node. The goal here is to separate the locality information from the JobModel itself (JobModel will be used to hold the task assignments). There will be  an abstraction(an interface), which will be used to read and write locality information for different deployment models. There will be two implementations of LocalityManager viz CoordinatorStreamBasedLocalityManager to read/write container locality information for yarn and ZkLocalityManager to read/write container locality information for standalone.


Code Block
languagejava
- zkBaseRootPath/$appName-$appId-$JobName-$JobId-$stageId/
    - processors/
        - processor.000001/
            - physicalHost1processor.000002/
        ...
        - processor.00000N/
            - physicalHostN
    - jobModelVersionjobModels/
        - {$Generation.$Version}
          - JobModeljobModelVersion}
                 - JobConfiguration
      JobModelObject
           - ContainerModel-1
       localityData
               - processorId: 0
    {jobModelVersion}
                  - hostName: physicalHost1
   processor.000001/
                   - TaskModel-1
                  physicalHostInfo
           - taskName: TaskId-1
        processor.00000N/
                     - SystemStreamPartitions: [InputStream1:P1, InputStream1:P2]- physicalHostInfo
                             - changeLogPartitionId: 0
                      - TaskModel-2
                 ... 
 
            - taskName: TaskId-2
               processor.00000N/
              - systemStreamPartitions: [InputStream1:P3, InputStream1:P4]
                             - ChangeLogPartitionId: 1
                  ...
                  - ContainerModel-NphysicalHostInfo 

Local store sandboxing:

In standalone landscape, the file system location to persist the local state should be provided by the users through stream processor configuration(by defining local.store.dir configuration). The configuration `local.store.dir` is expected to be preserved across processor restarts to reuse preexisting local state. It’s expected that the stream processor java process will be configured by user to run with sufficient read/write permissions to access the local state directories created by any processor in the group. The local store file hierarchy/organization followed in samza-yarn deployment model for both high and low level API will be followed in standalone.

...

Remove coordinator stream bindings from JobModel: 

JobModel is a data access object used to represent a samza job in both yarn and standalone deployment models. With existing implementation, JobModel requires LocalityManager(which is tied to coordinator stream) to read and populate processor locality assignments. However, since zookeeper is used as JobModel persistence layer and coordinator stream doesn’t exist in standalone landscape, it’s essential to remove this LocalityManager binding from JobModel and make JobModel immutable. It will be ideal to store task to preferred host assignment as a part of job model due to the following reasons:

  • Task to locality assignment information logically belongs to the JobModel itself and it makes things simpler by persisting them together.

  • If task to preferred host assignment and JobModel are stored separately in zookeeper in  standalone, we’ll run into consistency problems between these two data sinks when performing JobModel upgrades. We’ll also lose the capability to do atomic updates of entire JobModel in zookeeper.

Any existing implementations(ClusterBasedJobCoordinator, ContainerProcessManager) which depends upon this binding for functional correctness in samza-yarn, should directly read container locality from the coordinator stream instead of getting it indirectly via JobModel.

Cleaning up ContainerModel:

 ContainerModel is a data access object used in samza for holding the task to system stream partition assignments which is generated by TaskNameGrouper implementations. ContainerModel currently has two fields(processorId and containerID) used to uniquely identify a processor in a processors group. Standalone deployment model uses processorId and Yarn deployment model uses containerId field to store the unique processorId. To achieve uniformity between the two deployment models, the proposal is to remove duplicate containerId. This will not require any operational migration.

State store restoration:

 Upon processor restart, nonexistent local stores will be restored using the same restoration sequence followed in yarn deployment model.

Container to physical host assignment:

When assigning tasks to a stream processor in a run, the stream processor to which the task was assigned in the previous run will be preferred. If the stream processor to which task was assigned in previous run is unavailable in the current run, the stream processors running on physical host of previous run will be given higher priority and favored. If both of the above two conditions are not met, then the task will be assigned to any stream processor available in the processor group.

Semantics of host affinity with ‘run.id’ 

When a samza user stops all the stream processors of a samza application and starts them again(in any order), it is considered as a new samza application run. Samza supports deployment and management of multi-stage data pipeline jobs consuming form bounded(batch) as well as unbounded(streaming) data sources. Host affinity will be supported by default in all streaming scenarios and within a same run in batching(bounded data source) scenarios. Host affinity will not supported across multiple runs when a samza job consumes only from bounded data sources(batching scenarios in beam-runner).

...