Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: UNDER DISCUSSION Accepted

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201802.mbox/%3CCAFvExu1GHnphidP_wRriMey-T7Hss4AqAxscOoBFUHuMR5sq%3DQ%40mail.gmail.com%3E

...

  1. Support stateful stream processing in standalone stream applications.
  2. Minimize partition movements amongst stateful processors in the rebalance phase.

Non Goals

  1. In the embedded samza library model, users are expected to perform manual garbage collection of unused local state stores(to reduce the disk footprint) on nodes.
  2. Monitoring and handling the increase/decrease of input stream partitions of a stateful standalone stream application is out of scope for this feature.

Proposed Changes

JobModel is the data model in samza that logically represents a samza job. The JobModel hierarchy is that samza jobs have one to many containers(ContainerModel), and each container has one to many tasks(TaskModel). Each data model contains relevant information, such as logical id, partition information, etc. In the standalone deployment model, the JobModel is stored in zookeeper. Coordinator stream(kafka topic) is used to store the JobModel in the yarn deployment model.

  1. Existing generators discard the task to physical host assignment when generating the JobModel and only uses processor to preferred host assignment. However, for standalone it’s essential to consider this detail(task to physical host assignment) between successive job model generations to generate optimal task to processor assignment. For instance, let’s assume stream processors P1, P2 runs on host H1 and processors P3, P4 runs on host H3. If P1 dies, it is optimal to assign some of the tasks processed by P1 to P2. If previous task to physical host assignment is not taken into account when generating JobModel, this cannot be achieved.
  2. In an ideal world, any TaskNameGrouper should be usable interchangeably between yarn and standalone deployment models. Currently only a subset of TaskNameGrouper’s usable in yarn  are supported in standalone.

Non Goals

  1. In the embedded samza library model, users are expected to perform manual garbage collection of unused local state stores(to reduce the disk footprint) on nodes.
  2. Monitoring and handling the increase/decrease of input stream partitions of a stateful standalone stream application is out of scope for this feature.

Proposed Changes

JobModel is the data model in samza that logically represents a samza job. The JobModel hierarchy is that samza jobs have one to many containers(ContainerModel), and each container has one to many tasks(TaskModel). Each data model contains relevant information, such as logical id, partition information, etc. Existing host affinity implementation in Existing host affinity implementation in yarn is accomplished through the following two phases:A.

  • JobModel generation phase: ApplicationMaster(JobCoordinator) in yarn deployment model generates the Job model(

...

  • container to task assignment)

...

  • for the samza job. 

...

  • ContainerAllocator phase

...

  • : This happens after the JobModel generation

...

  • phase and schedules each

...

  • container to run on a physical host by coordinating with the underlying ClusterManager and orchestrates the execution of the

...

  • processor.

Zookeeper is used in standalone for coordination between the processors of a stream application. Amongst all the available processors of a stream application, a single processor will be elected as a leader in standalone. The leader will generate the JobModel and propagate the JobModel to all the other processors in the group. Distributed barrier in zookeeper will be used to block the message processing until the latest JobModel is picked by all the processors in the group.

Here’re the list of important and notable differences in processor and JobModel generation semantics between yarn and standalone deployment model:

  • Number of processors is a static configuration in yarn deployment model and a job restart is required to change the number of processors. However, an addition/deletion of a processor to a processors group in standalone is quite common and an expected behavior.
  • Existing generators discard the task to physical host assignment when generating the JobModel and only uses container to preferred host assignment. However, for standalone it’s essential to consider this detail(task to physical host assignment) between successive job model generations to generate optimal task to processor assignment. For instance, let’s assume stream processors P1, P2 runs on host H1 and processor P3 runs on host H3. If P1 dies, it is optimal to assign some of the tasks processed by P1 to P2. If previous task to physical host assignment is not taken into account when generating JobModel, this cannot be achieved.
  • A stream processor is assigned a physical host to run after the JobModel generation in yarn. Physical host in which a stream processor is going to run is known before the JobModel generation phase in standalone(hence a separate ContainerAllocator phase is not required in standalone).
  • In an ideal world, any TaskNameGrouper should be usable interchangeably between yarn and standalone deployment models. Currently only a subset of TaskNameGrouper’s usable in yarn  are supported in standalone.
  • Zookeeper will be used as locality store in standalone and coordinator stream(kafka) is used as locality store in yarn.

