Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 4.0

...

Polling

...

Consumer

...

Camel

...

supports

...

implementing

...

the

...

Polling

...

Consumer

...

from

...

the

...

EIP

...

patterns

...

using

...

the PollingConsumer interface which can be created via the Endpoint.createPollingConsumer() method.

Image Added

So in your Java code you can do

Code Block

Endpoint endpoint = context.getEndpoint("activemq:my.queue");
PollingConsumer consumer = endpoint.createPollingConsumer();
Exchange exchange = consumer.receive();

The ConsumerTemplate (discussed below) is also available.

There are 3 main polling methods on PollingConsumer

Method name

Description

receive()

Waits until a message is available and then returns it; potentially blocking forever

receive(long)

Attempts to receive a message exchange, waiting up to the given timeout and returning null if no message exchange could be received within the time available

receiveNoWait()

Attempts to receive a message exchange immediately without waiting and returning null if a message exchange is not available yet

ConsumerTemplate

The ConsumerTemplate is a template much like Spring's JmsTemplate or JdbcTemplate supporting the Polling Consumer EIP. With the template you can consume Exchanges from an Endpoint.

The template supports the 3 operations above, but also including convenient methods for returning the body, etc consumeBody.
The example from above using ConsumerTemplate is:

Code Block
 [PollingConsumer|http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/PollingConsumer.html] interface which can be created via the [Endpoint.createPollingConsumer()|http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/Endpoint.html#createPollingConsumer()] method.

!http://www.enterpriseintegrationpatterns.com/img/PollingConsumerSolution.gif!

So in your Java code you can do

{code}
Endpoint endpoint = context.getEndpoint("activemq:my.queue");
PollingConsumer consumer = endpoint.createPollingConsumer();
Exchange exchange = consumer.receive();
{code}

The {{ConsumerTemplate}} (discussed below) is also available.

There are 3 main polling methods on [PollingConsumer|http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/PollingConsumer.html]

|| Method name || Description ||
| [receive()|http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/PollingConsumer.html#receive()] | Waits until a message is available and then returns it; potentially blocking forever |
| [receive(long)|http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/PollingConsumer.html#receive(long)] | Attempts to receive a message exchange, waiting up to the given timeout and returning null if no message exchange could be received within the time available |
| [receiveNoWait()|http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/PollingConsumer.html#receiveNoWait()] | Attempts to receive a message exchange immediately without waiting and returning null if a message exchange is not available yet |

h3. ConsumerTemplate

The {{ConsumerTemplate}} is a template much like Spring's JmsTemplate or JdbcTemplate supporting the [Polling Consumer] EIP. With the template you can consume [Exchange]s from an [Endpoint].

The template supports the 3 operations above, but also including convenient methods for returning the body, etc {{consumeBody}}.
The example from above using ConsumerTemplate is:
{code}
Exchange exchange = consumerTemplate.receive("activemq:my.queue");
{code}

Or

...

to

...

extract

...

and

...

get

...

the

...

body

...

you

...

can

...

do:

{
Code Block
}
Object body = consumerTemplate.receiveBody("activemq:my.queue");
{code}

And

...

you

...

can

...

provide

...

the

...

body

...

type

...

as

...

a

...

parameter

...

and

...

have

...

it

...

returned

...

as

...

the

...

type:

{
Code Block
}
String body = consumerTemplate.receiveBody("activemq:my.queue", String.class);
{code}

You

...

get

...

hold

...

of a ConsumerTemplate from the CamelContext with the createConsumerTemplate operation:

Code Block
 a {{ConsumerTemplate}} from the {{CamelContext}} with the {{createConsumerTemplate}} operation:
{code}
ConsumerTemplate consumer = context.createConsumerTemplate();
{code}

h4. Using ConsumerTemplate with Spring DSL
With the Spring DSL we can declare the consumer in the CamelContext with the *consumerTemplate* tag, just like the ProducerTemplate. The example below illustrates this:

Using ConsumerTemplate with Spring DSL

With the Spring DSL we can declare the consumer in the CamelContext with the consumerTemplate tag, just like the ProducerTemplate. The example below illustrates this:

Wiki Markup
{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/SpringConsumerTemplateTest-context.xml}

Then

...

we

...

can

...

get

...

leverage

...

Spring

...

to

...

inject

...

the

...

ConsumerTemplate

...

in

...

our

...

java

...

class.

...

The

...

code

...

below

...

is

...

part

...

of

...

an

...

unit

...

test

...

but

...

it

...

shows

...

how

...

the

...

consumer

...

and

...

producer

...

can

...

work

...

together.

Wiki Markup
 
{snippet:id=e1|lang=java|url=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/SpringConsumerTemplateTest.java}

h4. 

Timer

...

based

...

polling

...

consumer

...

In

...

this

...

sample

...

we

...

use

...

a

...

Timer

...

to

...

schedule

...

a

...

route

...

to

...

be

...

started

...

every

...

5th

...

second

...

and

...

invoke

...

our

...

bean

...

MyCoolBean

...

where

...

we

...

implement

...

the

...

business

...

logic

...

for

...

the

...

Polling

...

Consumer

...

.

...

Here

...

we

...

want

...

to

...

consume

...

all

...

messages

...

from

...

a

...

JMS

...

queue,

...

process

...

the

...

message

...

and

...

send

...

them

...

to

...

the

...

next

...

queue.

...

First

...

we

...

setup

...

our

...

route

...

as:

Wiki Markup

{snippet:id=e1|lang=java|url=camel/tags/camel-2.6.0/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTimerBasedPollingConsumerTest.java}

And

...

then

...

we

...

have

...

out

...

logic

...

in

...

our

...

bean:

Wiki Markup

{snippet:id=e2|lang=java|url=camel/tags/camel-2.6.0/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTimerBasedPollingConsumerTest.java}

Scheduled Poll Components

Quite a few inbound Camel endpoints use a scheduled poll pattern to receive messages and push them through the Camel processing routes. That is to say externally from the client the endpoint appears to use an Event Driven Consumer but internally a scheduled poll is used to monitor some kind of state or resource and then fire message exchanges.

Since this a such a common pattern, polling components can extend the ScheduledPollConsumer base class which makes it simpler to implement this pattern.

There is also the Quartz Component which provides scheduled delivery of messages using the Quartz enterprise scheduler.

For more details see:

ScheduledPollConsumer Options

The ScheduledPollConsumer supports the following options:

Wiki Markup



h3. Scheduled Poll Components

Quite a few inbound Camel endpoints use a scheduled poll pattern to receive messages and push them through the Camel processing routes. That is to say externally from the client the endpoint appears to use an [Event Driven Consumer] but internally a scheduled poll is used to monitor some kind of state or resource and then fire message exchanges. 

Since this a such a common pattern, polling components can extend the [ScheduledPollConsumer|http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/impl/ScheduledPollConsumer.html] base class which makes it simpler to implement this pattern.

There is also the [Quartz Component|Quartz] which provides scheduled delivery of messages using the Quartz enterprise scheduler.

For more details see:

* [PollingConsumer|http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/PollingConsumer.html]
* Scheduled Polling Components
** [ScheduledPollConsumer|http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/impl/ScheduledPollConsumer.html]
** [Atom]
** [File|File2]
** [FTP|FTP2]
** [hbase|HBase]
** [iBatis]
** [JPA]
** [Mail]
** [MyBatis]
** [Quartz]
** [SNMP]
** [AWS-S3|AWS-S3]
** [AWS-SQS|AWS-SQS]

h3. ScheduledPollConsumer Options
The ScheduledPollConsumer supports the following options:

{div:class=confluenceTableSmall}
|| Option || Default || Description ||
| {{pollStrategy}} | - | A pluggable {{org.apache.camel.PollingConsumerPollingStrategy}} allowing you to provide your custom implementation to control error handling usually occurred during the {{poll}} operation *before* an [Exchange] have been created and being routed in Camel. In other words the error occurred while the polling was gathering information, for instance access to a file network failed so Camel cannot access it to scan for files. The default implementation will log the caused exception at {{WARN}} level and ignore it. |
| {{sendEmptyMessageWhenIdle}} | {{false}} | *Camel 2.9:* If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. |
| {{startScheduler}} | {{true}} | Whether the scheduler should be auto started. |
| {{initialDelay}} | {{1000}} | Milliseconds before the first poll starts. |
| {{delay}} | {{500}} | Milliseconds before the next poll. |
| {{useFixedDelay}} |  | Controls if fixed delay or fixed rate is used. See [ScheduledExecutorService|http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ScheduledExecutorService.html] in JDK for details. In *Camel 2.7.x* or older the default value is {{false}}. From *Camel 2.8* onwards the default value is {{true}}. |
| {{timeUnit}} | {{TimeUnit.MILLISECONDS}} | time unit for {{initialDelay}} and {{delay}} options. |
| {{runLoggingLevel}} | {{TRACE}} | *Camel 2.8:* The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that. |
| {{scheduledExecutorService}} | {{null}} | *Camel 2.10:* Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool. This option allows you to share a thread pool among multiple consumers. |
| {{greedy}} | {{false}} | *Camel 2.10.6/2.11.1:* If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages. |
| {{scheduler}} | {{null}} | *Camel 2.12:* Allow to plugin a custom {{org.apache.camel.spi.ScheduledPollConsumerScheduler}} to use as the scheduler for firing when the polling consumer runs. The default implementation uses the {{ScheduledExecutorService}} and there is a [Quartz2], and [Spring] based which supports CRON expressions. *Notice:* If using a custom scheduler then the options for {{initialDelay}}, {{useFixedDelay}}, {{timeUnit}}, and {{scheduledExecutorService}} may not be in use. Use the text {{quartz2}} to refer to use the [Quartz2] scheduler; and use the text {{spring}} to use the [Spring] based; and use the text {{#myScheduler}} to refer to a custom scheduler by its id in the [Registry]. See [Quartz2] page for an example. |
| {{scheduler.xxx}} | {{null}} | *Camel 2.12:* To configure additional properties when using a custom {{scheduler}} or any of the [Quartz2], [Spring] based scheduler. | 
| {{backoffMultiplier}} | {{0}} | *Camel 2.12:* To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then {{backoffIdleThreshold}} and/or {{backoffErrorThreshold}} must also be configured. |
| {{backoffIdleThreshold}} | {{0}} | *Camel 2.12:* The number of subsequent idle polls that should happen before the {{backoffMultipler}} should kick-in. |
| {{backoffErrorThreshold}} | {{0}} | *Camel 2.12:* The number of subsequent error polls (failed due some error) that should happen before the {{backoffMultipler}} should kick-in. |
{div}

h3. 

Using

...

backoff

...

to

...

let

...

the

...

scheduler

...

be

...

less

...

aggressive

...

Available

...

as

...

of

...

Camel

...

2.12

...

The

...

scheduled

...

Polling

...

Consumer

...

is

...

by

...

default

...

static

...

by

...

using

...

the

...

same

...

poll

...

frequency

...

whether

...

or

...

not

...

there

...

is

...

messages

...

to

...

pickup

...

or

...

not.

...

From

...

Camel

...

2.12

...

onwards

...

you

...

can

...

configure

...

the

...

scheduled

...

Polling

...

Consumer

...

to

...

be

...

more

...

dynamic

...

by

...

using

...

backoff.

...

This

...

allows

...

the

...

scheduler

...

to

...

skip

...

N

...

number

...

of

...

polls

...

when

...

it

...

becomes

...

idle,

...

or

...

there

...

has

...

been

...

X

...

number

...

of

...

errors

...

in

...

a

...

row.

...

See

...

more

...

details

...

in

...

the

...

table

...

above

...

for

...

the

...

backoffXXX

...

options.

...

For

...

example

...

to

...

let

...

a

...

FTP

...

consumer

...

backoff

...

if

...

its

...

becoming

...

idle

...

for

...

a

...

while

...

you

...

can

...

do:

{
Code Block
}
    from("ftp://myserver?username=foo&passowrd=secret?delete=true&delay=5s&backoffMultiplier=6&backoffIdleThreshold=5")
      .to("bean:processFile");
{code}

In

...

this

...

example,

...

the

...

FTP

...

consumer

...

will

...

poll

...

for

...

new

...

FTP

...

files

...

evert

...

5th

...

second.

...

But

...

if

...

it

...

has

...

been

...

idle

...

for

...

5

...

attempts

...

in

...

a

...

row,

...

then

...

it

...

will

...


backoff

...

using

...

a

...

multiplier

...

of

...

6,

...

which

...

means

...

it

...

will

...

now

...

poll

...

every

...

5

...

x

...

6

...

=

...

30th

...

second

...

instead.

...

When

...

the

...

consumer

...

eventually

...

pickup

...

a

...

file,

...

then

...

the

...

backoff

...

will

...

reset,

...

and

...

the

...

consumer

...

will

...

go

...

back

...

and

...

poll

...

every

...

5th

...

second

...

again.

...

Camel

...

will

...

log

...

at

...

DEBUG

...

level

...

using

...

org.apache.camel.impl.ScheduledPollConsumer

...

when

...

backoff

...

is

...

kicking-in.

...

About error handling and scheduled polling consumers

ScheduledPollConsumer is scheduled based and its run method is invoked periodically based on schedule settings. But errors can also occur when a poll is being executed. For instance if Camel should poll a file network, and this network resource is not available then a java.io.IOException could occur. As this error happens before any Exchange has been created and prepared for routing, then the regular Error handling in Camel does not apply. So what does the consumer do then? Well the exception is propagated back to the run method where its handled. Camel will by default log the exception at WARN level and then ignore it. At next schedule the error could have been resolved and thus being able to poll the endpoint successfully.

Using a custom scheduler

Available as of Camel 2.12:

The SPI interface org.apache.camel.spi.ScheduledPollConsumerScheduler

...

allows

...

to

...

implement

...

a

...

custom

...

scheduler

...

to

...

control

...

when

...

the

...

Polling

...

Consumer

...

runs.

...

The

...

default

...

implementation

...

is

...

based

...

on

...

the

...

JDKs

...

ScheduledExecutorService

...

with

...

a

...

single

...

thread

...

in

...

the

...

thread

...

pool.

...

There

...

is

...

a

...

CRON

...

based

...

implementation

...

in

...

the

...

Quartz2

...

,

...

and

...

Spring

...

components.

...

For

...

an

...

example

...

of

...

developing

...

and

...

using

...

a

...

custom

...

scheduler,

...

see

...

the

...

unit

...

test

...

org.apache.camel.component.file.FileConsumerCustomSchedulerTest

...

from

...

the

...

source

...

code

...

in

...

camel-core.

Controlling the error handling using PollingConsumerPollStrategy

org.apache.camel.PollingConsumerPollStrategy

...

is

...

a

...

pluggable

...

strategy

...

that

...

you

...

can

...

configure

...

on

...

the

...

ScheduledPollConsumer

...

.

...

The

...

default

...

implementation

...

org.apache.camel.impl.DefaultPollingConsumerPollStrategy

...

will

...

log

...

the

...

caused

...

exception

...

at

...

WARN

...

level

...

and

...

then

...

ignore

...

this

...

issue.

...

The

...

strategy

...

interface

...

provides

...

the

...

following

...

3

...

methods

...

  • begin
    • void begin(Consumer

...

    • consumer,

...

    • Endpoint

...

    • endpoint)

...

  • begin

...

  • (

...

  • Camel

...

  • 2.3

...

  • )
    • boolean begin(Consumer

...

    • consumer,

...

    • Endpoint

...

    • endpoint)

...

  • commit
    • void commit(Consumer

...

    • consumer,

...

    • Endpoint

...

    • endpoint)

...

  • commit

...

  • (

...

  • Camel

...

  • 2.6

...

  • )
    • void commit(Consumer

...

    • consumer,

...

    • Endpoint

...

    • endpoint,

...

    • int

...

    • polledMessages)

...

  • rollback
    • boolean rollback(Consumer

...

    • consumer,

...

    • Endpoint

...

    • endpoint,

...

    • int

...

    • retryCounter,

...

    • Exception

...

    • e)

...

    • throws

...

    • Exception

...

In Camel 2.3

...

onwards

...

the

...

begin

...

method

...

returns

...

a

...

boolean

...

which

...

indicates

...

whether

...

or

...

not

...

to

...

skipping

...

polling.

...

So

...

you

...

can

...

implement

...

your

...

custom

...

logic

...

and

...

return

...

false

...

if

...

you

...

do

...

not

...

want

...

to

...

poll

...

this

...

time.

...

In

...

Camel

...

2.6

...

onwards

...

the

...

commit

...

method

...

has

...

an

...

additional

...

parameter

...

containing

...

the

...

number

...

of

...

message

...

that

...

was

...

actually

...

polled.

...

For

...

example

...

if

...

there

...

was

...

no

...

messages

...

polled,

...

the

...

value

...

would

...

be

...

zero,

...

and

...

you

...

can

...

react

...

accordingly.

...

The

...

most

...

interesting

...

is

...

the

...

rollback

...

as

...

it

...

allows

...

you

...

do

...

handle

...

the

...

caused

...

exception

...

and

...

decide

...

what

...

to

...

do.

...

For

...

instance

...

if

...

we

...

want

...

to

...

provide

...

a

...

retry

...

feature

...

to

...

a

...

scheduled

...

consumer

...

we

...

can

...

implement

...

the

...

PollingConsumerPollStrategy

...

method

...

and

...

put

...

the

...

retry

...

logic

...

in

...

the

...

rollback

...

method.

...

Lets

...

just

...

retry

...

up

...

till

...

3

...

times:

Code Block
java
java

{code:java}
    public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
        if (retryCounter < 3) {
            // return true to tell Camel that it should retry the poll immediately
            return true;
        }
        // okay we give up do not retry anymore
        return false;
    }
{code}

Notice

...

that

...

we

...

are

...

given

...

the

...

Consumer

...

as

...

a

...

parameter.

...

We

...

could

...

use

...

this

...

to

...

restart

...

the

...

consumer

...

as

...

we

...

can

...

invoke

...

stop

...

and

...

start:

{
Code Block
}
    // error occurred lets restart the consumer, that could maybe resolve the issue
    consumer.stop();
    consumer.start();
{code}

*

Notice:

...

If

...

you

...

implement

...

the

...

begin

...

operation

...

make

...

sure

...

to

...

avoid

...

throwing

...

exceptions

...

as

...

in

...

such

...

a

...

case

...

the

...

poll

...

operation

...

is

...

not

...

invoked

...

and

...

Camel

...

will

...

invoke

...

the

...

rollback

...

directly.

...

Configuring

...

an

...

Endpoint

...

to

...

use PollingConsumerPollStrategy

To configure an Endpoint to use a custom PollingConsumerPollStrategy you use the option pollStrategy. For example in the file consumer below we want to use our custom strategy defined in the Registry with the bean id myPoll:

Code Block
 {{PollingConsumerPollStrategy}}
To configure an [Endpoint] to use a custom {{PollingConsumerPollStrategy}} you use the option {{pollStrategy}}. For example in the file consumer below we want to use our custom strategy defined in the [Registry] with the bean id {{myPoll}}:
{code}
    from("file://inbox/?pollStrategy=#myPoll").to("activemq:queue:inbox")
{code}

{include:Using This Pattern}

h3. See Also
- [POJO Consuming]
- [Batch Consumer]
Include Page
Using This Pattern
Using This Pattern

See Also