Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Although the implementation details are transparent to the users, there are some related changes to make the default implementation work.

The architecture is illustrated below:

Image Removed

Each cached table consists of two pieces of information:

  • Table metadata - name, location, etc.
  • Table contents - the actual contents of the table

Default table service stores the metadata in client (e.g. TableEnvironment --> CacheServiceManager) and saves the actual contents in the cache service instances running inside the Task Managers, more specifically, in the network stack which is also used by the shuffle service.

The end to end process is the following:

Step 1: Execute JOB_1 (write cached tables)

  1. Users call table.cache(), the client
    1. adds a Sink to the cached node in the DAG. The default IntermediateResultStorage creates a BlockingShuffleSink
    2. compute an IntermediateResultId based on the RelNode DIGEST (DIGEST1 in this case)
    3. passes the IntermediateResultId created in 1c all the way from RelNode down to the Operators in the JobVertex.
    4. Set IntermediateDataSetId (currently random) to IntermediateResultId
  2. The JobGraphGenerator recognizes the BlockingShuffleSink, removes the Sink and sets the result partition type to BLOCKING_PERSISTENT.
  3. The client submits the job.
  4. Job master executes the job like usual. After the job finishes, the ShuffleMaster / ShuffleService keeps the BLOCKING_PERSISTENT result partitions instead of deleting them.
  5. After the job finishes, JobMaster reports the location of persisted ResultPartitions to the JobManager who then returns the mapping of [IntermediateDataSetID -> Locations] as a part of the JobExecutionResult to the client.
    1. A client maintains a mapping of DIGEST -> (IntermediateDataSetID, [Locations])

Step 2: Execute JOB_2 (read cached tables)

  1. Later on, when the client submits another job whose DAG contains a node of DIGEST1, the client
    1. looks up DIGEST1 in the available intermediate results.
    2. creates a Source node from IntermedateResultStorage with the location information. The default IntermediateResultStorage creates a BlockingShuffleSource
    3. replaces the node with DIGEST1 and its subtree with the source node created in 6b
  2. The JobGraphGenerator sees a BlockingShuffleSource node, replaces it with an ordinary JobVertex, sets isCacheVertex=true and adds an input edge reading from intermediate result of IRID_1.
  3. The clients submit the job.
  4. JobMaster does the following if JobVertex.isCacheVertex() returns true
    1. This assumes Scheduler understands the result partition location.
    1. Create InputGateDeploymentDescriptor (or ShuffleDeploymentDescriptor after ShuffleMaster is available).
    2. assign the result partitions to each subtask based on locality.
  5. Task managers will run the given tasks as usual.

Step 3: Clean up 

  1. When the application exits, all the Task Managers will exit and the intermediate results will be released.

Image Removed

  1. Users invoke Table.invalidateCache()
    1. Clients remove the intermediate result entry from local metadata.
  2. Clients send RPC to each TM to delete the corresponding result partition.

If a Task Manager instance fails, Flink will bring it up again. However, all the intermediate result which has a partition on the failed TM will become unavailable.

In this case, the consuming job will throw an exception and the job will fail. As a result, CacheServiceManager invalidates the caches that are impacted. The TableEnvironment will resubmit the original DAG without using the cache. Note that because there is no cache available, the TableEnvironment (planner) will again create a Sink to cache the result that was initially cached, therefore the cache will be recreated after the execution of the original DAG.

...

 

NOTE: There are a few phases to the default intermediate result storage. 


Goals

Prerequisites

Phase 1

Explicit cache, i.e. Table.cache()


Explicit cache removal, i.e. Table.invalidateCache()

RPC in TaskManager to remove result partitions.

Locality for default intermediate result storage.

Support of locality hint from ShuffleMaster

Phase 2

Pluggable external intermediate result storage.


Phase 3

Implicit cache support, i.e cache at shuffle boundaries.

Old result partition eviction mechanism for ShuffleMaster / ShuffleService

Long Term

Locality in general for external intermediate result storage and external shuffle service.

Custom locality preference mechanism in Flink

Support of sophisticated optimization (use or not use cache)

Statistics on the intermediate results.

Cross-application intermediate result sharing.

External catalog service

 The following section describes phase 1, which does not support pluggable intermediate result storage or auto caching.

The architecture is illustrated below:

Image Added

Each cached table consists of two pieces of information:

  • Table metadata - name, location, etc.
  • Table contents - the actual contents of the table

