Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: added forceCompletion to possible values of CamelAggregatedCompletedBy

...

Wiki Markup
{div:class=confluenceTableSmall}
|| header || type || description ||
| {{CamelAggregatedSize}} | int | The total number of Exchanges aggregated into this combined Exchange. | 
| {{CamelAggregatedCompletedBy}} | String | Indicator how the aggregation was completed as a value of either: {{predicate}}, {{size}}, {{consumer}}, {{timeout}}, {{forceCompletion}} or {{interval}}. |
{div}

About AggregationStrategy

...

Here are a few example AggregationStrategy implementations that should help you create your own custom strategy.

Code Block

//simply combines Exchange String body values using '+' as a delimiter
class StringAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }

        String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);
        oldExchange.getIn().setBody(oldBody + "+" + newBody);
        return oldExchange;
    }
}

//simply combines Exchange body values into an ArrayList<Object>
class ArrayListAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
	Object newBody = newExchange.getIn().getBody();
	ArrayList<Object> list = null;
        if (oldExchange == null) {
		list = new ArrayList<Object>();
		list.add(newBody);
		newExchange.getIn().setBody(list);
		return newExchange;
        } else {
	        list = oldExchange.getIn().getBody(ArrayList.class);
		list.add(newBody);
		return oldExchange;
	}
    }
}

...

Note: You can also add a fixed size value and Camel will fallback to use this value if the dynamic value was null or 0.

Include Page
Using This Pattern
Using This Pattern

Manually Force the Completion of All Aggregated Exchanges Immediately

...

In the route below we group all the exchanges together using groupExchanges():

Code Block

                from("direct:start")
                    // aggregate all using same expression
                    .aggregate(constant(true))
                    // wait for 0.5 seconds to aggregate
                    .completionTimeout(500L)
                    // group the exchanges so we get one single exchange containing all the others
                    .groupExchanges()
                    .to("mock:result");

As a result we have one outgoing Exchange being routed the the "mock:result" endpoint. The exchange is a holder containing all the incoming Exchanges.
To get access to these exchanges you need to access them from a property on the outgoing exchange as shown:

Code Block

List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);

From Camel 2.13 onwards this behavior has changed to store these exchanges directly on the message body which is more intuitive:

Code Block

List<Exchange> grouped = exchange.getIn().getBody(List.class);

...

In the method below, we have only 2 parameters, so the 1st parameter is the body of the oldExchange, and the 2nd is paired to the body of the newExchange:

Code Block

public String append(String existing, String next) {
  return existing + next;
}

In the method below, we have only 4 parameters, so the 1st parameter is the body of the oldExchange, and the 2nd is the Map of the oldExchange} headers, and the 3rd is paired to the body of the {{newExchange, and the 4th parameter is the Map of the newExchange headers:

Code Block

public String append(String existing, Map existingHeaders, String next, Map nextHeaders) {
  return existing + next;
}

And finally if we have 6 parameters the we also have the properties of the Exchanges:

Code Block

public String append(String existing, Map existingHeaders, Map existingProperties, String next, Map nextHeaders, Map nextProperties) {
  return existing + next;
}

To use this with the Aggregate EIP we can use a POJO with the aggregate logic as follows:

Code Block

public class MyBodyAppender {

    public String append(String existing, String next) {
        return next + existing;
    }

}

And then in the Camel route we create an instance of our bean, and then refer to the bean in the route using bean method from org.apache.camel.util.toolbox.AggregationStrategies as shown:

Code Block

    private MyBodyAppender appender = new MyBodyAppender();

    public void configure() throws Exception {
        from("direct:start")
            .aggregate(constant(true), AggregationStrategies.bean(appender, "append"))
                .completionSize(3)
                .to("mock:result");
    }

We can also provide the bean type directly:

Code Block

    public void configure() throws Exception {
        from("direct:start")
            .aggregate(constant(true), AggregationStrategies.bean(MyBodyAppender.class, "append"))
                .completionSize(3)
                .to("mock:result");
    }

And if the bean has only one method we do not need to specify the name of the method:

Code Block

    public void configure() throws Exception {
        from("direct:start")
            .aggregate(constant(true), AggregationStrategies.bean(MyBodyAppender.class))
                .completionSize(3)
                .to("mock:result");
    }

And the append method could be static:

Code Block

public class MyBodyAppender {

    public static String append(String existing, String next) {
        return next + existing;
    }

}

If you are using XML DSL then we need to declare a <bean> with the POJO:

Code Block
xml
xml

    <bean id="myAppender" class="com.foo.MyBodyAppender"/>

And in the Camel route we use strategyRef to refer to the bean by its id, and the strategyMethodName can be used to define the method name to call:

Code Block
xml
xml

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="direct:start"/>
            <aggregate strategyRef="myAppender" strategyMethodName="append" completionSize="3">
                <correlationExpression>
                    <constant>true</constant>
                </correlationExpression>
                <to uri="mock:result"/>
            </aggregate>
        </route>
    </camelContext>

...

Though with POJOs as AggregationStrategy we made this simpler and only call the method when oldExchange and newExchange is not null, as that would be the most common use-case. If you need to allow oldExchange or newExchange to be null, then you can configure this with the POJO using the AggregationStrategyBeanAdapter as shown below. On the bean adapter we call setAllowNullNewExchange to allow the new exchange to be null.

Code Block

    public void configure() throws Exception {
        AggregationStrategyBeanAdapter myStrategy = new AggregationStrategyBeanAdapter(appender, "append");
        myStrategy.setAllowNullOldExchange(true);
        myStrategy.setAllowNullNewExchange(true);

        from("direct:start")
            .pollEnrich("seda:foo", 1000, myStrategy)
                .to("mock:result");
    }

This can be configured a bit easier using the beanAllowNull method from AggregationStrategies as shown:

Code Block

    public void configure() throws Exception {
        from("direct:start")
            .pollEnrich("seda:foo", 1000, AggregationStrategies.beanAllowNull(appender, "append"))
                .to("mock:result");
    }

Then the append method in the POJO would need to deal with the situation that newExchange can be null:

Code Block

    public class MyBodyAppender {

        public String append(String existing, String next) {
            if (next == null) {
                return "NewWasNull" + existing;
            } else {
                return existing + next;
            }
        }

    }

...

In XML DSL you would configure the strategyMethodAllowNull option and set it to true as shown below:

Code Block
xml
xml

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="direct:start"/>
            <aggregate strategyRef="myAppender" strategyMethodName="append" strategyMethodAllowNull="true" completionSize="3">
                <correlationExpression>
                    <constant>true</constant>
                </correlationExpression>
                <to uri="mock:result"/>
            </aggregate>
        </route>
    </camelContext>

...

When for example using strategyMethodAllowNull as true, then the parameter types of the message bodies does not have to be the same. For example suppose we want to aggregate from a com.foo.User type to a List<String> that contains the user name. We could code a POJO doing this as follows:

Code Block

    public static final class MyUserAppender {

        public List addUsers(List names, User user) {
            if (names == null) {
                names = new ArrayList();
            }
            names.add(user.getName());
            return names;
        }
    }

...