Approach to addressing Dispatcher issues.

Looking at the current rollback implementation we have two different approaches. One for 0-8 and one for 0-10. The change is being driven because the 0-8 has a race condition causing message ordering of a single message to change. The 0-10 implementation works for the receive() case but has a deadlock in the onMessage() case.

Recent work in the Python client would suggest that the better approach to implementing a client would be to use fewer locks and to remove the distinction between the onMessage() and the receive() cases. Putting the Message Listener thread in to the application space brings a large amount of simplification as there is no difference between the synchronous(receive()) and the asynchronous(onMessage()) cases. This change however is much larger than can reasonably be scoped in to 0.6.

So given that the desired solution is to large and we have a solution that works for the synchronous 0-10 case the simplest approach is to address the race condition and then move the logic up to be shared between 0-8 and 0-10.

How the 0-10 synchronous approach addresses rollback.

To ensure that all the received messages have been processed, the session is stopped and then a special message is added to the IO/Dispatcher _queue that the caller can wait to be processed. This is done in syncDispatchQueue()

syncDispatchQueue()
    void syncDispatchQueue()
    {
        final CountDownLatch signal = new CountDownLatch(1);
        _queue.add(new Dispatchable() {
            public void dispatch(AMQSession ssn)
            {
                signal.countDown();
            }
        });
        try
        {
            signal.await();
        }
        catch (InterruptedException e)
        {
            throw new RuntimeException(e);
        }
    }

This is tested via RollbackOrderTest.testOrderingAfterRollback()

Nature of deadlock

The problem is that the test only covers calling the synchronous receive(). The addition of a testOrderingAfterRollbackOnMessage() test that calls rollback() from the Dispatcher Thread highlights the deadlock. When the Dispatcher Thread calls syncDispatchQueue() it places an entry on _queue then awaits() for it to be processed. However, the Dispatcher Thread is the thread that does the processing so suspending it means that the entry will never be processed.

Steps to address issue.

A similar issue has already occurred and been addressed. The startDispatcherIfNecessasry() method performs a check to see if it is the Dispatcher thread.

AMQSession.startDispatcherIfNecessary()
    void startDispatcherIfNecessary()
    {
        //If we are the dispatcher then we don't need to check we are started
        if (Thread.currentThread() == _dispatcherThread)
        {
            return;
        }
    ...

The change to the syncDispatcherQueue() is to only do the await() if the current thread is not the Dispatcher. If the thread is the Dispatcher thread then we should proceed to process the contents of _queue.

As long as the rollback mark is correctly set before we call syncDispatchQueue() then the messages in the _queue will be correctly rejected/released. After that the rollback can proceed as before by calling dispatcher.rollback(). There is no danger of having a single message stuck on the Dispatcher thread as we just ensured that queue was fully processed.

The processing of the _queue may vary from the normal run as we cannot allow any message to be dispatched to the consumer. So whilst the _rollbackMark should cover all messages in the _queue we should also reject/release any other message in the queue rather than dispatch it to a consumer. It is not expected that we would have any other messages in the queue however, we should ensure that this error case is covered.

Other usages of syncDispatchQueue() to investigate

Searching for usages of syncDispatchQueue() yields three hits:

  1. AMQSession failoverPrep()
  2. AMQSession_0_10.releaseForRollback()
  3. BasicMessageConsumer_0_10.getMessageFromQueue(long)

They are all currently in the 0-10 codebase (as expected) there is nothing in the changes that will affect the other two usages; failoverPrep() and getMessageFromQueue(long). Only releaseForRollback() has the possibility of being called from onMessage() and hence being subject to the new logic. getMessageFromQueue(long) is part of the JMS receive() calls and failoverPrep() is called from the IO layer when failover needs to occur.

Changes to 0-8 code path to use the 0-10 rollback functionality.

Currently releaseForRollback() is called from the AMQSession.rollback() method:

AMQSession.rollback()
    public void rollback() throws JMSException
    {
    ...
                releaseForRollback();

    ...

In the 0-10 code path the following sync and rollback of the dispatcher is done before it performs message release.

0-10.releaseForRollback()
    public void releaseForRollback()
    {
        startDispatcherIfNecessary();
        syncDispatchQueue();
        _dispatcher.rollback();
        ...

If these step are brought up to AMQSession.rollback() then the 0-8 and 0-10 components need only perform their block rejects/release, and the 0-8 path need not call AMQSession.rollback().

Potential improvements.

Part of the goal of doing this, less than elegant, solution is to share as much code between 0-8 and 0-10 as possible. While this will align the process of performing rollback the method of recording the deliveryTags of delivered messages is still duplicated between the two protocols. 0-10 uses _txRangeSet and 0-8 uses _deliveredMessageTags. Attempting to consolidate these may not be needed but they both perform the same function so would be beneficial to the client to investigate if the RangeSet could be used in the 0-8 code path.

  • No labels