Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • unclear threading model
    This lead to hard to test code with many sleeps and sporadic test failures
  • over-reliance on StateChangeListeners
    Made the code hard to debug and reason about

Design Goals and Considerations

In no particular order:

  • DG1: Clear Threading model
  • DG2: Prefer direct invocation over indirect stateChangeListeners
  • DG3: Avoid spurious wake-ups
  • DG4: Reduce delay incurred by wake-ups
  • DG5: Ensure some level of fairness between consumers
  • DG6: Respect consumer priorities 
  • DG7: Work with out-of-order queues (e.g., SortedQueue, PriorityQueue)
  • DG8:  Handle QueueBrowsers 
  • DG9: Work with Multi-Queue consumers

High-level Overview

The main players are

  • Queue
    • QueueConsumerManager
    ConsumerTarget
  • (Queue-)Consumers

The ConsumerTarget is the broker-side representation of a consuming client. Due to multi-queue consumers a ConsumerTarget has one or more Consumers associated with one queue each.

...

A Consumer MUST notify the Queue when it is ready to do some work. When notified by a Queue of available work. A Consumer MUST either notify try to pull messages of said Queue until either it notifies the Queue that it is no longer interested OR try to pull a message of said Queue.

Thread Model

Consumers are always invoked from the consuming connection's IO-Thread whereas the Queue might be invoked from different threads (producing connection's IO-Thread, Housekeeping thread for held or TTLed messages, a consuming connection's IO-Thread in case for message reject).

The interface between Consumers and Queues are

  • AbstractQueue#setNotifyWorkDesired
  • QueueConsumer#notifyWork
  • AbstractQueue#deliverSingleMessage

These methods MUST be thread-safe and SHOULD be lock free.

Simple Flow

there are no more messages available on the Queue (i.e., the Queue does not return a message).

 

Simple Flow

 

  1. Message arrives on the Queue
  2. The Queue notifies some interested Consumers that there is work to be done
  3. The Consumers notify their ConsumerTarget that they would like to do work
  4. The ConsumerTargets notify their Session that they would like to do work
  5. The Sessions notify their Connections that they would like to do work
  6. The Connections schedule themselves. This is the switch from the incoming Thread to the IO-Thread.
  7. The Scheduler kicks off a IO-Thread to process the work of a Connection
  8. The Connection iterates over its Sessions that want to do work
  9. The Sessions iterate over its ConsumerTargets that want to do work
  10. The ConsumerTargets iterate over its Consumers that want to do work
  11. The Consumer tries to pulls a message from the Queue
  12. If successful the message is put on the IO-buffer to be sent down the wire

Design Goals

  • Avoid spurious wake-ups
  • Reduce delay incurred by wake-ups
  • Ensure some level of fairness between consumers

Corner Cases and Things to Remember

...

Threading Model

Consumers are always invoked from the consuming connection's IO-Thread whereas the Queue might be invoked from different threads (producing connection's IO-Thread, Housekeeping thread for held or TTLed messages, a consuming connection's IO-Thread in case for message reject).

These are the interfaces between Consumers and Queues and the scenarios when they are called.

  • AbstractQueue#setNotifyWorkDesired
    Called by the Consumer to notify the Queue whether it is interested in doing work or not.
    • FlowControl
    • Credit
    • TCP backpressure
  • QueueConsumer#notifyWork
    Called by the Queue to notify the Consumer that there is potentially work available.
    • Consumer becomes Interested
    • A new message arrives
    • A previously unavailable (acquired, held, blocked by message grouping) message becomes available
    • A notified consumer did not do the work we expected it to do we need to notify someone else 
    • A high priority consumer becomes uninterested and thus allows a low priority consumer to consume messages
  • AbstractQueue#deliverSingleMessage
    Called by the Consumer to get a message from the Queue.
    • A consumer was notified and now tries to pull a message of a queue

...

These methods MUST be thread-safe and SHOULD be lock free.


QueueConsumerManager internals

...

Typically we want these lists to be thread-safe and give us O(1) access/deletion if we know the element and O(1) size information. Unfortunately there does not exist a data structure in the Java standard library with those characteristics which is why they are based on our own data structure QueueConsumerNodeList.

...