Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion

...

threadhttps://

...

...

...

...

bb0kqjs8co3hhmtklmwptws4fc4rz810
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28779

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

We want to support such retryable lookup join includes both async and sync lookups to solve delayed updates issue, as a pre-work for this solution, we proposed FLIP-232[1] which adds a general retry support for Async I/O. And we prefer to offer this retry capability via query hints, similar to new join hints proposed in FLINK-27625[2] & FLIP-204[3].

As discussed in the mailing thread, we're planning to introduce an unified hint to support both sync|async lookup and with|without retry, the support matrix will be:

lookup supportasyncretry
sync w/o retryNN
sync w/ retryNY
async w/o retryYN
async w/ retryYY


Non target:

  1. The lookup sources that can be connected as streams (can use other types of join)
  2. Fixed Async lookup with retry is not capable for fixed delayed processing for all input data (should use other lighter ways to solve, e.g., pending source consumption or use sync lookup with retry)
  3. Do not support retry on exception (let the sql connectors handle it)

ref:

  1. FLIP-232: Add Retry Support For Async I/O In DataStream API
  2. FLINK-27625:  Add query hint for async lookup join
  3. FLIP-204: Introduce Hash Lookup Join

Public Interfaces

...


Public Interfaces

A new query hint: 'LOOKUP' with different hint options ('async'='true|false', 'miss-retry'='true|false') to cover all related functionalities(include FLINK-27625 and discussion on connector option 'lookup.async' in FLIP-221[4]). Compared to multiple hints with different subsets of functionality, a single hint may be easier for users to understand and use, and specific parameters can be quickly found through documentation  

The available hint options:

option typeoption nameoptionalvalue typedefault valuedescription
table nametableNstringN/Athe table name of the lookup source


async

asyncYbooleanN/A

value can be 'true' or  'false' to suggest the planner choose the corresponding lookup function.

If the backend lookup source does not support the suggested lookup mode, it will take no effect.

output-modeYstringordered

value can be 'ordered' or 'allow_unordered'.

'allow_unordered' means if users allow unordered result, it will attempt to use AsyncDataStream.OutputMode.UNORDERED when it does not affect the correctness of the result, otherwise ORDERED will be still used. It is consistent with 

`ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE`.
capacityYinteger100the buffer capacity for the backend asyncWaitOperator of the lookup join operator.
timeoutYduration300stimeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover



retry
retry-predicateYstringN/Acan be 'lookup_miss' which will enable retry if lookup result is empty.
retry-strategyYstringN/Acan be 'fixed_delay' 
fixed-delayYdurationN/A

delay time for the 'fixed_delay' strategy

max-attemptsYintegerN/A

max attempt number of the 'fixed_delay' strategy


For these connectors which can have both capabilities of async and sync lookup, our advice for the connector developers are implementing both sync and async interfaces if both capabilities have suitable use cases, the planner will prefer the async one by default, and users can give different option value 'async'='true|false' via the LOOKUP query hint to suggest the planner,  otherwise choose one interface to implement.

Because query hint works in a best effort manner, so if users specifies a hint with invalid option, the query plan keeps unchanged, e.g., use LOOKUP('table'='customer', 'async'='true'), but backend lookup source only implemented the sync lookup function, then the async lookup hint takes no effect.

1. Sync and Async Mode Lookup

1.1 Sync Lookup

If the connector has both capabilities of async and sync lookup, users can give the option value 'async'='false' to suggest the planner to use the sync lookup:

Code Block
languagesql
LOOKUP('table'='dim1', 'async'='false')

1.2 Async Lookup

If the connector has both capabilities of async and sync lookup, users can give the option value 'async'='true' to suggest the planner to use the async lookup:

Code Block
languagesql
LOOKUP('table'='dim1', 'async'='true')

And async lookup related parameters can also be configured via the hint option(this covers the join level configuration requirement proposed in FLINK-27625).  All the kv hint options except table name are optional (use job level configuration if not set):

Code Block
languagesql
LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s')

e.g., if the job level configuration is:

Code Block
languagesql
table.exec.async-lookup.output-mode: ORDERED
table.exec.async-lookup.buffer-capacity: 100
table.exec.async-lookup.timeout: 180s


then the following hints:

Code Block
languagesql
1. LOOKUP
Code Block
languagesql
ASYNC_LOOKUP_RETRY('table'='tableName', 'predicate'='retry-predicate', 'strategy'='retry-strategy', 'delay'='duration','max-attempts'='int')
options:
table: table name of lookup source
predicate: empty-result|exception|empty-result-or-exception
strategy: fixed-delay
delay: duration, e.g., 10s
max-attempts: integer, e.g., 3

e.g.,

Code Block
languagesql
ASYNC_LOOKUP_RETRY('table'='dim1', 'predicateasync'='empty-resulttrue', 'strategyoutput-mode'='fixed-delay', 'delayallow_unordered')
2. LOOKUP('table'='10sdim1', 'max-attemptsasync'='3')

