Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In standalone, locality information of the stream processors will be stored seperately from the JobModel. JobModel will be used to hold just the task assignments(processor to task assignment and task to system stream partition assignment) in standalone. In standalone, each stream processor during it's startup phase will store the physical host on which it runs from into an appropriate zookeeper locality node(This is synonymous to existing behavior in yarn). MetadataStore abstraction will be used to read and write stream processor locality information for different deployment models in appropriate storage layers. There will be two implementations of MetadataStore viz CoordinatorStreamBasedMetadataStore to read/write processor locality information into coordinator stream(a kafka topic) for yarn and ZkMetadataStore to read/write processor locality information in zookeeper for standalone. Local state of the tasks will be persisted in a directory(local.store.dir) provided through configuration by each processor.

Code Block
languagejava
- zkBaseRootPath/$appName-$appId-$JobName-$JobId-$stageId/
    - processors/
        - processor.000001/
            locatoinId1(stored as value in processor zookeeper node)
        - processor.000002/
            locationId2(stored as value in processor zookeeper node)
        ...
        - processor.00000N/
            locationIdN(stored as value in processor zookeeper node)
    - jobModels/
        - {version}
            JobModelObject(stored as value in jobmodels version zookeeper node)
    - barriers/
        - {version}
            barrier_state(stored as value in barriers version zookeeper node)
    - localityData
        - task01/
           locationId1(stored as value in task zookeeper node)
        - task02/
           locationId2(stored as value in task zookeeper node)
        ... 
        - task0N/
           locationIdN(stored as value in task zookeeper node)

...

Remove coordinator stream bindings from JobModel: 

JobModel is a data access object used to represent a samza job in both yarn and standalone deployment models. With existing implementation, JobModel requires LocalityManager(which is tied to coordinator stream) to read and populate processor locality assignments. However, since zookeeper is used as JobModel persistence layer and coordinator stream doesn’t exist in standalone landscape, it’s essential to remove this LocalityManager binding from JobModel and make JobModel immutable. Any existing implementations(ClusterBasedJobCoordinator, ContainerProcessManager) which depends upon this binding for functional correctness in samza-yarn, should directly read container locality from the coordinator stream instead of getting it indirectly via JobModel.

Cleaning up ContainerModel:

 ContainerModel is a data access object used in samza for holding the task to system stream partition assignments which is generated by TaskNameGrouper implementations. ContainerModel currently has two fields(processorId and containerID) used to uniquely identify a processor in a processors group. Standalone deployment model uses processorId and Yarn deployment model uses containerId field to store the unique processorId. To achieve uniformity between the two deployment models, the proposal is to remove duplicate containerId. This will not require any operational migration.

State store restoration:

 Upon processor restart, nonexistent local stores will be restored using the same restoration sequence followed in yarn deployment model.

Container to physical host assignment:

When assigning tasks to a stream processor in a run, the stream processor to which the task was assigned in the previous run will be preferred. If the stream processor to which task was assigned in previous run is unavailable in the current run, the stream processors running on physical host of previous run will be given higher priority and favored. If both of the above two conditions are not met, then the task will be assigned to any stream processor available in the processor group.

Semantics of host affinity with ‘run.id’ 

Strategy The strategy to determine continuation of states within a samza application if the state from the previous application run continue in the current run varies for different deployment environments and input sources. The semantic meaning of run.id is the continuation of states(viz state-store, checkpoint, config, task-assignments) across application restarts. Host affinity will be supported only within the same run.id of a application.

...

Code Block
languagejava
// '+' denotes addition, '-' denotes deletion.
public interface TaskNameGrouper {
  + @Deprecated
  Set<ContainerModel> group(Set<TaskModel> tasks);

  + @Deprecated
  default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
    return group(tasks);
  }
  /**
   * @param taskModels, represents the taskModels generated by the SSPGrouper.
   * @param taskLocality, taskName to locationId mapping of the previous generation. 
   * @param processorLocality, processorId to locationId mapping.
   * @return the optimal containerModels generated.   
   */  
  + Set<ContainerModel> group(Set<TaskModel> taskModels, Map<String, String> taskLocality, Map<String, String> processorLocality);
}

+ @Deprecated
public interface BalancingTaskNameGrouper extends TaskNameGrouper {
  + @Deprecated 
  Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
}

public class ContainerModel {
  - @Deprecated
  - private final int containerId;
  private final String processorId;
  private final Map<TaskName, TaskModel> tasks;
  + // New field added denoting the physical locationId.
  + private final String locationId;
}

+public interface LocationIdProvider {
   +  // In case of containerized environments, LocationId is a combination of multiple fields (sliceId, containerId, hostname) instead of simple physical hostname,
   +  // This will be provided by the execution environment of the processor.
   + String getLocationId();
}


+ public interface MetadataStore {
   // returns the processorId to LocationId mapping.
  + public Map<String, String> readProcessorLocality();

  // returns the taskName to LocationId mapping.
  + public Map<String, String> readTaskLocality();
 
  // writes the provided processordId to host mapping to underlying storage.
  + public boolean writeProcessorLocality(Map<String, String> processorLocality);
}

LocationId registered reported by the live processors of the group and last reported task locality of previous generation will be used to calculate the task to container assignment of current generation in standalone. Preferred host mapping will be used for task and processor locality in case of yarn. Any new task/processor for which grouping in unknown(unavailable in preferred host/task-locality in underlying storage layer), will be treated as any_host during assignment.

...