Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Status

...

Page properties


Discussion thread

...

...

/wkogrk9tt2bznhnj6p0slltr09dhyho5
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-10653

Release1.9


Original Design Document: https://docs.google.com/document/d/1l7yIVNH3HATP4BnjEOZFkO2CaHf1sVn_DSxS2llmkd8/edit?usp=sharing

JIRA: FLINK-10653

Released: FLINK 1.8+

Motivation

Shuffle is the process of data transfer between stages, which involves in writing outputs on producer side and reading inputs on consumer side. The shuffle architecture and behavior in Flink are unified for both streaming and batch jobs. It can be improved in two dimensions:

  • Lifecycle of TaskExecutor (TMTE)/Task/ResultPartition: TM TE starts an internal shuffle service environment for transporting partition data to consumer side. When task enters FINISHED state, its produced partition might not be fully consumed. Therefore TM TE container should not be freed until all the internal partitions consumed. It is obvious that there exists coupled implicit constraints among them, but has no specific mechanism for coordinating them work well.

  • Lifecycle of ResultPartition: Certain features, like fine-grained recovery and interactive programming, require flexible consumption of produced intermediate results: delayed consumption or multiple times. In these case, shuffle service user (JM or RM) should decide when to release the produced partitions and shuffle API should support this. More (more details in design proposal to extend this FLIP).

  • Extension of writer/reader: ResultPartition can only be written into local memory for streaming job and single persistent file per subpartition for batch job. It is difficult to extend partition writer and reader sides together based on current architecture. E.g. ResultPartition might be written in sort&merge way or to external storage. And partition might also be transported via external shuffle service on YARN, Kubernetes etc in order to release TM TE early.

Proposed Changes

We propose a pluggable ShuffleManager ShuffleService architecture for managing partitions on JobMaster (JM) side and extending adaptive writer/reader on TM TE side.

(1) Shuffle

...

Service Factory

public interface ShuffleManager {

     ShuffleMaster createMaster(Configuration flinkConfig);

interface ShuffleServiceFactory<SD extends ShuffleDescriptor, P extends ResultPartitionWriter, G extends InputGate> {

      ShuffleMaster<SD> createShuffleMaster(Configuration configuration);

           ShuffleEnvironment<P, G> createShuffleEnvironment(ShuffleEnvironmentContext shuffleEnvironmentContext      ShuffleService createService(Configuration flinkConfig);

}

  • ShuffleManager acts as a factory for creating Shuffle service factory creates ShuffleMaster (JM side) and ShuffleService ShuffleEnvironment (TM TE side). Flink config could also contain specific shuffle configuration like port etc.Specific ShuffleManager implements how to communicate interactively between ShuffleMaster and ShuffleService. If shuffle is channel-based it can behave in a similar way as now.

  • We could support cluster level config for ShuffleManager the factory class name in the first version. Later we could further support job or edge level config by introducing predefined ShuffleType. Cluster config could contain all provided ShuffleManager factory implementations for each supported ShuffleType or fallback to default for some types.

(2) Shuffle Master (JM side)

JM process creates ShuffleMaster from configured ShuffleManager factory per cluster, and is thus responsible for its lifecycle. ShuffleMaster is a global manager for partitions which means decoupling partition’s lifecycle from task. So it could bring natural benefits for the following improvements.

  • TM release: If TM is responsible for transporting partition data to consumer side, it could be released only when all internal tasks are in FINISHED state and all produced partitions are consumed. ShuffleMaster could provide the information whether the producer TM can be released before partition consumption is done.

  • Task failover: If the consumer task fails or TM TE crashes, JM could ask ShuffleMaster whether tries to reuse producer’s partition is still available. If partition is available for consuming, the producer task might not need to be restarted which narrows down the failover region to reduce failover cost.

