Discussion thread | https://lists.apache.org/thread/ccsv66ygffgqbv956bnknbpllj4t24kj | ||||||||
---|---|---|---|---|---|---|---|---|---|
Vote thread | https://lists.apache.org/thread/b1hqnlxkyqnqt1sn1dww7jw3bdrs030o | ||||||||
JIRA |
| ||||||||
Release |
...
Here are some cases of performance issues:
Unreused Not reused connections/threads, extra overhead of resource operations :
Frequently create/detroy destroy new HA(ZK、K8S) connections for leader retrieval
Frequently open/close Netty channel for each request
Frequently create/destroy ThreadPool in
RestClusterClient
andRestClient
Unreused Not reused instances, extra GC overhead:
For each operation, Flink Client creates a lot of new instances like:
ClusterDescriptor
,RestClusterClient
,ClientHighAvailabilityServices
andRestClient
Concurrency bottlenecks:
One global
ObjectMapper
instance for data serialization/deserialization for all http requests and responses
Unnecessary workload:
For example: fixed collect retry interval(100 ms) in
CollectResultFetcher
to fetch result from Flink Cluster. This retry operation could be very resource consuming when executing under high concurrency.
...
An agent process will use Flink JDBC Driver to continuously submit short-lived queries to the SQL Gateway service under different concurrency (1 concurrency, 32 concurrency, 64 concurrency and more) and monitor the end-to-end Latency.
...
Catalog DDL
Code Block | ||
---|---|---|
| ||
create temporary table table1 ( val1 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '1' ); create temporary table table2 ( val2 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '1' ); create temporary table table3 ( val3 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '1' ); |
DQL
Query Type | SQL | JobGraph | |||
---|---|---|---|---|---|
source |
|
...
|
...
wourdcount |
|
...
join |
|
Benchmark
Notice: To test the performance bottleneck of Flink Client in interactive scenarios, we used a version of Flink Cluster running with more scheduling optimizations than the community(e.g. HA improvement mentioned in FLIP-403) as the baseline.
...
In high concurrency scenarios, the CPU usage of the Flink cluster was reduced by 20% with no performance degradation in QPS by using proper retry strategy configuration.
initial-delay
200 ms
increment
100 ms
max-delay
700 ms
Baseline(128 Concurrency) | Experiment(128 Concurrency) | |||
---|---|---|---|---|
Retry Strategy | DEFAULT_RETRY_MILLIS=100 ms |
collect-strategy.type
incremental-delay
Optimized retry strategy |
Join QPS | 47 | 48 | ||
JM CPU | 17.3 |
14.5 |
TM CPU | 9.0 | 7.2 |
Changed Public Interfaces
DeploymentOptions
Option Name | Default Value | Description |
execution.interactive-client | false | Enabling this means the Flink client will be used in an interactive scenario. It should reuse all the necessary resources to improve interactive performance. Notice: This option will only take effect when Flink is deployed in session-mode. |
CollectOptions
Option Name | Default Value | Description |
collect-strategy. |
fixed-delay
CollectResultFetcher
. The default RetryStrategy is
FixedDelayCollectStrategy
, which means CollectResultFetcher
will keep fetch results from sinkoperator at fixed intervals.collect-strategy.fixed-delay.attempts
Integer.MAX_VALUE
Config options for FixedRetryStrategy
, the strategy will retries at a fixed delay.
collect-strategy.fixed-delay.delay
100 ms
exponential-delay.initial-backoff | 100 ms | Config options for |
| |
collect-strategy.exponential-delay.max-backoff | 1 s |
collect-strategy.exponential-delay. |
Integer.MAX_VALUE
collect-strategy.incremental-delay.initial-delay
100 ms
IncrementalDelayRetryStrategy
, the strategy will increase the delay with a fixed increment.backoff-multiplier | 1.0 |
collect-strategy. |
exponential-delay. |
100 ms
collect-strategy.incremental-delay.max-delay
1 s
attempts | Integer.MAX_VALUE |
Proposed Change
Overview
- Reuse
ClusterDescriptor
when interacting with a dedicated session cluster - Reuse
ClientHighAvailabilityServices
for leader retrieval with the same session cluster - Reuse
RestClusterClient
when sending requests to the same session cluster - Reuse the HTTP connections between JDBC Driver to SQL Gateway and Flink Client to Flink Cluster with the same target address
- Use
ThreadLocal<ObjectMapper>
instead of a globalObjectMapper
instance - Use a configurable retry strategy in
CollectResultFetcher
to reduce unnecessary workload
...
ClusterDescriptor is created by ClusterClientFactory
. When users have enabled the 'execution.interactive-client' option, ClusterClientFactory will return a cached ClusterDescriptor
for each ClusterID. The key and value in the cache would be ClusterID
and ClusterDescriptor
. The cache will be configured with an expireAfterAccess time.
Cache Properties | |
---|---|
Key Value | Cache<ClusterID, ClusterDescriptor> |
Configuration | expireAfterAccess(Duration.ofMinutes(30L)) |
ClientHighAvailabilityServices Reuse
Currently, ClientHighAvailabilityServices
will only return a LeaderRetrievalService
and the LeaderRetrievalService
that that can only be started once with a LeaderRetrievalListener
. This leads to that if multiple ClusterClient
instances need to listen to the high availability change, they would have to start their own ClientHighAvailabilityServices
.
...
Test Plan
Both Unit Test & Intergration Integration Test will be introduced to verify this change. There will also be performance benchmarks to prove that these optimizations will not cause any performance regression for Flink Client.
...