Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


Remove coordinator stream bindings from JobModel: 

JobModel is a data access object used to represent a samza job in both yarn and standalone deployment models. With existing implementation, JobModel requires LocalityManager(which is tied to coordinator stream) to read and populate processor locality assignments. However, since zookeeper is used as JobModel persistence layer and coordinator stream doesn’t exist in standalone landscape, it’s essential to remove this LocalityManager binding from JobModel and make JobModel immutable. It will be ideal to store task to preferred host assignment as a part of job model due to the following reasons:

  • Task to locality assignment information logically belongs to the JobModel itself and it makes things simpler by persisting them together.

  • If task to preferred host assignment and JobModel are stored separately in zookeeper in  standalone, we’ll run into consistency problems between these two data sinks when performing JobModel upgrades. We’ll also lose the capability to do atomic updates of entire JobModel in zookeeper.

Any existing implementations(ClusterBasedJobCoordinator, ContainerProcessManager) which depends upon this binding for functional correctness in samza-yarn, should directly read container locality from the coordinator stream instead of getting it indirectly via JobModel.

 Cleaning up ContainerModel:

 ContainerModel is a data access object used in samza for holding the task to system stream partition assignments which is generated by TaskNameGrouper implementations. ContainerModel currently has two fields(processorId and containerID) used to uniquely identify a processor in a processors group. Standalone deployment model uses processorId and Yarn deployment model uses containerId field to store the unique processorId. To achieve uniformity between the two deployment models, the proposal is to remove duplicate containerId. This will not require any operational migration.

State store restoration:

 Upon processor restart, nonexistent local stores will be restored using the same restoration sequence followed in yarn deployment model.

Container to physical host assignment:

When assigning tasks to a stream processor in a run, the stream processor to which the task was assigned in the previous run will be preferred. If the stream processor to which task was assigned in previous run is unavailable in the current run, the stream processors running on physical host of previous run will be given higher priority and favored. If both of the above two conditions are not met, then the task will be assigned to any stream processor available in the processor group.

...

Code Block
languagejava
// '+' denotes addition, '-' denotes deletion.
public interface TaskNameGrouper {
  + @deprecated
  Set<ContainerModel> group(Set<TaskModel> tasks);

  + @deprecated
  default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
    return group(tasks);
  }
  +   Set<ContainerModel> group(Set<TaskModel> currentGenerationTaskModels, List<String> currentGenerationProcessorIds, Set<ContainerModel> previousGenerationContainerModels);
}

public interface BalancingTaskNameGrouper extends TaskNameGrouper {
  + @deprecated 
  Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
}

public class ContainerModel {
  - @Deprecated
  - private final int containerId;
  private final String processorId;
  private final Map<TaskName, TaskModel> tasks;
  + // New field added denoting the physical hostname.
  + private final String hostName;

...

Code Block
languagejava
// '+' denotes addition, '-' denotes deletion.
public interface TaskNameGrouper {
 + @deprecated
 Set<ContainerModel> group(Set<TaskModel> tasks);
 + @deprecated
 default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containerIds) {
   return group(tasks);
 }
 +  Set<ContainerModel> group(Set<TaskModel> taskModels, List<String> containerIds,  LocalityManager localityManager);
}

public interface BalancingTaskNameGrouper extends TaskNameGrouper {
 + @deprecated
 Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
}

...