Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Support stateful stream processing in standalone stream applications.
  2. Minimize partition movements amongst stateful processors in the rebalance phase.
  3. Existing generators discard the task to physical host assignment when generating the JobModel and only uses container to processor to preferred host assignment. However, for standalone it’s essential to consider this detail(task to physical host assignment) between successive job model generations to generate optimal task to processor assignment. For instance, let’s assume stream processors P1, P2 runs on host H1 and processors P3, P4 runs on host H3. If P1 dies, it is optimal to assign some of the tasks processed by P1 to P2. If previous task to physical host assignment is not taken into account when generating JobModel, this cannot be achieved.
  4. In an ideal world, any TaskNameGrouper should be usable interchangeably between yarn and standalone deployment models. Currently only a subset of TaskNameGrouper’s usable in yarn  are supported in standalone.

...

  • JobModel generation phase: ApplicationMaster(JobCoordinator) in yarn deployment model generates the Job model(processor container to task assignment) for the samza job. 
  • ContainerAllocator phase: This happens after the JobModel generation phase and schedules each processor container to run on a physical host by coordinating with the underlying ClusterManager and orchestrates the execution of the processor.

...

  • Number of processors is a static configuration in yarn deployment model and a job restart is required to change the number of processors. However, an addition/deletion of a processor to a processors group in standalone is quite common and an expected behavior.
  • A processor is assigned a physical host by ContainerAllocator after the JobModel generation phase in yarn. Physical host in which a stream processor is going to run is known before the JobModel generation phase in standalone(ContainerAllocator phase is not needed in standalone to associate the processor with the physical host).

...

  • Deprecate the different existing flavors of the TaskNameGrouper implementations(each one of them primarily grouping TaskModel into containers) and provide a single unified contract. The common layer between yarn and standalone model is the TaskNameGrouper abstraction(which is part of JobModel generation phase) which will encapsulate the host aware task assignment to processors. In the existing implementation, only the processor locality will be is used to generate the task to processor assignments. In the new model, both the last reported task locality and processor locality of a stream application will be used when generating task to processor assignments in both the yarn and standalone models.
  • Introduction of MetaDataStore abstraction to store and retrieve processor and task locality for different deployment models in appropriate storage layers. Kafka be will be used as locality storage layer for yarn and zookeeper will be used as storage layer for standalone.
  • A new abstraction LocationIdProvider is introduced as a part of this change to generate locationId for a physical execution environment. All the processors of an application registered from an locationID should be able to share(read/write) their local state stores. Any store created by a processor running from a locationId is readable/writable by other processors running from the same locationId. Any custom LocationIdProvider is expected to honor this contract when generating the locationID. Here’re few reasons for introducing a new abstraction to generate locationId rather than using processorID as locationId.

...