Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Highly Available Client Event Mechanism

One of the key features of Geode is reliable asynchronous event notification. The publish/subscribe system offers a data distribution service where new events are published into the system and routed to all interested subscribers in a reliable manner.

The clients can subscribe interests using keys or CQ.

The product achieves reliability and HA (high availability) of the subscription event using HARegionQueue.

HARegionQueue

HARegionQueue is an implementation of a queue using a Geode region as the underlying data structure.

When a client connects to the server with subscription enabled flag, an HARegionQueue for that client-proxy is created on the server side. 


Class Diagrams


PlantUML
skinparam dpi 80

@startuml
hide empty members
title Server to Client Queues - Class Diagram
class CacheClientNotifier {
  Singleton for a Cache.
}
class CacheClientProxy {
  One for each client.

...


  Keeps track of a the 

...

subscriptions for the client.
}
class MessageDispatcher {
  Thread that reads events from the queue.
}
class HARegionQueue {
  Holds the list of events to

...


  dispatch to a single client.
}
class HAContainerMap {
  Holds the actual values for events to dispatch.
  Used to store only a single copy of each value
  across all queues.
}
interface HAContainerMap
CacheClientNotifier "1   " o-- "*" CacheClientProxy
CacheClientProxy o-- MessageDispatcher
CacheClientNotifier o-- HAContainerMap
MessageDispatcher o-- HARegionQueue
HARegionQueue "*" --> "1" HAContainerMap
HARegionQueue o-- HARegion
@enduml

...


This next diagram focuses on cardinality:

  • between HARegionQueue and DACE (DispatchedAndCurrentEvents) objects (through the mis-named HARegionQueue.eventsMap)
  • between HARegion and the two kinds of entries it holds: <Position,Conflatable> and <ThreadIdentifier,SequenceID>

In both cases, the Java Maps are depicted as associations in the diagram (the Map objects are not shown explicitly). So, for instance HARegionQueue.eventsMap is a Map. It's depicted as a 1-to-(0..1 per Position) association from HARegionQueue to Position. Similarly for HARegion to Conflatable and HARegion to SequenceID.

The diagram introduces two types not actually present in the Java code. Both are depicted as subtypes of Long: SequenceID to stand for an event's sequence id; and Position to stand for a position in a queue. In the source code these are both just Longs.


PlantUML
@startuml

title HARegionQueue—Class Diagram\nHARegion and DACE Cardinalities

class SequenceID <<design class>>
Long <|-- SequenceID
class Position <<design class>>
Long <|-- Position

class HARegionQueue {
  Position tailKey
}

class DispatchedAndCurrentEvents {
 SequenceID lastDispatchedSequenceID
}

' these are together because I want to highlight the similarity (potential subtype relationship)
together {

  ' putting this first causes it to be layed out to the right of ThreadIdentifier, making the
  ' association (line) from Conflatable shorter
  class EventID {
    byte[]     membershipId
    long       threadId
    SequenceID sequenceId
    int        bucketId
  }

  class ThreadIdentifier {
    byte[]     membershipId
    long       threadId
  }
}

interface Conflatable

' these subtypes of Conflatable are grouped together
together {
  interface ClientMessage
  class HAEventWrapper
  class ConflatableObject
}

Conflatable <|-- HAEventWrapper

Conflatable <|-- ConflatableObject

Conflatable <|-- ClientMessage

interface ClientUpdateMessage

ClientMessage <|-- ClientUpdateMessage
ClientUpdateMessage <|-- ClientUpdateMessageImpl
ClientUpdateMessageImpl <|-- ClientInstantiatorMessage


ClientMessage <|-- ClientMarkerMessageImpl

Conflatable --> EventID

interface RegionQueue
RegionQueue <|-- HARegionQueue
HARegionQueue <|-- BlockingHARegionQueue
BlockingHARegionQueue <|-- DurableHARegionQueue
DistributedRegion <|-- HARegion

' while method signatures in HARegionQueue take Object, there's downcasting to Conflatable
HARegion "*" *-- "0..1 per Position" Conflatable

HARegion "*" *-- "0..1 per ThreadIdentifier" SequenceID

HARegionQueue *-- HARegion

HARegionQueue "1" *-- "0..1 per ThreadIdentifier" DispatchedAndCurrentEvents

DispatchedAndCurrentEvents "1" *-- "0..1 per Position" Position

' this hidden assoc changes layout such that cardinality text on SequenceID and Position isn't overlapping as much
ConflatableObject -[hidden]- HARegion

@enduml

...


Sequence Diagrams


PlantUML
skinparam dpi 80

title Put into Queue (for Partitioned Regions)
actor user
user -> "PR Primary" : put
"PR Primary" -> "PR Secondary" : UpdateMessage
"PR Primary" -> "Adjunct Receiver": PutMessage
note right
  The primary notifies all members that have
  clients with interest
