Problem Statement

The Java Broker currently performs no throttling of producing clients.  In combination with the way that the Java Broker holds every transient message in memory until consumed, we can encounter scenarios where the Java broker runs out of heap space.  For example, if a producer P sends messages at a rate of 100msg/s to a queue Q, but the only consumer, C, of queue Q processes messages at a rate of 10msg/s, then Q will grow at a rate of 90msg/s until such time as the broker runs out of heap space. 

Tactically we may attempt to solve the problem of Queues becoming overfull, and thus causing out of memory exceptions, without attempting to solve the totality of out of memory issues. 

Analysis

AMQP0-8/0-9/0-9-1 provides no mechanism for throttling producers of messages based on credit (either for a given destination, or even at the granualrity of a session).  There are two mechanisms available to throttle a producing client - the use of TCP flow control, and the use of the AMQP Channel.Flow command. 

The use of TCP flow control throttles the producer to the rate at which the Broker can process the incomming messages, but does not address the throttling of the producer to the consumption of messages by a third part consuming client.

The Channel.Flow command instructs the recipient to either cease (or resume) sending messages.  The receiver of the command should send Channel.Flow-ok once the flow command has been received.

In AMQP0-9-1  and earlier we cannot determine prior to a producer sending a message, which queues a producer wishes to send to.  Thus we are limitted in general to a reactive flow control - that is, when a producer attempts to send to an overful queue we can request that the sender send no more messages, my issuing a Channel.Flow.  Further, since many messages may already be "on-the-wire" by the time our Channel.Flow is received, we cannot guarantee by how much the producer may "overfill" the queue before it ceases publishing.

Proposal

Allow each queue on the Java Broker to be configured with a "full" size.  Implement flow control such that the publisher of a message which is enqueued on a "full" queue is immediately sent a Channel.Flow command to cease publication.  Monitor queue sizes such that when an "overfull" Queue has available space, then sessions which are blocked waiting for this event are free to send messages again. 

Ensure that the Java Client respects the Channel.Flow command, and causes all attempts to send Messages to block, until the session is unflowed.

Design

Broker Changes

Add the following configurable properties to Queues:

capacity: size in bytes at which the queue is thought to be full (and thus publishes which send messages which take the total queue size above this mark will be blocked).  Default 0 (no maximum)
flowResumeCapacity: the queue size at which producers are unflowed (defaulted to capacity)

Like other such values these may be set on individual queues in the config, or on a per-virtualhost basis.

Alter the following files in the org.apache.qpid.server.configuration package to set the queue properties based on this configuration:

VirtualHostConfiguration
QueueConfiguration
ServerConfiguration

Alter the AMQQueue.java interface to add the following method

    /** Post enqueue check to ensure that the queue is not overfull.  If the queue is overfull
        then request the channel to begin flow control */

    void checkCapacity(AMQChannel channel);

Update the following two classes in package org.apache.qpid.server.txn to call checkCapacity on the queue after they have enqueued a message

LocalTransactionalContext
NonTransactionalContext

Add the following code to AMQChannel in package org.apache.qpid.server

    /** The set of queues on which the session is currently blocking.  Only a session blocking on no queues can be unblocked */
    private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();

    /** Toggle to indicate whether the session is currently being blocked by an overfull queue condition or not */
    private final AtomicBoolean _blocking = new AtomicBoolean(false);


    /** Add the given queue to the set of those which the session is blocking on (ignore if we are already blocking on this queue)
        if this moves us from being unblocked to blocked, issue a flow command */
    public void block(AMQQueue queue)
    {
        if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
        {
            if(_blocking.compareAndSet(false,true))
            {
                flow(false);
            }
        }
    }

    /** Remove the given queue to the set of those which the session is blocking on (ignore if we are no longer blocking on this queue)
        If this moves us from a blocking to an unblocked condition, allow client to resume publishing by issuing a flow */
    public void unblock(AMQQueue queue)
    {
        if(_blockingQueues.remove(queue))
        {
            if(_blocking.compareAndSet(true,false))
            {
                flow(true);
            }
        }
    }

    /** Send a Channel.Flow command to the client */
    private void flow(boolean flow)
    {
        MethodRegistry methodRegistry = _session.getMethodRegistry();
        AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
        _session.writeFrame(responseBody.generateFrame(_channelId));
    }

Modify SimpleAMQQueue to perform the capacity check, and also to unblock blocked channels when the queue reduces in size.

    private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();


    public void checkCapacity(AMQChannel channel)
    {
        if(_capcity != 0L && _atomicQueueSize.get() > _capacity)
        {
            if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
            {
                channel.block(this);
            }

            // guard against race condition where consumer takes messages, decreasing queue size message
            // but not seeing that the queue was blocked so not issuing unblock
            if(_atomicQueueSize.get() <= _flowResumeCapacity)
            {
               channel.unblock(this);
               _blockedChannels.remove(channel);

            }

        }
    }

    private void decrementQueueSize(final QueueEntry entry)
    {
        getAtomicQueueSize().addAndGet(-entry.getMessage().getSize());
        checkFreeCapacity();
    }

    private void checkFreeCapacity()
    {
        if(_capacity != 0L && !_blockedChannels.isEmpty() && _atomicQueueSize.get() <= _flowResumeCapacity)
        {
            for(AMQChannel c : _blockedChannels.keySet())
            {
                c.unblock(this);
                _blockedChannels.remove(c);
            }
        }
    }


    

Client Changes

Hook in the existing handler for ChannelFlow commands by altering the dispatchChannelFlow method in the ClientMethodDispatcherImpl class

    public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
    {
        _channelFlowMethodHandler.methodReceived(_session, body, channelId);
        return true;
    }

Logging

Additionally logging messages should be emitted

1) on the broker each time the queue issues an overfull request to a session to start flow control
2) on the client every time it receives a flow control command from the broker
3) on the client every time it attempts to send a message but finds itself blocked by broker flow control - in particular this message should repeat periodically untill the message is sent

  • No labels

1 Comment

  1. Hi Rob, Looks good. Is the plan only to provide a blocking send?