Since it covers the functionality of the 'ASYNC_LOOKUP' hint and extends the retry capability, so kv options for 'ASYNC_LOOKUP' also works.

...

true', 'timeout'='300s')


are equivalent to:

Code Block
languagesql
ASYNC_LOOKUP_RETRY1. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow-_unordered', 'capacity'='100', 'timeout'='180s')
2. LOOKUP('table'='dim1', 'triggerasync'='empty-result', 'true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s')

2. Retry Support

The hint option 'retry-predicate'='lookup_miss'  can enable retry on both sync and async lookup. Retry related hint options:

Code Block
languagesql
'retry-strategy'='fixed-_delay', 
'fixed-delay'='10s',
'max-attempts'='3')

Why not merge the two hints into single one?

...

e.g., retry on async lookup

Code Block
languagesql
LOOKUP('table'='dim1', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3')

retry on sync lookup

...

Code Block
languagesql

...

LOOKUP

...

('table'='dim1', 'async'='false', 'retry-predicate'='

...

lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay

...

'='10s','max-attempts'='3')

If the lookup source only has one capability, then the 'async' mode option can be omitted:

Code Block
languagesql
LOOKUP('table'='dim1', 'retry-predicate'='

...

lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay

...

'='10s','max-attempts'='3')

For the retry strategy, we plan to support a fixed_delay retry strategy first, and this can be extended in the future.

Use Case: Lookup Join Without Retry

...

Use Case: Lookup Join With Retry

Code Block
languagesql
-- async retry triggered by empty result, using 10s fixed-delay strategy, max attempts 3. 
SELECT /*+ ASYNC_LOOKUP_RETRY('table'='Customers', 'retry-predicate'='empty-resultlookup_miss', 'retry-strategy'='fixed-_delay',  'fixed-delay'='10s', 'max-attempts'='3') */ 
	o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

In the above example, the backend lookup source of table 'Customers' is JDBC, so the retry takes effect on the sync lookup

Proposed Changes

The main change of the new hint is similar to FLIP-204, and after FLIP-204 is done, it will be simpler adding a join hint.

  1. Define Hint Strategy, add 'ASYNC_LOOKUP_RETRY' and related hint parsing for simple and kv form hint options. (Original hints propagate via FlinkLogicalJoin)
  2. Hint validation and convert to AsyncRetryStrategy table's internal RetryStrategy in exec lookup join node.

Since FLIP-204 has not done yet, we can not give a complete Poc here, but the changes are controllable.

1. Async Retry

convert the parsed internal RetryStrategy into runtime's AsyncRetryStrategy, and use the new api with retry support proposed in FLIP-232[1]

2. Sync Retry

add sync retry implementation for the internal LookupJoinRunner

3. Prefer Async Lookup Function For The Planner

If the connector has both capabilities of async and sync lookup, the planner will prefer the async one when no query hint is given. If the user give the option value explicitly, 'async'='false' will use the sync lookup and 'async'='true' will use the async lookup.

Compatibility, Deprecation, and Migration Plan

This feature is backwards compatible and transparently to all connectors. For existing connectors which implements AsyncTableFunctionsupport lookup, can easily enable async|sync retry via the new join hint.

Add a followup issue to discuss whether to remove the 'lookup.async' option in HBase connector.

Test Plan

Extend existing cases to cover retry scenarios.

...

Code Block
languagesql
-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers',
  -- lookup-_miss.retry-strategy
  'lookup-_miss.retry-strategy'='fixed-delay', -- 'none' by default
  'lookup-_miss.retry-strategy.fixed-delay.delay' = '10 s',
  'lookup-_miss.retry-strategy.fixed-delay.max-attempts' = '3', 
);

-- join operation stays the same.
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

...

Code Block
languagesql
-- ①  create views for easy reading
CREATE VIEW v_first_join AS
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
-- Note: the original inner join change to left join
LEFT JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

-- view2 which join result is empty
CREATE VIEW v_first_join_empty AS
SELECT order_id, total, country FROM v_first_join WHERE zip IS NULL;

-- ② Add delayed processing and join operation (similar to retry) for data which join results is empty, and UNION ALL for non-empty results, lastly output to downstream
INSERT INTO downstream_table
SELECT * FROM v_first_join WHERE zip IS NOT NULL;

UNION ALL

SELECT o.order_id, o.total, c.country, c.zip
FROM TABLE(DEFER(v_first_join_empty, proc_time, 10 s)) AS o
-- Note: back to inner join here
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;


References:

1. FLIP-232: Add Retry Support For Async I/O In DataStream API

2. FLINK-27625:  Add query hint for async lookup join

3. FLIP-204: Introduce Hash Lookup Join

4. https://lists.apache.org/thread/1vokqdnnt01yycl7y1p74g556cc8yvtq