Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Zookeeper is used in standalone for coordination between the processors of a stream application. Amongst all the available processors of a stream application, a single processor will be elected as a leader in standalone. The leader will generate the JobModel and propagate the JobModel to all the other processors in the group. Distributed barrier in zookeeper will be used to block the message processing until the latest JobModel is picked by all the processors in the group.

...

  • Number of processors is a static configuration in yarn deployment model and a job restart is required to change the number of processors. However, an addition/deletion of a processor to a processors group in standalone is quite common and an expected behavior.
  • Existing generators discard the task to physical host assignment when generating the JobModel and only uses container to preferred host assignment. However, for standalone it’s essential to consider this detail(task to physical host assignment) between successive job model generations to generate optimal task to processor assignment. For instance, let’s assume stream processors P1, P2 runs on host H1 and processor P3 runs on host H3. If P1 dies, it is optimal to assign some of the tasks processed by P1 to P2. If previous task to physical host assignment is not taken into account when generating JobModel, this cannot be achieved.
  • A stream processor is assigned a physical host to run after the JobModel generation in yarn. Physical host in which a stream processor is going to run is known before the JobModel generation phase in standalone(hence a separate ContainerAllocator phase is not required in standalone).
  • In an ideal world, any TaskNameGrouper should be usable interchangeably between yarn and standalone deployment models. Currently only a subset of TaskNameGrouper’s usable in yarn  are supported in standalone.
  • Zookeeper will be used as locality store in standalone and coordinator stream(kafka) is used as locality store in yarn.

...

  • The common layer between yarn and standalone model is the TaskNameGrouper abstraction(which is part of JobModel generation phase) which will encapsulate the host aware task assignment to processors.
  • Deprecate different flavors of existing TaskNameGrouper implementations(each one of them primarily grouping TaskModel into containers) and provide a single unified contract which is agnostic and supported in different deployment models(standalone/yarn).
  • Introduction of MetaDataStore abstraction to store and retrieve processor and task locality for different deployment models in appropriate storage layers. Kafka be will be used as locality storage layer for yarn and zookeeper will be used as storage layer for standalone.
  • In the existing implementation, only the processor locality will be used to generate task to processor assignments. In the new model, both the last reported task locality and processor locality of a stream application will be used when generating task to processor assignments in both the yarn and standalone models.

If an optimal assignment for each task to a particular processor is generated in the JobModel as part of the leader in a stateful processors groupthe leader of the stateful processors group generates an optimal, host-aware task assignment to processors within the JobModel, each follower will just pick up their appropriate assignments from job model the JobModel and begin the processing after the rebalance phase and start processing(similar to non-stateful jobs). The goal is to guarantee that the optimal assignment happens which minimizes the task task assignment to processors is optimal and minimizing the tasks movement between the processors. Local state of the tasks will be persisted in a directory(local.store.dir) provided through configuration by each processor.

...