You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussion

Discussion threadhere

JIRA Unable to render Jira issues macro, execution error.

Released: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Generally speaking, applications may consist of one or more jobs, and they may want to share the data with others. In Flink, the jobs in the same application are independent and share nothing among themselves. If a Flink application involves several sequential steps, each step (as an independent job) will have to write its intermediate results to an external sink, so that its results can be used by the following step (job) as sources.

Although functionality wise this works, this programming paradigm has a few shortcomings:

  1. In order to share a result, a sink must be provided.

  2. Complicated applications become inefficient due to large amount of IO on intermediate result.

  3. User experience is weakened for users using programing API (SQL users are not victims here because the temporary tables are created by the framework)

It turns out that interactive programming support is critical to the user experience on Flink in batch processing scenarios. The following code gives an example:

Interactive Programming in Batch Processing
Table a = ….
Table b = a.select(...).filter(...)
int c = b.max()
int d = b.min() // recompute table b from table a
Table c = b.select((‘f1 - min)/(max - min)).filter(...)
c.print() // recompute table b from table a
...
If (b.count() > 10) { // recompute table b from table a
  b.select(UDF1(...)).print()// recompute table b from table a
} else {
  b.select(UDF2(...)).print()
}

In the above code, because b is not cached, it will be computed from scratch multiple times whenever referred later in the program.

To address the above issues, we propose to add support for interactive programming in Flink Table API.

Public Interfaces

1. Add the following two new methods to the Flink Table class.

cache() and invalidateCache() API
  /**
    * Cache this table to builtin table service or the specified customized table service.
    *
    * This method provides a hint to Flink that the current table maybe reused later so a
    * cache should be created to avoid regenerating this table.
    *
    * The following code snippet gives an example of how this method could be used.
    *
    * {{{
    *   val t = tEnv.fromCollection(data).as('country, 'color, 'count)
    *
    *   val t1 = t.filter('count < 100).cache()
    *   // t1 is cached after it is computed for the first time.
    *   val x = t1.collect().size
    *
    *   // When t1 is used again to compute t2, it may not be re-computed.
    *   val t2 = t1.groupBy('country).select('country, 'count.sum as 'sum)
    *   val res2 = t2.collect()
    *   res2.foreach(println)
    *
    *   // Similarly when t1 is used again to compute t3, it may not be re-computed.
    *   val t3 = t1.groupBy('color).select('color, 'count.avg as 'avg)
    *   val res3 = t3.collect()
    *   res3.foreach(println)
    *
    * }}}
    *
    * @note Flink optimizer may decide to not use the cache if doing that will accelerate the
    * processing, or if the cache is no longer available for reasons such as the cache has
    * been invalidated.
    * @note The table cache could be create lazily. That means the cache may be created at 
    * the first time when the cached table is computed.
    * @note The table cache will be cleared when the user program exits.
    *
    * @return the current table with a cache hint. The original table reference is not modified
    *               by the execution of this method.
    */
  def cache(): Table


  /**
    * Manually invalidate the cache of this table to release the physical resources. Users are
    * not required to invoke this method to release physical resource unless they want to. The
    * table caches are cleared when user program exits.
    *
    * @note After invalidated, the cache may be re-created if this table is used again.
    */
  def invalidateCache(): Unit

2. Add a close method to the TableEnvironment

TableEnvironment.close()
/**
  * Close and clean up the table environment.
  */
void close();

Proposed Changes

As mentioned in the motivation section. The key idea of the FLIP is to allow the intermediate process results to be cached, so later references to that result does not result in duplicate computation. To achieve that, we need to introduce Cached Tables.

The cached tables are tables whose contents are saved by Flink as the user application runs. A cached Table can be created in two ways:

  • Users can call cache() method on a table to explicitly tell Flink to cache a Table.
  • The cache() method returns a new Table object with a flag set.
  • The cache() method does not execute eagerly. Instead, the table will be cached when the DAG that contains the cached table runs.
  • As the application runs, if Flink can save an intermediate result with little cost, Flink will do that, even users did not call cache() explicitly. Such case typically occurs at shuffle boundaries.
  • Auto caching will be enabled by default but could be turned off by users.

The semantic of the cache() method is a little different depending on whether auto caching is enabled or not.

  • When auto caching is enabled (default behavior)

    Auto caching is enabled (default)
    TableEnvironment tEnv = ...
    Table t1 = ...
    Table t2 = t1.cache()
    ...
    tEnv.execute() // t1 is cached.
    Table t3 = t1.select(...) // cache will be used.
    Table t4 = t2.select(...) // cache will be used.
  • When auto caching is disabled

    Auto caching is disabled
    TableEnvironment tEnv = ...
    Table t1 = ...
    Table t2 = t1.cache()
    ...
    tEnv.execute() // t1 is cached.
    Table t3 = t1.select(...) // cache will NOT be used. <----- difference
    Table t4 = t2.select(...) // cache will be used.

The cached tables are available to the user application using the same TableEnvironment.

The cached intermediate results will consume some resources and needs to be released eventually. The cached result will be released in two cases.

When TableEnvironment is closed, the resources consumed by the cached tables will also be released. This usually happens when user application exits.

Sometimes users may want to release the resource used by a cached table before the application exits. In this case, users can call invalidateCache() on a table. This will immediately release the resources used to cache that table.

Implementation Details

To let the feature available out of the box, a default file system based cache service will be provided. The architecture is illustrated below:

Each cached table consists of two pieces of information:

  • Table metadata - name, location, etc.
  • Table contents - the actual contents of the table

Default table service stores the metadata in client (e.g. TableEnvironment) and saves the actual contents in the cache service instances running inside the Task Managers.

To achieve auto caching, the cache service needs to be integrated with shuffle service. This is sort of a natural move. Inherently, external shuffle service and cache service share a lot of similarity.


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels