Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. In order to share a result, a sink must be provided.
  2. Complicated applications become inefficient due to a large amount of IO on the intermediate results.
  3. User experience is weakened for users using programing programming API (SQL users are not victims here because the temporary tables are created by the framework)

...

To address the above issues, we propose to add support for interactive programming in Flink Table API.


Public Interfaces

1. Add the following two new methods to the Flink Table class.

...

  • Table metadata - name, location, etc.
  • Table contents - the actual contents of the table

The default table service stores the metadata metadata of the cached table is stored in the client (e.g. TableEnvironmentCatalogManager) and the actual contents are stored in the Task Managers as cluster partitions(FLIP-67).

...

  1. Later on, when the client submits another job whose DAG contains a CacheOperation, the planner
    1. looks up the available intermediate results in the CatalogManager
    2. creates a Source node(CacheSource) that contains the ClusterPartitionDescriptor
    3. replace the subtree of the cached node with the source node created
    4. The CacheSource is treated as a normal source when the planner optimizes and translates the rel nodes to transformation
  2. The StreamGraphGenerator recognizes the CacheSource and , includes the ClusterPartitionDescriptor in the StreamNode, and set the operator/driver to NoOp.
  3. When the JobGraphGenerator sees the StreamNode that contains the ClusterPartitionDescriptor, it will include the ClusterPartitionDescriptor in the JobVertex and set the operator/driver to NoOp.
    1. The parallelism is set to the maximum number of subpartitions among the cluster partitions to ensure that all the subpartitions are read by the NoOp vertex
  4. The clients submit the job
  5. JobMaster does the following if the JobVertex contains the ClusterPartitionDescriptor
    1. It assumes Scheduler understands the cluster partition location
    2. Create InputGateDeploymentDescriptor with the ShuffleMaster
    3. assign the result partitions to each subtask based on locality
  6. Task managers will run the given tasks as usual

...

Please refer to FLIP-67 for the implementation detail of steps 2 and 3.

...

In the per-job mode cluster, a cluster will be spun up on every submitted job and tore down when the submitted job finished. All lingering resources (files, etc) are cleared up, including the cluster partition in the TMs. Therefore, the cached table will not work in the Per-Job mode cluster. When a job that will create some cache tables is submitted to in Per-Job mode, the returned ClusterPartitionDescriptors will be ignored so that the planner will not attempt to replace the subtree of the cache node.

Repartition is needed when the cache consumer requires the input data to be partitioned in a specific way, i.e. hash partition, custom partition. When the JobGraphGenerator generates the job graph, it introduces a NoOp job vertex as the upstream vertex of the cache consumer and maintains the shipStrategyName of the output job edge. During execution, the task executor will make sure that the data is repartitioned.

If a Task Manager instance fails, Flink will bring it up again. However, all the intermediate results which have a partition on the failed TM will become unavailable.

In this case, the consuming job will throw an exception and the job will fail. As a result, PartitionTracker in ResourceManager will release all the cluster partitions that are impacted(implemented in FLIP-67). The TableEnvironment will fell back and resubmit the original DAG without using the cache. The original DAG will run as an ordinary job that follows the existing recovery strategy. Note that because there is no cache available, the TableEnvironment (planner) will again create a Sink to cache the result that was initially cached, therefore the cache will be recreated after the execution of the original DAG.

The above process is transparent to the users.

To explain how the optimizer affects the cache node, let look at a very simple DAG, where one scan node followed by a filter node, as shown below. The optimizer can push the filter to the scan node such that the scan node will produce fewer data to the downstream node. However, such optimization will affect the result of the scan node, which is an undesired behavior if we want to cache the scan node.

Image Added

To solve the problem above, when users explicitly cache a table, we will change the DAG implicitly by adding a special sink node to the cache table, as shown below. By doing this, we are able to ensure that the result of the cache node will not be affected by the optimizer and we can identify the job vertex that produces the cache table in JobGraphGenerator by the special sink. With Blink planner, when a DAG has multiple sinks, the multi sink optimization will break the DAG into multiple RelNodeBlocks, the cache node will be the output node of one of those RelNodeBlocks. In our example, it will be broken into three RelNodeBlocks. Then, the optimizer will run independently on each of those RelNodeBlocks so that the optimizer will not affect the result of the cache node.

Image Added

In order to implement the default intermediate result storage, the following changes are needed.

  • New result partition type: BLOCKING_PERSISTENT
    • A BLOCKING_PERSISTENT result partition will not be deleted when the job exits.
  • JobMaster reports IntermediateDataSetID to ClusterPartitionDescriptor mapping to Client.
  • Add IntermediateDataSetID to StreamTransformation and Operator
  • TableEnvironment stores the Table → (IntermediateResultId, [ClusterPartitionDescriptor]) mapping
  • Client replaces the source of the cached node before optimization.
  • Client adds a sink to the node that should be cached.

