IDIEP-23
Author
Sponsor
Created18 Jun 2018
Status

COMPLETED



Motivation

Currently, there is an issue with cache operations latency in thin clients, which also can result in throughput issues when using synchronous operations. It's caused by the additional network hop from client to server, as thin client currently is not able to determine physical location of the data, so it sends all cache requests to random server, which re-routes data to the right server.

The proposed solution is to implement "Affinity Awareness" for our thin clients, so they will send cache requests to the server, contain the data right away. With this idea in mind, we have potential to improve mean latency when using thin clients dramatically

Description

Here you can find description on how the solution can be implemented.

Connection

  1. On thin client startup it connects to all nodes provided by user by client configuration.
  2. Upon handshake server returns its UUID to client.
  3. By the end of the startup procedure, client have open connections to all available server nodes and the following mapping (nodeMap): [UUID => Connection]. 

Connection to all nodes helps to identify available nodes, but can lead to significant delay, when thin client is used on a large cluster with a long IP list provided by user. To lower this delay, asynchronous establishment of connections can be used.

Changes in format of handshake protocol message

The format of the handshake messages can be found here. Only successful response message is changed.

You can find a changes below. Added fields are in bold green.

Field typeField description
intSuccess message length, 1.
byte

Success flag, 1.

UUIDUUID of the server node.

Connection algorithm written in pseudo-code

