...
Wiki Markup |
---|
{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java} |
Using AggregateController
Available as of Camel 2.16
The org.apache.camel.processor.aggregate.AggregateController
allows you to control the aggregate at runtime using Java or JMX API. This can be used to force completing groups of exchanges, or query its current runtime statistics.
The aggregator provides a default implementation if no custom have been configured, which can be accessed using getAggregateController()
method. Though it may be easier to configure a controller in the route using aggregateController as shown below:
Code Block |
---|
private AggregateController controller = new DefaultAggregateController();
from("direct:start")
.aggregate(header("id"), new MyAggregationStrategy()).completionSize(10).id("myAggregator")
.aggregateController(controller)
.to("mock:aggregated"); |
Then there is API on AggregateController to force completion. For example to complete a group with key foo
Code Block |
---|
int groups = controller.forceCompletionOfGroup("foo"); |
The number return would be the number of groups completed. In this case it would be 1 if the foo group existed and was completed. If foo does not exists then 0 is returned.
There is also an api to complete all groups
Code Block |
---|
int groups = controller.forceCompletionOfAllGroups(); |
To configure this from XML DSL
Code Block |
---|
<bean id="myController" class="org.apache.camel.processor.aggregate.DefaultAggregateController"/>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<aggregate strategyRef="myAppender" completionSize="10" aggregateControllerRef="myController">
<correlationExpression>
<header>id</header>
</correlationExpression>
<to uri="mock:result"/>
</aggregate>
</route>
</camelContext> |
There is also JMX API on the aggregator which is available under the processors node in the Camel JMX tree.
Using GroupedExchanges
In the route below we group all the exchanges together using groupExchanges()
:
...