You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

A common UDF type is the ScalarFunction.  This works well for CPU-intensive operations, but less well for IO bound or otherwise long-running computations.  
One example of this is remote calls to external systems where networking, serialization, and database lookups might dominate the execution time.
StreamTask has a single thread serially executing operators and their contained calls, which happen synchronously in the remote call case.
Since each call can take, say, 1 second or more, that limits throughput and the overall performance, potentially accumulating backpressure to the upstream operator.
The solution is to either: increase the parallelism of the query (resulting in a higher resource cost, overhead, etc.) or asynchronously fire off many requests concurrently and receive results as they complete.
This FLIP aims to address the latter solution by introducing AsyncScalarFunction, a new UDF type which allows for issuing concurrent function calls.

Scope

To limit the scope for consideration, the goal is to allow AsyncScalarFunctions to be used in:

  • Projections (E.g. SELECT func(f1) FROM ...)

  • Simple Conditions (E.g. SELECT ... FROM T1 WHERE func(f1) > 10)

Out of consideration:

  • Join Conditions (E.g. SELECT ... T1 INNER JOIN T1 ON func(f1) = func(f2))

  • Subquery Conditions (E.g. SELECT ... WHERE func(f1) IN (SELECT id FROM Table)

  • Aggregate Projections (E.g. SELECT f1, func(count(*)) from t1 group by f1)

  • Other places where non deterministic ordering or other considerations may complicate execution or break SQL semantics.

Regarding the changelog modes supported by the async operator, incoming row types to support are:

  • Insert (+I)

  • Update After (+U)

Both of these types can be passed on to the UDF and executed. Any other row types won’t be supported.



Public Interfaces

The primary public class is AsyncScalarFunction, for being the base class of all async scalar functions.  The type is parameterized with a return type for the eval call.  This is similar to the definition of AsyncTableFunction.

public class AsyncScalarFunction<T> extends UserDefinedFunction {
    @Override
    public final FunctionKind getKind() {
        return FunctionKind.ASYNC_SCALAR;
    }

    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        TypeInference val = TypeInferenceExtractor.forAsyncScalarFunction(typeFactory, getClass());
        return val;
    }
}

An example implementing class could be the following:

public class RemoteCallFunction extends AsyncScalarFunction<String> {

    private ExternalClient client;
    private ExecutorService executor;

    public RemoteCallFunction() {
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        client = new Client();
        executor = Executors.newFixedThreadPool(
            context.getJobParameter("in-flight-requests", 10));
    }

    @Override
    public void close() throws Exception {
        client.close();
        executor.shutdownNow();
    }

    public final void eval(
            CompletableFuture<String> future,
            String param1,
            int param2) {
        executor.submit(() -> {
            try {
                String resp = client.rpc(param1, param2);
                future.complete(resp);
            } catch (Throwable t) {
                future.completeExceptionally(t);
            }
        });
    }
}


As with the standard ScalarFunction, there is an eval method with a 0th parameter of the type  CompletableFuture<String> future.  This is the primary method used to invoke the async functionality.

New configurations will be introduced for the functionality, similar in nature to table.exec.async-lookup.*:

table.exec.async-scalar.catalog.db.func-name.buffer-capacity: 10
table.exec.async-scalar.catalog.db.func-name.timeout: 30s
table.exec.async-scalar.catalog.db.func-name.output-mode: ORDERED
table.exec.async-scalar.catalog.db.func-name.retry-strategy: FIXED_DELAY
table.exec.async-scalar.catalog.db.func-name.fixed-delay: 10s
table.exec.async-scalar.catalog.db.func-name.max-attempts: 3
table.exec.async-scalar.system.func-name.buffer-capacity: 10


These options are scoped with the catalog, db, and function name as registered in the table environment so that any given definition can be configured.  Similarly, a system function can be configured with the special prefix system, as in the last example.
The options have the following meanings:

Name (Prefix table.exec.async-scalar.catalog.db.func-name)

Meaning

buffer-capacity

The number of outstanding requests the operator allows at once

timeout

The time which can pass before a restart strategy is triggered

output-mode

Depending on whether the planner deems it possible to allow for the more performant unordered option.

retry-strategy

FIXED_DELAY is for a retry after a fixed amount of time

fixed-delay

The time to wait between retries for the FIXED_DELAY strategy.

max-attempts

The maximum number of attempts while retrying.


Proposed Changes

One of the areas that have been used as inspiration for planner changes are the python calc rules.  Most of the split rules (rules for complex calc nodes being split into multiple simpler calc nodes) will be generalized and shared between the two, since remote python calls and async calls more generally share much of the same structure.  If done correctly, the intention is to simplify the async operator to handle only FlinkLogicalCalcs which contain async UDF calls in projections and no other calc logic (non async calls, field accesses, conditions).  The high level motivation is that anything that comes after an async call is easier to chain as a series of operators rather than internally within a single operator.

Specifically, PythonCalcSplitRuleBase will be generalized into RemoteCalcSplitRuleBase. It will be parameterized with a RemoteCalcCallFinder which can be used to analyze the RexNodes to look for python or async calls.

public interface RemoteCalcCallFinder {
    // This RexNode contains either directly or indirectly a remote call
    // of the specified type.
    boolean containsRemoteCall(RexNode node);
    // This RexNode contains either directly or indirectly a call which is not
    // the specified remote type.
    boolean containsNonRemoteCall(RexNode node);
    // This RexNode is a remote call of the specified type.
    boolean isRemoteCall(RexNode node);
    // This RexNode is a call that is not the specified type.
    boolean isNonRemoteCall(RexNode node);
}


This will allow for PythonCalcCallFinder and AsyncCalcCallFinder implementations.
The rules we intend to adopt split up a FlinkLogicalCalc into two (or more ultimately) FlinkLogicalCalcs which feed into one another. The async split rules shared with Python will be:



Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels