Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Multi-Queue consumers
  • consumer priorities
  • out-of-order queues
  • queue browsers
  • fairness

QueueConsumerManager internals

The QueueConsumerManager (QCM for short) keeps track of the state of Consumers from the perspective of the Queue. It decides which Consumer to notify of work. To do this in a performant way it maintains a number of lists and moves Consumers between those lists to indicate state change. The lists it maintains are:

  • All
  • NonAcquiring
  • NotInterested
  • Interested
  • Notified

Typically we want these lists to be thread-safe and give us O(1) access/deletion if we know the element. Unfortunately there does not exist a data structure in the Java standard library with those characteristics which is why they are based on our own data structure QueueConsumerNodeList.

QueueConsumerNodeList

The QueueConsumerNodeList is the underlying data structure of all of QCM's lists. It is thread-safe and allows O(1) appending and given you have a pointer to an entry O(1) deletion. It is essentially a singly linked list. To achieve O(1) deletion entries are marked for deletion but only actually removed upon the next iteration.  The rational being that, to delete an entry you would need to update the previous entry's "next" pointer but to get to the previous element you would need a doubly linked list which it impossible to maintain in a thread-safe way without locking. Special care must be taken when removing elements from the tail since we keep an explicit reference to it in the QueueConsumerNodeList to achieve O(1) appending. The data structure in the QueueConsumerNodeList are called QueueConsumerNodeListEntries which themselves have a reference to a QueueConsumerNode which is the persistent entity that moves between QCM's lists and has a reference to the QueueConsumer. The QueueConsumer itself also has a reference to the QueueConsumerNode to enable O(1) deletion given a Consumer. This tightly couples the QueueConsumer and QCM classes.

The "All" List

The All list contains all Consumers registered with the Queue. This is necessary to be able to iterate over all consumers in a thread-safe way without locking. The danger of using several lists instead of a single All list is that you might miss a Consumer if it moves between lists during iteration.

The "NonAcquiring" List

This is a list of Consumers that do not acquire messages for example Queue Browsers. These need to be handled separately because they should always be notified about new messages. Where they kept in the same list as the acquiring consumers we would have to iterate of the entire list to make sure we did not miss a non-acquiring consumer.

The "NotInterested" List

This list contains all acquiring Consumers that indicated to the Queue that they currently are not interested in doing any work (i.e., taking messages). This typically happens when a Consumer/Connection is suspended due to FlowControl/TCP backpressure.

The "Interested" List

This is the default list for acquiring Consumers. It signifies that they are ready to process messages. When a new message arrives on the Queue it will notify Consumers from this list.

The "Notified" List

Once an acquiring Consumer is notified that there is work to do it is moved from the "Interested" list to the "Notified" list. The QCM expects such a Consumer to either indicate that it is no longer interested (e.g., it became suspended in the meantime and therefore will not do the work we expected it to) or call AbstractQueue#deliverSingleMessage. The Consumer should remain in the "Notified" list and continue to call deliverSingleMessage until deliverSingleMessage cannot deliver a message to it any more in which case it is moved back to the "Interested" list. This is to avoid unnecessary wakeups.

Handling Consumer Priorities

When deciding which Consumer to notify the QCM should take consumer priorities into account. To do this in a performant way it maintains a QueueConsumerNodeList per consumer priority in a list of PriorityConsumerListPairs.