Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion threadhttps://lists.apache.org/thread/pd9gzslq20dtzzfphxqvwhc43hrzo2y1
Vote threadhere (<- link to https://lists.apache.org/thread/xc0c4zsq4qlyot4vcbdxznr9ocnd0pm0
JIRA

here (<- link to

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-34219

Release1.19

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when performing cascading connections in Flink, there is a pain point of record amplification. This FLIP aims to effectively solve this problem .

As mentioned in discussion,  record amplification would be quite pronounced when using Flink to perform join operations with outer joins on 10 tables.  An update to the first table will trigger an exponential increase in the number of records. Specifically, the first join operation will generate two records: a delete and a insert. Subsequent join operations double the number of incoming records, so that the second operation yields four records. This pattern continues until the last join operation generates 2^10 which is 1024 records. A diagram illustrating the record amplification is shown as below. This cumulative effect of record amplification can lead to a significant decrease in system performance and a sharp increase in resource consumption.

Public Interfaces

An new option to control minibatch join is added.

...

Key

...

Default

...

Type

...

table.exec.stream.join.mini-batch-enabled

...

false

...

boolean

The is no additional change about public interfaces. 

Three options about minibatch In addition,  other three options is required to set to make minibatch join work. The example is as following. Go to options for more details about other three options .to options for more details about these three options .

table.exec.stream.join.mini-batch-enabled: true
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 2s
table.exec.mini-batch.size: 50000

The rationale behind not utilizing table.exec.mini-batch.enabled' for this optimization is that this option does not control the regular join in current version. If utilizing this option to control minibatch join and job upgrade to new version with this option enabled, it could potentially alter the join behavior due to lost of complete changelog. This change would constitute a deviation from the anticipated behavior in scenarios where a complete changelog is requisite for the correctness.

Proposed Changes

A new operator called MiniBatchStreamingJoinOperator which inherits from StreamingJoinOperator is introduced. It now supports four types of joins: left join, right join, full join and inner join (not support semi / anti join). It will use a block of memory as a minibatch to store data input from the left and right streams like HeapBufferedBundle for GroupAggregate operation. The records in the minibatch is processed when triggered by minibatch size or watermark. 

It will use a block of memory as a minibatch to store data input from the left and right streams like HeapBufferedBundle for GroupAggregate operation. The records in the minibatch is processed when triggered by minibatch size or watermark. The following is the UML of MiniBatchStreamingJoinOperator.

Image Added

The explaination of 3 points of optimization and its corresponding scenarios could be referenced in the appendix and  and the nexmark metrics is also listed. Go to Appendix for more detailsBesides, go to implementation could get details of the POC.

Compatibility, Deprecation, and Migration Plan

...