...
Page properties |
---|
...
|
...
|
...
...
|
...
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
A lookup join is typically used to enrich a table with data that is queried from an external system(backed by a lookup source), but some data may not be updated in external systems in a timely manner, and and the user want do delayed retry when encounter such unexpected 'missed' data, but can't elegantly implement it for now. Although flink supports various joins(regular joins, interval joins, temporal joins...) for streams, unfortunately the lookup source is often not available as a stream(e.g., Redis).
We want to support such retryable lookup join includes both async and sync lookups to solve delayed updates issue, as a pre-work for this solution, we proposed FLIP-232[1] which adds a general retry support for Async I/O. And we prefer to offer this retry ability capability via query hints, similar to new join hints proposed in FLINK-27625[2] & FLIP-204[3].
As discussed in the mailing thread, we're planning to introduce an unified hint to support both sync|async lookup and with|without retry, the support matrix will be:
lookup support | async | retry |
sync w/o retry | N | N |
sync w/ retry | N | Y |
async w/o retry | Y | N |
async w/ retry | Y | Y |
Non target:
- The lookup sources that can be connected as streams (can use other types of join)
- Fixed Async lookup with retry is not capable for fixed delayed processing for all input data (should use other lighter ways to solve, e.g., pending source consumption or use sync lookup with retry)
- Do not support retry on exception (let the sql connectors handle it)
ref:
- FLIP-232: Add Retry Support For Async I/O In DataStream API
- FLINK-27625: Add query hint for async lookup join
- FLIP-204: Introduce Hash Lookup Join
Public Interfaces
Public Interfaces
A new query hint: 'LOOKUP' with different hint options ('async'='true|false', 'miss-retry'='true|false') to cover all related functionalities(include FLINK-27625 and discussion on connector option 'lookup.async' in FLIP-221[4]). Compared to multiple hints with different subsets of functionality, a single hint may be easier for users to understand and use, and specific parameters can be quickly found through documentation
The available hint options:
option type | option name | optional | value type | default value | description |
---|---|---|---|---|---|
table name | table | N | string | N/A | the table name of the lookup source |
async | async | Y | boolean | N/A | value can be 'true' or 'false' to suggest the planner choose the corresponding lookup function. If the backend lookup source does not support the suggested lookup mode, it will take no effect. |
output-mode | Y | string | ordered | value can be 'ordered' or 'allow_unordered'. 'allow_unordered' means if users allow unordered result, it will attempt to use AsyncDataStream.OutputMode.UNORDERED when it does not affect the correctness of the result, otherwise ORDERED will be still used. It is consistent with `ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE`. | |
capacity | Y | integer | 100 | the buffer capacity for the backend asyncWaitOperator of the lookup join operator. | |
timeout | Y | duration | 300s | timeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover | |
retry | retry-predicate | Y | string | N/A | can be 'lookup_miss' which will enable retry if lookup result is empty. |
retry-strategy | Y | string | N/A | can be 'fixed_delay' | |
fixed-delay | Y | duration | N/A | delay time for the 'fixed_delay' strategy | |
max-attempts | Y | integer | N/A | max attempt number of the 'fixed_delay' strategy |
For these connectors which can have both capabilities of async and sync lookup, our advice for the connector developers are implementing both sync and async interfaces if both capabilities have suitable use cases, the planner will prefer the async one by default, and users can give different option value 'async'='true|false' via the LOOKUP query hint to suggest the planner, otherwise choose one interface to implement.
Because query hint works in a best effort manner, so if users specifies a hint with invalid option, the query plan keeps unchanged, e.g., use LOOKUP('table'='customer', 'async'='true'), but backend lookup source only implemented the sync lookup function, then the async lookup hint takes no effect.
1. Sync and Async Mode Lookup
1.1 Sync Lookup
If the connector has both capabilities of async and sync lookup, users can give the option value 'async'='false' to suggest the planner to use the sync lookup:New query hints: 'ASYNC_LOOKUP_RETRY'
kv form only
Code Block | ||
---|---|---|
| ||
ASYNC_LOOKUP_RETRY('table'='tableNamedim1', 'predicateasync'='retry-predicate', 'strategy'='retry-strategy', 'delay'='duration','max-attempts'='int') options: table: table name of lookup source predicate: empty-result|exception|empty-result-or-exception strategy: fixed-delay delay: duration, e.g., 10s max-attempts: integer, e.g., 3 |
...
false') |
1.2 Async Lookup
If the connector has both capabilities of async and sync lookup, users can give the option value 'async'='true' to suggest the planner to use the async lookup:
Code Block | ||
---|---|---|
| ||
LOOKUP('table'='dim1', 'async'='true') |
And async lookup related parameters can also be configured via the hint option(this covers the join level configuration requirement proposed in FLINK-27625). All the kv hint options except table name are optional (use job level configuration if not set):
Code Block | ||
---|---|---|
| ||
LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s') |
e.g., if the job level configuration is:
Code Block | ||
---|---|---|
| ||
table.exec.async-lookup.output-mode: ORDERED
table.exec.async-lookup.buffer-capacity: 100
table.exec.async-lookup.timeout: 180s |
then the following hints:
Code Block | ||
---|---|---|
| ||
ASYNC_LOOKUP_RETRY1. LOOKUP('table'='dim1', 'predicateasync'='empty-resulttrue', 'strategyoutput-mode'='fixed-delay', 'delayallow_unordered') 2. LOOKUP('table'='10sdim1', 'max-attemptsasync'='3') |
Since it covers the functionality of the 'ASYNC_LOOKUP' hint and extends the retry capability, so kv options for 'ASYNC_LOOKUP' also works.
...
true', 'timeout'='300s') |
are equivalent to:
Code Block | ||
---|---|---|
| ||
ASYNC_LOOKUP_RETRY1. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow-_unordered', 'capacity'='100', 'timeout'='180s') 2. LOOKUP('table'='dim1', 'triggerasync'='empty-result', ''true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s') |
2. Retry Support
The hint option 'retry-predicate'='lookup_miss' can enable retry on both sync and async lookup. Retry related hint options:
Code Block | ||
---|---|---|
| ||
'retry-strategy'='fixed-_delay', 'fixed-delay'='10s', 'max-attempts'='3') |
Why not merge the two hints into single one?
...
e.g., retry on async lookup
Code Block | ||
---|---|---|
| ||
LOOKUP('table'='dim1', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') |
retry on sync lookup
...
Code Block | ||
---|---|---|
|
...
LOOKUP |
...
('table'='dim1', 'async'='false', 'retry-predicate'=' |
...
lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay |
...
'='10s','max-attempts'='3') |
If the lookup source only has one capability, then the 'async' mode option can be omitted:
Code Block | ||
---|---|---|
| ||
LOOKUP('table'='dim1', 'retry-predicate'=' |
...
lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay |
...
'='10s','max-attempts'='3') |
For the retry strategy, we plan to support a fixed_delay retry strategy first, and this can be extended in the future.
Use Case: Lookup Join Without Retry
...
Use Case: Lookup Join With Retry
Code Block | ||
---|---|---|
| ||
-- async retry triggered by empty result, using 10s fixed-delay strategy, max attempts 3. SELECT /*+ ASYNC_LOOKUP_RETRY('table'='Customers', 'retry-predicate'='empty-resultlookup_miss', 'retry-strategy'='fixed-_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; |
In the above example, the backend lookup source of table 'Customers' is JDBC, so the retry takes effect on the sync lookup
Proposed Changes
The main change of the new hint is similar to FLIP-204, and after FLIP-204 is done, it will be simpler adding a join hint.
- Define Hint Strategy, add 'ASYNC_LOOKUP_RETRY' and related hint parsing for simple and kv form hint options. (Original hints propagate via FlinkLogicalJoin)
- Hint validation and convert to AsyncRetryStrategy table's internal RetryStrategy in exec lookup join node.
Since FLIP-204 has not done yet, we can not give a complete Poc here, but the changes are controllable.
1. Async Retry
convert the parsed internal RetryStrategy into runtime's AsyncRetryStrategy, and use the new api with retry support proposed in FLIP-232[1]
2. Sync Retry
add sync retry implementation for the internal LookupJoinRunner
3. Prefer Async Lookup Function For The Planner
If the connector has both capabilities of async and sync lookup, the planner will prefer the async one when no query hint is given. If the user give the option value explicitly, 'async'='false' will use the sync lookup and 'async'='true' will use the async lookup.
Compatibility, Deprecation, and Migration Plan
This feature is backwards compatible and transparently to all connectors. For existing connectors which implements AsyncTableFunctionsupport lookup, can easily enable async|sync retry via the new join hint.
Add a followup issue to discuss whether to remove the 'lookup.async' option in HBase connector.
Test Plan
Extend existing cases to cover retry scenarios.
...
Code Block | ||
---|---|---|
| ||
-- Customers is backed by the JDBC connector and can be used for lookup joins CREATE TEMPORARY TABLE Customers ( id INT, name STRING, country STRING, zip STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', 'table-name' = 'customers', -- lookup-_miss.retry-strategy 'lookup-_miss.retry-strategy'='fixed-delay', -- 'none' by default 'lookup-_miss.retry-strategy.fixed-delay.delay' = '10 s', 'lookup-_miss.retry-strategy.fixed-delay.max-attempts' = '3', ); -- join operation stays the same. SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; |
...
Code Block | ||
---|---|---|
| ||
-- ① create views for easy reading CREATE VIEW v_first_join AS SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o -- Note: the original inner join change to left join LEFT JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; -- view2 which join result is empty CREATE VIEW v_first_join_empty AS SELECT order_id, total, country FROM v_first_join WHERE zip IS NULL; -- ② Add delayed processing and join operation (similar to retry) for data which join results is empty, and UNION ALL for non-empty results, lastly output to downstream INSERT INTO downstream_table SELECT * FROM v_first_join WHERE zip IS NOT NULL; UNION ALL SELECT o.order_id, o.total, c.country, c.zip FROM TABLE(DEFER(v_first_join_empty, proc_time, 10 s)) AS o -- Note: back to inner join here JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; |
References:
1. FLIP-232: Add Retry Support For Async I/O In DataStream API
2. FLINK-27625: Add query hint for async lookup join
3. FLIP-204: Introduce Hash Lookup Join
4. https://lists.apache.org/thread/1vokqdnnt01yycl7y1p74g556cc8yvtq