...
We introduce a way to carry the caching information from DataStream(Transformation) to StreamGraph to JobGraph. The caching information includes whether the intermediate result of the operation transformation should be cached and an IntermediateDataSetDescriptor
a descriptor that contains information to read the cached intermediate result . The IntermediateDataSetDescriptor
includes the IntermediateDataSetId
and a set of ShuffleDescriptor
s, one for each partitionat runtime.
DataStream
...
-> Stream Graph
When users write a Flink job with the DataStream API, We expose the cache method at the DataStream API to the user. The DataStream API builds a set of transformations under the hood. Therefore, we will introduce two internal classes CacheTransformation
and CacheTransformationTranslator
. The CacheTransformation
has one and only one PhysicalTransformation
as input. It a new transformation CacheTransformation
. The transformation implies that the intermediate result of the input transformation should be cached or has been cached. The CacheTransforamtion
contains the IntermediateDataSetID
of the cache intermediate result and an optional IntermediateDataSetDescriptor
that is used to read the cached intermediate result. The IntermediateDataSetDescriptor
will be populated after the cache is created.
...
When translating the CacheTransformation
, if the intermediate result of the input has not been createdcached, we will add a virtual cache stream node , CacheNode
, with the sample parallelism as its input is added to the StreamGraph. The CacheNode
's upstream is the input of the CacheTransformation
and they have the same parallelism. Its chaining strategy is set to HEAD so that it will not be chained with the upstream node and the intermediate result will be created.
If the intermediate result of the input has been createdcached, we add a source node, CacheReaderSource
, which includes the IntermediateDataSetDescriptor
, descriptor to read the cached intermediate result. Its parallelism is the same as the parallelism of the StreamNode that create creates the cache in the previous job. The transformations before the CacheTransformation
is ignored.
Stream Graph -> JobGraph
During JobGraph translation, multiple StreamNodes may be chaining together to form a JobVertex. While translating the StreamGraph to JobGraph, if the translator sees a virtual CacheNode
, it knows the node whose intermediate result of the input of the CacheNode
should be cached. Since the chaining strategy of the virtual cache stream node is HEAD, the CacheNode
will form a JobVertex and there is a JobEdge connects the upstream node and the JobVertex. The , it sets the ResultPartitionType of the JobEdge is set to BLOCKING_PERSISTENT.
On other hand, if it sees the CacheReaderSource
. The translator should pass the IntermediateDataSetDescriptor
from the CacheReaderSource
to the JobVertex.
Translation Process
In the below section, we put the above translation process together to demonstrate the translation from DataStream to JobGraph with a concret example.
Create Cache
The code snippet below is a flink job that create the cache intermediate result.
Code Block | ||
---|---|---|
| ||
cached_src = env.from_source(...)
.cache()
cached_map = cached_src.map(...)
.cache()
cached_map.key_by(...)
.reduce(...)
.print()
env.execute() |
Use Cache
The code snippet below is a flink job that read the cache intermediate result created by the last job.
Code Block | ||
---|---|---|
| ||
cached_src.print()
cached_map.key_by(...)
.reduce(...)
.print()
env.execute() |
...
source that read the cached intermediate result. The translator passes the descriptor to JobVertex.
Runtime
Execution Graph
In runtime JobGraph becomes ExecutionGraph, the ExecutionGraph is a runtime representation and it contains all the information in JobGraph. When the job is finished, it returns the IntermediateDataSetDescriptor
via the JobExecutionResult.
...