Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Status

Current stateAccepted

Page properties


Discussion thread

...

...

/wkogrk9tt2bznhnj6p0slltr09dhyho5
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-10653

Release1.9


Original Design Document: https://docs.google.com/document/d/1l7yIVNH3HATP4BnjEOZFkO2CaHf1sVn_DSxS2llmkd8/edit?usp=sharing

JIRA: FLINK-10653

Released: FLINK 1.9

Motivation

Shuffle is the process of data transfer between stages, which involves in writing outputs on producer side and reading inputs on consumer side. The shuffle architecture and behavior in Flink are unified for both streaming and batch jobs. It can be improved in two dimensions:

  • Lifecycle of TaskExecutor (TE)/Task: TE starts an internal shuffle environment for transporting partition data to consumer side. When task enters FINISHED state, its produced partition might not be fully consumed. Therefore TE container should not be freed until all the internal partitions consumed. It is obvious that there exists coupled implicit constraints among them, but has no specific mechanism for coordinating them work well.

  • Lifecycle of ResultPartition: Certain features, like fine-grained recovery and interactive programming, require flexible consumption of produced intermediate results: delayed consumption or multiple times. In these case, shuffle service user (JM or RM) should decide when to release the produced partitions and shuffle API should support this. More details in design proposal to extend this FLIP.

  • Extension of writer/reader: ResultPartition can only be written into local memory for streaming job and single persistent file per subpartition for batch job. It is difficult to extend partition writer and reader sides together based on current architecture. E.g. ResultPartition might be written in sort&merge way or to external storage. And partition might also be transported via external shuffle service on YARN, Kubernetes etc in order to release TE early.

...

            boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo partitionInfo);

      }

Image RemovedImage Added

  • ShuffleEnvironment is responsible for creating ResultPartitionWriters for producer task and creating InputGates for consumer task. Therefore this architecture can support extension of matching writer and reader sides together. It might be useful for current ResultPartitionWriter/InputGate interfaces to extend AutoClosable.

...