Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: UNDER DISCUSSION

...

JobModel is the data model in samza that logically represents a samza job. The JobModel hierarchy is that samza jobs have one to many containers(ContainerModel), and each container has one to many tasks(TaskModel). Each data model contains relevant information, such as logical id, partition information, etc. Existing host affinity implementation in yarn is accomplished through the following two phases:

  • JobModel generation phase: ApplicationMaster(JobCoordinator) in yarn deployment model generates the Job model(optimal processor to task assignment) and persists the JobModel in coordinator stream(kafka topic) associated with the samza job. 
  • ContainerAllocator phase(which : This happens after the JobModel generation ) phase and schedules each processor to run on a physical host by coordinating with the underlying ClusterManager and orchestrates the execution of the processor.

...

  • Number of processors is a static configuration in yarn deployment model and a job restart is required to change the number of processors. However, an addition/deletion of a processor to a processors group in standalone is quite common and an expected behavior.
  • A processor is assigned a physical host by ContainerAllocator after the JobModel generation phase in yarn. Physical host in which a stream processor is going to run is known before the JobModel generation phase in standalone(ContainerAllocator phase is not needed in standalone to associate the processor with the physical host).

Zookeeper is used in standalone for coordination between the stream processors of a stream application. Amongst all the available processors of a stream application, a single processor will be elected as a leader in standalone. In the standalone deployment model, the JobModel is stored in zookeeper. The leader will generate the JobModel and propagate the JobModel to all the other processors in the group. Distributed barrier in zookeeper will be used to block the message processing until the latest JobModel is picked by all the processors in the group. 

Overall high level changes:

Overall high level changes:

  • Deprecate the different existing flavors of the TaskNameGrouper implementations(each one of them primarily grouping TaskModel into containers) and provide a single unified contract. The common layer between yarn and standalone model is the TaskNameGrouper abstraction(which is part of JobModel generation phase) which will encapsulate the host aware task assignment to processors. In the existing implementation, only the processor locality will be used to generate task to processor assignments. In the new model, both the last reported task locality and processor locality of a stream application will be used when generating task to processor assignments in both the yarn and standalone models
  • The common layer between yarn and standalone model is the TaskNameGrouper abstraction(which is part of JobModel generation phase) which will encapsulate the host aware task assignment to processors.
  • Deprecate different flavors of existing TaskNameGrouper implementations(each one of them primarily grouping TaskModel into containers) and provide a single unified contract which is agnostic and supported in different deployment models(standalone/yarn).
  • Introduction of MetaDataStore abstraction to store and retrieve processor and task locality for different deployment models in appropriate storage layers. Kafka be will be used as locality storage layer for yarn and zookeeper will be used as storage layer for standalone.In the existing implementation, only the processor locality will be used to generate task to processor assignments. In the new model, both the last reported task locality and processor locality of a stream application will be used when generating task to processor assignments in both the yarn and standalone modelsbe will be used as locality storage layer for yarn and zookeeper will be used as storage layer for standalone.
  • A new abstraction LocationIdProvider is introduced as a part of this change to generate locationId for a physical execution environment. Here’re few reasons for introducing a new abstraction to generate locationId rather than using processorID as locationId.
    • LocationId denotes the physical execution environment required to run a stream processor. LocationId is used to uniquely identify a environment amongst all available physical execution environments. ProcessorId is used to uniquely identify a stream processor in a processors group. ProcessorId and localityId are two different, logically orthogonal concepts which cannot be unified.
    • Standalone model supports running multiple stream processors from a single JVM on a physical host. If a stream processor(P1) running a physical host(H) dies, it’s optimal to redistribute the tasks of the dead processor(P1) to the other processors running on the host(H). If processorId is used as localityId, this optimal generation cannot be achieved(since task to localityId association is not maintained).
    • In case of LinkedIn execution environment, locationId will be a composite key comprised of sliceID and sliceInstanceId. In case of kubernetes, locationId will be containerId(which will be obtained through POD API).

 

Standalone host affinityImage Removed

...

    • maintained).
    • In case of LinkedIn execution environment, locationId will be a composite key comprised of sliceID and sliceInstanceId. In case of kubernetes, locationId will be containerId(which will be obtained through POD API).

 

Standalone host affinityImage Added



Image Added

Zookeeper is used in standalone for coordination between the stream processors of a stream application. Amongst all the available processors of a stream application, a single processor will be elected as a leader in standalone. In the standalone deployment model, the JobModel is stored in zookeeper. The leader will generate the JobModel and propagate the JobModel to all the other processors in the group. Distributed barrier in zookeeper will be used to block the message processing until the latest JobModel is picked by all the processors in the group. 


ZK Data Model to support host affinity:

...

Code Block
languagejava
// '+' denotes addition, '-' denotes deletion.
public interface TaskNameGrouper {
  + @Deprecated
  Set<ContainerModel> group(Set<TaskModel> tasks);

  + @Deprecated
  default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
    return group(tasks);
  }
  /**
   * @param taskModels, represents the taskModels generated by the SSPGrouper.
   * @param taskLocality, taskName to locationId mapping of the previous generation. 
   * @param processorLocality, processorId to locationId mapping.
   * @return the optimal containerModels generated.   
   */  
  + Set<ContainerModel> group(Set<TaskModel> taskModels, Map<String, String> taskLocality, Map<String, String> processorLocality);
}

+ @Deprecated
public interface BalancingTaskNameGrouper extends TaskNameGrouper {
  + @Deprecated 
  Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
}

public class ContainerModel {
  - @Deprecated
  - private final int containerId;
  private final String processorId;
  private final Map<TaskName, TaskModel> tasks;
  + // New field added denoting the physical locationId.
  + private final String locationId;
}

+public interface LocationIdProvider {
   +  // In case of containerized environments, LocationId is a combination of multiple fields (sliceId, containerId, hostname) instead of simple physical hostname,
   +  // This will be provided by the execution environment of the processor.
   + String getLocationId();
}


+ public interface MetadataStore {
   // returns the processorId to LocationId mapping.
  + public Map<String, String> readProcessorLocality();

  // returns the taskName to LocationId mapping.
  + public Map<String, String> readTaskLocality();
 
  // writes the provided processordId to host mapping to underlying storage.
  + public boolean writeProcessorLocality(Map<String, String> processorLocality);
}

...