Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. In the embedded samza library model, users are expected to perform manual garbage collection of unused local state stores(to reduce the disk footprint) on nodes.
  2. Monitoring and handling the increase/decrease of input stream partitions of a stateful standalone stream application is considered out of scope for this feature.

...

In standalone, locality information of the stream processors will be stored seperately from the JobModel. JobModel will be used to hold just the task assignments(processor to task assignment and task to system stream partition assignment) alone in standalone. In standalone, each stream processor during it's startup phase will store the physical host on which it runs from into an appropriate zookeeper locality node(This is synonymous to existing behavior in yarn). MetadataStore abstraction will be used to read and write stream processor locality information for different deployment models in appropriate storage layers. There will be two implementations of MetadataStore viz CoordinatorStreamBasedMetadataStore to read/write processor locality information into coordinator stream(a kafka topic) for yarn and ZkMetadataStore to read/write processor locality information in zookeeper for standalone. Local state of the tasks will be persisted in a directory(local.store.dir) provided through configuration by each processor.

...

Remove coordinator stream bindings from JobModel: 

JobModel is a data access object used to represent a samza job in both yarn and standalone deployment models. With existing implementation, JobModel requires LocalityManager(which is tied to coordinator stream) to read and populate processor locality assignments. However, since zookeeper is used as JobModel persistence layer and coordinator stream doesn’t exist in standalone landscape, it’s essential to remove this LocalityManager binding from JobModel and make JobModel immutable. Any existing implementations(ClusterBasedJobCoordinator, ContainerProcessManager) which depends upon this binding for functional correctness in samza-yarn, should directly read container locality from the coordinator stream instead of getting it indirectly via JobModel.

Cleaning up ContainerModel:

 ContainerModel is a data access object used in samza for holding the task to system stream partition assignments which is generated by TaskNameGrouper implementations. ContainerModel currently has two fields(processorId and containerID) used to uniquely identify a processor in a processors group. Standalone deployment model uses processorId and Yarn deployment model uses containerId field to store the unique processorId. To achieve uniformity between the two deployment models, the proposal is to remove duplicate containerId. This will not require any operational migration.

State store restoration:

 Upon processor restart, nonexistent local stores will be restored using the same restoration sequence followed in yarn deployment model.

Container to physical host assignment:

When assigning tasks to a stream processor in a run, the stream processor to which the task was assigned in the previous run will be preferred. If the stream processor to which task was assigned in previous run is unavailable in the current run, the stream processors running on physical host of previous run will be given higher priority and favored. If both of the above two conditions are not met, then the task will be assigned to any stream processor available in the processor group.

Semantics of host affinity with ‘run.id’ 

The strategy to determine if the state from the previous stream application run continue continues in the current run varies will vary for different deployment environments and input sources. The semantic meaning of run.id is the continuation of states(viz state-store, checkpoint, config, task-assignments) across stream application restarts. Host affinity will be supported only within the same run.id of a application.

...