Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As the DataStream API now supports batch execution mode, we see users using the DataStream API to run batch jobs. Interactive programming experience is an important use case of Flink batch processing. And the ability to cache intermediate results of a DataStream is crucial to the interactive programming experience. For example, a machine learning scientist may want to interactively explore a bounded data source in a notebook with pyFlink. Let's have a look at an example below:

Code Block
languagepy
sample = env.read_text_file(...)

# Print out some records for sanity check
print(sample.execute_and_collect(limit=10))

# Expensive data processing
preprocessed = sample.flat_map(...)

# Explore the distribution of positive and negetive sample
sample_cnt = preprocessed.count()
posivivepositive_cnt = preprocessed.filter(...).count()

# Explore dataset
preprocessed.keyBy(...)
	.reduce(...)
    .execute_and_collect()

...

Therefore, we propose to support caching a DataStream in Batch execution. We believe that users can benefit a lot from the change and encourage them to use DataStream API for their interactive batch processing work.

Public Interfaces

We introduce the following public API to the DataStream API. We introduce the cache method to the SingleOutputStreamOperator class.

Code Block
languagejava
public class SingleOutputStreamOperator<T> extends DataStream<T> {
    
    ...
        

     /**
     * Cache the intermediate result of the transformation. Only job running in batch mode with 
     * blocking shuffle mode can create cache. Otherwise, an exception will be thrown. The cache
     * is generated lazily at the first time the intermediate result is computed. The cache will
     * be clear when {@link CachedDataStream#invalidateCache()} called or the 
     * {@link StreamExecutionEnvironment} close.
     *
     * @return CachedDataStream that can use in later job to consumereuse the cached intermediate
     * result.
     */
	public CachedDataStream cache();
}

...

Code Block
languagejava
/**
 * {@link CachedDataStream} represents a {@link DataStream} whose intermediate result will be
 * cached at the first time when it is computed. And the cached intermediate result can be used in
 * later job that using the same {@link CachedDataStream} to avoid re-computing the intermediate
 * result.
 * 
 * @param <T> The type of the elements in this stream.
 */
public class CachedDataStream<T> extends DataStream<T> implements AutoCloseable {
    
   	/**
    * Invalidate the cache intermediate result of this DataStream to release the physical resources. Users are
    * not required to invoke this method to release physical resources unless they want to. The 
    * CachedDataStream should not be used after it is closed. Otherwise, an exception will be thrown.
    */
    @Override
    public void close() throws Exception {
        ...
    }
}

...

Code Block
languagepy
# cache the sample so that it doesn't have to read from file system in later job.
sample = env.read_text_file(...).cache()

# Print out some records for sanity check, sample is cached after the job is finished.
print(sample.execute_and_collect(limit=10))

# Read the cached sample, run the expensive data processing and cache the result.
preprocessed = sample.flat_map(...).cache()

# Check if the preprocessing produces the correct result, the preprocessed result is cached after the job is finished.
print(preprocessed.execute_and_collect(limit=10))

# Explore the distribution of positive and negetive sample, using the cached preprocessed result.
sample_cnt = preprocessed.count()
posivivepositive_cnt = preprocessed.filter(...).count()

# Explore dataset with the cached preprocessed result.
preprocessed.keyBy(...)
	.reduce(...)
    .execute_and_collect()
    
# Close the StreamExecutionEnvironment
env.close()

...

If the intermediate result of the input has been cached, we add a source node, which includes the descriptor to read the cached intermediate result. Its parallelism is the same as the parallelism of the StreamNode that creates the cache in the previous job. The transformations before the CacheTransformation is are ignored.

Stream Graph -> JobGraph

...