Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion threadhttps://lists.apache.org/thread/ccsv66ygffgqbv956bnknbpllj4t24kj
Vote threadhttps://lists.apache.org/thread/b1hqnlxkyqnqt1sn1dww7jw3bdrs030o
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33683

Release

...

Code Block
languagesql
create temporary table table1 (
    val1 STRING
) WITH (
      'connector' = 'datagen',
      'number-of-rows' = '1'
);

create temporary table table2 (
    val2 STRING
) WITH (
      'connector' = 'datagen',
      'number-of-rows' = '1'
);

create temporary table table3 (
    val3 STRING
) WITH (
      'connector' = 'datagen',
      'number-of-rows' = '1'
);

DQL

Query Type

SQL

JobGraph

source


Code Block
languagesql
select val1 from table1;


Image Modified

wourdcount


Code Block
languagesql
select val1, count(*) from table1 
	 group by val1;



Image Modified

join


Code Block
languagesql
select val1, count(*) from table1 
    left join table2 on val1=val2 
    left join table3 on val2=val3 
group by val1;



Image Modified

Benchmark

Notice: To test the performance bottleneck of Flink Client in interactive scenarios, we used a version of Flink Cluster running with more scheduling optimizations than the community(e.g. HA improvement mentioned in FLIP-403) as the baseline.

...

In high concurrency scenarios, the CPU usage of the Flink cluster was reduced by 20% with no performance degradation in QPS by using proper retry strategy configuration.


Baseline(128 Concurrency)Experiment(128 Concurrency)
Retry StrategyDEFAULT_RETRY_MILLIS=100 msOptimized retry strategy

Join QPS

47

48

JM CPU
Usage AVG

17.3

Image Removed

Image Added

14.5
Image Removed

Image Added

TM CPU
Usage AVG

9.0

7.2

Changed Public Interfaces

DeploymentOptions

Option Name

Default Value

Description

execution.interactive-client

false

Enabling this means the Flink client will be used in an interactive scenario. It should reuse all the necessary resources to improve interactive performance.

Notice: This option will only take effect when Flink is deployed in session-mode.

CollectOptions

Option Name

Default Value

Description

collect-strategy.exponential-delay.initial-backoff

100 ms

Config options for ExponentialBackoffRetryStrategy. The delay strategy starts at initial value and keeps increasing (multiplying by backoff multiplier) until maximum delay is reached.

collect-strategy.exponential-delay.max-backoff

1 s

collect-strategy.exponential-delay.backoff-multiplier

1.0

collect-strategy.exponential-delay.attempts

Integer.MAX_VALUE


Proposed Change

Overview

  • Reuse ClusterDescriptor when interacting with a dedicated session cluster
  • Reuse ClientHighAvailabilityServices for leader retrieval with the same session cluster
  • Reuse RestClusterClient when sending requests to the same session cluster
  • Reuse the HTTP connections between JDBC Driver to SQL Gateway and Flink Client to Flink Cluster with the same target address
  • Use ThreadLocal<ObjectMapper> instead of a global ObjectMapper instance
  • Use a configurable retry strategy in CollectResultFetcher to reduce unnecessary workload

...

ClusterDescriptor is created by ClusterClientFactory. When users have enabled the 'execution.interactive-client' option, ClusterClientFactory will return a cached ClusterDescriptor for each ClusterID. The key and value in the cache would be ClusterID and ClusterDescriptor. The cache will be configured with an expireAfterAccess time.

Cache Properties

Key Value

Cache<ClusterID, ClusterDescriptor>

Configuration

expireAfterAccess(Duration.ofMinutes(30L))


ClientHighAvailabilityServices Reuse

...