RFC Title

To be Reviewed By: Thursday 28th May 2020

Authors: Alberto Bustamante Reyes ( alberto.bustamante.reyes@est.tech )

Status: in development

Superseded by: N/A

Related: N/A

Problem

This RFC aims to solve a problem observed in serial gateway senders when the receivers they have to connect to are configured using the same host and port.

The reason for such a setup (already discussed in GEODE-7565 RFC) is deploying Geode cluster on a Kubernetes cluster where all gateway receivers are reachable from the outside world on the same VIP and port. Other kinds of configuration (different hostname and/or different port for each gateway receiver) are not cheap from operation & maintenance and resources perspective in cloud native environments and also limit some important use-cases (like scaling).

Currently, it is possible to set gateway receivers as described, but there are some problems derived from this configuration that must be solved prior to state that this configuration is supported by Geode.

In this scenario, when serial gateway senders use more than one dispatcher thread, it is not possible to ensure that all of them are connected to the same receiver. As only host and port information is given to perform the connection, it can be pointed to any of the receivers sharing that host and port.


Anti-Goals

This RFC does not aim to solve any other issue derived from the specified configuration but the one described previously.

Solution

SerialGatewaySenderImpl class contains an AbstractGatewaySenderEventProcessor object called eventProcessor. When more than one dispatcher thread is used, eventProcessor type is RemoteConcurrentSerialGatewaySenderEventProcessor (child of ConcurrentSerialGatewaySenderEventProcessor ). The eventProcessor object holds a list of SerialGatewaySenderEventProcessor objects, one per dispatcher thread.

The proposed solution impacts how ConcurrentSerialGatewaySenderEventProcessor initializes all its SerialGatewaySenderEventProcessor processors (each one in a separated thread). The number of processors is defined when the serial gateway sender is created with the dispatcher-threads attribute. Currently all of them are started in parallel. The change will be that one thread will be initialized first. Once it is connected, the receiver member id will be known, so it can be notified to the rest of dispatcher threads. After that, the rest of dispatcher threads can be started in parallel. While getting a connection to the receiver, they will check if the member id of the receiver they are connected to is the expected. If not, they could retry the connection to ensure they are connected to the same receiver than the first dispatcher thread.

As this change in the initialization of the threads is only needed on a very specific use case, a new boolean --receivers-sharing-ip-and-port option will be added to the create gateway-sender command to trigger it, with "false" as default value. If it is not specified or set to false, the dispatcher threads will be initialized in parallel, as it is currently done .

Changes and Additions to Public Interfaces

A new method will be added to GatewaySenderFactory to set the value of the new parameter receivers-sharing-ip-and-port.

Performance Impact

As the new way to initialize the threads will be optional and false by default, it will not have an impact on most of the cases. Only in the scenario where this change applies (using several receivers with same host and port), serial gateway sender initialization could take longer due to the first thread is started in first place and due to the rest of threads could do some retries when connecting to the right server.

Backwards Compatibility and Upgrade Path

No impacts have been identified.

Prior Art

GEODE-7565 ( RFC link ) already introduced changes to allow the usage of same host and port in several gateway receivers, in that case the changes allowed locators to know they have several servers with that configuration, so if one goes down, replication is not considered down.

The change proposed in this RFC allows the usage of more than one dispatcher thread, increasing performance.


FAQ

N/A

Errata

N/A

  • No labels

