Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We introduce a way to carry the caching information from DataStream(Transformation) to StreamGraph to JobGraph. The caching information includes whether the intermediate result of the operation transformation should be cached and an IntermediateDataSetDescriptora descriptor that contains information to read the cached intermediate result . The IntermediateDataSetDescriptor includes the IntermediateDataSetId and a set of ShuffleDescriptors, one for each partitionat runtime.

DataStream

...

-> Stream Graph

When users write a Flink job with the DataStream API, We expose the cache method at the DataStream API to the user. The DataStream API builds a set of transformations under the hood. Therefore, we will introduce two internal classes CacheTransformation and CacheTransformationTranslator. The CacheTransformation has one and only one PhysicalTransformation as input. It a new transformation CacheTransformation. The transformation implies that the intermediate result of the input transformation should be cached or has been cached. The CacheTransforamtion contains the IntermediateDataSetID of the cache intermediate result and an optional IntermediateDataSetDescriptor that is used to read the cached intermediate result. The IntermediateDataSetDescriptor will be populated after the cache is created.

...

When translating the CacheTransformation, if the intermediate result of the input has not been createdcached, we will add a virtual cache stream node , CacheNode, with the sample parallelism as its input is added to the StreamGraph. The CacheNode's upstream is the input of the CacheTransformation and they have the same parallelism. Its chaining strategy is set to HEAD so that it will not be chained with the upstream node and the intermediate result will be created.

If the intermediate result of the input has been createdcached, we add a source node, CacheReaderSource, which includes the IntermediateDataSetDescriptor, descriptor to read the cached intermediate result. Its parallelism is the same as the parallelism of the StreamNode that create creates the cache in the previous job. The transformations before the CacheTransformation is ignored.

Stream Graph -> JobGraph

During JobGraph translation, multiple StreamNodes may be chaining together to form a JobVertex. While translating the StreamGraph to JobGraph, if the translator sees a virtual CacheNode, it knows the node whose intermediate result of the input of the CacheNode should be cached. Since the chaining strategy of the virtual cache stream node is HEAD, the CacheNode will form a JobVertex and there is a JobEdge connects the upstream node and the JobVertex. The , it sets the ResultPartitionType of the JobEdge is set to BLOCKING_PERSISTENT.

On other hand, if it sees the CacheReaderSource. The translator should pass the IntermediateDataSetDescriptor from the CacheReaderSource to the JobVertex.

Translation Process

In the below section, we put the above translation process together to demonstrate the translation from DataStream to JobGraph with a concret example.

Create Cache

The code snippet below is a flink job that create the cache intermediate result.

Code Block
languagepy
cached_src = env.from_source(...)
    .cache()

cached_map = cached_src.map(...)
    .cache()

cached_map.key_by(...)
    .reduce(...)
    .print()

env.execute()

Image Removed

Use Cache

The code snippet below is a flink job that read the cache intermediate result created by the last job.

Code Block
languagepy
cached_src.print()

cached_map.key_by(...)
    .reduce(...)
    .print()

env.execute()

...

source that read the cached intermediate result. The translator passes the descriptor to JobVertex.


Image Added



Runtime

Execution Graph

In runtime JobGraph becomes ExecutionGraph, the ExecutionGraph is a runtime representation and it contains all the information in JobGraph. When the job is finished, it returns the IntermediateDataSetDescriptor via the JobExecutionResult.

...