Discussion threadhttps://lists.apache.org/thread/ccsv66ygffgqbv956bnknbpllj4t24kj
Vote threadhttps://lists.apache.org/thread/b1hqnlxkyqnqt1sn1dww7jw3bdrs030o
JIRA

Unable to render Jira issues macro, execution error.

Release

Motivation

Nowadays, there are lots of unnecessary overhead involved in Flink Client when interacting with a dedicated long-running Flink Session Cluster. This works well when users are using a command line tool or a management platform like StreamPark to interact with Flink Session Cluster. In these cases, users will not interact with one Flink Session Cluster very often, all operations should be one-time and all resources need to be released when the operation is finished to save memory usage.


But in StreamingWarehouse or OLAP scenarios, users might submit lots of short-lived jobs to a dedicated Flink Session Cluster through sql-gateway services. In this situation, these overheads will have a huge impact on E2E performance. For short-lived jobs take less than 1s to execute in Flink Cluster, E2E latency may take more than 2s.


Here are some cases of performance issues:

  • Not reused connections/threads, extra overhead of resource operations :

    • Frequently create/destroy new HA(ZK、K8S) connections for leader retrieval

    • Frequently open/close Netty channel for each request

    • Frequently create/destroy ThreadPool in RestClusterClient and RestClient

  • Not reused instances, extra GC overhead:

    • For each operation, Flink Client creates a lot of new instances like: ClusterDescriptor, RestClusterClient, ClientHighAvailabilityServices and RestClient

  • Concurrency bottlenecks:

    • One global ObjectMapper instance for data serialization/deserialization for all http requests and responses

  • Unnecessary workload:

    • For example: fixed collect retry interval(100 ms) in CollectResultFetcher to fetch result from Flink Cluster. This retry operation could be very resource consuming when executing under high concurrency.


Therefore, I propose this FLIP to improve Flink Client's overall performance when interacting with dedicated Flink Session Clusters. This will benefit in both OLAP and StreamingWarehouse scenarios that users will frequently interact with a Flink Session Cluster. It could also improve the user experience when using session debug mode, users can get the debug result faster.

Proof of Concept

To verify the effect of Flink Client's performance improvement in interactive scenarios, we internally designed an interactive testing experiment under high concurrency. During the experiment, we can monitor the change of E2E latency, QPS and CPU usage when using an improved Flink Client.


Experiment

An agent process will use Flink JDBC Driver to continuously submit short-lived queries to the SQL Gateway service under different concurrency (1 concurrency, 32 concurrency, 64 concurrency and more) and monitor the end-to-end Latency.

Catalog DDL

create temporary table table1 (
    val1 STRING
) WITH (
      'connector' = 'datagen',
      'number-of-rows' = '1'
);

create temporary table table2 (
    val2 STRING
) WITH (
      'connector' = 'datagen',
      'number-of-rows' = '1'
);

create temporary table table3 (
    val3 STRING
) WITH (
      'connector' = 'datagen',
      'number-of-rows' = '1'
);

DQL

Query Type

SQL

JobGraph

source

select val1 from table1;

wourdcount

select val1, count(*) from table1 
	 group by val1;


join

select val1, count(*) from table1 
    left join table2 on val1=val2 
    left join table3 on val2=val3 
group by val1;


Benchmark

Notice: To test the performance bottleneck of Flink Client in interactive scenarios, we used a version of Flink Cluster running with more scheduling optimizations than the community(e.g. HA improvement mentioned in FLIP-403) as the baseline.

E2E Latency

This benchmark shows these optimizations can reduce more than 50% E2E latency of these short-lived queries.

E2E QPS

The E2E QPS can also be improved by more than 60% for source/wordcount query and 35% for join query.

CPU Usage

In high concurrency scenarios, the CPU usage of the Flink cluster was reduced by 20% with no performance degradation in QPS by using proper retry strategy configuration.


Baseline(128 Concurrency)Experiment(128 Concurrency)
Retry StrategyDEFAULT_RETRY_MILLIS=100 msOptimized retry strategy

Join QPS

47

48

JM CPU
Usage AVG

17.3

14.5

TM CPU
Usage AVG

9.0

7.2

Changed Public Interfaces

DeploymentOptions

Option Name

Default Value

Description

execution.interactive-client

false

Enabling this means the Flink client will be used in an interactive scenario. It should reuse all the necessary resources to improve interactive performance.

Notice: This option will only take effect when Flink is deployed in session-mode.

CollectOptions

Option Name

Default Value

Description

collect-strategy.exponential-delay.initial-backoff

100 ms

Config options for ExponentialBackoffRetryStrategy. The delay strategy starts at initial value and keeps increasing (multiplying by backoff multiplier) until maximum delay is reached.

collect-strategy.exponential-delay.max-backoff

1 s

