Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: typos

...

The following example shows how to use the batch-processing resequencer so that messages are sorted in order of the body() expression. That is messages are collected into a batch (either by a maximum number of messages per batch or using a timeout) then they are sorted in order and then sent out to their output.

Using the Fluent Builders

Wiki Markup
{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java}
This is equvalent equivalent to

Code Block

from("direct:start")
    .resequence(body()).batch()
    .to("mock:result");

The batch-processing resequencer can be further configured via the size() and timeout() methods.

Code Block

from("direct:start")
    .resequence(body()).batch().size(300).timeout(4000L)
    .to("mock:result")

This sets the batch size to 300 and the batch timeout to 4000 ms (by default, the batch size is 100 and the timeout is 1000 ms). Alternatively, you can provide a configuration object.

Code Block

from("direct:start")
    .resequence(body()).batch(new BatchResequencerConfig(300, 4000L))
    .to("mock:result")

So the above example will reorder messages from endpoint direct:a in order of their bodies, to the endpoint mock:result.
Typically you'd use a header rather than the body to order things; or maybe a part of the body. So you could replace this expression with

Code Block

resequencer(header("mySeqNo"))

...

Using the Spring XML Extensions

Code Block
xml
xml

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start" />
    <resequence>
      <simple>body</simple>
      <to uri="mock:result" />
      <!-- 
        batch-config can be ommitted for default (batch) resequencer settings
      -->
      <batch-config batchSize="300" batchTimeout="4000" />
    </resequence>
  </route>
</camelContext>

...

It's now much easier to use the Resequencer to resequence messages from JMS queues based on JMSPriority. For that to work you need to use the two new options allowDuplicates and reverse.

Wiki Markup
{snippet:id=e1|lang=java|url=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsBatchResequencerJMSPriorityTest.java}
Notice this is only possible in the batch mode of the Resequencer.

...

Available as of Camel 2.9

The resequencer Resequencer EIP will from Camel 2.9 onwards throw a CamelExchangeException if the incoming Exchange is not valid for the resequencer - ie. the expression cannot be evaluated, such as a missing header. You can use the option ignoreInvalidExchanges to ignore these exceptions which means the Resequencer will then skip the invalid Exchange.

Wiki Markup
{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java}
This option is available for both batch and stream resequencer.

...

This option can be used to prevent out of order messages from being sent regardless of the event that delivered messages downstream (capacity, timeout, etc). If enabled using rejectOld(), the Resequencer will throw a MessageRejectedException when an incoming Exchange is "older" (based on the Comparator) than the last delivered message. This provides an extra level of control with regards to delayed message ordering.

Code Block

from("direct:start")
    .onException(MessageRejectedException.class).handled(true).to("mock:error").end()
    .resequence(header("seqno")).stream().timeout(1000).rejectOld()
    .to("mock:result");

...

The next example shows how to use the stream-processing resequencer. Messages are re-ordered based on their sequence numbers given by a seqnum header using gap detection and timeouts on the level of individual messages.

Using the Fluent Builders

Wiki Markup
{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java}
The stream-processing resequencer can be further configured via the capacity() and timeout() methods.

Code Block

from("direct:start")
    .resequence(header("seqnum")).stream().capacity(5000).timeout(4000L)
    .to("mock:result")

This sets the resequencer's capacity to 5000 and the timeout to 4000 ms (by default, the capacity is 1000 and the timeout is 1000 ms). Alternatively, you can provide a configuration object.

Code Block

from("direct:start")
    .resequence(header("seqnum")).stream(new StreamResequencerConfig(5000, 4000L))
    .to("mock:result")

The stream-processing resequencer algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence (i.e. the batch size) in advance. Messages must contain a unique sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the sucessor successor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs).

...

By default, the stream resequencer expects long sequence numbers but other sequence numbers types can be supported as well by providing a custom expression.

Wiki Markup
{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/MyFileNameExpression.java}
Wiki Markup
{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerFileNameTest.java}
or custom comparator via the comparator() method

Code Block

ExpressionResultComparator<Exchange> comparator = new MyComparator();
from("direct:start")
    .resequence(header("seqnum")).stream().comparator(comparator)
    .to("mock:result");

or via a StreamResequencerConfig object.

Code Block

ExpressionResultComparator<Exchange> comparator = new MyComparator();
StreamResequencerConfig config = new StreamResequencerConfig(100, 1000L, comparator);

from("direct:start")
    .resequence(header("seqnum")).stream(config)
    .to("mock:result");

Using the Spring XML Extensions

Code Block
xml
xml

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <resequence>
      <simple>in.header.seqnum</simple>
      <to uri="mock:result" />
      <stream-config capacity="5000" timeout="4000"/>
    </resequence>
  </route>
</camelContext>

...

For further examples of this pattern in use you could look at the batch-processing resequencer junit test case and the stream-processing resequencer junit test case

Include Page
Using This Pattern
Using This Pattern