Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. In the embedded samza library model, users are expected to perform manual garbage collection of unused local state stores(to reduce the disk footprint) on nodes.

Proposed Changes

JobModel is the data model which represents a samza job. The hierarchy for JobModel is that jobs have containers, and containers have tasks. Each data model contains relevant information, such as an id, partition information, etc. Zookeeper is used as JobModel store in standalone and coordinator stream(kafka topic) is used as JobModel store in yarn.

In  In existing implementation, host affinity in yarn is accomplished via two phases:
A. ApplicationMaster(JobCoordinator) in yarn deployment model generates the Job model(optimal processor to task assignment) and saves the JobModel in coordinator stream(kafka topic).
B. ContainerAllocator phase(which happens after JobModel generation, ) requests the resources from physical host(resources) from the cluster manager for each processor to facilitate execution of processors in JobModel.

Zookeeper is used in standalone for coordination amongst the processors of a stream application. Amongst all the available processors of a stream application, a single processor will be chosen as a leader. The leader will generate the JobModel and propagate the JobModel to all other processors in the group. Distributed barrier in zookeeper will be used to block processing until the recent JobModel is picked by all the processors in the group.

Here’re the list of important and notable differences in processor and JobModel generator semantics between yarn and standalone deployment model:

...

and notable differences in processor and JobModel generator semantics between yarn and standalone deployment model:

  • Number of processors is a static configuration in yarn deployment model and a job restart is required to change the number of processors. However, an addition/deletion of a processor to a processors group in standalone is quite common and an expected behavior.
  • Existing generators discard the task to physical host assignment when generating the JobModel. However, for standalone it’s essential to consider this detail(task to physical host assignment) between successive job model generations to accomplish optimal task to processor assignment. For instance, let’s assume stream processors P1, P2 runs on host H1 and processor P3 runs on host H3. If P1 dies, it is optimal to assign some of the tasks processed by P1 to P2. If previous task to physical host assignment is not taken into account when generating JobModel, this cannot be achieved.
  • Processor is assigned a physical host to run after the JobModel generation in yarn. Physical host in which processor is going to run from is known before JobModel generation phase in standalone.
  • In an ideal world, any a TaskNameGrouper implementation should be usable interchangeably between yarn and standalone deployment models. However, in existing setup some TaskNameGrouper’s Currently only a subset of TaskNameGrouper’s usable in yarn  are supported in standalone and some .
  • Zookeeper will be used as locality store in standalone and coordinator stream(kafka) is used as locality store in yarn.

Overall high level changes:

...