You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: Under Discussion

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

A lookup join is typically used to enrich a table with data that is queried from an external system(backed by a lookup source), but some data may not be updated in external systems in a timely manner, and the user want do delayed retry when encounter such unexpected 'missed' data, but can't elegantly implement it for now. Although flink supports various joins(regular joins, interval joins, temporal joins...) for streams, unfortunately the lookup source is often not available as a stream(e.g., Redis).

We want to support such retryable lookup join to solve delayed updates issue, as a pre-work for this solution, we proposed FLIP-232[1] which adds a general retry support for Async I/O. And we prefer to offer this retry capability via query hints, similar to new join hints proposed in FLINK-27625[2] & FLIP-204[3].


Non target:

  1. The lookup sources that can be connected as streams (can use other types of join)
  2. Fixed delayed processing for all input data (should use other lighter ways to solve)

ref:

  1. FLIP-232: Add Retry Support For Async I/O In DataStream API
  2. FLINK-27625:  Add query hint for async lookup join
  3. FLIP-204: Introduce Hash Lookup Join

Public Interfaces

New query hints: 'ASYNC_LOOKUP_RETRY'

kv form only

ASYNC_LOOKUP_RETRY('table'='tableName', 'predicate'='retry-predicate', 'strategy'='retry-strategy', 'delay'='duration','max-attempts'='int')
options:
table: table name of lookup source
predicate: empty-result|exception|empty-result-or-exception
strategy: fixed-delay
delay: duration, e.g., 10s
max-attempts: integer, e.g., 3

e.g.,

ASYNC_LOOKUP_RETRY('table'='dim1', 'predicate'='empty-result', 'strategy'='fixed-delay', 'delay'='10s','max-attempts'='3')

Since it covers the functionality of the 'ASYNC_LOOKUP' hint and extends the retry capability, so kv options for 'ASYNC_LOOKUP' also works.

e.g.,

ASYNC_LOOKUP_RETRY('table'='dim1', 'output-mode'='allow-unordered', 'capacity'='100', 'timeout'='180s', 'trigger'='empty-result', 'strategy'='fixed-delay', 'delay'='10s','max-attempts'='3')


Why not merge the two hints into single one?

  1. Add 'retry' postfix to the hint name can reduce the option name length(avoid use retry prefix for every option)
    1. compare the two hints:

    ASYNC_LOOKUP_RETRY('table'='dim1', 'predicate'='empty-result', 'strategy'='fixed-delay'...
    
    Vs
    
    ASYNC_LOOKUP('table'='dim1', 'retry-predicate'='empty-result', 'retry-strategy'='fixed-delay'...
  2. Consider the option length, there's still a chance that we can support simple form option later for ASYNC_LOOKUP without retry if needed, but no chance for the one with retry since too many options. So use two hints to keep the chance for future.

Use Case: Lookup Join Without Retry

flink example

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

Use Case: Lookup Join With Retry

-- async retry triggered by empty result, using 10s fixed-delay strategy, max attempts 3. 
SELECT /*+ ASYNC_LOOKUP_RETRY('table'='Customers', 'predicate'='empty-result', 'strategy'='fixed-delay',  'delay'='10s', 'max-attempts'='3') */ 
	o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

Proposed Changes

The main change of the new hint is similar to FLIP-204, and after FLIP-204 is done, it will be simpler adding a join hint.

  1. Define Hint Strategy, add 'ASYNC_LOOKUP_RETRY' and related hint parsing for simple and kv form hint options. (Original hints propagate via FlinkLogicalJoin)
  2. Hint validation and convert to AsyncRetryStrategy in exec lookup join node.

Since FLIP-204 has not done yet, we can not give a complete Poc here, but the changes are controllable.

Compatibility, Deprecation, and Migration Plan

This feature is backwards compatible and transparently to connectors. For existing connectors which implements AsyncTableFunction, can easily enable async retry via the new join hint.

Test Plan

Extend existing cases to cover retry scenarios.

Rejected Alternatives

Candidate 1:

Reject Reason:
it's impossible to use different retry strategies for two joins with a same table if retry strategy binding to table ddl, and the retry behavior is more relatively with the join operation not the table.

Set Retry Strategy Via Table DDL's With Options

-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers',
  -- lookup-miss.retry-strategy
  'lookup-miss.retry-strategy'='fixed-delay', -- 'none' by default
  'lookup-miss.retry-strategy.fixed-delay.delay' = '10 s',
  'lookup-miss.retry-strategy.fixed-delay.max-attempts' = '3', 
);

-- join operation stays the same.
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;


Candidate 2:

Reject Reason:
It's hard for users to use, and queries can be overly complex when more retries are needed, even though the semantics are clean enough.

Introduce A New 'DERRER' SQL Operator

The sql standard does not have an operator that can express a 'delayed retry join' or the retry operation, and extending such a retryable join operator is hard to define a clear semantics.

Based on the idea that sql operators are orthogonal and combinable, a delayed processing table value function(Window TVF): 'DEFER' can be introduced, the input data will be delayed for a declared length of time before being sent out.

DEFER(table-identifier, time-descriptor, duration)
-- table-identifier: specify the table name
-- time-descriptor :  proctime or rowtime
-- duration: delayed duration


Use Case: Combine The DEFER and Join Operation To Simulate One Retry

if more times needed, the query should changed as well, maybe procedure(transact-SQL) is more suitable here.

-- ①  create views for easy reading
CREATE VIEW v_first_join AS
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
-- Note: the original inner join change to left join
LEFT JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

-- view2 which join result is empty
CREATE VIEW v_first_join_empty AS
SELECT order_id, total, country FROM v_first_join WHERE zip IS NULL;

-- ② Add delayed processing and join operation (similar to retry) for data which join results is empty, and UNION ALL for non-empty results, lastly output to downstream
INSERT INTO downstream_table
SELECT * FROM v_first_join WHERE zip IS NOT NULL;

UNION ALL

SELECT o.order_id, o.total, c.country, c.zip
FROM TABLE(DEFER(v_first_join_empty, proc_time, 10 s)) AS o
-- Note: back to inner join here
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;







  • No labels