Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Generally speaking, applications applications may consist of one or more jobs, and they may want to share the data with others. In Flink, the jobs in the same application are independent and share nothing among themselves. If a Flink application involves several sequential steps, each step (as an independent job) will have to write its intermediate results to an external sink, so that its results can be used by the following step (job) as sources.

Although functionality-wise this works, this programming paradigm has a few shortcomings:

  1. In order to share a result, a sink must be provided.
  2. Complicated applications become inefficient due to a large amount of IO on the intermediate
  3. result
  4. results.
  5. User experience is weakened for users using programing API (SQL users are not victims here because the temporary tables are created by the framework)

It turns out that interactive programming support is critical to the user experience on Flink in batch processing scenarios. The following code gives an example:

Code Block
languagejava
titleInteractive Programming in Batch Processing
Table a = ….
Table b = a.select(...).filter(...)
int c = b.max()
int d = b.min() // recompute table b from table a
Table e = b.select("(‘f1f1 - min)/(max - min)").filter(...)
e.print() // recompute table b from table a
...
If (b.count() > 10) { // recompute table b from table a
  b.select(UDF1(...)).print()// recompute table b from table a
} else {
  b.select(UDF2(...)).print()
}

In the above code, because b is not cached, it will be computed from scratch multiple times whenever referred later in the program.

To address the above issues, we propose to add support for interactive programming  in in Flink Table API.


Public Interfaces

1. Add the following two new methods to the Flink Table class.

Code Block
languagejava
titlecache() and invalidateCache() API
  /**
    * Cache this table to builtin table service or the specified customized table service.
    *
    * This method provides a hint to Flink that the current table maybe reused later so a
    * cache should be created to avoid regenerating this table.
    *
    * The following code snippet gives an example of how this method could be used.
    *
    * {{{
    *   val t = tEnv.fromCollection(data).as('country, 'color, 'count)
    *
    *   val t1 = t.filter('count < 100).cache()
    *   // t1 is cached after it is computed for the first time.
    *   val x = t1.collect().size
    *
    *   // When t1 is used again to compute t2, it may not be re-computed.
    *   val t2 = t1.groupBy('country).select('country, 'count.sum as 'sum)
    *   val res2 = t2.collect()
    *   res2.foreach(println)
    *
    *   // Similarly when t1 is used again to compute t3, it may not be re-computed.
    *   val t3 = t1.groupBy('color).select('color, 'count.avg as 'avg)
    *   val res3 = t3.collect()
    *   res3.foreach(println)
    *
    * }}}
    *
    * @note Flink optimizer may decide to not use the cache if doing that will accelerate the
    * processing, or if the cache is no longer available for reasons such as the cache has
    * been invalidated.
    * @note The table cache could be create lazily. That means the cache may be created at 
    * the first time when the cached table is computed.
    * @note The table cache will be cleared when the user program exits.
	* @note This method is only supported in batch table and it is treated as No-Op for stream table
    *
    * @return the current table with a cache hint. The original table reference is not modified
    *               by the execution of this method.
 If this method */
is called def cache(): on a table with cache 
	*               hint, the same table object will return.
    */
  def cache(): Table


  /**
    * Manually invalidate the cache of this table to release the physical resources. Users are
    * not required to invoke this method to release physical resource unless they want to. The
    * table caches are cleared when user program exits.
    *
    * @note After invalidated, the cache may be re-created if this table is used again.
    */
  def invalidateCache(): Unit

...

Code Block
languagejava
titleTableEnvironment.close()
/**
  * Close and clean up the table environment. All the
  */
void close();

3. Add the following configuration to control whether enable automatic caching.

auto.caching.enabled

...

 table cache should be released physically.
  */
void close();


Proposed Changes

As mentioned in the motivation section. The key idea of the FLIP is to allow the intermediate process results to be cached, so later references to that result does not result in duplicate computation. To achieve that, we need to introduce Cached Tables.