Overall high level changes:

  • The common layer between yarn and standalone model is the TaskNameGrouper abstraction(which is part of JobModel generation phase) which will encapsulate the host aware task assignment to processors.
  • Deprecate different flavors of existing TaskNameGrouper implementations(each one of them primarily grouping TaskModel into containers) and provide a single unified contract which is agnostic and supported in different deployment models(standalone/yarn).
  • Introduction of MetaDataStore abstraction to store and retrieve processor and task locality for different deployment models in appropriate storage layers. Kafka be will be used as locality storage layer for yarn and zookeeper will be used as storage layer for standalone.
  • In the existing implementation, only the processor locality will be used to generate task to processor assignments. In the new model, both the last reported task locality and processor locality of a stream application will be used when generating task to processor assignments in both the yarn and standalone models.

If the leader of the stateful processors group generates an optimal, host-aware task assignment to processors within the JobModel, each follower will pick up their appropriate assignments from the JobModel and begin the processing after the rebalance phase(similar to non-stateful jobs). The goal is to guarantee that the task assignment to processors is optimal and minimizing the tasks movement between the processors.

Standalone host affinityImage Removed

Here’re the list of important and notable differences in processor and JobModel generation semantics between yarn and standalone deployment model:

  • Number of containers is a static configuration in yarn deployment model and a job restart is required to change it. However, an addition/deletion of a processor to a processors group in standalone is quite common and an expected behavior.
  • A container is assigned a physical host by ContainerAllocator after the JobModel generation phase in yarn. Physical host in which a processor is going to run is known before the JobModel generation phase in standalone(ContainerAllocator phase is not needed in standalone to associate the processor with the physical host).

Overall high level changes:

  • Deprecate the different existing flavors of the TaskNameGrouper implementations(each one of them primarily grouping TaskModel into containers) and provide a single unified contract. The common layer between yarn and standalone model is the TaskNameGrouper abstraction(which is part of JobModel generation phase) which will encapsulate the host aware task assignment to processors. In the existing implementation, only the processor locality is used to generate the task to processor assignments. In the new model, both the last reported task locality and processor locality of a stream application will be used when generating task to processor assignments in both the yarn and standalone models.
  • Introduction of MetaDataStore abstraction to store and retrieve processor and task locality for different deployment models in appropriate storage layers. Kafka be will be used as locality storage layer for yarn and zookeeper will be used as storage layer for standalone.
  • A new abstraction LocationIdProvider is introduced as a part of this change to generate locationId for a physical execution environment. All the processors of an application registered from an locationID should be able to share(read/write) their local state stores. Any store created by a processor running from a locationId should be readable/writable by other processors running from the same locationId. Any custom LocationIdProvider is expected to honor this contract when generating the locationID. Here’re few reasons for introducing a new abstraction to generate locationId rather than using processorID as locationId.
    • LocationId denotes the physical execution environment required to run a stream processor. LocationId is used to uniquely identify a environment amongst all available physical execution environments. ProcessorId is used to uniquely identify a stream processor in a processors group. ProcessorId and localityId are two different, logically orthogonal concepts which cannot be unified.

    • Standalone model supports running multiple stream processors from a single JVM on a physical host. If a stream processor running a physical host dies, it’s optimal to redistribute the tasks of the dead processor to the other processors running on the host. If processorId is used as localityId, this optimal generation cannot be achieved(since task to localityId association is not maintained).

    • In case of LinkedIn execution environment, locationId will be a composite key comprised of sliceID and sliceInstanceId. In case of kubernetes, locationId will be containerId(which will be obtained through POD API).

 

Image Added

Zookeeper is used in standalone for coordination between the stream processors of a stream application. Amongst all the available processors of a stream application, a single processor will be elected as a leader in standalone. In the standalone deployment model, the JobModel is stored in zookeeper. The leader will generate the JobModel and propagate the JobModel to all the other processors in the group. Distributed barrier in zookeeper will be used to block the message processing until the latest JobModel is picked by all the processors in the group. 

Image Removed


ZK Data Model to support host affinity:

...

Code Block
languagejava
// '+' denotes addition, '-' denotes deletion.
public interface TaskNameGrouper {
  + @Deprecated
  Set<ContainerModel> group(Set<TaskModel> tasks);