Future works

The most intuitive way to use the cache table with SQL is to create a temporary view. We can introduce a new keyword CACHED and combine it with the CREATE TEMPORARY VIEW syntax. For example, the following SQL will register the CachedTable Object in the Catalog. And the later SQL statement can refer to the temporary view in order to use the cache table. Such syntax should align well with the caching in the Table API, as the following SQL has a similar effect of running tEnv.registerTemporaryView("CachedTable", table.cache()) in Table API.

Code Block
languagesql
titleSQL intergration
CREATE CACHE TEMPORARY VIEW CachedTable AS SELECT *

As of now DataStream only supports stream processing. There is some idea of supporting both Stream and Batch (as finite stream) in DataStream. Once we do that, we can add the cache API to DataStream as well.

Theoretically speaking, users can also cache a streaming table. The semantic will be storing the result somewhere (potentially with a TTL). However, caching a streaming table is usually not that useful. For simplicity, we would not support stream table caching in the first implementation. When cache() is invoked on a stream table, it will be treated as a No-Op. This leaves us room to add caching for stream tables in the future without asking users to change their code.

The cached table utilizes the cluster partition implemented in FLIP-67 to store the intermediate result in the TMs. Therefore, it is possible that the TM could run out of space to hold the intermediate result if clients cache too many tables without releasing. When it happens, the job will fail with an exception. For simplicity, we would not implement an automatic mechanism to handle such failure in the first implementation. For now, it is up to the user to decide what to do when the failure happens. In the future, we could introduce some kind of eviction policy to release some cluster partition when the failure happens and re-run the job

...

The cached table utilizes the cluster partition implemented in FLIP-67 to store the intermediate result in the TMs. Therefore, it is possible that the TM could run out of space to hold the intermediate result if clients cache too many tables without releasing. When it happens, the job will fail with an exception. For simplicity, we would not implement an automatic mechanism to handle such failure in the first implementation. It is up to the user to decide what to do when the failure happens. In the future, we could introduce some kind of eviction policy to release some cluster partition when the failure happens and re-run the job.

In the per-job mode cluster, a cluster will be spun up on every submitted job and tore down when the submitted job finished. All lingering resources (files, etc) are cleared up, including the cluster partition in the TMs. Therefore, the cached table will not work in the Per-Job mode cluster. When a job that will create some cache tables is submitted to in Per-Job mode, the returned ClusterPartitionDescriptors will be ignored so that the planner will not attempt to replace the subtree of the cache node.

Repartition is needed when the cache consumer requires the input data to be partitioned in a specific way, i.e. hash partition, custom partition. When the JobGraphGenerator generates the job graph, it introduces a NoOp job vertex as the upstream vertex of the cache consumer and maintains the shipStrategyName of the output job edge. During execution, the task executor will make sure that the data is repartitioned.

If a Task Manager instance fails, Flink will bring it up again. However, all the intermediate results which have a partition on the failed TM will become unavailable.

In this case, the consuming job will throw an exception and the job will fail. As a result, PartitionTracker in ResourceManager will release all the cluster partitions that are impacted(implemented in FLIP-67). The TableEnvironment will fell back and resubmit the original DAG without using the cache. The original DAG will run as an ordinary job that follows the existing recovery strategy. Note that because there is no cache available, the TableEnvironment (planner) will again create a Sink to cache the result that was initially cached, therefore the cache will be recreated after the execution of the original DAG.

The above process is transparent to the users.

To explain how the optimizer affects the cache node, let look at a very simple DAG, where one scan node followed by a filter node, as shown below. The optimizer can push the filter to the scan node such that the scan node will produce fewer data to the downstream node. However, such optimization will affect the result of the scan node, which is an undesired behavior if we want to cache the scan node.

Image Removed

To solve the problem above, when users explicitly cache a table, we will change the DAG implicitly by adding a special sink node to the cache table, as shown below. By doing this, we are able to ensure that the result of the cache node will not be affected by the optimizer and we can identify the job vertex that produces the cache table in JobGraphGenerator by the special sink. With Blink planner, when a DAG has multiple sinks, the multi sink optimization will break the DAG into multiple RelNodeBlocks, the cache node will be the output node of one of those RelNodeBlocks. In our example, it will be broken into three RelNodeBlocks. Then, the optimizer will run independently on each of those RelNodeBlocks so that the optimizer will not affect the result of the cache node.

Image Removed

In order to implement the default intermediate result storage, the following changes are needed.

  • New result partition type: BLOCKING_PERSISTENT
    • A BLOCKING_PERSISTENT result partition will not be deleted when the job exits.
  • JobMaster reports IntermediateDataSetID to ClusterPartitionDescriptor mapping to Client.
  • Add IntermediateDataSetID to StreamTransformation and Operator
  • TableEnvironment stores the Table → (IntermediateResultId, [ClusterPartitionDescriptor]) mapping
  • Client replaces the source of the cached node before optimization.
  • Client adds a sink to the node that should be cached.

