Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titlecache() and invalidateCache() API
  /**
    * Cache this table to builtin table service or the specified customized table service.
    *
    * This method provides a hint to Flink that the current table maybe reused later so a
    * cache should be created to avoid regenerating this table.
    *
    * The following code snippet gives an example of how this method could be used.
    *
    * {{{
    *   Table t = tEnv.fromCollection(data).as('country, 'color, 'count)
    *
    *   CachedTable t1 = t.filter('count < 100).cache()
    *   // t1 is cached after it is computed for the first time.
    *   t1.execute().print()
    *
    *   // When t1 is used again to compute t2, it may not be re-computed.
    *   Table t2 = t1.groupBy('country).select('country, 'count.sum as 'sum)
    *   t2.execute().print()
    *
    *   // Similarly when t1 is used again to compute t3, it may not be re-computed.
    *   Table t3 = t1.groupBy('color).select('color, 'count.avg as 'avg)
    *   t3.execute().print()
    *
    * }}}
    *
    * @note Flink optimizer may decide to not use the cache if doing that will accelerate the
    * processing, or if the cache is no longer available for reasons such as the cache has
    * been invalidated.
    * @note The table cache could be create lazily. That means the cache may be created at 
    * the first time when the cached table is computed.
    * @note The table cache will be cleared when the user program exits.
	* @note This method is only supported in batch table and it is treated as No-Op for stream table
    *
    * @return the current table with a cache hint. The original table reference is not modified
    *               by the execution of this method. If this method is called on a table with cache 
	*               hint, the same table object will return.
    */
   TableCachedTable cache();

2. Add a CachedTable interface extend Table

...

  1. Users call table.cache(), the client
    1. Wrap the operation of the table with CacheOperation. The CacheOpeartion contains a generated IntermediateResultId
    2. While translating the operations to rel nodes, the planner adds a CacheSink, which contains the IntermediateResultId, to the cached node in the DAG. By adding a sink to the cached node, we can make sure that the optimizer would not affect the cached node
    3. The CacheSink is treated as a normal sink when the planner optimizes and translates the rel nodes to transformation
  2. When the Executor(StreamGraphGenerator) translates the transformation to stream graph, it recognizes the CacheSink, removes the CacheSink, and sets the cache flag of the upstream node.
  3. If the JobGraphGenerator StreamingJobGraphGenerator see the node with the cache flag set, it sets the result partition type to BLOCKING_PERSISTENT
  4. The client submits the job
  5. JobMaster executes the job as usual. After the job finishes, the TaskExecutor promotes the BLOCKING_PERSISTENT result partitions to cluster partitions instead of releasing them (Implemented in FLIP-67)
  6. After the job finishes, JobMaster reports the information of the cluster partition (ClusterPartitionDescriptor) back to the client in form of a mapping of [IntermediateDataSetId -> [ClusterPartitionDescriptor]]
    1. The ClusterPartitionDescriptor should include a ShuffleDescriptor and some metadata, i.e. numberOfSubpartitions and partitionType
    2. The ClusterPartitionDescriptor will be serialized before sending back to the client via JobResult and only be deserialized in the JobGraphGeneratorStreamingJobGraphGenerator
    3. The CatalogManager in the table environment maintain the mapping of CachedTable -> (IntermediateResultId, [ClusterPartitionDescriptor])

...

  1. Later on, when the client submits another job whose DAG contains a CacheOperation, the planner
    1. looks up the available intermediate results in the CatalogManager
    2. creates a Source node(CacheSource) that contains the ClusterPartitionDescriptor
    3. replace the subtree of the cached node with the source node created
    4. The CacheSource is treated as a normal source when the planner optimizes and translates the rel nodes to transformation
  2. The StreamGraphGenerator recognizes the CacheSource, includes the ClusterPartitionDescriptor in the StreamNode, and set sets the operator/driver to NoOp.
  3. When the JobGraphGenerator StreamingJobGraphGenerator sees the StreamNode that contains the ClusterPartitionDescriptor, it will include the ClusterPartitionDescriptor in the JobVertex.
    1. The parallelism is set to the maximum number of subpartitions among the cluster partitions to ensure that all the subpartitions are read by the NoOp vertex
  4. The clients submit the job
  5. JobMaster does the following if the JobVertex contains the ClusterPartitionDescriptor
    1. It assumes Scheduler understands the cluster partition location
    2. Create InputGateDeploymentDescriptor with the ShuffleMaster
    3. assign the result partitions to each subtask based on locality
  6. Task managers will run the given tasks as usual

...

Please refer to FLIP-67 for the implementation detail of steps 2 and 3.