collect-strategy.exponential-delay.backoff-multiplier

1.0

collect-strategy.exponential-delay.attempts

Integer.MAX_VALUE

Proposed Change

Overview

  • Reuse ClusterDescriptor when interacting with a dedicated session cluster
  • Reuse ClientHighAvailabilityServices for leader retrieval with the same session cluster
  • Reuse RestClusterClient when sending requests to the same session cluster
  • Reuse the HTTP connections between JDBC Driver to SQL Gateway and Flink Client to Flink Cluster with the same target address
  • Use ThreadLocal<ObjectMapper> instead of a global ObjectMapper instance
  • Use a configurable retry strategy in CollectResultFetcher to reduce unnecessary workload


ClusterDescriptor Reuse

ClusterDescriptor is created by ClusterClientFactory. When users have enabled the 'execution.interactive-client' option, ClusterClientFactory will return a cached ClusterDescriptor for each ClusterID. The key and value in the cache would be ClusterID and ClusterDescriptor. The cache will be configured with an expireAfterAccess time.

Cache Properties

Key Value

Cache<ClusterID, ClusterDescriptor>

Configuration

expireAfterAccess(Duration.ofMinutes(30L))


ClientHighAvailabilityServices Reuse

Currently, ClientHighAvailabilityServices will only return a LeaderRetrievalService that can only be started once with a LeaderRetrievalListener. This leads to that if multiple ClusterClient instances need to listen to the high availability change, they would have to start their own ClientHighAvailabilityServices.


In interactive scenarios, Flink Client should reuse the most recent leader address across multiple ClusterClient instances to avoid unnecessary HA operation overhead. Currently, the LeaderRetriever implementation fits the need very well even though it lacks abstraction. So a new interface ReusableClientHAServices can be introduced here. This new interface can return a LeaderRetriever instance which is thread-safe and can be reused across multiple ClusterClient instances.

/**
 * {@code ReusableClientHAServices} provides a reusable high availability services those are required on client-side, including {@code LeaderRetrievalService} and {@code LeaderRetriever}.
 */
public interface ReusableClientHAServices extends ClientHighAvailabilityServices {

    /**
     * Get the leader retriever for the cluster.
     *
     * @return the leader retriever of the cluster.
     */
    LeaderRetriever getLeaderRetriever();
}

Each cached ClusterDescriptor instance will create one ReusableClientHAServices during initialization. This ReusableClientHAServices instance can be shared between multiple ClusterClient instances no matter if ClusterClient reuse is enabled or not.

RestClusterClient Reuse

At present, both JobClient and ClusterDescriptor will retrieve the ClusterClient by ClusterClientProvider and close the client after a single usage. The actual implementation of ClusterClientProvider will always create a new RestClusterClient


When users have enabled the 'execution.interactive-client', this can be optimized as follows :

  • The cached ClusterDescriptor can create a RestClusterClient during initialization

  • ClusterClientProvider created by cached ClusterDescriptor should always return the same ClusterClient instance

  • JobClient and ClusterDescriptor should not close the client after a single usage. ClusterClient should only be recycled when the corresponding ClusterDescriptor is closing.

Connection Reuse

Currently, RestClient will create a new Netty Channel for each request. In interactive mode, RestClient should use a ChannelPool to reuse the Channel with the same target. Channels in the ChannelPool should be recycled when they are inactive for a period of time or when the client process is shutting down.

// Tuple2<String, Integer> = (targetAddress, targetPort)
ChannelPoolMap<Tuple2<String, Integer>, SimpleChannelPool> poolMap =
        new AbstractChannelPoolMap<Tuple2<String, Integer>, SimpleChannelPool>() {
            @Override
            protected SimpleChannelPool newPool(Tuple2<String, Integer> key) {
                return new SimpleChannelPool(
                        bootstrap,
                        new ChannelPoolHandler() { ... },
                        ChannelHealthChecker.ACTIVE);
            }
        };


Also, RestClient should use a ThreadLocal<ObjectMapper> here instead of a global instance to improve the serialization and deserialization efficiency for HTTP requests and responses under high concurrency.

RetryStrategy in CollectResultFetcher

  1. CollectRetryStrategyFactory can create a Supplier<RetryStrategy> from CollectOptions

  2. Each CollectResultFetcher will be initialized with a Supplier<RetryStrategy>

  3. When the client needs the next result, CollectResultFetcher will create a new RetryStrategy and use this strategy to keep sending CollectCoordinationRequest to Flink Cluster until the job is finished

Migration Plan and Compatibility

These optimizations on client-side are disabled by default. There will be no impact on existing users.

Test Plan

Both Unit Test & Integration Test will be introduced to verify this change. There will also be performance benchmarks to prove that these optimizations will not cause any performance regression for Flink Client.

Rejected Alternatives