Default table service stores the metadata in client (e.g. TableEnvironment --> CacheServiceManager) and saves the actual contents in the cache service instances running inside the Task Managers, more specifically, in the network stack which is also used by the shuffle service.

The end to end process is the following:

Step 1: Execute JOB_1 (write cached tables)

  1. Users call table.cache(), the client
    1. adds a Sink to the cached node in the DAG. The default IntermediateResultStorage creates a BlockingShuffleSink
    2. compute an IntermediateResultId based on the RelNode DIGEST (DIGEST1 in this case)
    3. passes the IntermediateResultId created in 1c all the way from RelNode down to the Operators in the JobVertex.
    4. Set IntermediateDataSetId (currently random) to IntermediateResultId
  2. The JobGraphGenerator recognizes the BlockingShuffleSink, removes the Sink and sets the result partition type to BLOCKING_PERSISTENT.
  3. The client submits the job.
  4. Job master executes the job like usual. After the job finishes, the ShuffleMaster / ShuffleService keeps the BLOCKING_PERSISTENT result partitions instead of deleting them.
  5. After the job finishes, JobMaster reports the location of persisted ResultPartitions to the JobManager who then returns the mapping of [IntermediateDataSetID -> Locations] as a part of the JobExecutionResult to the client.
    1. A client maintains a mapping of DIGEST -> (IntermediateDataSetID, [Locations])

Step 2: Execute JOB_2 (read cached tables)

  1. Later on, when the client submits another job whose DAG contains a node of DIGEST1, the client
    1. looks up DIGEST1 in the available intermediate results.
    2. creates a Source node from IntermedateResultStorage with the location information. The default IntermediateResultStorage creates a BlockingShuffleSource
    3. replaces the node with DIGEST1 and its subtree with the source node created in 6b
  2. The JobGraphGenerator sees a BlockingShuffleSource node, replaces it with an ordinary JobVertex, sets isCacheVertex=true and adds an input edge reading from intermediate result of IRID_1.
  3. The clients submit the job.
  4. JobMaster does the following if JobVertex.isCacheVertex() returns true
    1. This assumes Scheduler understands the result partition location.
    1. Create InputGateDeploymentDescriptor (or ShuffleDeploymentDescriptor after ShuffleMaster is available).
    2. assign the result partitions to each subtask based on locality.
  5. Task managers will run the given tasks as usual.

Step 3: Clean up 

  1. When the application exits, all the Task Managers will exit and the intermediate results will be released.

Image Added

  1. Users invoke Table.invalidateCache()
    1. Clients remove the intermediate result entry from local metadata.
  2. Clients send RPC to each TM to delete the corresponding result partition.

If a Task Manager instance fails, Flink will bring it up again. However, all the intermediate result which has a partition on the failed TM will become unavailable.

In this case, the consuming job will throw an exception and the job will fail. As a result, CacheServiceManager invalidates the caches that are impacted. The TableEnvironment will resubmit the original DAG without using the cache. Note that because there is no cache available, the TableEnvironment (planner) will again create a Sink to cache the result that was initially cached, therefore the cache will be recreated after the execution of the original DAG.

The above process is transparent to the users.

In order to implement the default intermediate result storage, the following changes are needed.

  • New result partition type: BLOCKING_PERSISTENT
    • A BLOCKING_PERSISTENT result partition will not be deleted when job exits.
  • JobMaster executes JobGraph without Sink or Source node.
  • JobMaster reports IntermediateDataSetID to ResultPartitionDescriptor mapping to JobManager.
  • JobManager returns the DIGEST to ResultPartitionDescriptor mapping to clients in JobExecutionResult.
  • RPC call in TaskManagers to delete a ResultPartition. (As a part of FLIP-31)
  • Replace random IntermediateDatasetID with something derived from RelNode DIGEST.
  • Add IntermediateDataSetID (derived from DIGEST) to StreamTransformation and Operator
    • This is required in order to map ResultPartitionDescriptor to DIGEST.
  • Clients store the DIGEST → (IntermediateResultId, IntemediateResultDescriptor)
  • Clients replace the matching digest with a source before optimization.

When users explicitly caches a table, the DAG is implicitly changed. This is because the optimizer may decide to change the cached node if it were not explicitly cached. As of now, when a node has more than one downstream nodes in the DAG, that node will be computed multiple times. This could be solved by introducing partial optimization for subgraphs, which is a feature available in Blink but not in Flink yet. This is an existing problem in Flink and this FLIP does not attempt to address that.

...