In the per-job mode cluster, a cluster will be spun up on every submitted job and tore down when the submitted job finished. All lingering resources (files, etc) are cleared up, including the cluster partition in the TMs. Therefore, the cached table will not work in the Per-Job mode cluster. When a job that will create some cache tables is submitted to in Per-Job mode, the returned ClusterPartitionDescriptors will be ignored so that the planner will not attempt to replace the subtree of the cache node.

Repartition is needed when the cache consumer requires the input data to be partitioned in a specific way, i.e. hash partition, custom partition. When the JobGraphGenerator generates the job graph, it introduces a NoOp job vertex as the upstream vertex of the cache consumer and maintains the shipStrategyName of the output job edge. During execution, the task executor will make sure that the data is repartitioned.

If a Task Manager instance fails, Flink will bring it up again. However, all the intermediate results which have a partition on the failed TM will become unavailable.

In this case, the consuming job will throw an exception and the job will fail. As a result, PartitionTracker in ResourceManager will release all the cluster partitions that are impacted(implemented in FLIP-67). The TableEnvironment will fell back and resubmit the original DAG without using the cache. The original DAG will run as an ordinary job that follows the existing recovery strategy. Note that because there is no cache available, the TableEnvironment (planner) will again create a Sink to cache the result that was initially cached, therefore the cache will be recreated after the execution of the original DAG.

The above process is transparent to the users.

To explain how the optimizer affects the cache node, let look at a very simple DAG, where one scan node followed by a filter node, as shown below. The optimizer can push the filter to the scan node such that the scan node will produce fewer data to the downstream node. However, such optimization will affect the result of the scan node, which is an undesired behavior if we want to cache the scan node.

Image Removed

To solve the problem above, when users explicitly cache a table, we will change the DAG implicitly by adding a special sink node to the cache table, as shown below. By doing this, we are able to ensure that the result of the cache node will not be affected by the optimizer and we can identify the job vertex that produces the cache table in JobGraphGenerator by the special sink. With Blink planner, when a DAG has multiple sinks, the multi sink optimization will break the DAG into multiple RelNodeBlocks, the cache node will be the output node of one of those RelNodeBlocks. In our example, it will be broken into three RelNodeBlocks. Then, the optimizer will run independently on each of those RelNodeBlocks so that the optimizer will not affect the result of the cache node.

Image Removed

In order to implement the default intermediate result storage, the following changes are needed.

  • New result partition type: BLOCKING_PERSISTENT
    • A BLOCKING_PERSISTENT result partition will not be deleted when the job exits.
  • JobMaster reports IntermediateDataSetID to ClusterPartitionDescriptor mapping to Client.
  • Add IntermediateDataSetID to StreamTransformation and Operator
  • TableEnvironment stores the Table → (IntermediateResultId, [ClusterPartitionDescriptor]) mapping
  • Client replaces the source of the cached node before optimization.
  • Client adds a sink to the node that should be cached.

The ClusterPartitionDescriptor should have the necessary information to let another job to be able to consume the intermediate result. It includes the ShuffleDescriptor, along with some metadata about the intermediate result, i.e., numberOfSubpartitions, partitionType, and the IntermediateDataSetID. Since it contains the runtime class ShuffleDescriptor, it should not put into the flink-core module. Instead, it will get serialized before transfer back to the client-side, and only get deserialized in the StreamingJobGraphGenerator. We can put a ClusterPartitionDescriptor interface in the flink-core, and keep the implementation of the ClusterPartitionDescriptor in the flink-runtime. The JobResult contains the SerializedValue<ClusterPartitionDescriptor> will get sent back to the client-side.

In the per-job mode cluster, a cluster will be spun up on every submitted job and tore down when the submitted job finished. All lingering resources (files, etc) are cleared up, including the cluster partition in the TMs. Therefore, the cached table will not work in the Per-Job mode cluster. When a job that will create some cache tables is submitted to in Per-Job mode, the returned ClusterPartitionDescriptors will be ignored so that the planner will not attempt to replace the subtree of the cache node.

Repartition is needed when the cache consumer requires the input data to be partitioned in a specific way, i.e. hash partition, custom partition. When the StreamingJobGraphGenerator generates the job graph, it introduces a NoOp job vertex as the upstream vertex of the cache consumer and maintains the shipStrategyName of the output job edge. During execution, the task executor will make sure that the data is repartitioned.

If a Task Manager instance fails, Flink will bring it up again. However, all the intermediate results which have a partition on the failed TM will become unavailable.

In this case, the consuming job will throw an exception and the job will fail. As a result, PartitionTracker in ResourceManager will release all the cluster partitions that are impacted(implemented in FLIP-67). The TableEnvironment will fell back and resubmit the original DAG without using the cache. The original DAG will run as an ordinary job that follows the existing recovery strategy. Note that because there is no cache available, the TableEnvironment (planner) will again create a Sink to cache the result that was initially cached, therefore the cache will be recreated after the execution of the original DAG.