The cached tables are tables whose contents are saved by Flink as the user application runs. A cached Table can be created in two ways:

  • Users can call cache() method on a table to explicitly tell Flink to cache a Table.
  • The cache() method returns a new Table object with a flag set.
  • The cache() method does not execute eagerly. Instead, the table will be cached when the DAG that contains the cached table runs.

...

The semantic of the

...

  • As the application runs, if Flink can save an intermediate result with little cost, Flink will do that, even users did not call cache() explicitly. Such case typically occurs at shuffle boundaries.
  • Auto caching will be enabled by default but could be turned off by users.

The semantic of the cache() method is a little different depending on whether auto caching is enabled or not.

When auto caching is enabled (default behavior)

Code Block
languagejava

...

TableEnvironment tEnv = ...
Table t1 = ...
Table t2 = t1.cache()
...
tEnv.execute() // t1 is cached.
Table t3 = t1.select(...) // cache will NOT be used.
Table t4 = t2.select(...) // cache will be used.
...
// The following two lines of code are equivalent
t1.invalidateCache() // cache will be released
t2.invalidateCache() // cache will be released
..

...

.
t1.

...

print() // cache will NOT be recreated
t2.

...

print() // cache will be recreated

When auto caching is disabled

Code Block
languagejava
titleAuto caching is disabled
TableEnvironment tEnv = ...
Table t1 = ...
Table t2 = t1.cache()
...
tEnv.execute() // t1 is cached.
Table t3 = t1.select(...) // cache will NOT be used. <----- difference
Table t4 = t2.select(...) // cache will be used.
...
// The following two lines of code are equivalent
t1.invalidateCache() // cache will be released
t2.invalidateCache() // cache will be released
...
t1.print() // cache will NOT be recreated <-------- difference
t2.print() // cache will be recreated

The cached tables are available to the user application using the same TableEnvironment.

The cached intermediate results will consume some resources and needs to be released eventually. The cached result will be released in two cases.

When TableEnvironment is closed, the resources consumed by the cached tables will also be released. This usually happens when user application exits.

Sometimes users may want to release the resource used by a cached table before the application exits. In this case, users can call invalidateCache() on a table. This will immediately release the resources used to cache that table.

In some rare cases, users may want to explicitly ignore the cached intermediate result. In this case, user needs to give an explicit hint, such as:

table.hint("ignoreCache")

Right now Flink does not have a hint mechanism yet. So before such a hint mechanism is available. Users are not able to explicitly ignore a cached intermediate result.

Theoretically speaking, user can also cache a streaming table. The semantic will be storing the result somewhere (potentially with a TTL). However, caching a streaming table is usually not that useful. For simplicity, we would not support stream table caching in the first implementation. When cache() is invoked on a stream table, it will be treated as a No-Op. This leaves us room to add caching for stream tables in the future without asking users to change their code.

Implementation Details

To let the feature available out of the box, a default file system based cache service will be provided. This section describes the implementation details of the default table service.

Although the implementation details are transparent to the users, there are some related changes to make the default implementation work. 

NOTE: There are a few phases to the default intermediate result storage. 

...

Goals

...

Prerequisites

...

Phase 1

...

Explicit cache, i.e. Table.cache()

...

Explicit cache removal, i.e. Table.invalidateCache()

...

RPC in TaskManager to remove result partitions.

...

Locality for default intermediate result storage.

...

Support of locality hint from ShuffleMaster

...

Phase 2

...

Pluggable external intermediate result storage.

...

Phase 3

...

Auto cache support, i.e cache at shuffle boundaries.

...

Old result partition eviction mechanism for ShuffleMaster / ShuffleService

...

Long Term

...

Locality in general for external intermediate result storage and external shuffle service.

...

Custom locality preference mechanism in Flink

...

Support of sophisticated optimization (use or not use cache)

...

Statistics on the intermediate results.

...

Cross-application intermediate result sharing.

...

