...
- unclear threading model
This lead to hard to test code with many sleeps and sporadic test failures - over-reliance on StateChangeListeners
Made the code hard to debug and reason about
Design Goals and Considerations
In no particular order:
- DG1: Clear Threading model
- DG2: Prefer direct invocation over indirect stateChangeListeners
- DG3: Avoid spurious wake-ups
- DG4: Reduce delay incurred by wake-ups
- DG5: Ensure some level of fairness between consumers
- DG6: Respect consumer priorities
- DG7: Work with out-of-order queues (e.g., SortedQueue, PriorityQueue)
- DG8: Handle QueueBrowsers
- DG9: Work with Multi-Queue consumers
High-level Overview
The main players are
- Queue
- QueueConsumerManager
- (Queue-)Consumers
The ConsumerTarget is the broker-side representation of a consuming client. Due to multi-queue consumers a ConsumerTarget has one or more Consumers associated with one queue each.
...
A Consumer MUST notify the Queue when it is ready to do some work. When notified by a Queue of available work. A Consumer MUST either notify try to pull messages of said Queue until either it notifies the Queue that it is no longer interested OR try to pull a message of said Queue.
Thread Model
Consumers are always invoked from the consuming connection's IO-Thread whereas the Queue might be invoked from different threads (producing connection's IO-Thread, Housekeeping thread for held or TTLed messages, a consuming connection's IO-Thread in case for message reject).
The interface between Consumers and Queues are
- AbstractQueue#setNotifyWorkDesired
- QueueConsumer#notifyWork
- AbstractQueue#deliverSingleMessage
These methods MUST be thread-safe and SHOULD be lock free.
Simple Flow
there are no more messages available on the Queue (i.e., the Queue does not return a message).
Simple Flow
- Message arrives on the Queue
- The Queue notifies some interested Consumers that there is work to be done
- The Consumers notify their ConsumerTarget that they would like to do work
- The ConsumerTargets notify their Session that they would like to do work
- The Sessions notify their Connections that they would like to do work
- The Connections schedule themselves. This is the switch from the incoming Thread to the IO-Thread.
- The Scheduler kicks off a IO-Thread to process the work of a Connection
- The Connection iterates over its Sessions that want to do work
- The Sessions iterate over its ConsumerTargets that want to do work
- The ConsumerTargets iterate over its Consumers that want to do work
- The Consumer tries to pulls a message from the Queue
- If successful the message is put on the IO-buffer to be sent down the wire
Design Goals
- Avoid spurious wake-ups
- Reduce delay incurred by wake-ups
- Ensure some level of fairness between consumers
Corner Cases and Things to Remember
...
Threading Model
Consumers are always invoked from the consuming connection's IO-Thread whereas the Queue might be invoked from different threads (producing connection's IO-Thread, Housekeeping thread for held or TTLed messages, a consuming connection's IO-Thread in case for message reject).
These are the interfaces between Consumers and Queues and the scenarios when they are called.
- AbstractQueue#setNotifyWorkDesired
Called by the Consumer to notify the Queue whether it is interested in doing work or not.- FlowControl
- Credit
- TCP backpressure
- QueueConsumer#notifyWork
Called by the Queue to notify the Consumer that there is potentially work available.- Consumer becomes Interested
- A new message arrives
- A previously unavailable (acquired, held, blocked by message grouping) message becomes available
- A notified consumer did not do the work we expected it to do we need to notify someone else
- A high priority consumer becomes uninterested and thus allows a low priority consumer to consume messages
- AbstractQueue#deliverSingleMessage
Called by the Consumer to get a message from the Queue.- A consumer was notified and now tries to pull a message of a queue
- A consumer was notified and now tries to pull a message of a queue
...
These methods MUST be thread-safe and SHOULD be lock free.
QueueConsumerManager internals
...
Typically we want these lists to be thread-safe and give us O(1) access/deletion if we know the element and O(1) size information. Unfortunately there does not exist a data structure in the Java standard library with those characteristics which is why they are based on our own data structure QueueConsumerNodeList.
...