Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleInteractive Programming in Batch Processing
Table a = ….
Table b = a.select(...).filter(...)
int c = b.max()
int d = b.min() // recompute table b from table a
Table ce = b.select((‘f1 - min)/(max - min)).filter(...)
ce.print() // recompute table b from table a
...
If (b.count() > 10) { // recompute table b from table a
  b.select(UDF1(...)).print()// recompute table b from table a
} else {
  b.select(UDF2(...)).print()
}

...

Although the implementation details are transparent to the users, there are some related changes to Flink internals to make it make the default implementation work.

The architecture is illustrated below:

Image RemovedImage Added

Each cached table consists of two pieces of information:

...

Default table service stores the metadata in client (e.g. TableEnvironment --> CacheServiceManager) and saves the actual contents in the cache service instances running inside the Task Managers, more specifically, in the network stack which is also used by the shuffle service.

The end to end process is the following:

Step 1: Start Cache Service

...

Execute JOB_1

...

  1. The CacheServiceManager opens a ServerSocket and listens on that port.
  2. The CacheServiceManager submits a Cache Service job.
    1. The Cache Service job contains only source nodes which runs Cache Server code.
    2. The parallelism of the Cache Service job equals to the number of TMs so that each TM has one Cache Service instance running in it.
  3. Each Cache Service instance in the TM will open a server socket and register their HOST:PORT information back to the port opened by TableServiceManager in step 2a.

...

(write cached tables)

  1. Users call table.cache(), the client
    1. adds a Sink to the cached node in the DAG. The default IntermediateResultStorage creates a BlockingShuffleSink
    2. compute an IntermediateResultId based on the RelNode DIGEST (DIGEST1 in this case)
    3. passes the IntermediateResultId created in 1c all the way from RelNode down to the Operators in the JobVertex.
    4. Set IntermediateDataSetId (currently random) to IntermediateResultId
  2. The JobGraphGenerator recognizes the BlockingShuffleSink, removes the Sink and sets the result partition type to BLOCKING_PERSISTENT.
  3. The client submits the job.
  4. Job master executes the job like usual. After the job finishes, the ShuffleMaster / ShuffleService keeps the BLOCKING_PERSISTENT result partitions instead of deleting them.
  5. After the job finishes, JobMaster reports the location of persisted ResultPartitions to the JobManager who then returns the mapping of [IntermediateDataSetID -> Locations] as a part of the JobExecutionResult to the client.
    1. A client maintains a mapping of DIGEST -> (IntermediateDataSetID, [Locations])

Step 2

Step 2: Execute JOB_1 (write cached tables)

  1. The TableEnvironment adds a CacheServiceSink to each cached node (a node with a corresponding Cached Table) in the DAG of JOB_1.
    1. The CacheServiceSink acts as a normal sink, as if user called writeToSink(CacheServiceSink).
    2. Each cached node will be assigned a intermediate result UUID. The UUID is used to tag the when the CacheServiceSink writes the intermediate result to the Cache Service instances.
    3. Since there will be multiple Cache Service instances, the intermediate result will be partitioned and stored in a distributed manner.
  2. The TableEnvironment submits the modified DAG to the Flink cluster.
  3. After the JOB_1 finishes, the CacheServiceManager stores the [LogicalNode → intermediate result UUID] in memory.

Step 3: Execute JOB_2 (read cached tables)

  1. Later on, if JOB_2 is submitted and the DAG contains a node that has already been cached (a mapping to intermediate result UUID is found), TableEnvironment replaces the node and the connected ancestor nodes with a CacheServiceSource.
    1. The CacheServiceSource has the [HOST:PORT] pairs of all the Cache Service instances.
    2. The CacheServiceSource will be reading from the Cache Service instances using the UUID corresponding to the cached intermediate results.
  2. When invalidateCache() is invoked, the CacheServiceManager will directly make an RPC call to each Cache Service instance to delete the corresponding intermediate result. It will also remove the LogicalNode to intermedate result UUID mapping.
  3. when the client submits another job whose DAG contains a node of DIGEST1, the client
    1. looks up DIGEST1 in the available intermediate results.
    2. creates a Source node from IntermedateResultStorage with the location information. The default IntermediateResultStorage creates a BlockingShuffleSource
    3. replaces the node with DIGEST1 and its subtree with the source node created in 6b
  4. The JobGraphGenerator sees a BlockingShuffleSource node, replaces it with an ordinary JobVertex, sets isCacheVertex=true and adds an input edge reading from intermediate result of IRID_1.
  5. The clients submit the job.
  6. JobMaster does the following if JobVertex.isCacheVertex() returns true
    1. This assumes Scheduler understands the result partition location.
    1. Create InputGateDeploymentDescriptor (or ShuffleDeploymentDescriptor after ShuffleMaster is available).
    2. assign the result partitions to each subtask based on locality.
  7. Task managers will run the given tasks as usual.

Step 3Step 4: Clean up 

  1. When the application exits, the TableEnvironment all the Task Managers will exit and the intermediate results will be closed.
  2. The CacheServiceManager will be closed when TableEnvironment is closed. It will stop the Cache Service job. 
  3. released.

Image Added

  1. Users invoke Table.invalidateCache()
    1. Clients remove the intermediate result entry from local metadata.
  2. Clients send RPC to each TM to delete the corresponding result partitionAll the cached tables will be released when the Cache Service job stops.

If a Cache Service Task Manager instance fails, since Cache Service is a Flink job. The failed subtask will be brought will bring it up again. However, all the intermediate result which has a partition on the failed subtask TM will become unavailable.

In this case, the CacheServiceSource / CacheServiceSink consuming job will throw an exception and the job will fail. As a result, CacheServiceManager invalidates the caches that are impacted. The TableEnvironment will resubmit the original DAG without using the cache. Note that because there is no cache available, the TableEnvironment (planner) will add again create a CacheServiceSink Sink to cache the result that was initially cached nodes, therefore the cache will be recreated after the execution of the original DAG.

...

When users explicitly caches a table, the DAG is implicitly changed. This is because the optimizer may decide to change the cached node if it were not explicitly cached. As of now, when a node has more than one downstream nodes in the DAG, that node will be computed multiple times. This could be solved by introducing partial optimization for subgraphs, which is a feature available in Blink but not in Flink yet. This is an existing problem in Flink and this FLIP does not attempt to address that.

We propose to run the cache service instances using a Flink Job. Doing this has a few benefits:

  1. Leaving the cache service to user space decouples it from the Flink runtime.
  2. Flink job is a powerful primitive in Flink. This means the cache service can reuse many existing mechanism
    1. resource specifications
    2. monitoring framework
    3. lifecycle management
    4. state management
    5. fault tolerance 

existing problem in Flink and this FLIP does not attempt to address that.

Integration with external shuffle service (FLIP-31)

...


Lifecycle

Access Pattern

Persistency

Mutability

PartitionedRequire SubPartitions

Shuffle Service

cross-stages

Scan

volatile

immutable

YesYes

Cache Service

cross-jobs

Scan

volatile

immutable

YesNo

With a few extensions, the default cache service could also serve as the default external shuffle service. The architecture would look like something below:

Image Removed

...

immutable

YesNo


Future works

...

Some API changes will be needed to support customized cache service. We will start another FLIP to discuss that. The change should not be much. Curious readers can read the google doc for some idea.

Auto cache allows the BLOCKING shuffle boundaries to be persisted for later usage. It further relieves the users from thinking about when to explicitly cache in the program.

...