Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion

...

threadhttps://

...

...

...

...

gtvjl293s7mbm48h1nd47bhv4oqqjto5
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-27521

Release1.16

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

We introduce the cache method to the SingleOutputStreamOperator class. In order to allow caching the side output of the SingleOutputStreamOperator, we let the getSideOutput method returns a SingleOutputStreamOperator .

Code Block
languagejava
public class SingleOutputStreamOperator<T> extends DataStream<T> {
    
    ...
        

     /**
     * Cache the intermediate result of the transformation. Only job running in batch mode with 
     * blocking shuffle mode can create cache. Otherwise, an exception will be thrown. The cache
     * is generated lazily at the first time the intermediate result is computed. The cache will
     * be clear when {@link CachedDataStream#invalidateCache()} called or the 
     * {@link StreamExecutionEnvironment} close.
     *
     * @return CachedDataStream that can use in later job to reuse the cached intermediate
     * result.
     */
	public CachedDataStream cache() {
		...		
	}

    public <X> SingleOutputStreamOperator<X> getSideOutput();OutputTag<X> sideOutputTag) {
		...
    }
}

We introduce CacheDataStream that extends the DataStream and implements the AutoCloseable interface.

...

When translating the CacheTransformation, if the intermediate result of the input has not been cached, a stream node with the sample same parallelism as its input is added to the StreamGraph. Its chaining strategy is set to HEAD so that it will not be chained with the upstream node and the intermediate result will be created.

...

During JobGraph translation, multiple StreamNodes may be chaining chained together to form a JobVertex. While translating the StreamGraph to JobGraph, if the translator sees a node whose intermediate result should be cached, it sets the ResultPartitionType of the JobEdge to BLOCKING_PERSISTENT.

...

  • Application Mode: Application mode allows submission of an application that consists of multiple jobs. In application mode, the cached intermediate result can be used by multiple jobs in the application. When the application finishes, the Flink cluster will be torn down, which will clean all the cache.
  • Per-job Mode: In Per-job Mode, a dedicated Flink cluster is created when a job is submitted and the cluster is torn down when the job is finished. As we are leveraging the Cluster Partition to cache the intermediate result in TM, the life cycle of the intermediate result is bound to the life cycle of the TM. It is not possible to reuse the intermediate across multiple jobs.
  • Session Mode: In session mode, jobs are submitted to a long-running cluster. Therefore, cache intermediate results can be used by multiple jobs in the same application using the same StreamExecutionEnvironment. Since the life cycle of the Flink session cluster is independent of the life cycle of the user application, the user application is responsible for closing the StreamExecutionEnvironment so that the cached intermediate result results can be released. Otherwise, those cached intermediate result is results are leaked.

Failover

If a TaskManager instance fails , Flink can bring it up again. Howeverand the partitions are stored at TM, all the intermediate results which have a partition on the failed TM will become unavailable. If the partitions are not stored at TM, e.g. remote shuffle service is used, TM failure will not affect the intermediate results. However, if some partitions are lost at the remote shuffle service, the intermediate result will become unavailable.

In both casesIn this case, the consuming job will throw an exception and the job will fail. At the same time, PartitionTracker in ResourceManager will release all the cluster partitions that are impacted (implemented in FLIP-67). The StreamExecutionEnvironment will fall back and re-submit the original job as if the cache hasn't been created. The original job will run as an ordinary job that follows the existing recovery strategy. And the cache will be recreated after the execution of the original job. The above process is transparent to the users.

...