Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

To be Reviewed By: 7 April 2020

Authors: Dan Smith

Status: Draft | Discussion | Active | Dropped | Superseded

Superseded by: N/A

Related: N/A

Problem

WORK IN PROGRESS

...

Geode uses UDP messaging through JGroups for it's peer-to-peer (P2P) messaging related to membership. Unfortunately Geode does not actually use jgroups group membership system, only it's reliable UDP messaging. Using UDP messaging through JGroups has a couple of issues:

  • We implemented our own non-standard encryption and key exchange system on top of jgroups UDP messaging in order to ensure that all P2P messages are encrypted. See Secure UDP Communication in Geode. This adds complexity to the configuration by requiring a users to also set a separate UDP encryption property, and it also adds the risk of security and functional issues with our implementation. We recently discussed deprecating this property on the mailing list because of it's functional issues - See this thread.
  • JGroups does not support rolling upgrades

...

  • which makes it more difficult to upgrade JGroups.

Rather than continue with using UDP, we Configuring encryption for UDP adds an additional layer of complexity for securing P2P communications.We would like to replace the UDP messaging for membership with a TCP-based messaging system.

...

. This will allow us to use the standard encryption protocols for all peer to peer messaging, and it will remove our dependence on jgroups in the long term.

Goals

  • Set us up for removing JGroups as dependency in future versions by moving to a protocol that does not require jroupsJGroups
  • Support rolling upgrades from the old JGroups protocol to the new TCP-based protocol
  • Use the existing SSL settings to control how membership messages are encrypted
  • Be as reliable in the face of networking failures as the previous protocol

Anti-Goals

Design

  • It is not a goal to replace the existing and separate peer-to-peer TCP messaging system that is used for cache operations. The new messaging system is initially targeted only at replacing our use of jgroups UDP for membership messages.

Design

...

All of the messaging related to membership is handled by the JGroupsMessenger class, which implements the Messenger interface. We will create a new implementation of Messenger that uses TCP sockets, rather than JGroups and UDP sockets.

The key methods on messenger are below

Code Block
/**
 * Start the server and start listening for messages.
 */
void start();

/**
 * returns the endpoint ID for this member
 */
InternalDistributedMember getMemberID();

/**
 * sends an asynchronous message. Returns destinations that did not receive the message due to no
 * longer being in the view
 */
Set<InternalDistributedMember> send(DistributionMessage m);

/**
 * adds a handler for the given class/interface of messages
 */
void addHandler(Class c, MessageHandler h);

 

The start method of the messenger is responsible for starting a server that is listening on a socket. The getMemberId() method returns an InternalDistributedMember. Currently the InternalDistributedMember contains the host and port that the Messenger is listening on in it's getHost and getPort methods.

The send method takes a DistributionMessage. DistributionMessage has a getRecipients method, which returns an array of InternalDistributedMember objects. The messenger can send messages to those recipients because they have the host and port information.

Discovery of the InternalDistributedMembers happens outside the Messenger (in GMSJoinLeave). We won't have to modify that functionality. The new messenger just needs to know how to start a TCP server, generate an InternalDistributedMember that contains the contact information for that server, and use InternalDistributedMember objects from other other members to send messages to them.

Implement a TCP Server Using Netty

Our proposal for the new Messenger is to start implement a Tcp TCP server and client using Netty. We will implement a netty ChannelInboundAdapter that deserializes geode messages and passes them to handlers registered with addHandler. The host and port of the netty server can be included in InternalDistributedMember or shared other ways (see the Rolling Upgrade section, below).The netty server Netty server and client will use the existing cluster SSL configuration, so . So if cluster SSL is enabled no additional properties will be required. See https://geode.apache.org/docs/guide/latest/managing/security/implementing_ssl.html for information on the relevant properties. The server socket will use the existing bind-address and membership-port-range properties to determine it's address and port.

The Netty based messenger will maintain one connection to each peer. When sending a message the Messenger messenger will create a connection to all destinations if no connection exists yet. Once a connection is established, the connection will remain open as long as that member is still in the view. If IO errors or timeouts happen when writing messages, the Messenger will close the connection, reestablish a new connection, and retry. Messages will continue to be retried until we receive a view indicating that the member is no longer in the view. We will therefore guarantee the order of message delivery.??? What about data that was already in the socket write buffer but never transmitted? We need to make sure we transmit these messages as well. Does this mean we need our own ack protocol outside of TCP? Or do we just a keep a window of messages that will always be retransmitted when a connection fails?

Because we are retrying messages, we can end up delivering messages more than once. The old JGroups protocol ensured only once delivery. ??? Do messaging messages need only once delivery? What if we end up sending regular messages over this channel? If we need only once delivery, we also need to add sequence numbers to messages and drop duplicates on the receiving side.

One concern about switching to a TCP based protocol is that network outages may result in TCP sockets hanging on read or write operations. We need to ensure that if a connection to one members is blocked, that we still send messages to the other members. Each destination will have its own queue of messages to be sent, and adding to one queue will not block the others.

