...
This applies for Camel version 2.2 or older. If you use a newer version then the Aggregator has been rewritten from Camel 2.3 onwards on and you should use this Aggregator2 link instead.
...
A correlation Expression is used to determine the messages which should be aggregated together. If you want to aggregate all messages into a single message, just use a constant expression. An An AggregationStrategy
is used to combine all the message exchanges for a single correlation key into a single message exchange. The default strategy just chooses the latest message; so its ideal for throttling messages.
...
Info | ||
---|---|---|
| ||
In Camel 2.0 the On the first invocation of the |
Warning | ||
---|---|---|
| ||
You cannot use both both |
...
The following example shows how to aggregate messages so that only the latest message for a specific value of the cheese
header are sent.
Wiki Markup |
---|
{snippet:id=ex|lang=java|url=camel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java} |
JMSDestination
header as the correlation key; or some custom header for the stock symbol (using the above stock market example).Code Block |
---|
from("activemq:someReallyFastTopic") .aggregator(header("JMSDestination")) .to("activemq:someSlowTopicForGuis"); |
You can of course use many different Expression languages such as XPath, XQuery, SQL or various Scripting Languages.
Here is an example using XPath:
Code Block |
---|
//aggregate based on the message content using an XPath expression
//example assumes an XML document starting with <stockQuote symbol='...'>
//aggregate messages based on their symbol attribute within the <stockQuote> element
from("seda:start").aggregate().xpath("/stockQuote/@symbol", String.class).batchSize(5).to("mock:result");
//this example will aggregate all messages starting with <stockQuote symbol='APACHE'> into
//one exchange and all the other messages (different symbol or different root element) into another exchange.
from("seda:start").aggregate().xpath("name(/stockQuote[@symbol='APACHE'])", String.class).batchSize(5).to("mock:result");
|
...
Using the Spring XML Extensions
Info | ||
---|---|---|
The The
|
The following example shows how to create a simple aggregator using the XML notation; using an Expression for the correlation value used to aggregate messages together.
Wiki Markup |
---|
{snippet:id=example|lang=xml|url=camel/tags/camel-2.2.0/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml} |
AggregationStrategy
if you prefer as shown in the following exampleWiki Markup |
---|
{snippet:id=example|lang=xml|url=camel/tags/camel-2.2.0/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator-custom-strategy.xml} |
strategyRef
attribute is used on the the <aggregator>
element to refer to the custom strategy in Spring....
The aggregator supports the following batch options:
Option | Default | Description |
---|---|---|
|
| The in batch size. This is the number of incoming exchanges that is processed by the aggregator and when this threshold is reached the batch is completed and send. Camel 1.6.2/2.0: You can disable the batch size so the Aggregator is only triggered by timeout by setting the In Camel 1.6.1 or older you can set the |
|
| Camel 1.5: The out batch size. This is the number of exchanges currently aggregated in the |
|
| Timeout in millis. How long should the aggregator wait before its completed and sends whatever it has currently aggregated. |
|
| Camel 2.0: If enabled then Camel will group all aggregated Exchanges into a single combined |
|
| Camel 2.0: This option is if the exchanges is coming from a Batch Consumer. Then when enabled the Aggregator will use the batch size determined by the Batch Consumer in the message header |
|
| Allows you to use a Predicate to signal when an aggregation is complete. See warning in top of this page. |
AggregationCollection
and and AggregationStrategy
This aggregator uses a a AggregationCollection
to store the exchanges that is currently aggregated. The The AggregationCollection
uses a correlation Expression and an AggregationStrategy
.
- The correlation Expression is used to correlate the incoming exchanges. The default implementation will group messages based on the correlation expression. Other implementations could for instance just add all exchanges as a batch.
- The strategy is used for aggregate the old (lookup by its correlation id) and the new exchanges together into a single exchange. Possible implementations include performing some kind of combining or delta processing, such as adding line items together into an invoice or just using the newest exchange and removing old exchanges such as for state tracking or market data prices; where old values are of little use.
Camel provides these implementations:
DefaultAggregationCollection
PredicateAggregationCollection
UseLatestAggregationStrategy
Examples
Default example
By default Camel uses DefaultAggregationCollection
and UseLatestAggregationStrategy
, so this simple example will just keep the latest received exchange for the given correlation Expression:
Wiki Markup |
---|
{snippet:id=e5|lang=java|url=camel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java} |
Using Using PredicateAggregationCollection
The The PredicateAggregationCollection
is an extension to to DefaultAggregationCollection
that uses a Predicate as well to determine the completion. For instance the Predicate can test for a special header value, a number of maximum aggregated so far etc. To use this the routing is a bit more complex as we need to create our our AggregationCollection
object as follows:
Wiki Markup |
---|
{snippet:id=e1|lang=java|url=camel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java} |
Code Block | ||||
---|---|---|---|---|
| ||||
header(Exchange.AGGREGATED_COUNT).isEqualTo(3)
|
Using this the aggregator will complete if we receive 3 exchanges with the same correlation id or when the specified timeout of 500 msecs has elapsed (whichever criteria is met first).
Using
...
Custom Aggregation Strategy
In this example we will aggregate incoming bids and want to aggregate the highest bid. So we provide our own strategy where we implement the code logic:
Wiki Markup |
---|
{snippet:id=e3|lang=java|url=camel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java} |
Wiki Markup |
---|
{snippet:id=e1|lang=java|url=camel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java} |
winners
:Wiki Markup |
---|
{snippet:id=e2|lang=java|url=camel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java} |
Using
...
Custom Aggregation Collection
In this example we will aggregate incoming bids and want to aggregate the bids in reverse order (this is just an example). So we provide our own collection where we implement the code logic:
Wiki Markup |
---|
{snippet:id=e1|lang=java|url=camel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyReverseAggregationCollection.java} |
Wiki Markup |
---|
{snippet:id=e1|lang=java|url=camel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java} |
Wiki Markup |
---|
{snippet:id=e2|lang=java|url=camel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java} |
Custom aggregation collection in Spring DSL
You can also specify a custom aggregation collection in the Spring DSL. Here is an example for Camel 2.0
Wiki Markup |
---|
{snippet:id=example|lang=xml|url=camel/tags/camel-2.2.0/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator-custom-collection.xml} |
Code Block |
---|
<aggregator batchTimeout="500" collectionRef="aggregatorCollection">
<expression/>
<to uri="mock:result"/>
</aggregator>
|
...
You can enable grouped exchanges to combine all aggregated exchanges into a single org.apache.camel.impl.GroupedExchange
holder class that contains all the individual aggregated exchanges. This allows you to process a single Exchange containing all the aggregated exchange. Lets start with how to configure this in the router:
Wiki Markup |
---|
{snippet:id=e1|lang=java|url=camel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java} |
And we will only get 1 exchange out of the aggregator, but we can access all the individual aggregated exchanges from the List which we can extract as a property from the Exchange using the key
Exchange.GROUPED_EXCHANGE
.Wiki Markup |
---|
{snippet:id=e2|lang=java|url=camel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java} |
Using Batch Consumer
Available as of Camel 2.0
...
For example:
Code Block | ||||
---|---|---|---|---|
| ||||
from("file://inbox")
.aggregate(xpath("//order/@customerId"), new AggregateCustomerOrderStrategy()).batchConsumer().batchTimeout(60000).to("bean:processOrder");
|
When using batchConsumer
Camel will automatic adjust the the batchSize
according to reported by the Batch Consumer in this case the file consumer.
So if we poll in 7 files then the aggregator will aggregate all 7 files before it completes. As the timeout is still in play we set it to 60 seconds.
Include Page | ||||
---|---|---|---|---|
|
See also
- The Loan Broker Example which uses an aggregator
- Blog post by Torsten Mielke about using the aggregator correctly.