Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

If the leader of the stateful processors group generates an optimal, host-aware task assignment to processors within the JobModel, each follower will pick up their appropriate assignments from the JobModel and begin the processing after the rebalance phase(similar to non-stateful jobs). The goal is to guarantee that the task assignment to processors is optimal and minimizing the tasks movement between the processors. Local state of the tasks will be persisted in a directory(local.store.dir) provided through configuration by each processor.

Standalone host affinity




ZK Data Model to support host affinity:

After the rebalancing phase, before the start of processing each stream processor will register the details of physical host on which it runs in the localityData zookeeper node. The goal here is to separate the locality information from the JobModel itself (JobModel will be used to hold the task assignments). MetadataStore abstraction will be used to read and write locality information for different deployment models in appropriate storage layers. There will be two implementations of MetadataStore viz CoordinatorStreamBasedMetadataStore to read/write container locality information for yarn and ZkMetadataStore to read/write container locality information in zookeeper for standalone. In case of standalone, last known physical host in which each  samza task had run will be stored in zookeeper, which will then be used to assign tasks to stream processors. Stream processor will update the task locality of the tasks assigned to it before it begins processing(This is synonymous to behaviour in yarn, where locality is updated in SamzaContainer as a part of startup sequence). Local state of the tasks will be persisted in a directory(local.store.dir) provided through configuration by each processor.

Code Block
languagejava
- zkBaseRootPath/$appName-$appId-$JobName-$JobId-$stageId/
    - processors/
        - processor.000001/
            locatoinId1(stored as value in processor zookeeper node)
        - processor.000002/
            locationId2(stored as value in processor zookeeper node)
        ...
        - processor.00000N/
            locationIdN(stored as value in processor zookeeper node)
    - jobModels/
        - {version}
            JobModelObject(stored as value in jobmodels version zookeeper node)
    - barriers/
        - {version}
            barrier_state(stored as value in barriers version zookeeper node)
    - localityData
        - task01/
           locationId1(stored as value in task zookeeper node)
        - task02/
           locationId2(stored as value in task zookeeper node)
        ... 
        - task0N/
           locationIdN(stored as value in task zookeeper node)

...