Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
from("direct:start").resequencer(body()).batch().to("mock:result");

To define a custom configuration for the The batch-processing resequencer you should provide a configuration objectcan be further configured via the size() and timeout() methods.

Code Block
from("direct:start").resequencer(body()).batch(new BatchResequencerConfig).size(300, 4000L)).timeout(4000L).to("mock:result")

This sets the batchSize batch size to 300 and the batchTimeout batch timeout to 4000 ms (by default, the batch size is 100 and the timeout is 1000 ms). Alternatively, you can provide a configuration object.

Code Block

from("direct:start").resequencer(body()).batch(new BatchResequencerConfig(300, 4000L)).to("mock:result")

So the above example will reorder messages from endpoint direct:a in order of their bodies, to the endpoint mock:result. Typically you'd use a header rather than the body to order things; or maybe a part of the body. So you could replace this expression with

...

Wiki Markup
{snippet:id=example|lang=java|url=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java}

To define a custom configuration for the The stream-processing resequencer you should provide a configuration objectcan be further configured via the capacity() and timeout() methods.

Code Block
from("direct:start").resequencer(header("seqnum")).stream(new StreamResequencerConfig).capacity(5000, ).timeout(4000L)).to("mock:result")

This sets the resequencer's capacity to 5000 and the timeout to 4000 ms (by default, the capacity is 100 and the timeout is 1000 ms). Alternatively, you can provide a configuration object.

Code Block

from("direct:start").resequencer(header("seqnum")).stream(new StreamResequencerConfig(5000, 4000L)).to("mock:result")

The stream-processing resequencer algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence (i.e. the batch size) in advance. Messages must contain a unique sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the sucessor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs).

...

By default, the stream resequencer expects long sequence numbers but other sequence numbers types can be supported as well by providing custom comparatorsa custom comparator via the comparator() method

Code Block

ExpressionResultComparator<Exchange> comparator = new MyComparator();
from("direct:start").resequencer(header("seqnum")).stream().comparator(comparator).to("mock:result");

or via a StreamResequencerConfig object.

Code Block
ExpressionResultComparator<Exchange> comparator = new MyComparator();
StreamResequencerConfig config = new StreamResequencerConfig(5000100, 4000L1000L, comparator);
from("direct:start").resequencer(header("seqnum")).stream(config).to("mock:result");

...