  + @Deprecated
  default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
    return group(tasks);
  }
  /**
   * @param taskModels, represents the taskModels generated by the SSPGrouper.
   * @param taskLocality, taskName to locationId mapping of the previous generation. 
   * @param processorLocality, processorId to locationId mapping.
   * @return the optimal containerModels generated.   
   */  
  + Set<ContainerModel> group(Set<TaskModel> taskModels, Map<String, String> taskLocality, Map<String, String> processorLocality);
}

+ @Deprecated
public interface BalancingTaskNameGrouper extends TaskNameGrouper {
  + @Deprecated 
  Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
}

public class ContainerModel {
  - @Deprecated
  - private final int containerId;
  private final String processorId;
  private final Map<TaskName, TaskModel> tasks;
  + // New field added denoting the physical locationId.
  + private final String locationId;
}

+public interface LocationIdProvider {
   +  // In case of containerized environments, LocationId is a combination of multiple fields (sliceId, containerId, hostname) instead of simple physical hostname,
   +  // This will be provided by the execution environment of the processor.
   + String getLocationId();
}


+ public interface MetadataStore {
   // returns the processorId to LocationId mapping.
  + public Map<String, String> readProcessorLocality();

  // returns the taskName to LocationId mapping.
  + public Map<String, String> readTaskLocality();
 
  // writes the provided processordId to host mapping to underlying storage String getLocationId();
}


+ public interface MetadataStore {
  + // Gets the value associated with the specified {@code key}.
  + byte[] get(byte[] key);
  
  + // Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value} 
  + void put(byte[] key, byte[] value);
 
  + // Deletes the mapping for the specified {@code key} from this store (if such mapping exists).
  + publicvoid boolean writeProcessorLocality(Map<String, String> processorLocalityremove(byte[] key);
}

LocationId reported by the live processors of the group and last reported task locality will be used to calculate the task to container assignment in standalone. Preferred host mapping will be used for task and processor locality in case of yarn. Any new task/processor for which grouping in unknown(unavailable in preferred host/task-locality in underlying storage layer), will be treated as any_host during assignment.

Here are few reasons supporting the modification of TaskNameGrouper interface and removing LocalityManager from interface methods:

...

host during assignment.

Here are few reasons supporting the modification of TaskNameGrouper interface and removing LocalityManager from interface methods:

  • Multiple group methods in TaskNameGrouper interface and additional balance method in BalancingTaskNameGrouper are logically synonymous to each other and exists to generate ContainerModels based upon the input task models and past locality assignments. It’s sensible to combine them into one interface method with adequate parameters and simplify things.

  • Any future TaskNameGrouper implementation could hold references to LocalityManager(a live object) and create object hierarchies based upon that reference. This will clutter the ownership of LocalityManager and could potentially create an unintentional resource leak.

  • Logically, a TaskNameGrouper implementation would just require the previous generation container models(to get previous task to preferred host mapping, previous task to systemstreampartition mapping) which can be passed in through the interface method to generate new mapping. Any modifications to existing assignments should be done outside of TaskNameGrouper implementation. This will make any implementation as a pure function simply operating on the passed in data.

...

  • Modify the existing interfaces and classes as per the proposed solution.

  • Add unit tests to test and validate compatibility and functional correctness

  • Add integration tests in samza standalone samples to verify the host affinity feature. 

  • Add a an integration test in samza standalone samples to verify the host affinity featureto verify that there are minimal partition movements during rolling upgrade.

  • Verify compatibility - Jackson, a java serialization/deserialization library is used to convert data model objects in samza into JSON and back. After removing containerId field from ContainerModel, it should be verified that deserialization of old ContainerModel data with new ContainerModel spec works. 

  • Some TaskNameGrouper implementations assumes the comparability of integer containerId present in ContainerModel(for instance - GroupByContainerCount, a TaskNameGrouper implementation). Modify existing TaskNameGrouper implementations to take in collection of string processorId’s, as opposed to assuming that containerId is integer and lies within [0, N-1] interval(without incurring any change in functionality).

...

LocalityManager will be turned to an interface and there will be two implementations of LocalityManager viz CoordinatorStreamBasedLocalityManager to read/write container locality information for yarn and ZkLocalityManager to read/write container locality information for standalone.

Cons: 

...

  • Any TaskNameGrouper implementation could hold references to LocalityManager(a live object) and create object hierarchies based upon that reference. This will clutter the ownership of LocalityManager and could potentially create an unintentional resource leak.

Approach 2

GroupByContainerIds is the only TaskNameGrouper currently supported in standalone. Implement the host aware task to stream processors assignment for standalone in GroupByContainerIds.

...