  • Partition cleanup: When all the consumer tasks are done (in FINISHED state), the producer’s partition is to deregister and cleanup can be manually deregistered with the ShuffleMaster to cleanup resources. In case of external storage, partitions are at risk to linger after job/cluster failures. TTL mechanism is one option for handling this issue. ShuffleMaster could also provide an explicit way for manually triggering remove unused partitions.

In the first version, we only focus on migrating current existing process based on new ShuffleMaster architecture. So we define the most basic necessary methods below, and the above mentioned improvements might be forwarded step by step in priority by extending more features in ShuffleMaster.

public interface ShuffleMaster ShuffleMaster<T extends AutoClosable ShuffleDescriptor> {

     ShuffleDeploymentDescriptor registerPartitionProducer(PartitionShuffleDescriptor psd      CompletableFuture<T> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor);

     void deregisterPartitionProducer(PartionShuffleDescriptor psd      void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor);

}

Image Removed

Image Added

  • PartitionDescriptor and ProducerDescriptor are PartitionShuffleDescriptor is introduced for wrapping all abstract information informations which JM can provide from job/execution graph, such as JobID, ExecutionAttemptID, TaskManagerLocation of producer, ResultPartitionType, ResultPartitionLocation etc. ResultPartitionType and ResultPartitionLocation partition id, type, parallelism etc and producer location, execution id, address, data port etc. These parameters are derived from graph and execution mode, so they are rather general parameters and do not belong to particular shuffle implementation.

  • When producer execution is scheduled to deploy, PartitionShuffleDescriptor is ProducerDescriptor and PartitionDescriptor are created to register producer’s partition with ShuffleMaster. ShuffleMaster transforms the abstract PartitionShuffleDescriptor descriptors into a specific ShuffleDeploymentDescriptor specific ShuffleDescriptor which would also be cached for consumer vertex if the consumer is not deployed yet.

  • ShuffleDeploymentDescriptor ShuffleDescriptor is then put into ResultPartitionDeploymentDescriptor for submitting producer task and as a known producer inside InputGateDeploymentDescriptor for submitting consumer task. It can contain specific partition config for ShuffleService for ShuffleEnvironment on TM TE side to serve partition writer and reader.

  • Special UnknownShuffleDeploymentDescriptorUnknownShuffleDescriptor could be used in InputGateDeploymentDescriptor if producer location is unknown during the deployment of consumer. JM can update it on consumer side by sending the specific ShuffleDeploymentDescriptor ShuffleDescriptor in partition infos when producer is deployed.

(3)

...

Shuffle Environment (

...

TE side)

TM process creates ShuffleService TE creates ShuffleEnvironment from configured ShuffleManager shuffle factory per cluster, and is thus responsible for its lifecycle. Considering future ShuffleType config on job/edge level, TM TE could keep a registry of ShuffleService shuffle factories per ShuffleType.

If TE is responsible for transporting partition data to consumer side, the produced partitions occupy local TE resources. In this case TE can be released only when all internal tasks are in FINISHED state and all produced partitions occupying local resources are consumed and released. ShuffleEnvironment could provide the information about locally unreleased partitions to decide whether the producer TE can be released.

      public interface ShuffleService extends AutoClosable ShuffleEnvironment<P extends ResultPartitionWriter, G extends InputGate> extends AutoCloseable {

            int start(); // returns data port for shuffle data exchange connection

            Collection<P> createResultPartitionWriters(... ResultPartitionWriter createResultPartitionWriter(ResultPartitionDeploymentDescriptor rpdd);

            void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds);

          InputGate createInputGate  Collection<ResultPartitionID> getPartitionsOccupyingLocalResources(InputGateDeploymentDescriptor igdd);

            Collection<G> createInputGates(...);

          void updatePartitionInfos(Iterable<PartitionInfo> partitionInfos  boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo partitionInfo);

      }

Image RemovedImage Added

  • ShuffleService is ShuffleEnvironment is responsible for creating ResultPartitionWriter ResultPartitionWriters for producer task and creating InputGate InputGates for consumer task. Therefore this architecture can support extend matched extension of matching writer and reader sides together. It might be useful for current ResultPartitionWriter/InputGate interfaces extending to extend AutoClosable.