The above process is transparent to the users.

To explain how the optimizer affects the cache node, let look at a very simple DAG, where one scan node followed by a filter node, as shown below. The optimizer can push the filter to the scan node such that the scan node will produce fewer data to the downstream node. However, such optimization will affect the result of the scan node, which is an undesired behavior if we want to cache the scan node.

Image Added

To solve the problem above, when users explicitly cache a table, we will change the DAG implicitly by adding a special sink node to the cache table, as shown below. By doing this, we are able to ensure that the result of the cache node will not be affected by the optimizer and we can identify the job vertex that produces the cache table in StreamingJobGraphGenerator by the special sink. With Blink planner, when a DAG has multiple sinks, the multi sink optimization will break the DAG into multiple RelNodeBlocks, the cache node will be the output node of one of those RelNodeBlocks. In our example, it will be broken into three RelNodeBlocks. Then, the optimizer will run independently on each of those RelNodeBlocks so that the optimizer will not affect the result of the cache node.

Image Added

Future works

...

As of now DataStream only supports stream processing. There is some idea of supporting both Stream and Batch (as finite stream) in DataStream. Once we do that, we can add the cache API to DataStream as well.

Theoretically speaking, users can also cache a streaming table. The semantic will be storing the result somewhere (potentially with a TTL). However, caching a streaming table is usually not that useful. For simplicity, we would not support stream table caching in the first implementation. When cache() is invoked on a stream table, it will be treated as a No-Op. This leaves us room to add caching for stream tables in the future without asking users to change their code.

The cached table utilizes the cluster partition implemented in FLIP-67 to store the intermediate result in the TMs. Therefore, it is possible that the TM could run out of space to hold the intermediate result if clients cache too many tables without releasing. When it happens, the job will fail with an exception. For simplicity, we would not implement an automatic mechanism to handle such failure in the first implementation. For now, it is up to the user to decide what to do when the failure happens. In the future, we could introduce some kind of eviction policy to release some cluster partition when the failure happens and re-run the job.

Compatibility, Deprecation, and Migration Plan

...

do that, we can add the cache API to DataStream as well.

Theoretically speaking, users can also cache a streaming table. The semantic will be storing the result somewhere (potentially with a TTL). However, caching a streaming table is usually not that useful. For simplicity, we would not support stream table caching in the first implementation. When cache() is invoked on a stream table, it will be treated as a No-Op. This leaves us room to add caching for stream tables in the future without asking users to change their code.

The cached table utilizes the cluster partition implemented in FLIP-67 to store the intermediate result in the TMs. Therefore, it is possible that the TM could run out of space to hold the intermediate result if clients cache too many tables without releasing. When it happens, the job will fail with an exception. For simplicity, we would not implement an automatic mechanism to handle such failure in the first implementation. For now, it is up to the user to decide what to do when the failure happens. In the future, we could introduce some kind of eviction policy to release some cluster partition when the failure happens and re-run the job.

Compatibility, Deprecation, and Migration Plan

This FLIP proposes a new feature in Flink. It is fully backward compatible.

Implementation Plan

In order to implement the cache table, the following changes are needed.

  • Add IntermediateDataSetID to StreamTransformation and Operator
  • CatalogManager stores the Table → (IntermediateResultId, [ClusterPartitionDescriptor]) mapping
  • Planner replaces the source of the cached node before optimization
  • Planner adds a sink to the node that should be cached
  • StreamGraphGenerator modifies the stream graph and sets the flag of the cache node accordingly
  • StreamingJobGraphGenerator sets the result partition type and ClusterPartitionDescriptor of the JobVertex.
  • JobMaster reports IntermediateDataSetID to ClusterPartitionDescriptor mapping to Client.
  • JobMaster can generate InputGateDeploymentDescriptor base on the ClusterPartitionDescriptor in the JobVertex.

The implementation should proceed in the following way, some of which can proceed concurrently:

  1. Implementation in Runtime
    1. The ClusterPartitionDescriptors can generated and put into the JobResult.
    2. By using the ClusterPartitionDescriptor in step 1.a, we can execute a Job that reads from the cluster partition.
  2. Implementation in StreamGraphGenerator and StreamingJobGraphGenerator
    1. Given the CacheSouce and CacheSInk, the JobGraph generated by the StreamingJobGraphGenerator has the ClusterPartitionDescriptor or have the ResultPartitionType set correctly.
  3. Implementation in Planner
    1. Given an Operation Tree with CacheOperation, it can replace the cache node with CacheSource accordingly or add a CacheSink to the cache node.
  4. Implementation in TableEnvironment/Table
    1. Implement the public-facing classes.
    2. CatalogManager can store the ClusterPartitionDescriptor of the cached node.

Test Plan

Unit tests and Integration Tests will be added to test the proposed functionalities.

...