Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Table of Contents

Motivation

Describe the problems you are trying to solve.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

As the DataStream API now supports batch execution mode, we see users using the DataStream API to run batch jobs. Interactive programming experience is critical to the user experience on Flink with batch processing. And the ability to cache intermediate results of a DataStream is crucial to interactive programming. For example, a machine learning scientist may want to interactively explore a bounded data source in a notebook with pyFlink. Let's have a look at an example below:

Code Block
languagepy
sample = env.read_text_file(...)

# Print out some records for sanity check
print(sample.execute_and_collect(limit=10))

# Expensive data processing
preprocessed = source.flat_map(...)

# Explore the distribution of positive and negetive sample
sample_cnt = preprocessed.count()
posivive_cnt = preprocessed.filter(...).count()

# Explore dataset
preprocessed.keyBy(...)
	.reduce(...)
    .execute_and_collect()

In the example above, since the preprocessed is not cached, it is recomputed from scratch every time it is used in the batch job. It is not only a waste of computation resource but also hurt the user experience and it gets worse as the computation graph get deeper.


Therefore, we propose to support caching a DataStream in Batch execution. We believe that users can benefit a lot from the change and encourage them to use DataStream API for their interactive batch processing work. We are also aware that many interactive programming users tend to use the high-level API, Table/SQL. Therefore, after we have the cache in DataStream, we can continue where the FLIP-36 is left and introduce the cache to the Table/SQL API, which will help a wider range of users.

Public Interfaces

There are serval key specs regarding the cache mechanism we want to introduce:

  1. Only blocking intermediate results can be cached.
  2. Only transformations that create physical operations are allowed to cache. Logical transformations, such as keyby, union, rescaling, only affect how the IR is partitioned. Caching the logical transformation is the same as if you cache the physical transformation before the logical transformation. Therefore, it doesn't make much sense to cache a logical transformation.
  3. The consumer of the cached IR should not assume how the cached IR is partitioned.
  4. Cached IR is created lazily. That means the cache may be created at the first time when the cached IR is computed.
  5. Cached IR is immutable, meaning once the IR is cached, its data cannot update.
  6. The cached IR is available to the user application using the same StreamExecutionEnvironment. And the life cycle of the cached IR is bound to the StreamExecutionEnvironment.


From the spec above, we want to introduce the following public API to the DataStream API. We introduce the cache method to the SingleOutputStreamOperator  class.

Code Block
languagejava
public class SingleOutputStreamOperator<T> extends DataStream<T> {
    
    ...
        

     /**
     * Cache the intermediate result of the transformation. Only job running in batch mode with 
     * blocking shuffle mode can create cache. Otherwise, an exception will be thrown. The cache
     * is generated lazily at the first time the intermediate result is computed. The cache will
     * be clear when {@link CachedDataStream#invalidateCache()} called or the 
     * {@link StreamExecutionEnvironment} close.
     *
     * @return CachedDataStream that can use in later job to consume the cached intermediate
     * result.
     */
	public CachedDataStream cache();
}


We introduce CacheDataStream that extends the DataStream and implements the AutoCloseable interface.

Code Block
languagejava
/**
 * {@link CachedDataStream} represents a {@link DataStream} whose intermediate result will be
 * cached at the first time when it is computed. And the cached intermediate result can be used in
 * later job that using the same {@link CachedDataStream} to avoid re-computing the intermediate
 * result.
 * @param <T> The type of the elements in this stream.
 */
public class CachedDataStream<T> extends DataStream<T> implements AutoCloseable {
    
   	/**
    * Invalidate the cache intermediate result of this DataStream to release the physical resources. Users are
    * not required to invoke this method to release physical resources unless they want to. The 
    * CachedDataStream should not be used after it is closed. Otherwise, an exception will be thrown.
    */
    @Override
    public void close() throws Exception {
        ...
    }
}


StreamExecutionEnvironment implements the AutoCloseable interface.

Code Block
languagejava
public class StreamExecutionEnvironment implements AutoCloseable {
    
    ...

    
    /**
    * Close and clean up the execution environment. All the
    * cached intermediate results will be released physically.
    */
    @Override
    public void close() throws Exception {
        ...
    }
}


Proposed Changes

The FLIP will leverage the Cluster Partition introduced in FLIP-67. Therefore, this FLIP should not make much change at the Flink Runtime. Most of the change should be in the translation from DataStream to JobGraph.

JobGraph Translation

We introduce a way to carry the caching information from DataStream(Transformation) to StreamGraph to JobGraph. The caching information includes whether the intermediate result of the operation should be cached and an IntermediateDataSetDescriptor to read the cached intermediate result.