External catalog service

 The following section describes phase 1, which does not support pluggable intermediate result storage or auto caching.

The architecture is illustrated below:

Image Removed

Each cached table consists of two pieces of information:

  • Table metadata - name, location, etc.
  • Table contents - the actual contents of the table

Default table service stores the metadata in client (e.g. TableEnvironment --> CacheServiceManager) and saves the actual contents in the cache service instances running inside the Task Managers, more specifically, in the network stack which is also used by the shuffle service.

The end to end process is the following:

Step 1: Execute JOB_1 (write cached tables)

  1. Users call table.cache(), the client
    1. adds a Sink to the cached node in the DAG. The default IntermediateResultStorage creates a BlockingShuffleSink
    2. compute an IntermediateResultId based on the RelNode DIGEST (DIGEST1 in this case)
    3. passes the IntermediateResultId created in 1c all the way from RelNode down to the Operators in the JobVertex.
    4. Set IntermediateDataSetId (currently random) to IntermediateResultId
  2. The JobGraphGenerator recognizes the BlockingShuffleSink, removes the Sink and sets the result partition type to BLOCKING_PERSISTENT.
  3. The client submits the job.
  4. Job master executes the job like usual. After the job finishes, the ShuffleMaster / ShuffleService keeps the BLOCKING_PERSISTENT result partitions instead of deleting them.
  5. After the job finishes, JobMaster reports the location of persisted ResultPartitions to the JobManager who then returns the mapping of [IntermediateDataSetID -> Locations] as a part of the JobExecutionResult to the client.
    1. A client maintains a mapping of DIGEST -> (IntermediateDataSetID, [Locations])

Step 2: Execute JOB_2 (read cached tables)

  1. Later on, when the client submits another job whose DAG contains a node of DIGEST1, the client
    1. looks up DIGEST1 in the available intermediate results.
    2. creates a Source node from IntermedateResultStorage with the location information. The default IntermediateResultStorage creates a BlockingShuffleSource
    3. replaces the node with DIGEST1 and its subtree with the source node created in 6b
  2. The JobGraphGenerator sees a BlockingShuffleSource node, replaces it with an ordinary JobVertex, sets isCacheVertex=true and adds an input edge reading from intermediate result of IRID_1.
  3. The clients submit the job.
  4. JobMaster does the following if JobVertex.isCacheVertex() returns true
    1. This assumes Scheduler understands the result partition location.
    1. Create InputGateDeploymentDescriptor (or ShuffleDeploymentDescriptor after ShuffleMaster is available).
    2. assign the result partitions to each subtask based on locality.
  5. Task managers will run the given tasks as usual.

Step 3: Clean up 

  1. When the application exits, all the Task Managers will exit and the intermediate results will be released.

Image Removed

...

  1. Clients remove the intermediate result entry from local metadata.

...

The cached tables are available to the user application using the same TableEnvironment.

The cached intermediate results will consume some resources and need to be released eventually. The cached result will be released in two cases.

When TableEnvironment is closed, the resources consumed by the cached tables will also be released. This usually happens when user application exits.

Sometimes users may want to release the resource used by a cached table before the application exits. In this case, users can call invalidateCache() on a table. This will immediately release the resources used to cache that table.

In some rare cases, users may want to explicitly ignore the cached intermediate result. In this case, users need to give an explicit hint, such as:

table.hint("ignoreCache")

Right now Flink does not have a hint mechanism yet. So before such a hint mechanism is available. Users are not able to explicitly ignore a cached intermediate result.

Theoretically speaking, users can also cache a streaming table. The semantic will be storing the result somewhere (potentially with a TTL). However, caching a streaming table is usually not that useful. For simplicity, we would not support stream table caching in the first implementation. When cache() is invoked on a stream table, it will be treated as a No-Op. This leaves us room to add caching for stream tables in the future without asking users to change their code.

Implementation Details

To let the feature available out of the box, a default file system based cache service will be provided, which utilizes the cluster partition implemented in FLIP-67. This section describes the implementation details of the default table service.