8 Comments

  1. No comments received after several days in "under discussion" state.

    I have moved the RFC to "in development" and created a Jira ticket for the code ( GEODE-8202 )

  2. Thanks for contributing this. I have a few questions regarding it.

    All serial senders currently connect to the same remote ServerLocation (based on the first dispatcher thread's connection). This is controlled by a synchronized block in GatewaySenderEventRemoteDispatcher initializeConnection.

    The first GatewaySenderEventRemoteDispatcher thread in the primary sender acquires a connection to a random receiver and saves the ServerLocation in the AbstractGatewaySender. Then, each other thread uses that same ServerLocation.

    Event Processor for GatewaySender_ny.4: GatewaySenderEventRemoteDispatcher.initializeConnection about to get server
    Event Processor for GatewaySender_ny.4: GatewaySenderEventRemoteDispatcher.initializeConnection got server=null
    Event Processor for GatewaySender_ny.4: GatewaySenderEventRemoteDispatcher.initializeConnection about to acquire connection
    Event Processor for GatewaySender_ny.4: GatewaySenderEventRemoteDispatcher.initializeConnection acquired connection=Pooled Connection to 127.0.0.1:5239: Connection[127.0.0.1:5239]@409550082
    Event Processor for GatewaySender_ny.4: GatewaySenderEventRemoteDispatcher.initializeConnection set sender server=127.0.0.1:5239

    Event Processor for GatewaySender_ny.0: GatewaySenderEventRemoteDispatcher.initializeConnection about to get server
    Event Processor for GatewaySender_ny.0: GatewaySenderEventRemoteDispatcher.initializeConnection got server=127.0.0.1:5239
    Event Processor for GatewaySender_ny.0: GatewaySenderEventRemoteDispatcher.initializeConnection about to acquire connection to server=127.0.0.1:5239
    Event Processor for GatewaySender_ny.0: GatewaySenderEventRemoteDispatcher.initializeConnection acquired connection=Pooled Connection to 127.0.0.1:5239: Connection[127.0.0.1:5239]@1723700142

    The ServerLocation is just wrapper on a host and port. It is set in the Endpoint as soon as a connection is made like:

    java.lang.Exception: Stack trace
    at java.lang.Thread.dumpStack(Thread.java:1333)
    at org.apache.geode.cache.client.internal.Endpoint.<init>(Endpoint.java:43)
    at org.apache.geode.cache.client.internal.EndpointManagerImpl.referenceEndpoint(EndpointManagerImpl.java:67)
    at org.apache.geode.cache.client.internal.ConnectionImpl.connect(ConnectionImpl.java:113)
    at org.apache.geode.cache.client.internal.ConnectionConnector.connectClientToServer(ConnectionConnector.java:75)
    at org.apache.geode.cache.client.internal.ConnectionFactoryImpl.createClientToServerConnection(ConnectionFactoryImpl.java:118)
    at org.apache.geode.cache.client.internal.ConnectionFactoryImpl.createClientToServerConnection(ConnectionFactoryImpl.java:223)
    at org.apache.geode.cache.client.internal.pooling.ConnectionManagerImpl.createPooledConnection(ConnectionManagerImpl.java:196)
    at org.apache.geode.cache.client.internal.pooling.ConnectionManagerImpl.createPooledConnection(ConnectionManagerImpl.java:190)
    at org.apache.geode.cache.client.internal.pooling.ConnectionManagerImpl.borrowConnection(ConnectionManagerImpl.java:273)
    at org.apache.geode.cache.client.internal.PoolImpl.acquireConnection(PoolImpl.java:931)
    at org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.initializeConnection(GatewaySenderEventRemoteDispatcher.java:398)
    at org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.<init>(GatewaySenderEventRemoteDispatcher.java:89)
    at org.apache.geode.internal.cache.wan.serial.RemoteSerialGatewaySenderEventProcessor.initializeEventDispatcher(RemoteSerialGatewaySenderEventProcessor.java:44)
    at org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor.setRunningStatus(AbstractGatewaySenderEventProcessor.java:1141)
    at org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor.run(SerialGatewaySenderEventProcessor.java:192)

    In this scenario, are the host and port for every ServerLocation the same for all remote receivers?

    If so, what will you do to change that since that is what is stored in the AbstractGatewaySender? Can you use something in the actual Socket since that is available in the stack?

    If not, why does the first thread have to start and connect before the others? Even though they start in parallel, the synchronization ensures that all threads connect to the same remote receiver.

    1. Hi Barry Oglesby

      Thanks for your comment and sorry for the late answer, the wiki did not notify me about it.

      In this scenario, are the host and port for every ServerLocation the same for all remote receivers?

      Yes, using the same value in the --hostname-for-senders parameter

      If so, what will you do to change that since that is what is stored in the AbstractGatewaySender? Can you use something in the actual Socket since that is available in the stack?

      If not, why does the first thread have to start and connect before the others? Even though they start in parallel, the synchronization ensures that all threads connect to the same remote receiver.

      As all receivers are sharing the same host and port, when a sender connects to that host and port  you cannot know to which receiver you will connect as the routing will be made outside the Geode cluster (kubernetes in our case).  This is why I propose to connect the first dispatcher thread first: we have different receivers under the same host and port, and it doesnt matter to which we are connected, but we have to ensure that all threads connect to the same. So lets connect one thread and get the member id of the receiver, so the other threads can distinguish if they are connected to the same receiver than the first thread.

      In the current implementation, all threads connect to the same remote receiver because one host and port identifies a unique server, so all the threads can connect in parallel. But in this scenario for uniquely identify a server we need host, port and member id.

      1. I think the code is already doing part of what you're talking about. The current code already prevents the sender dispatcher threads from connecting to different servers using the synchronization.

        Here are some thread stacks showing that behavior.

        One Event Processor will have the synchronization and be getting the connection:

        "Event Processor for GatewaySender_ny.3" #65 daemon prio=5 os_prio=31 tid=0x00007f8e949ba000 nid=0xbc03 waiting on condition [0x000070000e05a000]
        java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.initializeConnection(GatewaySenderEventRemoteDispatcher.java:398)
        - locked <0x00000007a3433bf8> (a java.lang.Object)
        at org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.<init>(GatewaySenderEventRemoteDispatcher.java:89)
        at org.apache.geode.internal.cache.wan.serial.RemoteSerialGatewaySenderEventProcessor.initializeEventDispatcher(RemoteSerialGatewaySenderEventProcessor.java:44)
        at org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor.setRunningStatus(AbstractGatewaySenderEventProcessor.java:1141)
        at org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor.run(SerialGatewaySenderEventProcessor.java:192)
        The remaining Event Processors will be BLOCKED waiting for the synchronization:

        "Event Processor for GatewaySender_ny.4" #67 daemon prio=5 os_prio=31 tid=0x00007f8e94a2a800 nid=0x14303 waiting for monitor entry [0x000070000e15d000]
        java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.initializeConnection(GatewaySenderEventRemoteDispatcher.java:383)
        - waiting to lock <0x00000007a3433bf8> (a java.lang.Object)
        at org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.<init>(GatewaySenderEventRemoteDispatcher.java:89)
        at org.apache.geode.internal.cache.wan.serial.RemoteSerialGatewaySenderEventProcessor.initializeEventDispatcher(RemoteSerialGatewaySenderEventProcessor.java:44)
        at org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor.setRunningStatus(AbstractGatewaySenderEventProcessor.java:1141)
        at org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor.run(SerialGatewaySenderEventProcessor.java:192)

        "Event Processor for GatewaySender_ny.2" #63 daemon prio=5 os_prio=31 tid=0x00007f8e95a9d800 nid=0xba03 waiting for monitor entry [0x000070000df57000]
        java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.initializeConnection(GatewaySenderEventRemoteDispatcher.java:383)
        - waiting to lock <0x00000007a3433bf8> (a java.lang.Object)
        at org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.<init>(GatewaySenderEventRemoteDispatcher.java:89)
        at org.apache.geode.internal.cache.wan.serial.RemoteSerialGatewaySenderEventProcessor.initializeEventDispatcher(RemoteSerialGatewaySenderEventProcessor.java:44)
        at org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor.setRunningStatus(AbstractGatewaySenderEventProcessor.java:1141)
        at org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor.run(SerialGatewaySenderEventProcessor.java:192)

        "Event Processor for GatewaySender_ny.1" #61 daemon prio=5 os_prio=31 tid=0x00007f8e8d19a000 nid=0x14403 waiting for monitor entry [0x000070000de54000]
        java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.initializeConnection(GatewaySenderEventRemoteDispatcher.java:383)
        - waiting to lock <0x00000007a3433bf8> (a java.lang.Object)
        at org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.<init>(GatewaySenderEventRemoteDispatcher.java:89)
        at org.apache.geode.internal.cache.wan.serial.RemoteSerialGatewaySenderEventProcessor.initializeEventDispatcher(RemoteSerialGatewaySenderEventProcessor.java:44)
        at org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor.setRunningStatus(AbstractGatewaySenderEventProcessor.java:1141)
        at org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor.run(SerialGatewaySenderEventProcessor.java:192)

        "Event Processor for GatewaySender_ny.0" #57 daemon prio=5 os_prio=31 tid=0x00007f8e8d255800 nid=0x14503 waiting for monitor entry [0x000070000dd51000]
        java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.initializeConnection(GatewaySenderEventRemoteDispatcher.java:383)
        - waiting to lock <0x00000007a3433bf8> (a java.lang.Object)
        at org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.<init>(GatewaySenderEventRemoteDispatcher.java:89)
        at org.apache.geode.internal.cache.wan.serial.RemoteSerialGatewaySenderEventProcessor.initializeEventDispatcher(RemoteSerialGatewaySenderEventProcessor.java:44)
        at org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor.setRunningStatus(AbstractGatewaySenderEventProcessor.java:1141)
        at org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor.run(SerialGatewaySenderEventProcessor.java:192)

        The Event Processor thread that gets the connection will set the ServerLocation in the Endpoint in the stack I posted before. In the current code, that ServerLocation is the generic host and port.

        I think the change that needs to be made is in the ConnectionImpl.connect method to create a ServerLocation with the actual host and port rather than the generic one passed into it.

        ConnectionImpl.connect signature is:

        public ServerQueueStatus connect(EndpointManager endpointManager, ServerLocation location, ...

        The ServerLocation passed into this method is the generic one.

        The connect implementation does:

        theSocket = sc.forClient().connect(new HostAndPort(location.getHostName(), location.getPort()), ...
        ...
        endpoint = endpointManager.referenceEndpoint(location, this.status.getMemberId());

        I think theSocket will have the host and port of the actual server rather than the generic one. If so, rather than passing the input ServerLocation as the argument to referenceEndpoint, I think it will have be the host / port of the actual socket.

        That will mean the Endpoint created in EndpointManagerImpl.referenceEndpoint will have the actual ServerLocation:

        endpoint = new Endpoint(this, ds, server, stats, memberId);

        And this call in GatewaySenderEventRemoteDispatcher.initializeConnection will get the actual host/port and store it in the AbstractGatewaySender serverLocation:

        if (sender.getServerLocation() == null) {
        sender.setServerLocation(con.getServer());
        }

        And this call in GatewaySenderEventRemoteDispatcher.initializeConnection will get the actual host/port from the AbstractGatewaySender:

        ServerLocation server = this.sender.getServerLocation();
        1. You are referring to "actual server" and "generic server", but Im not sure what do you mean.

          In the setup Im describing, the ips of the servers inside the Kubernetes cluster are not used, as they are not reachable from outside the cluster. There is only one host and port reachable from outside the cluster, which is used in the "--hostname-for-senders" option when receivers are started. That host and port is exposed by Kubernetes, and its Kubernetes the one doing the load balancing of incoming connections, that is why Geode cannot guarantee to which server a connection will point to. 




          1. By generic, I meant the one host and port reachable from outside the cluster. By actual, I meant the server that is actually connected to.

            1. Ok, thanks Barrett. I have verified what you suggested about the socket, but the ip that the socket has is the service ip, not the server ip.

              I added a log when the sender is created:

              [info 2020/06/11 07:49:18.866 GMT <Event Processor for GatewaySender_sender-to-2.0> tid=0x3e] alb3rtobr - theSocket.getInetAddress()=receiver-site2-service.geode-cluster-2.svc.cluster.local/10.101.212.191


              You can see that is the ip the receivers service in the other cluster:


              $ kubectl --namespace=geode-cluster-2 get service receiver-site2-service
              
              NAME                    TYPE      CLUSTER-IP      EXTERNAL-IP  PORT(S)          AGE
              
              receiver-site2-service  NodePort  10.101.212.191  <none>       32000:32000/TCP  49m
              
              
              
              



  3. I would like to better understand the name of the property you propose, `--receivers-sharing-ip-and-port`, and the part of the explanation that says "Sets whether or not the receivers are sharing the same ip and port."
    If I understand the RFC correctly, the general context is an environment in which multiple receivers specify a duplicate ip:port combination (not exactly 'sharing'), and the purpose of the property is to say to gfsh, "When you create this sender, be sure that all of its dispatcher threads connect to the same receiver."
    Also, it sounds like this option would work (though it would be unnecessary) if there was only one receiver up and running.
    So I think this property needs a more precise name. Something like `--enforce-all-threads-same-receiver` or something like that.