Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

So in your Java code you can do

Code Block

Endpoint endpoint = context.getEndpoint("activemq:my.queue");
PollingConsumer consumer = endpoint.createPollingConsumer();
Exchange exchange = consumer.receive();

...

Method name

Description

receive()

Waits until a message is available and then returns it; potentially blocking forever

receive(long)

Attempts to receive a message exchange, waiting up to the given timeout and returning null if no message exchange could be received within the time available

receiveNoWait()

Attempts to receive a message exchange immediately without waiting and returning null if a message exchange is not available yet

if no message exchange could be received within the time available

receiveNoWait()

Attempts to receive a message exchange immediately without waiting and returning null if a message exchange is not available yet

EventDrivenPollingConsumer Options

The EventDrivePollingConsumer (the default implementation) supports the following options:

Wiki Markup
{div:class=confluenceTableSmall}
|| Option || Default || Description ||
| {{pollingConsumerQueueSize}} | {{1000}} | *Camel 2.14/2.13.1:* The queue size for the internal handoff queue between the polling consumer, and producers sending data into the queue. |
| {{pollingConsumerBlockWhenFull}} | {{true}} | *Camel 2.14/2.13.1:* Whether to block any producer if the internal queue is full. | 
{div}

Notice that some Camel Components has their own implementation of PollingConsumer and therefore do not support the options above.

You can configure these options in endpoints URIs, such as shown below:

Code Block
Endpoint endpoint = context.getEndpoint("file:inbox?pollingConsumerQueueSize=50");
PollingConsumer consumer = endpoint.createPollingConsumer();
Exchange exchange = consumer.receive(5000);

ConsumerTemplate

The ConsumerTemplate is a template much like Spring's JmsTemplate or JdbcTemplate supporting the Polling Consumer EIP. With the template you can consume Exchanges from an Endpoint.

The template supports the 3 operations above, but also including convenient methods for returning the body, etc consumeBody.
The example from above using ConsumerTemplate is:

Code Block

Exchange exchange = consumerTemplate.receive("activemq:my.queue");

Or to extract and get the body you can do:

Code Block

Object body = consumerTemplate.receiveBody("activemq:my.queue");

And you can provide the body type as a parameter and have it returned as the type:

Code Block

String body = consumerTemplate.receiveBody("activemq:my.queue", String.class);

You get hold of a ConsumerTemplate from the CamelContext with the createConsumerTemplate operation:

Code Block

ConsumerTemplate consumer = context.createConsumerTemplate();

...

...

For example to let a FTP consumer backoff if its becoming idle for a while you can do:

Code Block

    from("ftp://myserver?username=foo&passowrd=secret?delete=true&delay=5s&backoffMultiplier=6&backoffIdleThreshold=5")
      .to("bean:processFile");

...

For instance if we want to provide a retry feature to a scheduled consumer we can implement the PollingConsumerPollStrategy method and put the retry logic in the rollback method. Lets just retry up till 3 times:

Code Block
java
java

    public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
        if (retryCounter < 3) {
            // return true to tell Camel that it should retry the poll immediately
            return true;
        }
        // okay we give up do not retry anymore
        return false;
    }

Notice that we are given the Consumer as a parameter. We could use this to restart the consumer as we can invoke stop and start:

Code Block

    // error occurred lets restart the consumer, that could maybe resolve the issue
    consumer.stop();
    consumer.start();

...

To configure an Endpoint to use a custom PollingConsumerPollStrategy you use the option pollStrategy. For example in the file consumer below we want to use our custom strategy defined in the Registry with the bean id myPoll:

Code Block

    from("file://inbox/?pollStrategy=#myPoll").to("activemq:queue:inbox")

Include Page
Using This Pattern
Using This Pattern

See Also