Future works

In some cases, users may want to plugin their own cache service. In the future, we could add support for that.

Some API changes will be needed to support customized cache service. We will start another FLIP to discuss that. The change should not be much. Curious readers can read the google doc for some idea.

Auto cache allows the BLOCKING shuffle boundaries to be persisted for later usage. It further relieves the users from thinking about when to explicitly cache in the program.

As of now DataStream only supports stream processing. There is some idea of supporting both Stream and Batch (as finite stream) in DataStream. Once we do that, we can add the cache API to DataStream as well.

Theoretically speaking, users can also cache a streaming table. The semantic will be storing the result somewhere (potentially with a TTL). However, caching a streaming table is usually not that useful. For simplicity, we would not support stream table caching in the first implementation. When cache() is invoked on a stream table, it will be treated as a No-Op. This leaves us room to add caching for stream tables in the future without asking users to change their code.

Compatibility, Deprecation, and Migration Plan

...

The semantic of the cache() / invalidateCache() API has gone through extended discussions. The rejected alternative semantics are documented below:

Code Block
languagejava
titleAPI Option 1
// Do not return anything
void cache();
// Alternatively, return the original table for chaining purpose.
Table cache();
 
// Physically uncache the table to release resource
void uncache();
 
// To explicitly ignore cache, not part of this proposal but in the future
Table hint(“ingoreCache”)

...

) / invalidateCache() API has gone through extended discussions. The rejected alternative semantics are documented below:

Code Block
languagejava
titleSemantic API Option 1
TableEnvironment tEnv = ...
Table t = …
...
// cache the table
t.// Do not return anything
void cache();
// Alternatively, return the original table for chaining purpose.
Table cache();
 
// AssumePhysically someuncache executionthe physicallytable createdto therelease cache.
tEnv.executeresource
void uncache();
... 
// TheTo optimizerexplicitly decideignore whether the cache, willnot bepart used or not.
t.foo();
...
// Physically release the cache.
t.uncache();
  • Simple and intuitive, users only need to deal one variable of Table class

...

of this proposal but in the future
Table hint(“ingoreCache”)
Code Block
languagejava
title

...

Semantic Option 1

...

TableEnvironment tEnv = ...
Table t

...

 = …

...


...
// cache the table

...

t.cache();

...

// 

...

Assume some execution physically created the cache.
tEnv.execute()
...
// The optimizer decide whether the cache will be used or not.
t.foo();
...
// Physically release the cache.
t.uncache();
  • Simple and intuitive, users only need to deal one variable of Table class

  • Side effect: a table may be cached / uncached in a method invocation, while the caller does not know about this.

...

  • Code Block
    languagejava
    title

...

  • Side Effect Option

...

  • 1
    {
      Table t  = …
       // cache the table
       t.cache();

...

  •  
    

...

  •  

...

  •  

...

  •  

...

  • ...
     

...

  •  

...

  •  

...

  • // 

...

  • The 

...

  • cache 

...

  • may 

...

  • be 

...

  • released 

...

  • in 

...

  • foo()
      

...

  •  

...

  • foo(t);
    

...

...

languagejava
titleSemantic Option 2

...

  •  

...

  •  

...

  •  ...
    

...

  •  

...

  •   // cache 

...

  • is no longer available
       t.

...

  • bar()

...

  •  
    }
    

...

  •  
    

...

  •  

...

  • 
    void foo(Table t) {
       // cache the table
       t.cache();   
       ….
       // Physically 

...

  • drop the 

...

  • table
       t.uncache();

...

  • No side effect

  • Optimizer has no chance to kick in.

  • Users have to distinguish between original table / cached table.

  • Adding auto cache becomes a backward incompatible change.
     
    }


...

Code Block
languagejava
titleAPI Option 3
// Return a CacheHandle
CacheHandle cache();
 
public interface CacheHandle {
  // Physically release cache resource
  void release();
}
Code Block
languagejava
titleSemantic Option 3
TableEnvironment tEnv = ...
Table t = …
...
// cache the table
CacheHandle handle1 = t.cache();
CacheHandle handle2 = t.cache;
…
// Assume some execution physically created the cache.
tEnv.execute()
...
// Optimizer decide whether cache will be used.
t.foo();
...
// Release the first handle, cache will not be released because handle 2 has not been released
handle1.release();
// Release the second handle, the cache will be released because all the cache handle have been released.
handle2.release();
  • No side effect

  • Users only deal with the variable.

  • Easy to add auto caching.

  • The behavior of t.foo() changes after t.cache(), the concern is that this is considered as “modifies” table t, which is against the immutable principle.

...