Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion threadhttps://lists.apache.org/thread/ccsv66ygffgqbv956bnknbpllj4t24kj
Vote threadhttps://lists.apache.org/thread/b1hqnlxkyqnqt1sn1dww7jw3bdrs030o
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33683

Release

...

Here are some cases of performance issues:

  • Unreused Not reused connections/threads, extra overhead of resource operations :

    • Frequently create/detroy destroy new HA(ZK、K8S) connections for leader retrieval

    • Frequently open/close Netty channel for each request

    • Frequently create/destroy ThreadPool in RestClusterClient and RestClient

  • Unreused Not reused instances, extra GC overhead:

    • For each operation, Flink Client creates a lot of new instances like: ClusterDescriptor, RestClusterClient, ClientHighAvailabilityServices and RestClient

  • Concurrency bottlenecks:

    • One global ObjectMapper instance for data serialization/deserialization for all http requests and responses

  • Unnecessary workload:

    • For example: fixed collect retry interval(100 ms) in CollectResultFetcher to fetch result from Flink Cluster. This retry operation could be very resource consuming when executing under high concurrency.

...

An agent process will use Flink JDBC Driver to continuously submit short-lived queries to the SQL Gateway service under different concurrency (1 concurrency, 32 concurrency, 64 concurrency and more) and monitor the end-to-end Latency.

...

Catalog DDL

Code Block
languagesql
create temporary table table1 (
    val1 STRING
) WITH (
      'connector' = 'datagen',
      'number-of-rows' = '1'
);

create temporary table table2 (
    val2 STRING
) WITH (
      'connector' = 'datagen',
      'number-of-rows' = '1'
);

create temporary table table3 (
    val3 STRING
) WITH (
      'connector' = 'datagen',
      'number-of-rows' = '1'
);

DQL

Query Type

SQL

JobGraph

source


Code Block
languagesql

...

select val1 from table1;

...


Image Added

wourdcount


Code Block
languagesql
select val1, count(*) from table1 
	 group by val1;

...



Image Added

join


Code Block
languagesql
select val1, count(*) from table1 
    left join table2 on val1=val2 
    left join table3 on val2=val3 
group by val1;



Image Added

Benchmark

Notice: To test the performance bottleneck of Flink Client in interactive scenarios, we used a version of Flink Cluster running with more scheduling optimizations than the community(e.g. HA improvement mentioned in FLIP-403) as the baseline.

...

In high concurrency scenarios, the CPU usage of the Flink cluster was reduced by 20% with no performance degradation in QPS by using proper retry strategy configuration.

initial-delay

200 ms

increment

100 ms

max-delay

700 ms


Baseline(128 Concurrency)Experiment(128 Concurrency)
Retry StrategyDEFAULT_RETRY_MILLIS=100 ms

collect-strategy.type

incremental-delay

Optimized retry strategy

Join QPS

47

48

JM CPU
Usage AVG

17.3

Image Removed

Image Added

14.5
Image Removed

Image Added

TM CPU
Usage AVG

9.0

7.2

Changed Public Interfaces

DeploymentOptions

Option Name

Default Value

Description

execution.interactive-client

false

Enabling this means the Flink client will be used in an interactive scenario. It should reuse all the necessary resources to improve interactive performance.

Notice: This option will only take effect when Flink is deployed in session-mode.

CollectOptions

Option Name

Default Value

Description

collect-strategy.type

fixed-delay

Config options for RetryStrategy in CollectResultFetcher.
The default RetryStrategy is FixedDelayCollectStrategy, which means CollectResultFetcher will keep fetch results from sinkoperator at fixed intervals.

collect-strategy.

fixed-delay.attempts

Integer.MAX_VALUE

Config options for FixedRetryStrategy, the strategy will retries at a fixed delay.

collect-strategy.fixed-delay.delay

100 ms

collect-strategy.

exponential-delay.initial-backoff

100 ms

Config options for ExponentialBackoffRetryStrategy

, the strategy will increase the delay exponentially with a cap

. The delay strategy starts at initial value and keeps increasing (multiplying by backoff multiplier) until maximum delay is reached.

collect-strategy.exponential-delay.max-backoff

1 s

collect-strategy.exponential-delay.

attempts

Integer.MAX_VALUE

collect-strategy.incremental-delay.initial-delay

100 ms

Config options for IncrementalDelayRetryStrategy, the strategy will increase the delay with a fixed increment.

backoff-multiplier

1.0

collect-strategy.

incremental-delay.increment

100 ms

collect-strategy.incremental-delay.max-delay

1 s

collect-strategy.incremental

exponential-delay.attempts

Integer.MAX_VALUE


Proposed Change

Overview

  • Reuse ClusterDescriptor when interacting with a dedicated session cluster
  • Reuse ClientHighAvailabilityServices for leader retrieval with the same session cluster
  • Reuse RestClusterClient when sending requests to the same session cluster
  • Reuse the HTTP connections between JDBC Driver to SQL Gateway and Flink Client to Flink Cluster with the same target address
  • Use ThreadLocal<ObjectMapper> instead of a global ObjectMapper instance
  • Use a configurable retry strategy in CollectResultFetcher to reduce unnecessary workload

...

ClusterDescriptor is created by ClusterClientFactory. When users have enabled the 'execution.interactive-client' option, ClusterClientFactory will return a cached ClusterDescriptor for each ClusterID. The key and value in the cache would be ClusterID and ClusterDescriptor. The cache will be configured with an expireAfterAccess time.

Cache Properties

Key Value

Cache<ClusterID, ClusterDescriptor>

Configuration

expireAfterAccess(Duration.ofMinutes(30L))


ClientHighAvailabilityServices Reuse

...

Test Plan

Both Unit Test & Intergration Integration Test will be introduced to verify this change. There will also be performance benchmarks to prove that these optimizations will not cause any performance regression for Flink Client.

...