You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Current »

Highly Available Client Event Mechanism

One of the key features of Geode is reliable asynchronous event notification. The publish/subscribe system offers a data distribution service where new events are published into the system and routed to all interested subscribers in a reliable manner.

The clients can subscribe interests using keys or CQ.

The product achieves reliability and HA (high availability) of the subscription event using HARegionQueue.

HARegionQueue

HARegionQueue is an implementation of a queue using a Geode region as the underlying data structure.

When a client connects to the server with subscription enabled flag, an HARegionQueue for that client-proxy is created on the server side.


Class Diagrams


Server to Client Queues - Class Diagram CacheClientNotifier Singleton for a Cache. CacheClientProxy One for each client.Keeps track of a the subscriptions for the client. MessageDispatcher Thread that reads events from the queue. HARegionQueue Holds the list of events todispatch to a single client. HAContainerMap Holds the actual values for events to dispatch.Used to store only a single copy of each valueacross all queues. HARegion 1* *1

This next diagram focuses on cardinality:

  • between HARegionQueue and DACE (DispatchedAndCurrentEvents) objects (through the mis-named HARegionQueue.eventsMap)
  • between HARegion and the two kinds of entries it holds: <Position,Conflatable> and <ThreadIdentifier,SequenceID>

In both cases, the Java Maps are depicted as associations in the diagram (the Map objects are not shown explicitly). So, for instance HARegionQueue.eventsMap is a Map. It's depicted as a 1-to-(0..1 per Position) association from HARegionQueue to Position. Similarly for HARegion to Conflatable and HARegion to SequenceID.

The diagram introduces two types not actually present in the Java code. Both are depicted as subtypes of Long: SequenceID to stand for an event's sequence id; and Position to stand for a position in a queue. In the source code these are both just Longs.


HARegionQueue—Class DiagramHARegion and DACE Cardinalities «design class»SequenceID Long «design class»Position HARegionQueue Position tailKey DispatchedAndCurrentEvents SequenceID lastDispatchedSequenceID EventID byte[] membershipIdlong threadIdSequenceID sequenceIdint bucketId ThreadIdentifier byte[] membershipIdlong threadId Conflatable ClientMessage HAEventWrapper ConflatableObject ClientUpdateMessage ClientUpdateMessageImpl ClientInstantiatorMessage ClientMarkerMessageImpl RegionQueue BlockingHARegionQueue DurableHARegionQueue DistributedRegion HARegion *0..1 per Position *0..1 per ThreadIdentifier 10..1 per ThreadIdentifier 10..1 per Position

Sequence Diagrams


Put into Queue (for Partitioned Regions) user user PR Primary PR Primary PR Secondary PR Secondary Adjunct Receiver Adjunct Receiver CacheClientNotifier CacheClientNotifier CacheClientProxy CacheClientProxy MessageDispatcher MessageDispatcher HARegionQueue HARegionQueue put UpdateMessage PutMessage The primary notifies all members that haveclients with interest notifyClients() notifyClients() All members with interestcall notifyClients locally notifyClients() deliverMessage() enqueueMessage() basicPut()


Adding events to HARegionQueue:

When an operation is performed on the cache, the “LocalRegion.notifyBridgeClients()” is called to deliver events to the interested clients. The list of interested clients are obtained by calling  “generateLocalFilterRouting()” and events are queued through “CacheClientNotifier.notifyClients()”.

Based on the interest satisfied by the client, the ClientMessage is created and added to the HARegionQueue of the interested client (“CacheClientProxy.enqueueMessage()”). Once the event is added to the HARegionQueue the cache operation thread returns to the caller.

There are cases where the events are not simply added to the HARegionQueue. Specifically, there currently are two cases where the event is put into a temporary queue.

1. The server is providing an initial image to a performed (see HARegionQueue.giiQueue)
2. The server is initializing the message dispatcher as part of cliesnt queue initialization logic (see CacheClientProxy.queuedEvents)

After these operations are completed, the temporary queues are drained and the event is added to the HARegionQueue. The diagrams below show the different put paths in detail.


These diagrams show more detail around the special handling during client queue initialization or providing an initial image.

Delivering/Dispatching Events to Client:

The client events are delivered asynchronously to the clients using MessageDispatcher thread ("CacheClientProxy.MessageDispatcher"). The dispatcher thread peeks event from the HARegionQueue and delivers it to the client.

Ack from Client:

Client:
Upon receiving the event, the client updates its local cache or invokes the CqListeners (“CacheClientUpdater.processMessages()”).

The client also acknowledges the server periodically with the received event ID (ThreadId, sequence ID); for removal of events from HARegionQueue.

Server:
The server processes the ack command from client and maintains it in the HARegionQueue.ackedEvents map. 

Removing Events:

The event from HARegionQueue is removed based on the client acknowledgement (“HAregionQueue.remove()”).

Reliable Event Delivery:

Reliable event delivery (in case of node failure) is achieved with subscription redundancy level. Client can configure to have queues created on multiple server nodes by setting redundancy level.

With redundancy set; the HARegionQueues are created on multiple servers; one of the queues is treated as primary queue and others as secondary queues.

When the cache operation is performed, it is distributed to other nodes; based on the interest subscription the events are added to primary and secondary HARegionQueues (as explained above). The HARegionQueue’s by themselves don’t replicate the events between them.

The events are dispatched to clients from the primary HARegionQueue. If the node hosting the primary queue goes down, one of the nodes hosting secondary queues becomes the primary and starts dispatching the events. During this scenario duplicate events could be sent to client. At the client side checks are made for duplicate events which are ignored.

When the message is delivered and successfully removed from the primary HARegionQueue, the acknowledgement is sent to other servers hosting the queues (“QueueRemovalMessage”).

Creating Secondary HAQueues:

When the secondary queues are created; in order to be in sync with primary queue, they perform initial data replication from one of the existing queues (using backing region’s GII mechanism - “HARegionQueue.createHARegion()”).

During queue initialization, any events satisfying the interest criteria are queued temporarily in “CacheClientProxy.queuedEvents”. Once the initialization is over, the events from the temporary queue are added to HARegionQueue for delivery.

After the events are delivered and removed from the primary queue (node), it sends the “QueueRemovalMessage” to secondary queues (nodes); as part of message processing each secondary queue removes events from its queue(“QueueRemovalMessage.process().removeDispatchedEvents()”.

The diagram below shows a simplified view of the queue removal logic.

Memory Optimization:

Based on number of client queues, and the rate at which events are delivered, the individual queues may consume lot of memory. In order to reduce the memory footprint the events are stored in a container (“HAContainer”) and all the client queues will refer to this instead of having their own copy (“HARegionQueue.putEntryConditionallyIntoHAContainer()”).

The reason for doing this is that in most of the cases clients will have common interests so having a single copy of the client message will reduce the memory significantly.

Once event is delivered to all the interested clients, the event is removed from HAContainer (“HARegionQueue.decAndRemoveFromHAContainer()”).

Code path for adding event to HAContainer:
HARegionQueue.basicPut()->putObject()->putEventInHARegion()->putEntryConditionallyIntoHAContainer()

Event Conflation:

The clients can configure events to be conflated: the old events in the HARegionQueue will be destroyed and a new event is added to the tail of the Queue.

This is done at the end of “HARegionQueue.putObject()”.



  • No labels