Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

...

JIRA: 

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Vote threadhttps://lists.apache.org/thread/bb0kqjs8co3hhmtklmwptws4fc4rz810
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28779

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

A lookup A lookup join is typically used to enrich a table with data that is queried from an external system(backed by a lookup source), but some data may not be updated in external systems in a timely manner, and the user want do delayed retry when encounter such unexpected 'missed' data, but can't elegantly implement it for now. Although flink supports various joins(regular joins, interval joins, temporal joins...) for streams, unfortunately the lookup source is often not available as a stream(e.g., Redis).

...

As discussed in the mailing thread, we're planning to introduce an unified hint to support both sync|async lookup and with|without retry, the support matrix will be:

lookup supportasyncretry
sync w/o retryNN
sync w/ retryNY
async w/o retryYN
async w/ retryYY


Non target:

  1. The lookup sources that can be connected as streams (can use other types of join)
  2. Async lookup with retry is not capable for fixed delayed processing for all input data (should use other lighter ways to solve, e.g., pending source consumption or use sync lookup with retry)
  3. Do not support retry on exception (let the sql connectors handle it)

...

The available hint options for each mode::

option typeoption nameoptionalvalue typedefault valuedescription
table nametableNstringN/Athe table name of the lookup source


async

asyncYbooleanN/A

value can be 'true' or  'false' to suggest the planner choose the corresponding lookup function.

If the backend lookup source does not support the suggested lookup mode, it will take no effect.

output-modeYstringordered

value can be 'ordered' or 'allow_unordered'.

'allow_unordered' means if users allow unordered result, it will attempt to use AsyncDataStream.OutputMode.UNORDERED when it does not affect the correctness of the result, otherwise ORDERED will be still used. It is consistent with 

`ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE`.
capacityYinteger100the buffer capacity for the backend asyncWaitOperator of the lookup join operator.
timeoutYduration300stimeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover



retry
retry-predicateYstringN/Acan be 'lookup_miss' which will enable retry if lookup result is empty.
retry-strategyYstringN/Acan be 'fixed_delay' 
fixed-delayYdurationN/A

delay time for the 'fixed_delay' strategy

max-attemptsYintegerN/A

max attempt number of the 'fixed_delay' strategy

modesupport hint optionsasync'async'='true'
'output-mode'='ordere|allow-unordered'
'capacity'='100'
'timeout'='180s'retry'miss-retry'='true'
'retry-strategy'='fixed-delay'
'delay'='10s'
'max-attempts'='3'


For these connectors which can have both capabilities of async and sync lookup, our advice for the connector developers are implementing both sync and async interfaces if both capabilities have suitable use cases, the planner will prefer the async one by default, and users can give different option value 'async'='true|false' via the LOOKUP query hint to suggest the planner,  otherwise choose one interface to implement.

...

Code Block
languagesql
LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow-_unordered', 'capacity'='100', 'timeout'='180s')

...

Code Block
languagesql
1. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow-_unordered')
2. LOOKUP('table'='dim1', 'async'='true', 'timeout'='300s')

...

Code Block
languagesql
1. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow-_unordered', 'capacity'='100', 'timeout'='180s')
2. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s')

2. Retry Support

The hint option 'missretry-retrypredicate'='truelookup_miss'  can enable retry on both sync and async lookup. Retry related hint options:

Code Block
languagesql
'retry-strategy'='fixed-_delay'
'fixed-delay'='10s'
'max-attempts'='3'

...

Code Block
languagesql
LOOKUP('table'='dim1', 'async'='true', 'missretry-retrypredicate'='truelookup_miss', 'retry-strategy'='fixed-_delay', 'fixed-delay'='10s','max-attempts'='3')

...

Code Block
languagesql
LOOKUP('table'='dim1', 'async'='false', 'missretry-retrypredicate'='truelookup_miss', 'retry-strategy'='fixed-_delay', 'fixed-delay'='10s','max-attempts'='3')

...

Code Block
languagesql
LOOKUP('table'='dim1', 'missretry-retrypredicate'='truelookup_miss', 'retry-strategy'='fixed-_delay', 'fixed-delay'='10s','max-attempts'='3')

For the retry strategy, we plan to support a fixed-_delay retry strategy first, and this can be extended in the future.

...

Code Block
languagesql
-- retry triggered by empty result, using 10s fixed-delay strategy, max attempts 3. 
SELECT /*+ LOOKUP('table'='Customers', 'missretry-retrypredicate'='truelookup_miss', 'retry-strategy'='fixed-_delay', 'fixed-delay'='10s','max-attempts'='3') */ 
	o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

...

This feature is backwards compatible and transparently to all connectors. For existing connectors which support lookup, can easily enable async|sync retry via the new join hint.

Add a followup issue to discuss whether to remove the 'lookup.async' option in HBase connector.

Test Plan

Extend existing cases to cover retry scenarios.

...

Code Block
languagesql
-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers',
  -- lookup-_miss.retry-strategy
  'lookup-_miss.retry-strategy'='fixed-delay', -- 'none' by default
  'lookup-_miss.retry-strategy.fixed-delay.delay' = '10 s',
  'lookup-_miss.retry-strategy.fixed-delay.max-attempts' = '3', 
);

-- join operation stays the same.
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

...