Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Draft

...

Currently, the initial socket connection timeout is depending on kernel system setting tcp_syn_retries. The actual timeout value is 2 ^ (tcp_sync_retries. In some scenarios + 1) - 1 seconds. For the reasons below, we want to control the client-side socket timeout directly using configuration files. 

  1. The default value of tcp_syn_retries is 6. It means the default timeout value is 127 seconds and too long in some scenarios. For example, when the user specifies a list of N bootstrap-servers and no connection has been built between the client and the servers, the least loaded node provider will poll all the server nodes specified by the user. If M servers in the bootstrap-servers list are offline, the client may take (127 * M) seconds to connect to the cluster. In the worst case when M = N - 1, the wait time can be several minutes.
  2. Though we may set the default value of tcp_syn_retries smaller, we will then change the system level network behaviors, which might cause other issues.
  3. Applications depending on KafkaAdminClient may want to robustly know and control the initial socket connect timeout, which can help throw corresponding exceptions in their layer.

Public Interfaces

We propose a new common client config

connections.timeout.ms: The configuration controls the maximum amount of time the client will wait for the initial socket connection to be built. If the connection is not built before the timeout elapses the selector will drop the socket channel. The default value will be 10 seconds.

Proposed Changes

Selector


The Selector class will now take the new config CONNECTIONS_TIMEOUT_MS_CONFIG in its constructions. 

idleExpiryManager


Currently, we have an idleExpiryManager that uses the LRU algorithm evicting the oldest idle connected socket channels. Similarly, we can instantiate a new LinkedHashMap to keep those socket channels initiating the connection and evict the timeout channels.

...

Currently, all the channels will be kept in on the same LRU map. We will split the connected socket channels and connecting socket channels into different LRU map (we will call them "connecting LRU maplruConnectingConnections" and "connected LRU maplruConnectedConnections" later).


Here's the state transition:

When the socket channel is initiating the connection, we will put the socket channel to the connecting LRU maplruConnectingConnections.

When the connection is successfully built, we will move the channel from connecting LRU map into connected LRU mapfrom lruConnectingConnections into lruConnectedConnections.

In each selector poll, we will remove the oldest timeout socket channel in both connecting LRU map and connected LRU map, if possibleboth lruConnectingConnections and lruConnectedConnections, if possible.

LeastLoadedNodeProvider


Currently, when no nodes provided in --boostrap-server option is connected, the LeastLoadedNodeProvider will provide an unconnected node for the client. The Cluster class shuffled the nodes to balance the initial pressure and the LeastLoadedNodeProvider will always provide the same node, which is the last node after shuffling. Consequently, though we may provide several bootstrap servers, the client not be able to connect to the cluster if any of the servers in the --bootstrap-server list is offline.


I'm changing the provider to interact with the ClusterConnectionStates to determine which node to provide when no connection exists.

ClusterConnectionStates


ClusterConnectionStates will keep the index of the most recently provided node. Meanwhile, a public API looks like below will be added to simulate the round-robin node picking.

public synchronized int nextNodeIdx(int nodeSize) {
          return (this.nextNodeIdx++) % nodeSize;
}

When the LeastLoadedNodeProvider iterates the node list, it can consult the ClusterConnectionStates for the index of the node it should provide.

To Discuss

  1. We can think about if we want to have different connect.timeout.ms for different clients.
  2. What would be the default value for connect.timeout.ms (is 10s too short?)
  3. Should the Selector / IdleExpiryManager throw an exception when the initial socket channel connection is timeout? Personally I don't think we need as we will continue trying the rest of the candidate nodes.

Compatibility, Deprecation, and Migration Plan

...