The IntermediateDataSetDescriptor includes the IntermediateDataSetId and a set of ShuffleDescriptors, one for each partition.

DataStream

We expose the cache method at the DataStream API to the user. The DataStream API builds a set of transformations under the hood. Therefore, we will introduce two internal classes CacheTransformation and CacheTransformationTranslator. The CacheTransformation has one and only one PhysicalTransformation as input. It implies that the intermediate result of the input transformation should be cached or has been cached. The CacheTransforamtion contains the IntermediateDataSetID of the cache intermediate result and an optional IntermediateDataSetDescriptor that is used to read the cached intermediate result. The IntermediateDataSetDescriptor will be populated after the cache is created.

Stream Graph

When translating the CacheTransformation, if the intermediate result of the input has not been created, we will add a virtual cache stream node, CacheNode, to the StreamGraph. The CacheNode's upstream is the input of the CacheTransformation and they have the same parallelism. Its chaining strategy is set to HEAD so that it will not be chained with the upstream node and the intermediate result will be created.

If the intermediate result of the input has been created, we add a source node, CacheReaderSource, which includes the IntermediateDataSetDescriptor, to read the cached intermediate result. Its parallelism is the same as the parallelism of the StreamNode that create the cache in the previous job.

JobGraph

During JobGraph translation, multiple StreamNodes may be chaining together to form a JobVertex. While translating the StreamGraph to JobGraph, if the translator sees a virtual CacheNode, it knows the intermediate result of the input of the CacheNode should be cached. Since the chaining strategy of the virtual cache stream node is HEAD, the CacheNode will form a JobVertex and there is a JobEdge connects the upstream node and the JobVertex. The ResultPartitionType of the JobEdge is set to BLOCKING_PERSISTENT.

On other hand, if it sees the CacheReaderSource. The translator should pass the IntermediateDataSetDescriptor from the CacheReaderSource to the JobVertex.

Translation Process

In the below section, we put the above translation process together to demonstrate the translation from DataStream to JobGraph with a concret example.

Create Cache

The code snippet below is a flink job that create the cache intermediate result.

Code Block
languagepy
cached_src = env.from_source(...)
    .cache()

cached_map = cached_src.map(...)
    .cache()

cached_map.key_by(...)
    .reduce(...)
    .print()

env.execute()

Image Added

Use Cache

The code snippet below is a flink job that read the cache intermediate result created by the last job.

Code Block
languagepy
cached_src.print()

cached_map.key_by(...)
    .reduce(...)
    .print()

env.execute()

Image Added

Runtime

Execution Graph

In runtime JobGraph becomes ExecutionGraph, the ExecutionGraph is a runtime representation and it contains all the information in JobGraph. When the job is finished, it returns the IntermediateDataSetDescriptor via the JobExecutionResult.

TaskDeploymentDescriptor

During generating the TaskDeploymentDescriptor, if we found the IntermediateDataSetDescriptor in the JobVertex, we should create an InputGateDescriptor so that the task will consume the cached intermediate result.

Deployment Mode

  • Application Mode: Application mode allows submission of an application that consists of multiple jobs. In application mode, the cached intermediate result can be used by multiple jobs in the application. When the application finishes, the Flink cluster will be torn down, which will clean all the cache.
  • Per-job Mode: In Per-job Mode, a dedicated Flink cluster is created when a job is submitted and the cluster is torn down when the job is finished. As we are leveraging the Cluster Partition to cache the intermediate result in TM, the life cycle of the intermediate result is bound to the life cycle of the TM. It is not possible to reuse the intermediate across multiple jobs.
  • Session Mode: In session mode, jobs are submitted to a long-running cluster. Therefore, cache intermediate results can be used by multiple jobs in the same application using the same StreamExecutionEnvironment. Since the life cycle of the Flink session cluster is independent of the life cycle of the user application, the user application is responsible for closing the StreamExecutionEnvironment so that the cached intermediate result can be released. Otherwise, those cached intermediate result is leaked.

Failover

If a TaskManager instance fails, Flink can bring it up again. However, all the intermediate results which have a partition on the failed TM will become unavailable.

In this case, the consuming job will throw an exception and the job will fail. At the same time, PartitionTracker in ResourceManager will release all the cluster partitions that are impacted (implemented in FLIP-67). The StreamExecutionEnvironment will fall back and re-submit the original job as if the cache hasn't been created. The original job will run as an ordinary job that follows the existing recovery strategy. And the cache will be recreated after the execution of the original job. The above process is transparent to the users.

Compatibility, Deprecation, and Migration Plan

This FLIP proposes a new feature in Flink. It is fully backward compatible.

Test Plan

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

There is no rejected alternatives to be listed here yetIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.