the messenger is told to shut it down.

Messages received by the Netty server will be dispatched from Netty event loop threads. For this reason, it is important that message processing should not block, or it will prevent other messages from being received. The old JGroupsMessenger dispatched messages using a single jgroups receiver thread.

Backwards Compatibility and Upgrade Path

...

We need to be able to do a rolling upgrade from the old JGroups-based protocol to the new protocol. We will need to continue to support rolling upgrades for a certain range of versions before we can drop JGroups.

In order to accomplish this a member will actually need to be listening for connections on both protocols when it initially starts up. We will create a delegating Messenger a VersionAwareMessenger that contains both a JGroupsMessenger and a NettyMessenger. It can install handlers in both of them, and decide which Messenger to use when sending a message based on the version of the recipient. If a member receives a view that contains no old members that don't support require the old protocol it could shut down the JGroups based MessengerJGroupsMessenger.

There is an issue here with the need to listen on two separate ports , because InternalDistributedMember currently only has support for a single port fieldand distribute both ports to other members. Currently, we distribute the jgroups UDP server port as part of the InternalDistributedMember. InternalDistributedMembers are sent as part of view messages and find coordinator requests to the locator. That allows all members to discover the listening port of other members (see GMSJoinLeave message sequence diagrams). We need to distribute the new tcp port as well. There are a few couple options we are evaluating:

  1. Just use Bind to the same port for JGroups both UDP and netty. Since one is a UDP port and the other is TCP port, they can both be open at the same time.TCP traffic. Newer members would use TCP. This would require finding an available port with both protocols. This is the option we prefer to try first.
  2. Add another port field to InternalDistributedMemberEncode the second port in some other field in InternalDistributedMember, for example by using part of the UUID bytes. This is kindy hacky
  3. Pass the new membership port around outside of InternalDistributedMember. This would probably involve sending as it part of the FindCoordinatorResponse, as well as including it in the GMSMembershipView.

For the last two options, we will need to version either InternalDistributedMember itself, or the FindCoordinatorResponse and GMSMembershipView classes. Old members will only receive the UDP port, newer members will receive both ports.

Handling TCP connection failures

The Messenger is used to send messages to destinations that may or may not be part of the current membership view. If a member is in the view, the Messenger needs to keep trying to deliver messages to that member as long as that member is still in the view.  Because individual TCP connections can fail, this forces us to implement a reliability layer above TCP that will continue to retry messages until a member is removed from the view. This layer needs to be able to:

  • Reestablish a connection to the destination to if the existing TCP connection fails.
  • Retransmit messages that may not have been received.
  • Prevent duplication of retransmitted messages on the receiver side

In order to accomplish these goals, we we implement a layer that attaches an increasing sequence number with each message, and save messages on the sender until their sequence numbers are acknowledged.

One concern about switching to a TCP-based protocol is that network outages may result in TCP sockets hanging on read or write operations. We need to ensure that, if a connection to one member is blocked, we still send messages to the other members. Each destination will have its own queue of messages to be sent, and adding to one queue should never block.

The Messenger is also used by membership to check for a quorum of members, after a network partition event. So we need to ensure that we retain the ability to send a ping message out to all members and check for responses within a certain time window.

Changes and Additions to Public Interfaces

Because we are eventually getting rid of udp messaging, the following udp related gemfire properties will be deprecated:

  • udp-fragment-size

  • udp-recv-buffer-size

  • udp-send-buffer-size

  • security-udp-dhalgo

  • mcast-address

  • mcast-ttl
  • mcast-flow-control
  • mcast-port
  • mcast-recv-buffer-size
  • mcast-send-buffer-size
  • disable-tcp

Performance Impact

This proposal is only targeted at replacing the UDP messages used for membership events, which are fairly low traffic. Region operations like puts and gets go over separate TCP connections that are not going to be changed as part of this proposal. Therefore we don't anticipate any performance impact for region operations.

In the future we could consider switching all peer-to-peer messages to flow through this new TCP messaging system. We would only do that if the new system has similar or better performance than the old system.

Alternatives

Use DTLS


One option to address concerns with using custom encryption protocol would be to continue using jgroups, but remove our old security-udp-dhalgo property in favor of using DTLS. However, this approach is complicated by the fact the the JDK does not have DTLS support built in to Java 8, which would require us to backport and maintain an implementation. In addition this would not help us fix the problem of being tied to an outdated jgroups stack that cannot be upgraded.

Use the existing P2P tcp socket code

One the face of it, just using our existing TCP sockets for membership messages as well seems attractive. However this code as it is written is dependent on the membership system to bootstrap itself. For example it will not even send messages until the member has completely joined and some startup messages are exchanged. It would take some refactoring to be able to send membership messages over this system in

...

order to bootstrap membership. The existing P2P socket code is also fairly old, and we feel a better path forward is to move towards a new system based on netty that is less entangled with the rest of the system.

FAQ

Errata