Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Geode uses UDP messaging through JGroups for its peer-to-peer (P2P) messaging related to membership. Unfortunately, there are multiple problems that have arisen due to our use of jgroups.Geode does not actually use jgroups group membership system, just it's reliable UDP messaging. Using UDP messaging through JGroups has a couple of issues:

  • We Jgroups did not have a UDP encryption capability that works for our use case, so we implemented our own non-standard encryption and key exchange system on top of jgroups UDP messaging in order to ensure that all P2P messages are encrypted. See Secure UDP Communication in Geode. This adds complexity to the configuration by requiring a users to also set a separate UDP encryption property, and it also adds the risk of security and functional issues with our implementation. We recently have been discussing deprecating this property on the mailing list because of it's functional issues - See this thread.
  • JGroups does not support rolling upgrades , which makes it more difficult to upgrade JGroups.

We are currently only using jgroups to provide reliable UDP communication. Rather than continue with using UDP, we should would like to replace the UDP messaging for membership with a TCP-based messaging system. We will get reliable messaging and This will allow us to use the standard encryption protocols by switching to TCPfor all peer to peer messaging, and it will remove our dependence on jgroups in the long term.

Goals

  • Set us up for removing JGroups as dependency in future versions by moving to a protocol that does not require JGroups
  • Support rolling upgrades from the old JGroups protocol to the new TCP-based protocol
  • Use the existing SSL settings to control how membership messages are encrypted
  • Be as reliable in the face of networking failures as the previous protocol

...

All of messaging related to membership is handled by the JGroupsMessenger class, which implements the Messenger interface. We will create a new implementation of Messenger that uses TCP sockets, rather than JGroups and UDP sockets.

The key methods on messenger are below:

Code Block
/**
 * Start the server and start listening for messages.
 */
void start();

/**
 * returns the endpoint ID for this member
 */
InternalDistributedMember getMemberID();

/**
 * sends an asynchronous message. Returns destinations that did not receive the message due to no
 * longer being in the view
 */
Set<InternalDistributedMember> send(DistributionMessage m);

/**
 * adds a handler for the given class/interface of messages
 */
void addHandler(Class c, MessageHandler h);

/**
 * called when a new view is installed by Membership
 */
void installView(NetView v);

The start() method of the messenger is responsible for starting a server that is listening on a socket. The getMemberId() method returns an InternalDistributedMember. Currently the InternalDistributedMember contains the host and port that the Messenger is listening on in its getHost() and getPort() methods.

The send() method takes a DistributionMessage. DistributionMessage has a getRecipients() method, which returns an array of InternalDistributedMember objects. The messenger can send messages to those recipients because they have the host and port information.

The installView() method is used by the messenger to determine which members are no longer in the view, and so we should stop trying to transmit messages to them.

Discovery of the view happens outside the Messenger in GMSJoinLeave. We won't have to modify that functionality. The new messenger just needs to know how to start a TCP server, generate an InternalDistributedMember that contains the contact information for that server, and use InternalDistributedMember objects from other other members to send messages to them.

Implement a TCP Server Using Netty

Our proposal for the new Messenger is to start a TCP server using Netty. We will implement a Netty ChannelInboundAdapter that deserializes Geode messages and passes them to handlers registered with addHandler(). The host and port of the Netty server can be included in InternalDistributedMember or shared other ways. (See the Rolling Upgrade section, below.)

The Netty server will use the existing cluster SSL configuration. So if cluster SSL is enabled no additional properties will be required. See https://geode.apache.org/docs/guide/latest/managing/security/implementing_ssl.html for information on the relevant properties.

.

Our proposal for the new Messenger is to implement a TCP server and client using Netty. The Netty server and client will use the existing cluster SSL configuration. So if cluster SSL is enabled no additional properties will be required. See https://geode.apache.org/docs/guide/latest/managing/security/implementing_ssl.html for information on the relevant properties.

The host and port of this new TCP server socket need to be shared with other members. Currently, we distribute the jgroups UDP server port as part of the InternalDistributedMember. Those member ids are sent as part of view messages, which allow all members to discover the listening port of other members (see GMSJoinLeave message sequence diagrams). We propose adding a new port field to the InternalDistributedMember to keep track of this new port. (See the Rolling Upgrade section, below.)

The current Messenger interface is somewhat tied to geode specific group messaging. It has the concept of a view, and methods related to returning a QuorumChecker and performing a state flush. In order to separate out group membership from the messaging system, we will create a new module and a new interface for the messenger that is strictly focused on point to point messaging of geode objects - UnicastMessenger. This should also make it easier to swap out different messaging implementations if we want to experiment with them.

In order to support rolling upgrades, we are going to need to continue to run the old Messenger implementation that uses jgroups, until we drop support for old versions. Therefore, we will also need a BackwardsCompatibleMessenger that wraps the new Messenger and the JGroupsMessenger. The class diagram will look something like this:

PlantUML
@startuml
interface Messenger {
  "Existing interface, has group messaging capabilities"
  + installView
  + start
  + send(MemberID, Message)
  + addHandler(Class, MessageHandler)
}
interface UnicastMessenger {
  "Point to point messenger"
  + start(): Address
  + send(Address, Object message)
  + addHandler(Class, MessageHandler)
}
Messenger <|-- MessengerImpl
Messenger <|-- JGroupsMessenger
Messenger <|-- BackwardsCompatibleMessenger
BackwardsCompatibleMessenger *-- JGroupsMessenger
BackwardsCompatibleMessenger *-- MessengerImpl
MessengerImpl *-- UnicastMessenger
UnicastMessenger <|-- NettyUnicastMessenger
@enduml


The NettyUnicastMessenger will maintain one connection to each peer. When sending a message the NettyUnicastMessenger When sending a message the Messenger will create a connection to all destinations if no connection exists yet. Once a connection is established, the connection will remain open as long as that member is still in the viewuntil the messenger is told to shut it down.

Messages dispatched from NettyUnicastMessenger will be dispatched from netty event loop threads. For this reason, it is important that message processing should not block, or it will prevent other messages from being received. The old JGroupsMessenger dispatched messages using a single jgroups dispatcher thread.


Handling TCP connection failures

...