Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleTableEnvironment.close()
/**
  * Close and clean up the table environment.
  */
void close();

3. Add the following configuration to control whether enable automatic caching.

auto.cache.enabled

The default value is true, i.e. by default auto caching is enabled.

Proposed Changes

...

  • Users can call cache() method on a table to explicitly tell Flink to cache a Table.
  • The cache() method returns a new Table object with a flag set.
  • The cache() method does not execute eagerly. Instead, the table will be cached when the DAG that contains the cached table runs.
  • As the application runs, if Flink can save an intermediate result with little cost, Flink will do that, even users did not call cache() explicitly. Such case typically occurs at shuffle boundaries.
  • Auto caching will be enabled by default but could be turned off by users.

...

  • When auto caching is disabled

    Code Block
    languagejava
    titleAuto caching is disabled
    TableEnvironment tEnv = ...
    Table t1 = ...
    Table t2 = t1.cache()
    ...
    tEnv.execute() // t1 is cached.
    Table t3 = t1.select(...) // cache will NOT be used. <----- difference
    Table t4 = t2.select(...) // cache will be used.
    ...
    // The following two lines of code are equivalent
    t1.invalidateCache() // cache will be released
    t2.invalidateCache() // cache will be released
    ...
    t1.print() // cache will notNOT be recreated <-------- differentdifference
    t2.print() // cache will be recreated


...

Theoretically speaking, user can also cache a streaming table. The semantic will be store storing the result somewhere (potentially with a TTL). However, caching a streaming table is usually not that useful. For simplicity, we would not support stream table caching in the first implementation. When cache() is invoked on a stream table, it will be treated as a No-Op. This leaves us the room to add caching for stream tables in the future without asking users to change their code.

...

  1. When a DAG of JOB_1 is submitted, the TableEnvironment (planner) checks the DAG to see if any node should be cached. 
  2. If there is a node to be cached, the TableEnvironment (CacheServiceManager) will first start a Cache Service job (if the job has not started). To start a cache service job,
    1. The TableServiceManager CacheServiceManager opens a ServerSocket and listens on that port.
    2. The TableServiceManager CacheServiceManager submits a Cache Service job.
      1. The Cache Service job contains only source nodes which runs Cache Server code.
      2. The parallelism of the Cache Service job equals to the number of TMs so that each TM has one Cache Service instance running in it.
    3. Each Cache Service instance in the TM will open a server socket and register their HOST:PORT information back to the port opened by TableServiceManager in step 2a.
  3. The CacheServiceManager waits until it collects all the HOST:PORT pairs from the Table Service Instances.

...