Discussion thread | |
---|---|
Vote thread | |
JIRA |
|
Release |
Motivation
Nowadays, there are lots of unnecessary overhead involved in Flink Client when interacting with a dedicated long-running Flink Session Cluster. This works well when users are using a command line tool or a management platform like StreamPark to interact with Flink Session Cluster. In these cases, users will not interact with one Flink Session Cluster very often, all operations should be one-time and all resources need to be released when the operation is finished to save memory usage.
But in StreamingWarehouse or OLAP scenarios, users might submit lots of short-lived jobs to a dedicated Flink Session Cluster through sql-gateway services. In this situation, these overheads will have a huge impact on E2E performance. For short-lived jobs take less than 1s to execute in Flink Cluster, E2E latency may take more than 2s.
Here are some cases of performance issues:
Unreused connections/threads, extra overhead of resource operations :
Frequently create/detroy new HA(ZK、K8S) connections for leader retrieval
Frequently open/close Netty channel for each request
Frequently create/destroy ThreadPool in
RestClusterClient
andRestClient
Unreused instances, extra GC overhead:
For each operation, Flink Client creates a lot of new instances like:
ClusterDescriptor
,RestClusterClient
,ClientHighAvailabilityServices
andRestClient
Concurrency bottlenecks:
One global
ObjectMapper
instance for data serialization/deserialization for all http requests and responses
Unnecessary workload:
For example: fixed collect retry interval(100 ms) in
CollectResultFetcher
to fetch result from Flink Cluster. This retry operation could be very resource consuming when executing under high concurrency.
Therefore, I propose this FLIP to improve Flink Client's overall performance when interacting with a dedicated Flink Session Cluster. This will benefit in both OLAP and StreamingWarehouse scenarios that users will frequently interact with a Flink Session Cluster. It could also improve the user experience when using session debug mode, users can get the debug result faster.
Benchmark
Test Design
To verify this optimization, we internally designed test scenarios for validation.
The agent process will continuously submit sql queries to the SQL Gateway service using different concurrency (1 concurrency, 32 concurrency and 64 concurrency) and monitor the end-to-end Latency.
Test Queries
Catalog
create temporary table table1 ( val1 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '1' ); create temporary table table2 ( val2 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '1' ); create temporary table table3 ( val3 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '1' );
Query
/* source query */ select val1 from table1; /* wordcount query */ select val1, count(*) from table1 group by val1; /* join query */ select val1, count(*) from table1 left join table2 on val1=val2 left join table3 on val2=val3 group by val1;
Benchmark
Notice: The baseline version of Flink is not exactly the same as the community version, and it includes some of our internal enhancements, like the HA improvement mentioned in FLIP-403.
E2E Latency
This benchmark shows these optimizations can reduce more than 50% E2E latency of these short-lived queries.
E2E QPS
The E2E QPS can also be improved by more than 60% for source/wordcount query and 35% for join query.
CPU Usage
In high concurrency scenarios, the CPU usage of the Flink cluster was reduced by 20% with no change in QPS by using proper retry strategy configuration.
Baseline(128 Concurrency) | Experiment(128 Concurrency) | |||
---|---|---|---|---|
Configuration | DEFAULT_RETRY_MILLIS | 100 ms | collect-strategy.type | incremental-delay |
initial-delay | 200 ms | |||
increment | 100 ms | |||
max-delay | 700 ms | |||
Join QPS | 47 | 48 | ||
JM CPU | 17.3 | 14.5 | ||
TM CPU | 9.0 | 7.2 |
Changed Public Interfaces
DeploymentOptions
Option Name | Default Value | Description |
execution.client-reuse | false | By enabling this, Flink client will reuse the This option will only take effect when Flink is deployed in session-mode. |
RestOptions
Option Name | Default Value | Description |
rest.client.channel-pool | No default Value | Create a channel pool for connection reuse when sending requests to remote Netty server. |
CollectOptions
Option Name | Default Value |
collect-strategy.type | fixed-delay |
collect-strategy.fixed-delay.attempts | Integer.MAX_VALUE |
collect-strategy.fixed-delay.delay | 100 ms |
collect-strategy.exponential-delay.initial-backoff | 100 ms |
collect-strategy.exponential-delay.max-backoff | 1 s |
collect-strategy.exponential-delay.attempts | Integer.MAX_VALUE |
collect-strategy.incremental-delay.initial-delay | 100 ms |
collect-strategy.incremental-delay.increment | 100 ms |
collect-strategy.incremental-delay.max-delay | 1 s |
collect-strategy.incremental-delay.attempts | Integer.MAX_VALUE |
Proposed Change
Overview
Reuse
ClusterDescriptor
when interacting with a dedicated session clusterReuse
ClientHighAvailabilityServices
for leader retrieval with the same session clusterReuse
RestClusterClient
when sending requests to the same session clusterReuse the HTTP connections between JDBC Driver to SQL Gateway and Flink Client to Flink Cluster with the same target address
Use
ThreadLocal<ObjectMapper>
instead of a globalObjectMapper
instanceUse a configurable retry strategy in
CollectResultFetcher
to reduce unnecessary workload
ClusterDescriptor Reuse
ClusterDescriptor is created by ClusterClientFactory. When users have enabled the 'execution.client-reuse' option, ClusterClientFactory will return a cached ClusterDescriptor for each ClusterID. The key and value in the cache would be ClusterID and ClusterDescriptor. The cache will be configured with an expireAfterAccess time.
Cache Properties | |
---|---|
Key Value | Cache<ClusterID, ClusterDescriptor> |
Configuration | expireAfterAccess(Duration.ofMinutes(30L)) |
ClientHighAvailabilityServices Reuse
Currently, ClientHighAvailabilityServices
will only return a LeaderRetrievalService
and the LeaderRetrievalService
that can only be started once with a LeaderRetrievalListener
.
This leads to that if multiple ClusterClient
instances want to listen to the high availability change, they need to start their own ClientHighAvailabilityServices
.
This can be improved by introducing a new interface ReusableClientHAServices
. ReusableClientHAServices
returns a LeaderRetriever
which is thread-safe and can be reused across multiple ClusterClient
instances for leader retrieval.
/** * {@code ReusableClientHAServices} provides a reusable high availability services those are required on client-side, including {@code LeaderRetrievalService} and {@code LeaderRetriever}. */ public interface ReusableClientHAServices extends ClientHighAvailabilityServices { /** * Get the leader retriever for the cluster. * * @return the leader retriever of the cluster. */ LeaderRetriever getLeaderRetriever(); }
Each cached ClusterDescriptor
instance will create one ReusableClientHAServices
during initialization. This ReusableClientHAServices
instance can be shared between multiple ClusterClient instances no matter if ClusterClient reuse is enabled or not.
RestClusterClient Reuse
At present, both JobClient
and ClusterDescriptor
will retrieve the ClusterClient
by ClusterClientProvider
and close the client after a single usage. The actual implementation of ClusterClientProvider
will always create a new RestClusterClient
When users have enabled the 'execution.client-reuse', this can be optimized as follows :
The cached
ClusterDescriptor
can create aRestClusterClient
during initializationClusterClientProvider
created by cachedClusterDescriptor
should always return the sameClusterClient
instanceJobClient
andClusterDescriptor
should not close the client after a single usage.ClusterClient
should only be recycled when the correspondingClusterDescriptor
is closing.
Connection Reuse
Currently, RestClient
will create a new Channel for each request. When users have enabled 'rest.client.channel-pool', RestClient
will use ChannelPoolMap
to get the channel for request sending.
// Tuple2<String, Integer> = (targetAddress, targetPort) ChannelPoolMap<Tuple2<String, Integer>, SimpleChannelPool> poolMap = new AbstractChannelPoolMap<Tuple2<String, Integer>, SimpleChannelPool>() { @Override protected SimpleChannelPool newPool(Tuple2<String, Integer> key) { return new SimpleChannelPool( bootstrap, new ChannelPoolHandler() { ... }, ChannelHealthChecker.ACTIVE); } };
In this case, the RestClient
instance can be retrieved as a singleton and will be recycled when the client process is shutting down.
Also, RestClient will use a ThreadLocal<ObjectMapper>
here instead of a global instance to improve serialization and deserialization efficiency under high concurrency.
RetryStrategy in CollectResultFetcher
CollectRetryStrategyFactory
can create aSupplier<RetryStrategy>
fromCollectOptions
CollectResultFetcher
will be initialized with aSupplier<RetryStrategy>
When the client needs the next result,
CollectResultFetcher
will create a newRetryStrategy
and use this strategy to keep sendingCollectCoordinationRequest
to Flink Cluster until the job is finished.
Migration Plan and Compatibility
These optimizations on client-side are disabled by default. There will be no impact on existing users.
Test Plan
Both Unit Test&Intergration Test will be introduced to verify this change. There will also be performance benchmarks to prove that these optimizations will not cause any performance regression for Flink Client.