end note
"PR Primary" -> CacheClientNotifier: notifyClients()
"Adjunct Receiver" -> CacheClientNotifier: notifyClients()
note right 
  All members with interest 
  call notifyClients locally
end note
"PR Secondary" -> CacheClientNotifier: notifyClients()
CacheClientNotifier -> CacheClientProxy: deliverMessage()
CacheClientProxy -> MessageDispatcher : enqueueMessage()
MessageDispatcher -> HARegionQueue: basicPut()
 



Adding events to HARegionQueue:

When an operation is performed on the cache, the “LocalRegion.notifyBridgeClients()” is called to deliver events to the interested clients. The list of interested clients are obtained by calling  “generateLocalFilterRouting()” and events are queued through “CacheClientNotifier.notifyClients()”.

Based on the interest satisfied by the client, the ClientMessage is created and added to the HARegionQueue of the interested client (“CacheClientProxy.enqueueMessage()”). Once the event is added to the HARegionQueue the cache operation thread returns to the caller.

There are cases where the events are not simply added to the HARegionQueue. Specifically, there currently are two cases where the event is put into a temporary queue.

1. The server is providing an initial image to a performed (see HARegionQueue.giiQueue)
2. A client is in the process of registering and its message dispatcher/queue are not fully created and initialized

After these operations are completed, the temporary queues are drained and the event is added to the HARegionQueue. The diagrams below show the different put paths in detail.


Image Added

These diagrams show more detail around the special handling during client queue initialization or providing an initial image.



Image AddedImage Added

Delivering/Dispatching Events to Client:

The client events are delivered asynchronously to the clients using MessageDispatcher thread ("CacheClientProxy.MessageDispatcher"). The dispatcher thread peeks event from the HARegionQueue and delivers it to the client.

Ack from Client:

Client:
Upon receiving the event, the client updates its local cache or invokes the CqListeners (“CacheClientUpdater.processMessages()”).

The client also acknowledges the server periodically with the received event ID (ThreadId, sequence ID); for removal of events from HARegionQueue.

Server:
The server processes the ack command from client and maintains it in the HARegionQueue.ackedEvents map. 

Removing Events:

The event from HARegionQueue is removed based on the client acknowledgement (“HAregionQueue.remove()”).

Reliable Event Delivery:

Reliable event delivery (in case of node failure) is achieved with subscription redundancy level. Client can configure to have queues created on multiple server nodes by setting redundancy level.

With redundancy set; the HARegionQueues are created on multiple servers; one of the queues is treated as primary queue and others as secondary queues.

When the cache operation is performed, it is distributed to other nodes; based on the interest subscription the events are added to primary and secondary HARegionQueues (as explained above). The HARegionQueue’s by themselves don’t replicate the events between them.

The events are dispatched to clients from the primary HARegionQueue. If the node hosting the primary queue goes down, one of the nodes hosting secondary queues becomes the primary and starts dispatching the events. During this scenario duplicate events could be sent to client. At the client side checks are made for duplicate events which are ignored.

When the message is delivered and successfully removed from the primary HARegionQueue, the acknowledgement is sent to other servers hosting the queues (“QueueRemovalMessage”).

Creating Secondary HAQueues:

When the secondary queues are created; in order to be in sync with primary queue, they perform initial data replication from one of the existing queues (using backing region’s GII mechanism - “HARegionQueue.createHARegion()”).

During queue initialization, any events satisfying the interest criteria are queued temporarily in “CacheClientProxy.queuedEvents”. Once the initialization is over, the events from the temporary queue are added to HARegionQueue for delivery.

After the events are delivered and removed from the primary queue (node), it sends the “QueueRemovalMessage” to secondary queues (nodes); as part of message processing each secondary queue removes events from its queue(“QueueRemovalMessage.process().removeDispatchedEvents()”.

The diagram below shows a simplified view of the queue removal logic.

Image Added

Memory Optimization:

Based on number of client queues, and the rate at which events are delivered, the individual queues may consume lot of memory. In order to reduce the memory footprint the events are stored in a container (“HAContainer”) and all the client queues will refer to this instead of having their own copy (“HARegionQueue.putEntryConditionallyIntoHAContainer()”).

The reason for doing this is that in most of the cases clients will have common interests so having a single copy of the client message will reduce the memory significantly.

Once event is delivered to all the interested clients, the event is removed from HAContainer (“HARegionQueue.decAndRemoveFromHAContainer()”).

Code path for adding event to HAContainer:
HARegionQueue.basicPut()->putObject()->putEventInHARegion()->putEntryConditionallyIntoHAContainer()

Event Conflation:

The clients can configure events to be conflated: the old events in the HARegionQueue will be destroyed and a new event is added to the tail of the Queue.

This is done at the end of “HARegionQueue.putObject()”.