Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion threadhttps://lists.apache.org/thread/pd9gzslq20dtzzfphxqvwhc43hrzo2y1
Vote threadhere (<- link to https://lists.apache.org/thread/xc0c4zsq4qlyot4vcbdxznr9ocnd0pm0
JIRA

here (<- link to

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-34219

Release1.19

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when performing cascading connections in Flink, there is a pain point of record amplification. This FLIP aims to effectively solve this problem .

As mentioned in discussion,  record amplification would be quite pronounced when using Flink to perform join operations with outer joins on 10 tables.  An update to the first table will trigger an exponential increase in the number of records. Specifically, the first join operation will generate two records: a delete and a insert. Subsequent join operations double the number of incoming records, so that the second operation yields four records. This pattern continues until the last join operation generates 2^10 which is 1024 records. A diagram illustrating the record amplification is shown as below. This cumulative effect of record amplification can lead to a significant decrease in system performance and a sharp increase in resource consumption.

Public Interfaces

An new option to control minibatch join is added.

...

Key

...

Default

...

Type

...

table.exec.stream.join.mini-batch-enabled

...

false

...

boolean

The is no additional change about public interfaces. 

Three options about minibatch In addition,  other three options is required to set to make minibatch join work. The example is as following. Go to options for more details about other three options .to options for more details about these three options .

table.exec.stream.join.mini-batch-enabled: true
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 2s
table.exec.mini-batch.size: 50000

...

Proposed Changes

A new operator called MiniBatchStreamingJoinOperator which inherits from StreamingJoinOperator is introduced. It now supports four types of joins: left join, right join, full join and inner join (not support semi / anti join). 

...