Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Existing implementation of host affinity in yarn is accomplished through the following two phases:
A. ApplicationMaster(JobCoordinator) in yarn deployment model generates the Job model(optimal processor to task assignment) and saves the JobModel in coordinator stream(kafka topic) of samza job.
B. ContainerAllocator phase(which happens after JobModel generation) requests physical host(resources) from the cluster manager to facilitate execution of processors in JobModel.

...

  • Number of processors is a static configuration in yarn deployment model and a job restart is required to change the number of processors. However, an addition/deletion of a processor to a processors group in standalone is quite common and an expected behavior.
  • Existing generators discard the task to physical host assignment when generating the JobModel and only uses container to preferred host assignment. However, for standalone it’s essential to consider this detail(task to physical host assignment) between successive job model generations to generate optimal task to processor assignment. For instance, let’s assume stream processors P1, P2 runs on host H1 and processor P3 runs on host H3. If P1 dies, it is optimal to assign some of the tasks processed by P1 to P2. If previous task to physical host assignment is not taken into account when generating JobModel, this cannot be achieved.
  • Processor A stream processor is assigned a physical host to run after the JobModel generation in yarn. Physical host in which a stream processor is going to run from is known before the JobModel generation phase in standalone.
  • In an ideal world, a any TaskNameGrouper should be usable interchangeably between yarn and standalone deployment models. Currently only a subset of TaskNameGrouper’s usable in yarn  are supported in standalone.
  • Zookeeper will be used as locality store in standalone and coordinator stream(kafka) is used as locality store in yarn.

...

  • The common layer between yarn and standalone model is the TaskNameGrouper abstraction(which is part of JobModel generation phase) which will encapsulate the host aware task assignment to processors.
  • Deprecate different flavors of existing TaskNameGrouper implementations(each one of them primarily grouping TaskModel into containers) and provide a single unified contract which is agnostic and supported in different deployment models(standalone/yarn).
  • Introduction of MetaDataStore abstraction which will be used to store and retrieve locality information for different deployment models in appropriate storage layers(Kafka be will be used as locality storage layer for yarn and zookeeper will be used as storage layer in standalone).
  • When container count changes between runs, task to preferred host mapping Utilizing the task to preferred host mapping of a stream application along with processor to preferred host mapping when generating the ContainerModels in both yarn and standalone modes.

If an optimal assignment for each task to a particular processor is generated in the JobModel as part of the leader in a stateful processors group, each follower will just pick up their assignments from job model after the rebalance phase and start processing(similar to non-stateful jobs). The goal is to guarantee that the optimal assignment happens which minimizes the task movement between the processors. Local state of the tasks will be persisted in a directory(local.store.dir) provided through configuration by each processor.

...