Discussion thread | https://lists.apache.org/thread/7vk1799ryvrz4lsm5254q64ctm89mx2l |
---|---|
Vote thread | here (<- link to https://lists.apache.org/list.html?dev@flink.apache.org) |
JIRA | here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX) |
Release | <Flink Version> |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, async table function are special functions for table source to perform async lookup. However, it's worth to support the user defined async table function. In this way, the end SQL user can leverage it to perform the async operation which is useful to maximum the system throughput especially for IO bottleneck case.
Take a job perform RPC call (IO intensive) as an example.
Perform async operation with lookup join
Let's see how we can perform an async RPC call with lookup join:
(1) Implement an AsyncTableFunction with RPC call logic.
(2) Implement a `LookupTableSource` connector run with the async udtf defined in (1).
(3) Then define a DDL of this look up table in SQL
CREATE TEMPORARY TABLE Customers ( id INT, name STRING, country STRING, zip STRING ) WITH ( 'connector' = 'custom' );
(4) Run with the query as below:
SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
This example is from doc.You can image the look up process as an async RPC call process.
Perform async operation by join table function
Let's see how we can perform an async RPC call by join table function:
(1) Implement an AsyncTableFunction with RPC call logic.
(2) Run query with
CREATE FUNCTION f1 AS '...' ; SELECT o.order_id, o.total, c.country, c.zip FROM Orders, lateral table (f1(order_id)) as T(...);
As you can see, join with table function version is more simple and intuitive to users. Users do not have to wrap a LookupTableSource for the purpose of using async table function, and not forced to join on an equality join condition. Also, User-defined asynchronous table functions allow complex parameters (e.g., Row type) to be passed to function rather than using 'join … on ...'.
Public Interfaces
AsyncTableFunction
We already have AsyncTableFunction as below, it will be extension for the user to implement the custom async table function.
@PublicEvolving public abstract class AsyncTableFunction<T> extends UserDefinedFunction { @Override public final FunctionKind getKind() { return FunctionKind.ASYNC_TABLE; } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public TypeInference getTypeInference(DataTypeFactory typeFactory) { return TypeInferenceExtractor.forAsyncTableFunction(typeFactory, (Class) getClass()); } }
A example async table function.
public static class JavaAsyncTableFunc0 extends AsyncTableFunction<Row> { private ExecutorService executor; @Override public void open(FunctionContext context) throws Exception { super.open(context); this.executor = Executors.newSingleThreadExecutor(); } @DataTypeHint(value = "ROW<c0 VARCHAR, c1 VARCHAR>") public void eval(CompletableFuture<Collection<Row>> resultFuture, String input) { CompletableFuture.supplyAsync( () -> { if (!input.contains(";")) { return Collections.EMPTY_LIST; } else { String[] splits = input.split(";"); List<Row> res = new ArrayList<>(); for (int i = 0; i <= splits.length - 1; i += 2) { if (i + 1 > splits.length - 1) { res.add(Row.of(splits[i], null)); } else { res.add(Row.of(splits[i], splits[i + 1])); } } return res; } }, executor) .thenAccept(resultFuture::complete); } @Override public void close() throws Exception { super.close(); if (executor != null) { executor.shutdown(); } } }
The SQL syntax is same with the current lateral table to describe the async correlate
# inner join SELECT a, c1, c2 FROM T1, lateral TABLE (async_split(b)) AS T(c1, c2) # left join SELECT a, c1, c2 FROM T1 LEFT JOIN lateral TABLE (async_split(b)) AS T(c1, c2) ON true
ConfigOption
Hint
Similar to the lookup join's hint config option, async join with table function will also support hint options to config the async operator buffer capacity, timeout and order. (see some rejected alternatives in the end)
option name | required | value type | default value | description |
---|---|---|---|---|
function | N | string | null | If specified, then the option below will applied to the target function, and other's will use the default. If not set, all the function in a single SELECT clause will all use the default option below. |
output-mode | N | string | ordered | value can be 'ordered' or 'allow_unordered'. 'allow_unordered' means if users allow unordered result, it will attempt to use AsyncDataStream.OutputMode.UNORDERED when it does not affect the correctness of the result, otherwise ORDERED will be still used. |
capacity | N | integer | 100 | the buffer capacity for the backend asyncWaitOperator of the async operator. |
timeout | N | duration | 300s | timeout from first invoke to final completion of asynchronous operation, will be reset in case of failover |
ASYNC_TABLE_FUNC('output-mode' = 'ordered', 'capacity' = '200', 'timeout' = '180s')
SELECT /*+ ASYNC_TABLE_FUNC('output-mode' = 'ordered', 'capacity' = '200', 'timeout' = '180s') */ a, c1, c2 FROM T1 LEFT JOIN LATERAL TABLE (async_split(b)) AS T(c1, c2) ON true
SELECT /*+ ASYNC_TABLE_FUNC('output-mode' = 'ordered', 'capacity' = '200', 'timeout' = '180s'), ASYNC_TABLE_FUNC('function' = 'async_split2', 'output-mode' = 'allow_unordered', 'capacity' = '100', 'timeout' = '60s') */ a, c1, c2 FROM T1 LEFT JOIN LATERAL TABLE (async_split(b)) AS T(c1, c2) ON true JOIN LATERAL TABLE(async_split2(b)) AS T2(c3, c4) ON TRUE
In this query, the async_split
will run with option 'output-mode' = 'ordered', 'capacity' = '200', 'timeout' = '180s'
and the async_split2
will run with option 'output-mode' = 'allow_unordered', 'capacity' = '100', 'timeout' = '60s'
. We use the function
to distinguish different table function call in the SELECT clause. The default global option can also be used by omitting the function
option.
Proposed Changes
As shown above, we will use lateral table syntax to support the async table function. So the planner will also treat this statement to a CommonExecCorrelate
node. So the runtime code should be generated in CorrelatedCodeGenerator
. In CorrelatedCodeGenerator, we will know the TableFunction's Kind of FunctionKind.Table
or FunctionKind.ASYNC_TABLE
.
For FunctionKind.ASYNC_TABLE
type we can generate a AsyncWaitOperator to execute the async table function. It can leverage most facility in org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin#createAsyncLookupJoin
to generate an async correlate node.
Performance
sync version
async version
In the testing, the evaluation is a blocking call which will cost 100ms in average, and the sync correlate and async correlate are all execute in 1 parallelism. The async function is implemented in a async fashion with a thread pool of 2 threads.
The async version's tps is significant double of the sync version.
Compatibility, Deprecation, and Migration Plan
- A new user defined function is introduced no compatibility problem
Test Plan
UT/ITCase
Rejected Alternatives
Job level config
public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_CORRELATE_BUFFER_CAPACITY = key("table.exec.async-udtf.buffer-capacity") .intType() .defaultValue(100) .withDescription( "The max number of async i/o operation that the async lateral join can trigger."); public static final ConfigOption<Duration> TABLE_EXEC_CORRELATE_TIMEOUT = key("table.exec.async-udtf.timeout") .durationType() .defaultValue(Duration.ofMinutes(3)) .withDescription( "The async timeout for the asynchronous operation to complete."); public static final ConfigOption<AsyncOutputMode> TABLE_EXEC_ASYNC_CORRELATE_OUTPUT_MODE = key("table.exec.async-udtf.output-mode") .enumType(AsyncOutputMode.class) .defaultValue(AsyncOutputMode.ORDERED) .withDescription( "Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. " + "If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not " + "affect the correctness of the result, otherwise ORDERED will be still used.");
These config options are the counterpart of the lookup join eg: table.exec.async-lookup.buffer-capacity.
But using hints is more flexible and can avoid introduce too much similar options, so I abandon this choice.