Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Aggregator

This applies for Camel version 2.2 or older. If you use a newer version then the Aggregator has been rewritten from Camel 2.3 onwards and you should use this Aggregator2 link instead.

The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message.

...

Warning
titleBatchTimeout and CompletionPredicate in Camel 2.2 or older

You cannot use both batchTimeout and completionPredicate to trigger a completion based on either on reaching its goal first. The batch timeout will always trigger first, at that given interval.
This has been fixed in Camel 2.3, where either on reaching its goal first triggers the aggregated message to be sent. See also section Restrictions when using completion predicate.

Using the Fluent Builders

Using the Fluent Builders

The following example shows how to aggregate messages so that only the latest message for a specific value of the cheese header are sent.

Wiki Markup
{snippet:id=ex|lang=java|url=camel/trunk/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java}

...

For further examples of this pattern in use you could look at the junit test case

Using the Spring XML Extensions

...

Wiki Markup
{snippet:id=example|lang=xml|url=camel/trunktags/camel-2.2.0/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml}

...

Wiki Markup
{snippet:id=example|lang=xml|url=camel/trunktags/camel-2.2.0/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator-custom-strategy.xml}

...

Option

Default

Description

batchSize

100

The in batch size. This is the number of incoming exchanges that is processed by the aggregator and when this threshold is reached the batch is completed and send. Camel 1.6.2/2.0: You can disable the batch size so the Aggregator is only triggered by timeout by setting the batchSize to 0 (or negative). In Camel 1.6.1 or older you can set the batchSize to a very large number to archive the same.

outBatchSize

0

Camel 1.5: The out batch size. This is the number of exchanges currently aggregated in the AggregationCollection. When this threshold is reached the batch is completed and send. By default this option is disabled. The difference to the batchSize options is that this is for outgoing, so setting this size to e.g. 50 ensures that this batch will at maximum contain 50 exchanges when its sent.

batchTimeout

1000L

Timeout in millis. How long should the aggregator wait before its completed and sends whatever it has currently aggregated.

groupExchanges

false

Camel 2.0: If enabled then Camel will group all aggregated Exchanges into a single combined org.apache.camel.impl.GroupedExchange holder class that holds all the aggregated Exchanges. And as a result only one Exchange is being sent out from the aggregator. Can be used to combine many incomming Exchanges into a single output Exchange without coding a custom AggregationStrategy yourself.

batchConsumer

false

Camel 2.0: This option is if the exchanges is coming from a Batch Consumer. Then when enabled the Aggregator will use the batch size determined by the Batch Consumer in the message header CamelBatchSize. See more details at Batch Consumer. This can be used to aggregate all files consumed from a File endpoint in that given poll.

completionPredicate

null

Allows you to use a Predicate to signal when an aggregation is complete. See warning in top of this page and the section Restrictions when using completion predicate.

AggregationCollection and AggregationStrategy

...

  • DefaultAggregationCollection
  • PredicateAggregationCollection
  • UseLatestAggregationStrategy

Restrictions when using completion predicate

Applies for Camel 2.3

When using completionPredicate there is the following restrictions:

  • The completion predicate is being evaluated before the AggregationStrategy which means it does not have access to data that may have been computer/set within the AggregationStrategy
  • The completion predicate does not have access to the Exchange properties Exchange.AGGREGATED_INDEX, Exchange.AGGREGATED_SIZE as they are set after the predicate has been evaluated.
  • The completion predicate is being evaluated twice per Exchange.

The restrictions is subject to change as we would like to overhaul the Aggregator in the future.

Examples

Default example

Examples

Default example

By default Camel uses DefaultAggregationCollection and UseLatestAggregationStrategy, so this simple example will just keep the latest received exchange for By default Camel uses DefaultAggregationCollection and UseLatestAggregationStrategy, so this simple example will just keep the latest received exchange for the given correlation Expression:

Wiki Markup
{snippet:id=e5|lang=java|url=camel/trunkcamel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java}

...

Wiki Markup
{snippet:id=e1|lang=java|url=camel/trunk/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java}

...

Wiki Markup
{snippet:id=e3|lang=java|url=camel/trunktags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java}

...

Wiki Markup
{snippet:id=e1|lang=java|url=camel/trunk/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java}

...

Wiki Markup
{snippet:id=e2|lang=java|url=camel/trunktags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java}

...

Wiki Markup
{snippet:id=e1|lang=java|url=camel/trunk/camel/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyReverseAggregationCollection.java}

...

Wiki Markup
{snippet:id=e1|lang=java|url=camel/trunk/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java}

...

Wiki Markup
{snippet:id=e2|lang=java|url=camel/trunk/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java}

...

Wiki Markup
{snippet:id=example|lang=xml|url=camel/trunk/tags/camel-2.2.0/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator-custom-collection.xml}

...

Wiki Markup
{snippet:id=e1|lang=java|url=camel/trunk/tags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java}

...

Wiki Markup
{snippet:id=e2|lang=java|url=camel/trunktags/camel-2.2.0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java}

...