Although the implementation details are transparent to the users, there are some related changes to make the default implementation work.

The architecture is illustrated below:

Image Added

Each cached table consists of two pieces of information:

  • Table metadata - name, location, etc.
  • Table contents - the actual contents of the table

The default table service stores the metadata in the client (e.g. TableEnvironment) and the actual contents are stored in the Task Managers as cluster partitions(FLIP-67).

The end to end process is the following:


Step 1: Execute JOB_1 (write cached tables)

  1. Users call table.cache(), the client
    1. adds a Sink to the cached node in the DAG. The default IntermediateResultStorage creates a BlockingShuffleSink
    2. generate an IntermediateResultId
    3. passes the IntermediateResultId created all the way from RelNode down to the Operators in the JobVertex
    4. Set IntermediateDataSetId to IntermediateResultId
  2. The JobGraphGenerator recognizes the BlockingShuffleSink, removes the Sink and sets the result partition type of the producer to BLOCKING_PERSISTENT
  3. The client submits the job
  4. JobMaster executes the job like usual. After the job finishes, the TaskExecutor promotes the BLOCKING_PERSISTENT result partitions to cluster partitions instead of releasing them (Implemented in FLIP-67)
  5. After the job finishes, JobMaster reports the information of the cluster partition (ClusterPartitionDescriptor) back to the client in form of a mapping of [IntermediateDataSetId -> [ClusterPartitionDescriptor]]
    1. The ClusterPartitionDescriptor should include a ShuffleDescriptor and some metadata, i.e. numberOfSubpartitions and partitionType
    2. The ClusterPartitionDescriptor will be serialized before sending back to the client and only be deserialized in the JobGraphGenerator
    3. The table environment on client maintain the mapping of CachedTable -> (IntermediateResultId, [ClusterPartitionDescriptor])

Step 2: Execute JOB_2 (read cached tables)

  1. Later on, when the client submits another job whose DAG contains a cached node, the client
    1. looks up the available intermediate results
    2. creates a Source node(BlockingShuffleSource) that contains the ClusterPartitionDescriptor
    3. replace the subtree of the cached node with the source node created
  2. The JobGraphGenerator sees a BlockingShuffleSource node, sets its downstream node’s cluster partition input and sets the operator/driver to NoOp. It will then remove itself from the JobGraph
    1. The parallelism is set to the maximum number of subpartitions among the cluster partitions. It ensures that all the subpartitions are read by the NoOp vertex
    2. The shipStrategyName field in the output edge of the NoOp vertex contains information about how the record should be partitioned
  3. The clients submit the job
  4. JobMaster does the following if the JobVertex has cluster partition input set 
    1. It assumes Scheduler understands the cluster partition location
    2. Create InputGateDeploymentDescriptor with the ShuffleMaster
    3. assign the result partitions to each subtask based on locality
  5. Task managers will run the given tasks as usual

Clean up 

  1. When the application exits, all the Task Managers will exit and the intermediate results will be released.
  1. Users invoke Table.invalidateCache()
    1. Clients remove the intermediate result entry in the TableEnvironment.
  2. Clients delete the corresponding cluster partitions with the REST API provided in FLIP-67.
  3. The cluster partitions will then be released by the Task Managers that hold the cluster partitions.

Please refer to FLIP-67 for the implementation detail of steps 2 and 3.

Repartition is needed when the cache consumer requires the input data to be partitioned in a specific way, i.e. hash partition, custom partition. When the JobGraphGenerator generates the job graph, it introduces a NoOp job vertex as the upstream vertex of the cache consumer and maintains the shipStrategyName of the output job edge. During execution, the task executor will make sure that the data is repartitioned

...

.

If a Task Manager instance fails, Flink will bring it up again. However, all the intermediate result results which has have a partition on the failed TM will become unavailable.

