Discussion thread | https://lists.apache.org/thread/ccsv66ygffgqbv956bnknbpllj4t24kj | ||||||||
---|---|---|---|---|---|---|---|---|---|
Vote thread | https://lists.apache.org/thread/b1hqnlxkyqnqt1sn1dww7jw3bdrs030o | ||||||||
JIRA |
| ||||||||
Release |
...
Code Block | ||
---|---|---|
| ||
create temporary table table1 ( val1 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '1' ); create temporary table table2 ( val2 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '1' ); create temporary table table3 ( val3 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '1' ); |
DQL
Query Type | SQL | JobGraph | |||||
---|---|---|---|---|---|---|---|
source |
| ||||||
wourdcount |
| ||||||
join |
|
Benchmark
Notice: To test the performance bottleneck of Flink Client in interactive scenarios, we used a version of Flink Cluster running with more scheduling optimizations than the community(e.g. HA improvement mentioned in FLIP-403) as the baseline.
...
In high concurrency scenarios, the CPU usage of the Flink cluster was reduced by 20% with no performance degradation in QPS by using proper retry strategy configuration.
Baseline(128 Concurrency) | Experiment(128 Concurrency) | |||
---|---|---|---|---|
Retry Strategy | DEFAULT_RETRY_MILLIS=100 ms | Optimized retry strategy | ||
Join QPS | 47 | 48 | ||
JM CPU | 17.3 |
14.5 |
TM CPU | 9.0 | 7.2 |
Changed Public Interfaces
DeploymentOptions
Option Name | Default Value | Description |
execution.interactive-client | false | Enabling this means the Flink client will be used in an interactive scenario. It should reuse all the necessary resources to improve interactive performance. Notice: This option will only take effect when Flink is deployed in session-mode. |
CollectOptions
Option Name | Default Value | Description |
collect-strategy.exponential-delay.initial-backoff | 100 ms | Config options for |
collect-strategy.exponential-delay.max-backoff | 1 s | |
collect-strategy.exponential-delay.backoff-multiplier | 1.0 | |
collect-strategy.exponential-delay.attempts | Integer.MAX_VALUE |
Proposed Change
Overview
- Reuse
ClusterDescriptor
when interacting with a dedicated session cluster - Reuse
ClientHighAvailabilityServices
for leader retrieval with the same session cluster - Reuse
RestClusterClient
when sending requests to the same session cluster - Reuse the HTTP connections between JDBC Driver to SQL Gateway and Flink Client to Flink Cluster with the same target address
- Use
ThreadLocal<ObjectMapper>
instead of a globalObjectMapper
instance - Use a configurable retry strategy in
CollectResultFetcher
to reduce unnecessary workload
...
ClusterDescriptor is created by ClusterClientFactory
. When users have enabled the 'execution.interactive-client' option, ClusterClientFactory will return a cached ClusterDescriptor
for each ClusterID. The key and value in the cache would be ClusterID
and ClusterDescriptor
. The cache will be configured with an expireAfterAccess time.
Cache Properties | |
---|---|
Key Value | Cache<ClusterID, ClusterDescriptor> |
Configuration | expireAfterAccess(Duration.ofMinutes(30L)) |
ClientHighAvailabilityServices Reuse
...