Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, async table function are special functions for table source to perform async lookup. However, it's worth to support the user defined async table function. In this way, the end SQL user can leverage it to perform the async operation which is useful to maximum the system throughput especially for IO bottleneck case. 

Take a job perform RPC call (IO intensive) as an example.

Perform async operation with lookup join 

Let's see how we can perform an async RPC call with lookup join:

(1) Implement an AsyncTableFunction with RPC call logic. 

(2) Implement a `LookupTableSource` connector run with the async udtf defined in (1).

(3) Then define a DDL of this look up table in SQL

CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'custom'
);

(4) Run with the query as below:

SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

This example is from doc.You can image the look up process as an async RPC call process.

Perform async operation by join table function

Let's see how we can perform an async RPC call by join table function:

(1) Implement an AsyncTableFunction with RPC call logic.  

(2) Run query with 

CREATE FUNCTION f1 AS '...' ;

SELECT o.order_id, o.total, c.country, c.zip FROM Orders, lateral table (f1(order_id)) as T(...);


As you can see, join with table function version is more simple and intuitive to users. Users do not have to wrap a LookupTableSource for the purpose of using async table function, and not forced to join on an equality join condition. Also, User-defined asynchronous table functions allow complex parameters (e.g., Row type) to be passed to function rather than using 'join … on ...'.

Public Interfaces

AsyncTableFunction

We already have AsyncTableFunction as below, it will be extension for the user to implement the custom async table function.

@PublicEvolving
public abstract class AsyncTableFunction<T> extends UserDefinedFunction {

    @Override
    public final FunctionKind getKind() {
        return FunctionKind.ASYNC_TABLE;
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        return TypeInferenceExtractor.forAsyncTableFunction(typeFactory, (Class) getClass());
    }
}

A example async table function.

    public static class JavaAsyncTableFunc0 extends AsyncTableFunction<Row> {

        private ExecutorService executor;

        @Override
        public void open(FunctionContext context) throws Exception {
            super.open(context);
            this.executor = Executors.newSingleThreadExecutor();
        }

        @DataTypeHint(value = "ROW<c0 VARCHAR, c1 VARCHAR>")
        public void eval(CompletableFuture<Collection<Row>> resultFuture, String input) {
            CompletableFuture.supplyAsync(
                            () -> {
                                if (!input.contains(";")) {
                                    return Collections.EMPTY_LIST;
                                } else {
                                    String[] splits = input.split(";");
                                    List<Row> res = new ArrayList<>();
                                    for (int i = 0; i <= splits.length - 1; i += 2) {
                                        if (i + 1 > splits.length - 1) {
                                            res.add(Row.of(splits[i], null));
                                        } else {
                                            res.add(Row.of(splits[i], splits[i + 1]));
                                        }
                                    }
                                    return res;
                                }
                            },
                            executor)
                    .thenAccept(resultFuture::complete);
        }

        @Override
        public void close() throws Exception {
            super.close();
            if (executor != null) {
                executor.shutdown();
            }
        }
     }


The SQL syntax is same with the current lateral table to describe the async correlate

# inner join 

SELECT a, c1, c2
FROM T1, lateral TABLE (async_split(b)) AS T(c1, c2)

# left join 
SELECT a, c1, c2
FROM T1
LEFT JOIN lateral TABLE (async_split(b)) AS T(c1, c2) ON true

ConfigOption

Hint

Similar to the lookup join's hint config option, async join with table function will also support hint options to config the async operator buffer capacity, timeout and order. (see some rejected alternatives in the end)

option namerequiredvalue typedefault valuedescription
functionNstringnullIf specified, then the option below will applied to the target function, and other's will use the default. If not set, all the function in a single SELECT clause will all use the default option below.
output-modeNstringorderedvalue can be 'ordered' or 'allow_unordered'.
'allow_unordered' means if users allow unordered result, it will attempt to use AsyncDataStream.OutputMode.UNORDERED when it does not affect the correctness of the result, otherwise ORDERED will be still used.
capacityNinteger100the buffer capacity for the backend asyncWaitOperator of the async operator.
timeoutNduration300stimeout from first invoke to final completion of asynchronous operation, will be reset in case of failover
ASYNC_TABLE_FUNC('output-mode' = 'ordered', 'capacity' = '200', 'timeout' = '180s')


SELECT /*+ ASYNC_TABLE_FUNC('output-mode' = 'ordered', 'capacity' = '200', 'timeout' = '180s') */ a, c1, c2
FROM T1
LEFT JOIN LATERAL TABLE (async_split(b)) AS T(c1, c2) ON true
SELECT /*+ ASYNC_TABLE_FUNC('output-mode' = 'ordered', 'capacity' = '200', 'timeout' = '180s'), ASYNC_TABLE_FUNC('function' = 'async_split2', 'output-mode' = 'allow_unordered', 'capacity' = '100', 'timeout' = '60s') */ a, c1, c2
FROM T1
LEFT JOIN LATERAL TABLE (async_split(b)) AS T(c1, c2) ON true
JOIN LATERAL TABLE(async_split2(b)) AS T2(c3, c4) ON TRUE

In this query, the async_split  will run with option 'output-mode' = 'ordered', 'capacity' = '200', 'timeout' = '180s'  and the async_split2  will run with option 'output-mode' = 'allow_unordered', 'capacity' = '100', 'timeout' = '60s' . We use the function  to distinguish different table function call in the SELECT clause. The default global option can also be used by omitting the function option.


Proposed Changes

As shown above, we will use lateral table syntax to support the async table function. So the planner will also treat this statement to a CommonExecCorrelate  node. So the runtime code should be generated in CorrelatedCodeGenerator . In CorrelatedCodeGenerator, we will know the TableFunction's Kind of FunctionKind.Table  or FunctionKind.ASYNC_TABLE .

For  FunctionKind.ASYNC_TABLE  type we can generate a AsyncWaitOperator to execute the async table function. It can leverage most facility in org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin#createAsyncLookupJoin  to generate an async correlate node.

Performance

sync version

image.png

async version

image.png

In the testing, the evaluation is a blocking call which will cost 100ms in average, and the sync correlate and async correlate are all execute in 1 parallelism. The async function is implemented in a async fashion with a thread pool of 2 threads.

The async version's tps is significant double of the sync version. 


Compatibility, Deprecation, and Migration Plan

  • A new user defined function is introduced no compatibility problem

Test Plan

UT/ITCase

Rejected Alternatives

Job level config

public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_CORRELATE_BUFFER_CAPACITY =
        key("table.exec.async-udtf.buffer-capacity")
                .intType()
                .defaultValue(100)
                .withDescription(
                        "The max number of async i/o operation that the async lateral join can trigger.");

public static final ConfigOption<Duration> TABLE_EXEC_CORRELATE_TIMEOUT =
        key("table.exec.async-udtf.timeout")
                .durationType()
                .defaultValue(Duration.ofMinutes(3))
                .withDescription(
                        "The async timeout for the asynchronous operation to complete.");

public static final ConfigOption<AsyncOutputMode> TABLE_EXEC_ASYNC_CORRELATE_OUTPUT_MODE =
        key("table.exec.async-udtf.output-mode")
                .enumType(AsyncOutputMode.class)
                .defaultValue(AsyncOutputMode.ORDERED)
                .withDescription(
                        "Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. "
                                + "If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not "
                                + "affect the correctness of the result, otherwise ORDERED will be still used.");

These config options are the counterpart of the lookup join eg: table.exec.async-lookup.buffer-capacity. But using hints is more flexible and can avoid introduce too much similar options, so I abandon this choice.



  • No labels