Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Add IntermediateDataSetID to StreamTransformation and Operator
  • TableEnvironment stores the Table → (IntermediateResultId, [ClusterPartitionDescriptor]) mapping
  • Client replaces the source of the cached node before optimization.
  • Client adds a sink to the node that should be cached.

...

To explain how the optimizer affect affects the cache node, let look at a very simple DAG, where there is only one scan node followed by a projection filter node. The optimizer can push the projection filter to the scan node such that the scan node can read fewer data from the sourcewill produce less data to the downstream node. However, such optimization will affect the result of the scan node, which is an undesired behavior if we want to cache the scan node.

To solve the problem above, when users explicitly cache a table, we will change the DAG implicitly by adding a special sink node to the cache table. By doing this, we are able to ensure that the result of the cache node will not be affected by the optimizer so that and we can identify the job vertex that produces the cache table in JobGraphGenerator by the special sink. With Blink planner, when a DAG has a node(cache node in our case) with multiple sinks, the multi sink optimization will break the DAG into multiple RelNodeBlocks, the cache node will be the output node of one of those RelNodeBlocks. Then, the optimizer will run independently on each of those RelNodeBlocks so that the optimizer will not affect the result of the cache node.

...