  • Similar to how it is implemented currently, the scheduler/EG in JM can decide whether and when to update partition info on consumer side. E.g. always for pipelined partitions and when task is finished for blocking partitions. The producer task can also send the notification to JM when something has been produced in pipelined partition, as now. The consumer’s ShuffleService consumer’s ShuffleEnvironment provides the way of updating internal input gate for known partition infos.

  • ShuffleService should ShuffleEnvironment should also consider the transport way between producer and consumer, e.g. via netty-based network as the current default way. So ShuffleService So ShuffleEnvironment might substitute NetworkEnvrionment in TaskManagerServices.

Future Improvement

Current ResultPartitionWriter and InputGate both operate on buffer unit with serialized record data. Certain ShuffleService Certain ShuffleEnvironment implementation might benefit from operating on serialized record or even raw record directly (e.g. partial sort merge partition data).

  • Abstract RecordWriter/RecordReader interfaces for handling serialized/raw record.

  • ShuffleService could ShuffleEnvironment could be further refactored to return RecordWriter/RecordReader.

Public Interfaces

  • In the first version, class name which implements ShuffleManager implements ShuffleServiceFactory interface is configured for by shuffle.manager parameter -service-factory.class parameter in Flink cluster level.
  • In the second version, it might support job/edge level ShuffleType config for specific ShuffleManager specific ShuffleServiceFactory implementation.

Compatibility, Deprecation, and Migration Plan

  • In the first version, the default ShuffleManager implementation Netty implementation of ShuffleServiceFactory is compatible with current existing behavior.

  • In the second or later version, we can extend other implementations like YarnShuffleManagerYarnShuffleServiceFactory/KubernetesShuffleManager KubernetesShuffleServiceFactory to be configured based on cluster environment and user requirements.

Implementation Plan

All the mentioned related work could be done in at least two versions. The first version realizes implements the most basic architecture so that the following versions strictly build upon it.

First MVP: Refactoring to Shuffle API (

...

Flink 1.

...

9)

  • Implement PartitionShuffleDescriptorProducer- and PartitionDescriptor for covering necessary abstract info.

  • Implement ShuffleDeploymentDecriptorShuffleDecriptor generated from PartitionShuffleDecriptorProducer- and PartitionDecriptor.

  • Define ShuffleMaster interface and create a simple implementation on JM side which relies on currently implemented NetworkEnvironment on TM sideTE side.

  • Define ShuffleManager interface ShuffleServiceFactory interface for creating ShuffleMaster.

  • Introduce a Flink configuration option for ShuffleManager implementationShuffleServiceFactory implementation. Default value for it could be <none> NettyShuffleServiceFactory which serves as a feature flag at the moment to use current code paths.

...

  • Define ShuffleService interface ShuffleEnvironment interface and give a default implementation on TM sideTE side.

  • Reuse shuffle related components from NetworkEnvironment in ShuffleService to implement default NettyShuffleEnvironment.

  • Add ShuffleService factory ShuffleEnvironment factory method to ShuffleManager interfaceShuffleServiceFactory interface.

  • Respect feature flag in Flink configuration option for ShuffleManagerfor ShuffleServiceFactory

  • move some general concerns outside and NetworkEnvironmentnotifyPipelinedConsumers from outside of ResultPartitionWriter to make it not shuffle specific

Set shuffle implementation config parameter to default netty-based implementation from FLINK-11391 and FLINK-11392, instead of <none> which meant feature flag to use previous non-pluggable legacy implementation. The legacy and feature flag code should be removed.

Next steps

...

  • (e.g. notifyPipelinedConsumers from ResultPartition etc)

Next steps

...

Implement partition deregister and cleanup logic via ShuffleMaster.

...

Improve TM release by checking partition consumed via ShuffleMaster.

...

  • Support job/edge level config for ShuffleType.

  • Abstract RecordWriter/Reader interface for handing raw records.

  • Refactor ShuffleService interface ShuffleEnvironment interface for returning RecordWriter/Reader.

  • Adjust the processes in StreamInputProcessor and StreamRecordWriterRecordWriter based on Writer/Reader interfaces.

  • Extend to Yarn/KubernetesShuffleManager KubernetesShuffleServiceFactory implementations based on new interfaces.

Rejected Alternatives

None so far.