Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
As the DataStream API now supports batch execution mode, we see users using the DataStream API to run batch jobs. Interactive programming is an important use case of Flink batch processing. And the ability to cache intermediate results of a DataStream is crucial to the interactive programming experience. For example, a machine learning scientist may want to interactively explore a bounded data source in a notebook with pyFlink. Let's have a look at an example below:
sample = env.read_text_file(...) # Print out some records for sanity check print(sample.execute_and_collect(limit=10)) # Expensive data processing preprocessed = sample.flat_map(...) # Explore the distribution of positive and negetive sample sample_cnt = preprocessed.count() positive_cnt = preprocessed.filter(...).count() # Explore dataset preprocessed.keyBy(...) .reduce(...) .execute_and_collect()
In the example above, since the preprocessed data is not cached, it is recomputed from scratch every time it is used in later batch jobs. It is not only a waste of computation resource but also hurt the user experience and it gets worse as the computation graph get deeper.
Therefore, we propose to support caching a DataStream in Batch execution. We believe that users can benefit a lot from the change and encourage them to use DataStream API for their interactive batch processing work.
Public Interfaces
We introduce the cache method to the SingleOutputStreamOperator
class. In order to allow caching the side output of the SingleOutputStreamOperator
, we let the getSideOutput
method returns a SingleOutputStreamOperator
.
public class SingleOutputStreamOperator<T> extends DataStream<T> { ... /** * Cache the intermediate result of the transformation. Only job running in batch mode with * blocking shuffle mode can create cache. Otherwise, an exception will be thrown. The cache * is generated lazily at the first time the intermediate result is computed. The cache will * be clear when {@link CachedDataStream#invalidateCache()} called or the * {@link StreamExecutionEnvironment} close. * * @return CachedDataStream that can use in later job to reuse the cached intermediate * result. */ public CachedDataStream cache() { ... } public <X> SingleOutputStreamOperator<X> getSideOutput(OutputTag<X> sideOutputTag) { ... } }
We introduce CacheDataStream
that extends the DataStream
and implements the AutoCloseable
interface.
/** * {@link CachedDataStream} represents a {@link DataStream} whose intermediate result will be * cached at the first time when it is computed. And the cached intermediate result can be used in * later job that using the same {@link CachedDataStream} to avoid re-computing the intermediate * result. * * @param <T> The type of the elements in this stream. */ public class CachedDataStream<T> extends DataStream<T> implements AutoCloseable { /** * Invalidate the cache intermediate result of this DataStream to release the physical resources. Users are * not required to invoke this method to release physical resources unless they want to. The * CachedDataStream should not be used after it is closed. Otherwise, an exception will be thrown. */ @Override public void close() throws Exception { ... } }
StreamExecutionEnvironment
implements the AutoCloseable
interface.
public class StreamExecutionEnvironment implements AutoCloseable { ... /** * Close and clean up the execution environment. All the * cached intermediate results will be released physically. */ @Override public void close() throws Exception { ... } }
Example Usage
In this section, we show how the example in Motivation can use the provided API.
# cache the sample so that it doesn't have to read from file system in later job. sample = env.read_text_file(...).cache() # Print out some records for sanity check, sample is cached after the job is finished. print(sample.execute_and_collect(limit=10)) # Read the cached sample, run the expensive data processing and cache the result. preprocessed = sample.flat_map(...).cache() # Check if the preprocessing produces the correct result, the preprocessed result is cached after the job is finished. print(preprocessed.execute_and_collect(limit=10)) # Explore the distribution of positive and negetive sample, using the cached preprocessed result. sample_cnt = preprocessed.count() positive_cnt = preprocessed.filter(...).count() # Explore dataset with the cached preprocessed result. preprocessed.keyBy(...) .reduce(...) .execute_and_collect() # Close the StreamExecutionEnvironment env.close()
In the above example, since the user cache the sample and the preprocessed result, the sample is only read once from the file system and the expensive preprocessing only runs once.
Proposed Changes
The FLIP will leverage the Cluster Partition introduced in FLIP-67. Therefore, this FLIP should not make much change at the Flink Runtime. Most of the change should be in the translation from DataStream to JobGraph.
JobGraph Translation
We introduce a way to carry the caching information from DataStream(Transformation) to StreamGraph to JobGraph. The information includes whether the intermediate result of the transformation should be cached and a descriptor that contains information to read the cached intermediate result at runtime.
DataStream -> Stream Graph
When users write a Flink job with the DataStream API, DataStream API builds a set of transformations under the hood. Therefore, we will introduce a new transformation CacheTransformation
. The transformation implies that the intermediate result of the input transformation should be cached or has been cached.
When translating the CacheTransformation
, if the intermediate result of the input has not been cached, a stream node with the same parallelism as its input is added to the StreamGraph. Its chaining strategy is set to HEAD so that it will not be chained with the upstream node and the intermediate result will be created.
If the intermediate result of the input has been cached, we add a source node, which includes the descriptor to read the cached intermediate result. Its parallelism is the same as the parallelism of the StreamNode that creates the cache in the previous job. The transformations before the CacheTransformation
are ignored.
Stream Graph -> JobGraph
During JobGraph translation, multiple StreamNodes may be chained together to form a JobVertex. While translating the StreamGraph to JobGraph, if the translator sees a node whose intermediate result should be cached, it sets the ResultPartitionType of the JobEdge to BLOCKING_PERSISTENT.
On other hand, if it sees the source that read the cached intermediate result. The translator passes the descriptor to JobVertex.
Runtime
Execution Graph
In runtime JobGraph becomes ExecutionGraph, the ExecutionGraph is a runtime representation and it contains all the information in JobGraph. When the job is finished, it returns the IntermediateDataSetDescriptor
via the JobExecutionResult.
TaskDeploymentDescriptor
During generating the TaskDeploymentDescriptor
, if we found the IntermediateDataSetDescriptor
in the JobVertex, we should create an InputGateDescriptor
so that the task will consume the cached intermediate result.
Deployment Mode
- Application Mode: Application mode allows submission of an application that consists of multiple jobs. In application mode, the cached intermediate result can be used by multiple jobs in the application. When the application finishes, the Flink cluster will be torn down, which will clean all the cache.
- Per-job Mode: In Per-job Mode, a dedicated Flink cluster is created when a job is submitted and the cluster is torn down when the job is finished. As we are leveraging the Cluster Partition to cache the intermediate result in TM, the life cycle of the intermediate result is bound to the life cycle of the TM. It is not possible to reuse the intermediate across multiple jobs.
- Session Mode: In session mode, jobs are submitted to a long-running cluster. Therefore, cache intermediate results can be used by multiple jobs in the same application using the same StreamExecutionEnvironment. Since the life cycle of the Flink session cluster is independent of the life cycle of the user application, the user application is responsible for closing the StreamExecutionEnvironment so that the cached intermediate results can be released. Otherwise, those cached intermediate results are leaked.
Failover
If a TaskManager instance fails and the partitions are stored at TM, all the intermediate results which have a partition on the failed TM will become unavailable. If the partitions are not stored at TM, e.g. remote shuffle service is used, TM failure will not affect the intermediate results. However, if some partitions are lost at the remote shuffle service, the intermediate result will become unavailable.
In both cases, the consuming job will throw an exception and the job will fail. At the same time, PartitionTracker in ResourceManager will release all the cluster partitions that are impacted (implemented in FLIP-67). The StreamExecutionEnvironment will fall back and re-submit the original job as if the cache hasn't been created. The original job will run as an ordinary job that follows the existing recovery strategy. And the cache will be recreated after the execution of the original job. The above process is transparent to the users.
Compatibility, Deprecation, and Migration Plan
This FLIP proposes a new feature in Flink. It is fully backward compatible.
Test Plan
We will provide unit tests to validate the proposed changes.
Rejected Alternatives
There is no rejected alternatives to be listed here yet.