In this case, the consuming job will throw an exception and the job will fail. As a result, CacheServiceManager invalidates the caches PartitionTracker in ResourceManager will release all the cluster partitions that are impacted(implemented in FLIP-67). The TableEnvironment will resubmit the original DAG without using the cache. Note that because there is no cache available, the TableEnvironment (planner) will again create a Sink to cache the result that was initially cached, therefore the cache will be recreated after the execution of the original DAG.

The above process is transparent to the users.

Users can specify the maximum number of retries before giving up by the following configuration.

cache.retries.max

When the retries maximum is reached, an exception is thrown and it is up to the users to handle the failure.

In order to implement the default intermediate result storage, the following changes are needed.

...

  • New result partition type: BLOCKING_PERSISTENT
  • node
    • A BLOCKING_PERSISTENT result partition will not be deleted when the job exits.
    JobMaster executes JobGraph without Sink or Source
    • .
  • JobMaster reports IntermediateDataSetID to ResultPartitionDescriptor mapping to JobManager.
  • JobManager returns the DIGEST to ResultPartitionDescriptor mapping to clients in JobExecutionResult.
  • RPC call in TaskManagers to delete a ResultPartition. (As a part of FLIP-31)
  • ClusterPartitionDescriptor mapping to Client.
  • Add IntermediateDataSetID
  • Replace random IntermediateDatasetID with something derived from RelNode DIGEST.
  • Add IntermediateDataSetID (derived from DIGEST) to StreamTransformation and Operator
    • This is required in order to map ResultPartitionDescriptor to DIGEST.
  • Clients store the DIGEST TableEnvironment stores the Table → (IntermediateResultId, IntemediateResultDescriptor[ClusterPartitionDescriptor]) mapping
  • Client replaces the source of the cached node before optimization.
  • Client adds a sink to the node that should be cachedClients replace the matching digest with a source before optimization.

When users explicitly caches cache a table, the DAG is implicitly changed. This is because the optimizer may decide to change the cached node if it were not explicitly cached. As of now, when a node has more than one downstream nodes node in the DAG, that node will be computed multiple times. This could be solved by introducing partial optimization for subgraphs, which is a feature available in Blink but not in Flink yet.  This This is an existing problem in Flink and this FLIP does not attempt to address that.

Integration with external shuffle service (FLIP-31)

To achieve auto caching, the cache service needs to be integrated with shuffle service. This is sort of a natural move. Inherently, external shuffle service and cache service share a lot of similarities:

...

Lifecycle

...

Access Pattern

...

Persistency

...

Mutability

...

to address that.

Future works

NOTE: There are a few phases to the default intermediate result storage. 


Goals

Prerequisites

Phase 1

Explicit cache, i.e. Table.cache()


Explicit cache removal, i.e. Table.invalidateCache()

RPC in TaskManager to remove result partitions.

Locality for default intermediate result storage.

Support of locality hint from ShuffleMaster

Phase 2

Pluggable external intermediate result storage.


Phase 3

Auto cache support, i.e cache at shuffle boundaries.

Old result partition eviction mechanism for ShuffleMaster / ShuffleService

Long Term

Locality in general for external intermediate result storage and external shuffle service.

Custom locality preference mechanism in Flink

Support of sophisticated optimization (use or not use cache)

Statistics on the intermediate results.

Cross-application intermediate result sharing.

External catalog service

This FLIP is focusing on phase 1. We will have new FLIPs for Phase 2 and Phase 3 in the future.

...

Shuffle Service

...

cross-stages

...

Scan

...

volatile

...

immutable

...

Cache Service

...

cross-jobs

...

Scan

...

volatile

...

immutable

...

Future works

...

As of now DataStream only supports stream processing. There is some idea of supporting both Stream and Batch (as finite stream) in DataStream. Once we do that, we can add the cache API to DataStream as well.

Compatibility, Deprecation, and Migration Plan

This FLIP proposes a new feature in Flink. It is fully backwards compatible.

...