...
Page properties |
---|
...
|
...
JIRA:
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
A lookup A lookup join is typically used to enrich a table with data that is queried from an external system(backed by a lookup source), but some data may not be updated in external systems in a timely manner, and the user want do delayed retry when encounter such unexpected 'missed' data, but can't elegantly implement it for now. Although flink supports various joins(regular joins, interval joins, temporal joins...) for streams, unfortunately the lookup source is often not available as a stream(e.g., Redis).
...
As discussed in the mailing thread, we're planning to introduce an unified hint to support both sync|async lookup and with|without retry, the support matrix will be:
lookup support | async | retry |
sync w/o retry | N | N |
sync w/ retry | N | Y |
async w/o retry | Y | N |
async w/ retry | Y | Y |
Non target:
- The lookup sources that can be connected as streams (can use other types of join)
- Async lookup with retry is not capable for fixed delayed processing for all input data (should use other lighter ways to solve, e.g., pending source consumption or use sync lookup with retry)
- Do not support retry on exception (let the sql connectors handle it)
...
A new query hint: 'LOOKUP' with different hint options ('async'='true|false', 'miss-retry'='true|false') to cover all related functionalities(include FLINK-27625 and discussion on connector option 'lookup.async' in FLIP-221[4]). Compared to multiple hints with different subsets of functionality, a single hint may be easier for users to understand and use, and specific parameters can be quickly found through documentation
The available hint options
...
:
option type | option name | optional | value type | default value | description |
---|---|---|---|---|---|
table name | table | N | string | N/A | the table name of the lookup source |
async | async | Y | boolean | N/A | value can be 'true' or 'false' to suggest the planner choose the corresponding lookup function. If the backend lookup source does not support the suggested lookup mode, it will take no effect. |
output-mode | Y | string | ordered | value can be 'ordered' or 'allow_unordered'. 'allow_unordered' means if users allow unordered result, it will attempt to use AsyncDataStream.OutputMode.UNORDERED when it does not affect the correctness of the result, otherwise ORDERED will be still used. It is consistent with `ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE`. | |
capacity | Y | integer | 100 | the buffer capacity for the backend asyncWaitOperator of the lookup join operator. | |
timeout | Y | duration | 300s | timeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover | |
retry | retry-predicate | Y | string | N/A | can be 'lookup_miss' which will enable retry if lookup result is empty. |
retry-strategy | Y | string | N/A | can be 'fixed_delay' | |
fixed-delay | Y | duration | N/A | delay time for the 'fixed_delay' strategy | |
max-attempts | Y | integer | N/A | max attempt number of the 'fixed_delay' strategy |
'output-mode'='ordere|allow-unordered'
'capacity'='100'
'timeout'='180s'
'retry-strategy'='fixed-delay'
'delay'='10s'
'max-attempts'='3'
For these connectors which can have both capabilities of async and sync lookup, our advice for the connector developers are implementing both sync and async interfaces if both capabilities have suitable use cases, the planner will prefer the async one by default, and users can give different option value 'async'='true|false' via the LOOKUP query hint to hint to suggest the planner, otherwise choose one interface to implement.
Because query hint works in a best effort manner, so if users specifies a hint with invalid option, the query plan keeps unchanged, e.g., use LOOKUP('table'='customer', 'async'='true'), but backend lookup source only implemented the sync lookup function, then the async lookup hint takes no effect.
...
Code Block | ||
---|---|---|
| ||
LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow-_unordered', 'capacity'='100', 'timeout'='180s') |
e.g., if the job level configuration is:
Code Block | ||
---|---|---|
| ||
table.exec.async-lookup.output-mode: ORDERED table.exec.async-lookup.buffer-capacity: 100 table.exec.async-lookup.timeout: 180s |
then the following hints:
Code Block | ||
---|---|---|
| ||
1. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow-_unordered') 2. LOOKUP('table'='dim1', 'async'='true', 'timeout'='300s') |
are equivalent to:
Code Block | ||
---|---|---|
| ||
1. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow-_unordered', 'capacity'='100', 'timeout'='180s') 2. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s') |
2. Retry Support
The hint option 'missretry-retrypredicate'='truelookup_miss' can enable retry on both sync and async lookup. Retry related hint options:
Code Block | ||
---|---|---|
| ||
'retry-strategy'='fixed-_delay' 'fixed-delay'='10s' 'max-attempts'='3' |
...
Code Block | ||
---|---|---|
| ||
LOOKUP('table'='dim1', 'async'='true', 'missretry-retrypredicate'='truelookup_miss', 'retry-strategy'='fixed-_delay', 'fixed-delay'='10s','max-attempts'='3') |
...
Code Block | ||
---|---|---|
| ||
LOOKUP('table'='dim1', 'async'='false', 'missretry-retrypredicate'='truelookup_miss', 'retry-strategy'='fixed-_delay', 'fixed-delay'='10s','max-attempts'='3') |
...
Code Block | ||
---|---|---|
| ||
LOOKUP('table'='dim1', 'missretry-retrypredicate'='truelookup_miss', 'retry-strategy'='fixed-_delay', 'fixed-delay'='10s','max-attempts'='3') |
For the retry strategy, we plan to support a fixed-_delay retry strategy first, and this can be extended in the future.
...
Code Block | ||
---|---|---|
| ||
-- retry triggered by empty result, using 10s fixed-delay strategy, max attempts 3. SELECT /*+ LOOKUP('table'='Customers', 'missretry-retrypredicate'='truelookup_miss', 'retry-strategy'='fixed-_delay', 'fixed-delay'='10s','max-attempts'='3') */ o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; |
...
This feature is backwards compatible and transparently to all connectors. For existing connectors which support lookup, can easily enable async|sync retry via the new join hint.
Add a followup issue to discuss whether to remove the 'lookup.async' option in HBase connector.
Test Plan
Extend existing cases to cover retry scenarios.
...
Code Block | ||
---|---|---|
| ||
-- Customers is backed by the JDBC connector and can be used for lookup joins CREATE TEMPORARY TABLE Customers ( id INT, name STRING, country STRING, zip STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', 'table-name' = 'customers', -- lookup-_miss.retry-strategy 'lookup-_miss.retry-strategy'='fixed-delay', -- 'none' by default 'lookup-_miss.retry-strategy.fixed-delay.delay' = '10 s', 'lookup-_miss.retry-strategy.fixed-delay.max-attempts' = '3', ); -- join operation stays the same. SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; |
...
3. FLIP-204: Introduce Hash Lookup Join
4. https://lists.apache.org/thread/1vokqdnnt01yycl7y1p74g556cc8yvtq