Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Users call table.cache(), the client
    1. adds a Sink to the cached node in the DAG. By adding the Sink to the cached node, we can make sure that the optimizer would not affect the cached node. The default IntermediateResultStorage creates a BlockingShuffleSinkCacheSink.
    2. generate an IntermediateResultId
    3. passes the IntermediateResultId created all the way from RelNode down to the Operators in the JobVertex
    4. Set IntermediateDataSetId to IntermediateResultId
  2. The JobGraphGenerator recognizes the BlockingShuffleSinkCacheSink, removes the Sink and sets the result partition type of the producer to BLOCKING_PERSISTENT
  3. The client submits the job
  4. JobMaster executes the job like usual. After the job finishes, the TaskExecutor promotes the BLOCKING_PERSISTENT result partitions to cluster partitions instead of releasing them (Implemented in FLIP-67)
  5. After the job finishes, JobMaster reports the information of the cluster partition (ClusterPartitionDescriptor) back to the client in form of a mapping of [IntermediateDataSetId -> [ClusterPartitionDescriptor]]
    1. The ClusterPartitionDescriptor should include a ShuffleDescriptor and some metadata, i.e. numberOfSubpartitions and partitionType
    2. The ClusterPartitionDescriptor will be serialized before sending back to the client and only be deserialized in the JobGraphGenerator
    3. The table environment on client maintain the mapping of CachedTable -> (IntermediateResultId, [ClusterPartitionDescriptor])

...

  1. Later on, when the client submits another job whose DAG contains a cached node, the client
    1. looks up the available intermediate results
    2. creates a Source node(BlockingShuffleSourceCacheSource) that contains the ClusterPartitionDescriptor
    3. replace the subtree of the cached node with the source node created
  2. The JobGraphGenerator sees a BlockingShuffleSource CacheSource node, sets its downstream node’s cluster partition input and sets the operator/driver to NoOp. It will then remove itself from the JobGraph
    1. The parallelism is set to the maximum number of subpartitions among the cluster partitions. It ensures that all the subpartitions are read by the NoOp vertex
    2. The shipStrategyName field in the output edge of the NoOp vertex contains information about how the record should be partitioned
  3. The clients submit the job
  4. JobMaster does the following if the JobVertex has cluster partition input set 
    1. It assumes Scheduler understands the cluster partition location
    2. Create InputGateDeploymentDescriptor with the ShuffleMaster
    3. assign the result partitions to each subtask based on locality
  5. Task managers will run the given tasks as usual

...

To explain how the optimizer affects the cache node, let look at a very simple DAG, where one scan node followed by a filter node, as shown below. The optimizer can push the filter to the scan node such that the scan node will produce less fewer data to the downstream node. However, such optimization will affect the result of the scan node, which is an undesired behavior if we want to cache the scan node.

Image Added

To solve the problem above, when users explicitly cache a table, we will change the DAG implicitly by adding a special sink node to the cache table, as shown below. By doing this, we are able to ensure that the result of the cache node will not be affected by the optimizer and we can identify the job vertex that produces the cache table in JobGraphGenerator by the special sink. With Blink planner, when a DAG has a node(cache node in our case) with multiple sinks, the multi sink optimization will break the DAG into multiple RelNodeBlocks, the cache node will be the output node of one of those RelNodeBlocks. In our example, it will be broken into three RelNodeBlocks. Then, the optimizer will run independently on each of those RelNodeBlocks so that the optimizer will not affect the result of the cache node.

Image Added

Future works

...