Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The cached tables are available to the user application running in the same Flink cluster using the same TableEnvironment.

...

Sometimes users may want to release the resource used by a cached table before the application exits. In this case, users can call invalidateCache() on a table. This will immediately release the resources used to cache that table.

Implementation Details

...

To let the feature available out of the box, a default file system based cache service will be providedThis section describes the implementation details of the default table service.

Although the implementation details are transparent to the users, there are some changes to Flink internals to make it work.

The architecture is illustrated below:

...

Default table service stores the metadata in client (e.g. TableEnvironment) and saves the actual contents in the cache service instances running inside the Task Managers.

The end to end process is the following:

  1. When a DAG of JOB_1 is submitted, the TableEnvironment (planner) checks the DAG to see if any node should be cached. 
  2. If there is a node to be cached, the TableEnvironment (CacheServiceManager) will first start a Cache Service job (if the job has not started). To start a cache service job,
    1. The TableServiceManager opens a ServerSocket and listens on that port.
    2. The TableServiceManager submits a Cache Service job.
      1. The Cache Service job contains only source nodes which runs Cache Server code.
      2. The parallelism of the Cache Service job equals to the number of TMs so that each TM has one Cache Service instance running in it.
    3. Each Cache Service instance in the TM will open a server socket and register their HOST:PORT information back to the port opened by TableServiceManager in step 2a.
  3. The CacheServiceManager waits until it collects all the HOST:PORT pairs from the Table Service Instances.
  4. The TableEnvironment adds a CacheServiceSink to each cached node (a node with a corresponding Cached Table) in the DAG.
    1. The CacheServiceSink acts as a normal sink, as if user called writeToSink(CacheServiceSink).
    2. Each cached node will be assigned a intermediate result UUID. The UUID is used to tag the when the CacheServiceSink writes the intermediate result to the Cache Service instances.
    3. Since there will be multiple Cache Service instances, the intermediate result will be partitioned and stored in a distributed manner.
  5. The TableEnvironment submits the modified DAG to the Flink cluster.
  6. After the JOB_1 finishes, the CacheServiceManager stores the [LogicalNode → intermediate result UUID] in memory.
  7. Later on, if JOB_2 is submitted and the DAG contains a node that has already been cached (a mapping to intermediate result UUID is found), TableEnvironment replaces the node and the connected ancestor nodes with a CacheServiceSource.
    1. The CacheServiceSource has the [HOST:PORT] pairs of all the Cache Service instances.
    2. The CacheServiceSource will be reading from the Cache Service instances using the UUID corresponding to the cached intermediate results.
  8. When invalidateCache() is invoked, the CacheServiceManager will directly make an RPC call to each Cache Service instance to delete the corresponding intermediate result. It will also remove the LogicalNode to intermedate result UUID mapping.
  9. When the application exits, the CacheServiceManager will be closed on TableEnvironment exits. It will stop the Cache Service job, so all the cached tables will be released.

If a Cache Service instance fails, since Cache Service is a Flink job. The failed subtask will be brought up again. However, all the intermediate result which has a partition on the failed subtask will become unavailable.

In this case, the CacheServiceSource / CacheServiceSink will throw an exception and the job will fail. As a result, CacheServiceManager invalidates the caches that are impacted. The TableEnvironment will resubmit the original DAG without using the cache. Note that because there is no cache available, the TableEnvironment (planner) will add a CacheServiceSink to the cached nodes, therefore the cache will be recreated after the execution of the original DAG.

The above process is transparent to the users.

When users explicitly caches a table, the DAG is implicitly changed. This is because the optimizer may decide to change the cached node if it were not explicitly cached. As of now, when a node has more than one downstream nodes in the DAG, that node will be computed multiple times. This could be solved by introducing partial optimization for subgraphs, which is a feature available in Blink but not in Flink yet. This is an existing problem in Flink and this FLIP does not attempt to address that. 

...