You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Motivation

Currently, when performing cascading connections in Flink, there is a pain point of record amplification. This FLIP aims to effectively solve this problem .

As mentioned in discussion,  record amplification would be quite pronounced when using Flink to perform join operations with outer joins on 10 tables.  An update to the first table will trigger an exponential increase in the number of records. Specifically, the first join operation will generate two records: a delete and a insert. Subsequent join operations double the number of incoming records, so that the second operation yields four records. This pattern continues until the last join operation generates 2^10 which is 1024 records. A diagram illustrating the record amplification is shown as below. This cumulative effect of record amplification can lead to a significant decrease in system performance and a sharp increase in resource consumption.

Public Interfaces

An new option to control minibatch join is added.

Key

Default

Type

table.exec.stream.join.mini-batch-enabled

false

boolean

In addition,  other three options is required to set to make minibatch join work. The example is as following. Go to options for more details about other three options .

table.exec.stream.join.mini-batch-enabled: true
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 2s
table.exec.mini-batch.size: 50000

The rationale behind not utilizing table.exec.mini-batch.enabled' for this optimization is that this option does not control the regular join in current version. If utilizing this option to control minibatch join and job upgrade to new version with this option enabled, it could potentially alter the join behavior due to lost of complete changelog. This change would constitute a deviation from the anticipated behavior in scenarios where a complete changelog is requisite for the correctness.

Proposed Changes

A new operator called MiniBatchStreamingJoinOperator which inherits from StreamingJoinOperator is introduced. It now supports four types of joins: left join, right join, full join and inner join (not support semi / anti join). It will use a block of memory as a minibatch to store data input from the left and right streams like HeapBufferedBundle for GroupAggregate operation. The records in the minibatch is processed when triggered by minibatch size or watermark.

The explaination of 3 points of optimization for three scenarios could be referenced in the appendix and the nexmark metrics is also listed. Go to Appendix for more details.

Compatibility, Deprecation, and Migration Plan

The proposal exclusively impacts the functionality of the streamingJoinOperator when handling paired records. A switch is used to make sure streamingJoinOperator works as before when disabling minibatch join. 

Test Plan

The test will focus on

  • Functionality Test

    • The operator-level MiniBatchStreamingJoinOperator works as expected.

    • The BufferBundle works as expected.

Rejected Alternatives

N/A


  • No labels