Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Wiki Markup
{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java}

Using AggregateController

Available as of Camel 2.16

The org.apache.camel.processor.aggregate.AggregateController allows you to control the aggregate at runtime using Java or JMX API. This can be used to force completing groups of exchanges, or query its current runtime statistics.

The aggregator provides a default implementation if no custom have been configured, which can be accessed using getAggregateController() method. Though it may be easier to configure a controller in the route using aggregateController as shown below:

Code Block
private AggregateController controller = new DefaultAggregateController();
 
from("direct:start")
   .aggregate(header("id"), new MyAggregationStrategy()).completionSize(10).id("myAggregator")
      .aggregateController(controller)
      .to("mock:aggregated");

Then there is API on AggregateController to force completion. For example to complete a group with key foo

Code Block
int groups = controller.forceCompletionOfGroup("foo");

The number return would be the number of groups completed. In this case it would be 1 if the foo group existed and was completed. If foo does not exists then 0 is returned.

There is also an api to complete all groups

Code Block
int groups = controller.forceCompletionOfAllGroups();

 

To configure this from XML DSL

Code Block
<bean id="myController" class="org.apache.camel.processor.aggregate.DefaultAggregateController"/>
 
  <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="direct:start"/>
            <aggregate strategyRef="myAppender" completionSize="10" aggregateControllerRef="myController">
                <correlationExpression>
                    <header>id</header>
                </correlationExpression>
                <to uri="mock:result"/>
            </aggregate>
        </route>
    </camelContext>

 

There is also JMX API on the aggregator which is available under the processors node in the Camel JMX tree.

 

Using GroupedExchanges

In the route below we group all the exchanges together using groupExchanges():

...