Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Users call table.cache(), the client
    1. adds a Sink to the cached node in the DAG. By adding the Sink to the cached node, we can make sure that the optimizer would not affect the cached node. The default IntermediateResultStorage creates a BlockingShuffleSink.
    2. generate an IntermediateResultId
    3. passes the IntermediateResultId created all the way from RelNode down to the Operators in the JobVertex
    4. Set IntermediateDataSetId to IntermediateResultId
  2. The JobGraphGenerator recognizes the BlockingShuffleSink, removes the Sink and sets the result partition type of the producer to BLOCKING_PERSISTENT
  3. The client submits the job
  4. JobMaster executes the job like usual. After the job finishes, the TaskExecutor promotes the BLOCKING_PERSISTENT result partitions to cluster partitions instead of releasing them (Implemented in FLIP-67)
  5. After the job finishes, JobMaster reports the information of the cluster partition (ClusterPartitionDescriptor) back to the client in form of a mapping of [IntermediateDataSetId -> [ClusterPartitionDescriptor]]
    1. The ClusterPartitionDescriptor should include a ShuffleDescriptor and some metadata, i.e. numberOfSubpartitions and partitionType
    2. The ClusterPartitionDescriptor will be serialized before sending back to the client and only be deserialized in the JobGraphGenerator
    3. The table environment on client maintain the mapping of CachedTable -> (IntermediateResultId, [ClusterPartitionDescriptor])

...

  • Add IntermediateDataSetID to StreamTransformation and Operator
  • TableEnvironment stores the Table → (IntermediateResultId, [ClusterPartitionDescriptor]) mapping
  • Client replaces the source of the cached node before optimization.
  • Client adds a sink to the node that should be cached.

To explain how the optimizer affect the cache node, let look at a very simple DAG, where there is only one scan node followed by a projection node. The optimizer can push the projection to the scan node such that the scan node can read fewer data from the source. However, such optimization will affect the result of the scan node, which is an undesired behavior if we want to cache the scan node.

To solve the problem above, when users explicitly cache a table, we will change the DAG implicitly by adding a special sink node to the cache table. By doing this, we are able to ensure that the result of the cache node will not be affected by the optimizer so that we can identify the job vertex that produces the cache table in JobGraphGenerator by the special sink. With Blink planner, when a DAG has a node(cache node in our case) with multiple sinks, the multi sink optimization will break the DAG into multiple RelNodeBlocks, the cache node will be the output node of one of those RelNodeBlocks. Then, the optimizer will run independently on each of those RelNodeBlocks so that the optimizer will not affect the result of the cache nodeWhen users explicitly cache a table, the DAG is implicitly changed. This is because the optimizer may decide to change the cached node if it were not explicitly cached. As of now, when a node has more than one downstream node in the DAG, that node will be computed multiple times. This could be solved by introducing partial optimization for subgraphs, which is a feature available in Blink but not in Flink yet. This is an existing problem in Flink and this FLIP does not attempt to address that.

Future works

...