Pseudo code
Map<UUID, TcpConnection> connect(ClientConfig cfg)
{
	Map<UUID, TcpConnection> nodeMap = new Map();

	// Synchronous case here, as it is easier to read, but the same operation
    // can be performed asynchronously.
	for (addr: cfg.GetAddresses()) {
		TcpConnection conn = establishConnection(addr);

		if (conn.isGood()) {
			HandshakeResult handshakeRes = handshake(conn);

			if (handshakeRes.isOk()) {
				UUID nodeUuid = handshakeRes.nodeUuid();

				if (nodeMap.contains(nodeUuid)) {
					// This can happen if the same node has several IPs.
					// It have sense to keep more fresh connection alive.
					nodeMap.get(nodeUuid).disconnect();
				}

				nodeMap.put(nodeUuid, conn);
			} else {
				conn.disconnect();
			}
		}
	}

	if (nodeMap.isEmpty()) {
		// This is the only case which leads to the failure of the whole operation.
		// If at least one connection has been established, we can work with the cluster.
		reportFailure("Can not establish connection to a cluster);
	}


	return nodeMap;
}

Cache affinity mapping acquiring

To be able to route data to the primary node for the key, client should know partition mapping for a cache. There are several possible cases when client may want to request an affinity mapping for a cache or several caches, so it makes sense to add possibility to request affinity mapping for a several caches in one request. Also, partition mappings for several caches are often the same, so as an optimization, it makes sense to include in response with partition mapping a list of caches for which it applies. Thus the partitions request can be described by the following steps:

  1. Thin client issues Cache Partitions Request for the cache or several caches to a random connected node.
  2. In the Cache Partitions Response there is one or more mappings [[cacheCfg] => [nodeUuid => partition]].
  3. If one of the caches in the request does not use default AffinityFunction - RendezvousAffinityFunction, or custom AffinityKeyMapper is set, then Cache Partitions Response also includes for which this IEP is not applicable.
  4. After receiving Cache Partitions Response client internally decomposes it and uses to update following maps:
    1. cacheKeyMap: [cacheId => [typeId => affinityFieldId]].
    2. distributionMap: [cacheId => partitionMap].
    3. partitionMap: [partition => nodeUuid]. Note, that format of the response does not contain partitionMap itself and has data grouped in different way to reduce size of the message (see point 2). Upon receiving Cache Partitions Response client should re-format this data to partitionMap, to significantly reduce latency of the following chain of operations: distributionMap(cacheId)=partitionMappartitionMap(partition)=nodeUuid.

See proposed Cache Partitions Request and Response message format below.

Cache Partitions Request

Field typeDescription
Header

Request header. Format details can be found here.

intNumber of caches N to get partition mappings for.
intCache ID #1
intCache ID #2
......
intCache ID #N

Cache Partitions Response

Field typeDescription
Header

Response header. Format details can be found here.

longTopology Affinity Version.
intMinor Topology Affinity Version.
intNumber of cache mappings J, that describe all the caches listed in request.

Partition Mapping

Partition mapping #1. [[cacheId] => [nodeUuid => partition]]. See format below.

Partition Mapping

Partition mapping #2
......

Partition Mapping

Partition mapping #J

Partition Mapping

Field typeDescription
boolApplicable. Flag that shows, whether standard affinity is used for caches.
intNumber K of caches for which this mapping is applicable
intCache ID #1

Cache key configuration

Key configuration for cache #1. Present only if Applicable is true.
intCache ID #2
Cache key configurationKey configuration for cache #2. Present only if Applicable is true.
......
intCache ID #K
Cache key configurationKey configuration for cache #K. Present only if Applicable is true.
intNumber L of nodes. Present only if Applicable is true.
Node Partitions
Partitions of the node #1. Present only if Applicable is true.
Node Partitions
Partitions of the node #2. Present only if Applicable is true.
......
Node PartitionsPartitions of the node #L. Present only if Applicable is true.

Cache key configuration

Field typeDescription
intNumber R of key configurations
intKey type ID #1
intAffinity Key Field ID #1
intKey type ID #2
intAffinity Key Field ID #2
......
intKey type ID #R
intAffinity Key Field ID #R

Node Partitions

Field typeDescription
UUIDUUID of the node
intNumber of partitions M associated with node
intPartition #1 for node.
intPartition #2 for node.
......
intPartition #M for node.

Changes to cache operations with single key

When user makes key-based cache operation, thin client makes the best effort to send the request to the node, which stores the data.

  1. Client determines partitionMap for the Cache using cacheId and distributionMapdistributionMap(cacheId) => partitionMap. For details about partitionMap and distributionMap see section Cache instance acquiring.
  2. If there is no partitionMap for the cache, it means a feature is not applicable for the Cache. Go to the step 6.
  3. Once given a key by a user, client checks its cacheKeyMap (see section Cache instance acquiring 4.a) to find out, if cache configured to calculate partition from the key, using a specific key field.
  4. Client uses a whole key or affinity key field if present with its internal implementation of rendezvous affinity function to calculate a partition for the key.
  5. Using partitionMap , client gets a nodeUuid of the primary node for the key.
  6. Using the nodeMap described in section Connection, client checks, whether there is an active connection nodeConnection to the required node, associated with nodeUuid.
  7. If there is no connection to the primary node for the key, or if it can not be determined, client gets random nodeConnection from nodeMap.
  8. Connection nodeConnection used to make request.
  9. If the error happens on request send, use other random connection to send request. Used connection should be excluded from the nodeMap. This is not an error for user, though a warning log message may be a good idea in this case.
  10. If no nodes are left, report an error to a user.

Request Sending Algorithm in pseudo code

Pseudo code
Response sendRequest(CacheKey key, Message msg)
{
	UUID nodeUuid = null;
	Connection nodeConnection = null;

	if (!distributionMap.contains(cacheId))
		updateCachePartitions(cacheId); // See "Cache instance acquiring"

	PartitionMap partitionMap = distributionMap.get(cacheId);

	if (!partitionMap.empty()) {
		Object affinityKey = key;

		Map<int, int> keyAffinityMap = cacheKeyMap.get(cacheId);
		int affinityKeyId = keyAffinityMap.get(key.typeId());

		if (affinityKeyId != null)
			affinityKey = key.getFieldById(affinityKeyId)

		int partition = RendezvousAffinityFunction(affinityKey);

		nodeUuid = partitionMap.get(partition);

		nodeConnection = nodeMap.get(nodeUuid);
	}

	if (nodeConnection == null)
		nodeUuid, nodeConnection = nodeMap.getRandom();

	while (true) {
		try {
			Response rsp = nodeConnection.send(msg);

			return rsp;
		}
		catch(err) {
			logWarning(err);

			nodeConnection.disconnect();

			nodeMap.remove(nodeUuid);

			if (nodeMap.isEmpty())
				reportErrorToUser("Cluster is unavailable");

			nodeUuid, nodeConnection = nodeMap.getRandom();
		}
	}
}

Updating partition mapping

It is important for client to keep partition mapping updated. To ensure this, the following changes are proposed:

  1. Changes in Standard Message Response header. It is proposed to add flag to every response, that shows whether the Affinity Topology Version of the cluster has changed since the last request from the client.
  2. If the Affinity Topology Version of the cluster has changed since the last request from the client, new Affinity Topology Version is also included in the response.

This mechanism allows client passively detect change of the Affinity Topology. It also adds overhead of only 1 bit of additional information in most cases, indicating, that Affinity Topology has not changed.

Please note, that every server node tracks its own Last Reported Affinity Topology Version. It means, that every change in Affinity topology version is likely to be reported multiple times to the client - once by every server node. However, since the standard response header contains Affinity Topology Version, it is trivial for a client to detect, whether client needs to update mapping or whether already known change is reported.

Once client detects changes in the Affinity Topology, it does the following:

  1. Updates distributionMap and partitionMap (preferably asynchronously) for all active caches. It also may be done "on demand" - on the first call to the Cache instance;
  2. Tries to establish connection to hosts, which is not yet connected. This is required as changes of the topology may be caused by the new node joining the cluster.

Standard response message header changes

The format of the standard response messages can be found here.

You can find a changes below. Added and changed fields are in bold green.

Field typeDescription
int

Length of the response message.

longRequest ID.
shortFlags. ("Error" and "Afiinity Topology Changed" flags are proposed)
longTopology Affinity Version. (present only when "Afiinity Topology Changed" flag is set)
intMinor Topology Affinity Version. (present only when "Afiinity Topology Changed" flag is set)
intStatus Code (present only when "Error" flag is set)
StringError message (present only when "Error" flag is set)

As shown above, it is proposed to add new "Flags" field, to reduce size of the success response message (the most common case). 

Benchmarks

Benchmarks that were performed on thin clients show good performance improvements for 3-nodes use case of C++ thin client with prototype implementation.

Risks and Assumptions

  1. The proposed algorithm can introduce significant delay on start up, when thin client is used on a large cluster with a long IP list provided by user. To lower this delay, asynchronous establishment of connections can be used.
  2. There may be a need to limit maximum number of open connection to use the feature on large clusters. However, this will reduce the efficiency of the proposed enhancement for large clusters.
  3. Enhancement does not improve performance on a cluster with non-standard affinity function. However it does not reduce the performance in this case either.
  4. The feature was not yet tested with large clusters.

Discussion Links

Initial proposal discussion: http://apache-ignite-developers.2346864.n4.nabble.com/Best-Effort-Affinity-for-thin-clients-td31574.html

Tickets

key summary type created updated due assignee reporter priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured

  • No labels