Status
Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread/9k1sl2519kh2n3yttwqc00p07xdfns3h
JIRA:
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
A lookup join is typically used to enrich a table with data that is queried from an external system(backed by a lookup source), but some data may not be updated in external systems in a timely manner, and the user want do delayed retry when encounter such unexpected 'missed' data, but can't elegantly implement it for now. Although flink supports various joins(regular joins, interval joins, temporal joins...) for streams, unfortunately the lookup source is often not available as a stream(e.g., Redis).
We want to support such retryable lookup join includes both async and sync lookups to solve delayed updates issue, as a pre-work for this solution, we proposed FLIP-232[1] which adds a general retry support for Async I/O. And we prefer to offer this retry capability via query hints, similar to new join hints proposed in FLINK-27625[2] & FLIP-204[3].
As discussed in the mailing thread, we're planning to introduce an unified hint to support both sync|async lookup and with|without retry, the support matrix will be:
lookup support | async | retry |
sync w/o retry | N | N |
sync w/ retry | N | Y |
async w/o retry | Y | N |
async w/ retry | Y | Y |
Non target:
- The lookup sources that can be connected as streams (can use other types of join)
- Async lookup with retry is not capable for fixed delayed processing for all input data (should use other lighter ways to solve, e.g., pending source consumption or use sync lookup with retry)
- Do not support retry on exception (let the sql connectors handle it)
Public Interfaces
A new query hint: 'LOOKUP' with different hint options ('async'='true|false', 'miss-retry'='true|false') to cover all related functionalities(include FLINK-27625 and discussion on connector option 'lookup.async' in FLIP-221[4]). Compared to multiple hints with different subsets of functionality, a single hint may be easier for users to understand and use, and specific parameters can be quickly found through documentation
The available hint options for each mode:
mode | support hint options |
async | 'async'='true' 'output-mode'='ordere|allow-unordered' 'capacity'='100' 'timeout'='180s' |
retry | 'miss-retry'='true' 'retry-strategy'='fixed-delay' 'delay'='10s' 'max-attempts'='3' |
For these connectors which can have both capabilities of async and sync lookup, our advice for the connector developers are implementing both sync and async interfaces if both capabilities have suitable use cases, the planner will prefer the async one by default, and users can give different option value 'async'='true|false' via the LOOKUP query hint to suggest the planner,
otherwise choose one interface to implement.
Because query hint works in a best effort manner, so if users specifies a hint with invalid option, the query plan keeps unchanged, e.g., use LOOKUP('table'='customer', 'async'='true'), but backend lookup source only implemented the sync lookup function, then the async lookup hint takes no effect.
1. Sync and Async Mode Lookup
1.1 Sync Lookup
If the connector has both capabilities of async and sync lookup, users can give the option value 'async'='false' to suggest the planner to use the sync lookup:
LOOKUP('table'='dim1', 'async'='false')
1.2 Async Lookup
If the connector has both capabilities of async and sync lookup, users can give the option value 'async'='true' to suggest the planner to use the async lookup:
LOOKUP('table'='dim1', 'async'='true')
And async lookup related parameters can also be configured via the hint option(this covers the join level configuration requirement proposed in FLINK-27625). All the kv hint options except table name are optional (use job level configuration if not set):
LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow-unordered', 'capacity'='100', 'timeout'='180s')
e.g., if the job level configuration is:
table.exec.async-lookup.output-mode: ORDERED table.exec.async-lookup.buffer-capacity: 100 table.exec.async-lookup.timeout: 180s
then the following hints:
1. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow-unordered') 2. LOOKUP('table'='dim1', 'async'='true', 'timeout'='300s')
are equivalent to:
1. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow-unordered', 'capacity'='100', 'timeout'='180s') 2. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s')
2. Retry Support
The hint option 'miss-retry'='true' can enable retry on both sync and async lookup. Retry related hint options:
'retry-strategy'='fixed-delay' 'delay'='10s' 'max-attempts'='3'
e.g., retry on async lookup
LOOKUP('table'='dim1', 'async'='true', 'miss-retry'='true', 'strategy'='fixed-delay', 'delay'='10s','max-attempts'='3')
retry on sync lookup
LOOKUP('table'='dim1', 'async'='false', 'miss-retry'='true', 'strategy'='fixed-delay', 'delay'='10s','max-attempts'='3')
If the lookup source only has one capability, then the 'async' mode option can be omitted:
LOOKUP('table'='dim1', 'miss-retry'='true', 'strategy'='fixed-delay', 'delay'='10s','max-attempts'='3')
For the retry strategy, we plan to support a fixed-delay retry strategy first, and this can be extended in the future.
Use Case: Lookup Join Without Retry
-- enrich each order with customer information SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
Use Case: Lookup Join With Retry
-- retry triggered by empty result, using 10s fixed-delay strategy, max attempts 3. SELECT /*+ LOOKUP('table'='Customers', 'miss-retry'='true', 'strategy'='fixed-delay', 'delay'='10s','max-attempts'='3') */ o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
In the above example, the backend lookup source of table 'Customers' is JDBC, so the retry takes effect on the sync lookup
Proposed Changes
The main change of the new hint is similar to FLIP-204, and after FLIP-204 is done, it will be simpler adding a join hint.
- Define Hint Strategy, add 'LOOKUP' and related hint parsing for kv options. (Original hints propagate via FlinkLogicalJoin)
- Hint validation and convert to table's internal RetryStrategy in exec lookup join node.
Since FLIP-204 has not done yet, we can not give a complete Poc here, but the changes are controllable.
1. Async Retry
convert the parsed internal RetryStrategy into runtime's AsyncRetryStrategy, and use the new api with retry support proposed in FLIP-232[1]
2. Sync Retry
add sync retry implementation for the internal LookupJoinRunner
3. Prefer Async Lookup Function For The Planner
If the connector has both capabilities of async and sync lookup, the planner will prefer the async one when no query hint is given. If the user give the option value explicitly, 'async'='false' will use the sync lookup and 'async'='true' will use the async lookup.
Compatibility, Deprecation, and Migration Plan
This feature is backwards compatible and transparently to all connectors. For existing connectors which support lookup, can easily enable async|sync retry via the new join hint.
Add a followup issue to discuss whether to remove the 'lookup.async' option in HBase connector.
Test Plan
Extend existing cases to cover retry scenarios.
Rejected Alternatives
Candidate 1:
Reject Reason:
it's impossible to use different retry strategies for two joins with a same table if retry strategy binding to table ddl, and the retry behavior is more relatively with the join operation not the table.
Set Retry Strategy Via Table DDL's With Options
-- Customers is backed by the JDBC connector and can be used for lookup joins CREATE TEMPORARY TABLE Customers ( id INT, name STRING, country STRING, zip STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', 'table-name' = 'customers', -- lookup-miss.retry-strategy 'lookup-miss.retry-strategy'='fixed-delay', -- 'none' by default 'lookup-miss.retry-strategy.fixed-delay.delay' = '10 s', 'lookup-miss.retry-strategy.fixed-delay.max-attempts' = '3', ); -- join operation stays the same. SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
Candidate 2:
Reject Reason:
It's hard for users to use, and queries can be overly complex when more retries are needed, even though the semantics are clean enough.
Introduce A New 'DERRER' SQL Operator
The sql standard does not have an operator that can express a 'delayed retry join' or the retry operation, and extending such a retryable join operator is hard to define a clear semantics.
Based on the idea that sql operators are orthogonal and combinable, a delayed processing table value function(Window TVF): 'DEFER' can be introduced, the input data will be delayed for a declared length of time before being sent out.
DEFER(table-identifier, time-descriptor, duration) -- table-identifier: specify the table name -- time-descriptor : proctime or rowtime -- duration: delayed duration
Use Case: Combine The DEFER and Join Operation To Simulate One Retry
if more times needed, the query should changed as well, maybe procedure(transact-SQL) is more suitable here.
-- ① create views for easy reading CREATE VIEW v_first_join AS SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o -- Note: the original inner join change to left join LEFT JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; -- view2 which join result is empty CREATE VIEW v_first_join_empty AS SELECT order_id, total, country FROM v_first_join WHERE zip IS NULL; -- ② Add delayed processing and join operation (similar to retry) for data which join results is empty, and UNION ALL for non-empty results, lastly output to downstream INSERT INTO downstream_table SELECT * FROM v_first_join WHERE zip IS NOT NULL; UNION ALL SELECT o.order_id, o.total, c.country, c.zip FROM TABLE(DEFER(v_first_join_empty, proc_time, 10 s)) AS o -- Note: back to inner join here JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
References:
1. FLIP-232: Add Retry Support For Async I/O In DataStream API
2. FLINK-27625: Add query hint for async lookup join
3. FLIP-204: Introduce Hash Lookup Join
4. https://lists.apache.org/thread/1vokqdnnt01yycl7y1p74g556cc8yvtq