Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • When auto caching is enabled (default behavior)

    Code Block
    languagejava
    titleAuto caching is enabled (default)
    TableEnvironment tEnv = ...
    Table t1 = ...
    Table t2 = t1.cache()
    ...
    tEnv.execute() // t1 is cached.
    Table t3 = t1.select(...) // cache will be used.
    Table t4 = t2.select(...) // cache will be used.
  • When auto caching is disabled

    Code Block
    languagejava
    titleAuto caching is disabled
    TableEnvironment tEnv =
  • 
    ...
    
  • Table t1 = ... Table t2 = t1.cache() ... tEnv.execute
  • // The following two lines of code are equivalent
    t1.invalidateCache() // 
  • t1
  • cache 
  • is cached. Table t3 = t1.select(...
  • will be released
    t2.invalidateCache() // cache will
  • NOT
  •  be 
  • used. <----- difference Table t4 = t2.select(...
  • released
    ...
    // The following two lines of code are equivalent.
    t1.select() // cache will be 
  • used.

The cached tables are available to the user application running in the same Flink cluster using the same TableEnvironment.

The cached intermediate results will consume some resources and needs to be released eventually. The cached result will be released in two cases.

When TableEnvironment is closed, the resources consumed by the cached tables will also be released. This usually happens when user application exits.

Sometimes users may want to release the resource used by a cached table before the application exits. In this case, users can call invalidateCache() on a table. This will immediately release the resources used to cache that table.

Implementation Details

To let the feature available out of the box, a default file system based cache service will be provided. This section describes the implementation details of the default table service.

Although the implementation details are transparent to the users, there are some changes to Flink internals to make it work.

The architecture is illustrated below:

Image Removed

Each cached table consists of two pieces of information:

  • Table metadata - name, location, etc.
  • Table contents - the actual contents of the table

Default table service stores the metadata in client (e.g. TableEnvironment) and saves the actual contents in the cache service instances running inside the Task Managers.

The end to end process is the following:

  1. When a DAG of JOB_1 is submitted, the TableEnvironment (planner) checks the DAG to see if any node should be cached. 
  2. If there is a node to be cached, the TableEnvironment (CacheServiceManager) will first start a Cache Service job (if the job has not started). To start a cache service job,
    1. The TableServiceManager opens a ServerSocket and listens on that port.
    2. The TableServiceManager submits a Cache Service job.
      1. The Cache Service job contains only source nodes which runs Cache Server code.
      2. The parallelism of the Cache Service job equals to the number of TMs so that each TM has one Cache Service instance running in it.
    3. Each Cache Service instance in the TM will open a server socket and register their HOST:PORT information back to the port opened by TableServiceManager in step 2a.
  3. The CacheServiceManager waits until it collects all the HOST:PORT pairs from the Table Service Instances.
  4. The TableEnvironment adds a CacheServiceSink to each cached node (a node with a corresponding Cached Table) in the DAG.
    1. The CacheServiceSink acts as a normal sink, as if user called writeToSink(CacheServiceSink).
    2. Each cached node will be assigned a intermediate result UUID. The UUID is used to tag the when the CacheServiceSink writes the intermediate result to the Cache Service instances.
    3. Since there will be multiple Cache Service instances, the intermediate result will be partitioned and stored in a distributed manner.
  5. The TableEnvironment submits the modified DAG to the Flink cluster.
  6. After the JOB_1 finishes, the CacheServiceManager stores the [LogicalNode → intermediate result UUID] in memory.
  7. Later on, if JOB_2 is submitted and the DAG contains a node that has already been cached (a mapping to intermediate result UUID is found), TableEnvironment replaces the node and the connected ancestor nodes with a CacheServiceSource.
    1. The CacheServiceSource has the [HOST:PORT] pairs of all the Cache Service instances.
    2. The CacheServiceSource will be reading from the Cache Service instances using the UUID corresponding to the cached intermediate results.
  8. When invalidateCache() is invoked, the CacheServiceManager will directly make an RPC call to each Cache Service instance to delete the corresponding intermediate result. It will also remove the LogicalNode to intermedate result UUID mapping.
  9. When the application exits, the CacheServiceManager will be closed on TableEnvironment exits. It will stop the Cache Service job, so all the cached tables will be released.

If a Cache Service instance fails, since Cache Service is a Flink job. The failed subtask will be brought up again. However, all the intermediate result which has a partition on the failed subtask will become unavailable.

In this case, the CacheServiceSource / CacheServiceSink will throw an exception and the job will fail. As a result, CacheServiceManager invalidates the caches that are impacted. The TableEnvironment will resubmit the original DAG without using the cache. Note that because there is no cache available, the TableEnvironment (planner) will add a CacheServiceSink to the cached nodes, therefore the cache will be recreated after the execution of the original DAG.

The above process is transparent to the users.

When users explicitly caches a table, the DAG is implicitly changed. This is because the optimizer may decide to change the cached node if it were not explicitly cached. As of now, when a node has more than one downstream nodes in the DAG, that node will be computed multiple times. This could be solved by introducing partial optimization for subgraphs, which is a feature available in Blink but not in Flink yet. This is an existing problem in Flink and this FLIP does not attempt to address that. 

To achieve auto caching, the cache service needs to be integrated with shuffle service. This is sort of a natural move. Inherently, external shuffle service and cache service share a lot of similarity.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

  • recreated
    t2.select() // cache will be recreated


  • When auto caching is disabled

    Code Block
    languagejava
    titleAuto caching is disabled
    TableEnvironment tEnv = ...
    Table t1 = ...
    Table t2 = t1.cache()
    ...
    tEnv.execute() // t1 is cached.
    Table t3 = t1.select(...) // cache will NOT be used. <----- difference
    Table t4 = t2.select(...) // cache will be used.
    ...
    // The following two lines of code are equivalent
    t1.invalidateCache() // cache will be released
    t2.invalidateCache() // cache will be released
    ...
    // The following two lines of code are equivalent.
    t1.print() // cache will not be recreated
    t2.print() // cache will be recreated


The cached tables are available to the user application using the same TableEnvironment.

The cached intermediate results will consume some resources and needs to be released eventually. The cached result will be released in two cases.

When TableEnvironment is closed, the resources consumed by the cached tables will also be released. This usually happens when user application exits.

Sometimes users may want to release the resource used by a cached table before the application exits. In this case, users can call invalidateCache() on a table. This will immediately release the resources used to cache that table.

In some rare cases, users may want to explicitly ignore the cached intermediate result. In this case, user needs to give an explicit hint, such as:



However, caching a streaming table is usually not that useful. For simplicity, we would not support caching a streaming table in the first implementation. When cache() is invoked on a stream table, it will be treated as a No-Op. This leaves us the room to add caching for streaming tables in the future without asking users to change their code.

Implementation Details

To let the feature available out of the box, a default file system based cache service will be provided. This section describes the implementation details of the default table service.

Although the implementation details are transparent to the users, there are some changes to Flink internals to make it work.

The architecture is illustrated below:

Image Added

Each cached table consists of two pieces of information:

  • Table metadata - name, location, etc.
  • Table contents - the actual contents of the table

Default table service stores the metadata in client (e.g. TableEnvironment) and saves the actual contents in the cache service instances running inside the Task Managers.

The end to end process is the following:

Step 1: Start Cache Service

  1. When a DAG of JOB_1 is submitted, the TableEnvironment (planner) checks the DAG to see if any node should be cached. 
  2. If there is a node to be cached, the TableEnvironment (CacheServiceManager) will first start a Cache Service job (if the job has not started). To start a cache service job,
    1. The TableServiceManager opens a ServerSocket and listens on that port.
    2. The TableServiceManager submits a Cache Service job.
      1. The Cache Service job contains only source nodes which runs Cache Server code.
      2. The parallelism of the Cache Service job equals to the number of TMs so that each TM has one Cache Service instance running in it.
    3. Each Cache Service instance in the TM will open a server socket and register their HOST:PORT information back to the port opened by TableServiceManager in step 2a.
  3. The CacheServiceManager waits until it collects all the HOST:PORT pairs from the Table Service Instances.

Step 2: Execute JOB_1 (write cached tables)

  1. The TableEnvironment adds a CacheServiceSink to each cached node (a node with a corresponding Cached Table) in the DAG of JOB_1.
    1. The CacheServiceSink acts as a normal sink, as if user called writeToSink(CacheServiceSink).
    2. Each cached node will be assigned a intermediate result UUID. The UUID is used to tag the when the CacheServiceSink writes the intermediate result to the Cache Service instances.
    3. Since there will be multiple Cache Service instances, the intermediate result will be partitioned and stored in a distributed manner.
  2. The TableEnvironment submits the modified DAG to the Flink cluster.
  3. After the JOB_1 finishes, the CacheServiceManager stores the [LogicalNode → intermediate result UUID] in memory.

Step 3: Execute JOB_2 (read cached tables)

  1. Later on, if JOB_2 is submitted and the DAG contains a node that has already been cached (a mapping to intermediate result UUID is found), TableEnvironment replaces the node and the connected ancestor nodes with a CacheServiceSource.
    1. The CacheServiceSource has the [HOST:PORT] pairs of all the Cache Service instances.
    2. The CacheServiceSource will be reading from the Cache Service instances using the UUID corresponding to the cached intermediate results.
  2. When invalidateCache() is invoked, the CacheServiceManager will directly make an RPC call to each Cache Service instance to delete the corresponding intermediate result. It will also remove the LogicalNode to intermedate result UUID mapping.

Step 4: Clean up 

  1. When the application exits, the TableEnvironment will be closed.
  2. The CacheServiceManager will be closed when TableEnvironment is closed. It will stop the Cache Service job. 
  3. All the cached tables will be released when the Cache Service job stops.

If a Cache Service instance fails, since Cache Service is a Flink job. The failed subtask will be brought up again. However, all the intermediate result which has a partition on the failed subtask will become unavailable.

In this case, the CacheServiceSource / CacheServiceSink will throw an exception and the job will fail. As a result, CacheServiceManager invalidates the caches that are impacted. The TableEnvironment will resubmit the original DAG without using the cache. Note that because there is no cache available, the TableEnvironment (planner) will add a CacheServiceSink to the cached nodes, therefore the cache will be recreated after the execution of the original DAG.

The above process is transparent to the users.

When users explicitly caches a table, the DAG is implicitly changed. This is because the optimizer may decide to change the cached node if it were not explicitly cached. As of now, when a node has more than one downstream nodes in the DAG, that node will be computed multiple times. This could be solved by introducing partial optimization for subgraphs, which is a feature available in Blink but not in Flink yet. This is an existing problem in Flink and this FLIP does not attempt to address that.

We propose to run the cache service instances using a Flink Job. Doing this has a few benefits:

  1. Leaving the cache service to user space decouples it from the Flink runtime.
  2. Flink job is a powerful primitive in Flink. This means the cache service can reuse many existing mechanism
    1. resource specifications
    2. monitoring framework
    3. lifecycle management
    4. state management
    5. fault tolerance 

Integration with external shuffle service (FLIP-31)

To achieve auto caching, the cache service needs to be integrated with shuffle service. This is sort of a natural move. Inherently, external shuffle service and cache service share a lot of similarities:


Lifecycle

Access Pattern

Persistency

Mutability

PartitionedRequire SubPartitions

Shuffle Service

cross-stages

Scan

volatile

immutable

YesYes

Cache Service

cross-jobs

Scan

volatile

immutable

YesNo

With a few extensions, the default cache service could also serve as the default external shuffle service. The architecture would look like something below:

Image Added

The cache service will serve as a storage for both cache and shuffle data. This means both the storage tier and the RPC protocol will support SubPartitions.

Compatibility, Deprecation, and Migration Plan

This FLIP proposes a new feature in Flink. It is fully backwards compatible.

Test Plan

Unit tests and Integration Tests will be added to test the proposed functionalities.

Rejected Alternatives

The semantic of the cache() / invalidateCache() API has gone through extended discussions. The rejected alternative semantics are documented below:

Code Block
languagejava
titleSemantic Option 1
// Do not return anything
void cache();
// Alternatively, return the original table for chaining purpose.
Table cache();
 
// Physically uncache the table to release resource
void uncache();
 
// To explicitly ignore cache, not part of this proposal but in the future
Table hint(“ingoreCache”)
Code Block
languagejava
titleSemantic Option 1
TableEnvironment tEnv = ...
Table t = …
...
// cache the table
t.cache();
…
// Assume some execution physically created the cache.
tEnv.execute()
...
// The optimizer decide whether the cache will be used or not.
t.foo();
...
// Physically release the cache.
t.uncache();
  • Simple and intuitive, users only need to deal one variable of Table class

  • Side effect: a table may be cached / uncached in a method invocation, while the caller does not know about this.

    Code Block
    languagejava
    titleSemantic Option 1
    {
      Table t  = …
       // cache the table
       t.cache(); 
       ...
       // The cache may be released in foo()
       foo(t);
       ...
       // cache is no longer available
       t.bar() 
    }
     
     
    void foo(Table t) {
       // cache the table
       t.cache();   
       ….
       // Physically drop the table
       t.uncache(); 
    }


Code Block
languagejava
titleSemantic Option 2
// Return CachedTable
CachedTable cache();
 
public interface CachedTable extends Table {
  // Physically uncache the table to release resource
  void uncache();
}
Code Block
languagejava
titleSemantic Option 2
TableEnvironment tEnv = ...
Table t = …
...
// cache the table
CachedTable cachedT = t.cache();
…
// Assume some execution physically created the cache.
tEnv.execute()
...
// Use the original DAG.
t.foo();
...
// Use cache
cachedT.bar()
...
// Physically release the cache.
t.uncache();
  • No side effect

  • Optimizer has no chance to kick in.

  • Users have to distinguish between original table / cached table.

  • Adding auto cache becomes a backward incompatible change.

Code Block
languagejava
titleSemantic Option 3
// Return a CacheHandle
CacheHandle cache();
 
public interface CacheHandle {
  // Physically release cache resource
  void release();
}
Code Block
languagejava
titleSemantic Option 2
TableEnvironment tEnv = ...
Table t = …
...
// cache the table
CacheHandle handle1 = t.cache();
CacheHandle handle2 = t.cache;
…
// Assume some execution physically created the cache.
tEnv.execute()
...
// Optimizer decide whether cache will be used.
t.foo();
...
// Release the first handle, cache will not be released because handle 2 has not been released
handle1.release();
// Release the second handle, the cache will be released because all the cache handle have been released.
handle2.release();
  • No side effect

  • Users only deal with the variable.

  • Easy to add auto caching.

  • The behavior of t.foo() changes after t.cache(), the concern is that this is considered as “modifies” table t, which is against the immutable principle.