Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public class StreamExecutionEnvironment implements AutoCloseable {
    
    ...

    
    /**
    * Close and clean up the execution environment. All the
    * cached intermediate results will be released physically.
    */
    @Override
    public void close() throws Exception {
        ...
    }
}

Example Usage

In this section, we show how the example in Motivation can use the provided API.

Code Block
languagepy
# cache the sample so that it doesn't have to read from file system in later job.
sample = env.read_text_file(...).cache()

# Print out some records for sanity check, sample is cached after the job finished.
print(sample.execute_and_collect(limit=10))

# Read the cached sample, run the expensive data processing and cache the result.
preprocessed = sample.flat_map(...).cache()

# Check if the preprocessing produce the correct result, the preprocessed result is cached after the job finished.
print(preprocessed.execute_and_collect(limit=10))

# Explore the distribution of positive and negetive sample, using the cached preprocessed result.
sample_cnt = preprocessed.count()
posivive_cnt = preprocessed.filter(...).count()

# Explore dataset with the cached preprocessed result.
preprocessed.keyBy(...)
	.reduce(...)
    .execute_and_collect()
    
# Close the StreamExecutionEnvironment
env.close()

In the above example, since the user cache the sample and the preprocessed result, the sample is only read once from the file system and the expensive preprocessing only runs once.

Proposed Changes

The FLIP will leverage the Cluster Partition introduced in FLIP-67. Therefore, this FLIP should not make much change at the Flink Runtime. Most of the change should be in the translation from DataStream to JobGraph.

...