Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As the DataStream API now supports batch execution mode, we see users using the DataStream API to run batch jobs. Interactive programming experience is critical to the user experience on Flink with an important use case of Flink batch processing. And the ability to cache intermediate results of a DataStream is crucial to interactive programming. For example, a machine learning scientist may want to interactively explore a bounded data source in a notebook with pyFlink. Let's have a look at an example below:

Code Block
languagepy
sample = env.read_text_file(...)

# Print out some records for sanity check
print(sample.execute_and_collect(limit=10))

# Expensive data processing
preprocessed = sourcesample.flat_map(...)

# Explore the distribution of positive and negetive sample
sample_cnt = preprocessed.count()
posivive_cnt = preprocessed.filter(...).count()

# Explore dataset
preprocessed.keyBy(...)
	.reduce(...)
    .execute_and_collect()

In the example above, since the preprocessed data is not cached, it is recomputed from scratch every time it is used in the later batch jobjobs. It is not only a waste of computation resource but also hurt the user experience and it gets worse as the computation graph get deeper.

Therefore, we propose to support caching a DataStream in Batch execution. We believe that users can benefit a lot from the change and encourage them to use DataStream API for their interactive batch processing work. We are also aware that many interactive programming users tend to use the high-level API, Table/SQL. Therefore, after we have the cache in DataStream, we can continue where the FLIP-36 is left and introduce the cache to the Table/SQL API, which will help a wider range of users.

Public Interfaces

There are serval key specs regarding the cache mechanism we want to introduce:

  1. Only blocking intermediate results can be cached.
  2. Only transformations that create physical operations are allowed to cache. Logical transformations, such as keyby, union, rescaling, only affect how the IR is partitioned. Caching the logical transformation is the same as if you cache the physical transformation before the logical transformation. Therefore, it doesn't make much sense to cache a logical transformation.
  3. The consumer of the cached IR should not assume how the cached IR is partitioned.
  4. Cached IR is created lazily. That means the cache may be created at the first time when the cached IR is computed.
  5. Cached IR is immutable, meaning once the IR is cached, its data cannot update.
  6. The cached IR is available to the user application using the same StreamExecutionEnvironment. And the life cycle of the cached IR is bound to the StreamExecutionEnvironment.

From the spec above, we want to introduce the We introduce the following public API to the DataStream API. We introduce the cache method to the SingleOutputStreamOperator  class.

Code Block
languagejava
public class SingleOutputStreamOperator<T> extends DataStream<T> {
    
    ...
        

     /**
     * Cache the intermediate result of the transformation. Only job running in batch mode with 
     * blocking shuffle mode can create cache. Otherwise, an exception will be thrown. The cache
     * is generated lazily at the first time the intermediate result is computed. The cache will
     * be clear when {@link CachedDataStream#invalidateCache()} called or the 
     * {@link StreamExecutionEnvironment} close.
     *
     * @return CachedDataStream that can use in later job to consume the cached intermediate
     * result.
     */
	public CachedDataStream cache();
}

...