You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Enterprise Integration Patterns

Camel supports most of the Enterprise Integration Patterns from the excellent book of the same name by Gregor Hohpe and Bobby Woolf. Its a highly recommended book, particularly for users of Camel.

There now follows a list of the Enterprise Integration Patterns from the book along with examples of the various patterns using Apache Camel

Messaging Systems

Message Channel

How does one application communicate with another using messaging?

Message

How can two applications connected by a message channel exchange a piece of information?

Pipes and Filters

How can we perform complex processing on a message while maintaining independence and flexibility?

Message Router

How can you decouple individual processing steps so that messages can be passed to different filters depending on a set of conditions?

Message Translator

How can systems using different data formats communicate with each other using messaging?

Message Endpoint

How does an application connect to a messaging channel to send and receive messages?

Messaging Channels

Point to Point Channel

How can the caller be sure that exactly one receiver will receive the document or perform the call?

Publish Subscribe Channel

How can the sender broadcast an event to all interested receivers?

Dead Letter Channel

What will the messaging system do with a message it cannot deliver?

Guaranteed Delivery

How can the sender make sure that a message will be delivered, even if the messaging system fails?

Message Bus

What is an architecture that enables separate applications to work together, but in a de-coupled fashion such that applications can be easily added or removed without affecting the others?

Message Construction

Event Message

How can messaging be used to transmit events from one application to another?

Request Reply

When an application sends a message, how can it get a response from the receiver?

Correlation Identifier

How does a requestor that has received a reply know which request this is the reply for?

Return Address

How does a replier know where to send the reply?

Message Routing

Content Based Router

How do we handle a situation where the implementation of a single logical function (e.g., inventory check) is spread across multiple physical systems?

Message Filter

How can a component avoid receiving uninteresting messages?

Dynamic Router

How can you avoid the dependency of the router on all possible destinations while maintaining its efficiency?

Recipient List

How do we route a message to a list of (static or dynamically) specified recipients?

Splitter

How can we process a message if it contains multiple elements, each of which may have to be processed in a different way?

Aggregator

How do we combine the results of individual, but related messages so that they can be processed as a whole?

Resequencer

How can we get a stream of related but out-of-sequence messages back into the correct order?

Composed Message Processor

How can you maintain the overall message flow when processing a message consisting of multiple elements, each of which may require different processing?

Scatter-Gather

How do you maintain the overall message flow when a message needs to be sent to multiple recipients, each of which may send a reply?

Routing Slip

How do we route a message consecutively through a series of processing steps when the sequence of steps is not known at design-time and may vary for each message?

Throttler

How can I throttle messages to ensure that a specific endpoint does not get overloaded, or we don't exceed an agreed SLA with some external service?

Sampling

How can I sample one message out of many in a given period to avoid downstream route does not get overloaded?

Delayer

How can I delay the sending of a message?

Load Balancer

How can I balance load across a number of endpoints?

 

Hystrix

To use Hystrix Circuit Breaker when calling an external service.

 

Service Call

To call a remote service in a distributed system where the service is looked up from a service registry of some sorts.

Multicast

How can I route a message to a number of endpoints at the same time?

Loop

How can I repeat processing a message in a loop?

Message Transformation

Content Enricher

How do we communicate with another system if the message originator does not have all the required data items available?

Content Filter

How do you simplify dealing with a large message, when you are interested only in a few data items?

Claim Check

How can we reduce the data volume of message sent across the system without sacrificing information content?

Normalizer

How do you process messages that are semantically equivalent, but arrive in a different format?

Sort

How can I sort the body of a message?

 

Script

How do I execute a script which may not change the message?

Validate

How can I validate a message?

Messaging Endpoints

Messaging Mapper

How do you move data between domain objects and the messaging infrastructure while keeping the two independent of each other?

Event Driven Consumer

How can an application automatically consume messages as they become available?

Polling Consumer

How can an application consume a message when the application is ready?

Competing Consumers

How can a messaging client process multiple messages concurrently?

Message Dispatcher

How can multiple consumers on a single channel coordinate their message processing?

Selective Consumer

How can a message consumer select which messages it wishes to receive?

Durable Subscriber

How can a subscriber avoid missing messages while it's not listening for them?

Idempotent Consumer

How can a message receiver deal with duplicate messages?

Transactional Client

How can a client control its transactions with the messaging system?

Messaging Gateway

How do you encapsulate access to the messaging system from the rest of the application?

Service Activator

How can an application design a service to be invoked both via various messaging technologies and via non-messaging techniques?

System Management

ControlBus

How can we effectively administer a messaging system that is distributed across multiple platforms and a wide geographic area?

Detour

How can you route a message through intermediate steps to perform validation, testing or debugging functions?

Wire Tap

How do you inspect messages that travel on a point-to-point channel?

Message History

How can we effectively analyze and debug the flow of messages in a loosely coupled system?

Log

How can I log processing a message?

Messaging Systems

Error rendering macro 'include'

org.owasp.validator.html.ScanException: java.util.EmptyStackException

Message

Camel supports the Message from the EIP patterns using the Message interface.

To support various message exchange patterns like one way Event Message and Request Reply messages Camel uses an Exchange interface which has a pattern property which can be set to InOnly for an Event Message which has a single inbound Message, or InOut for a Request Reply where there is an inbound and outbound message.

Here is a basic example of sending a Message to a route in InOnly and InOut modes

Requestor Code

//InOnly
getContext().createProducerTemplate().sendBody("direct:startInOnly", "Hello World");

//InOut
String result = (String) getContext().createProducerTemplate().requestBody("direct:startInOut", "Hello World");

Route Using the Fluent Builders

from("direct:startInOnly").inOnly("bean:process");

from("direct:startInOut").inOut("bean:process");

Route Using the Spring XML Extensions

<route>
  <from uri="direct:startInOnly"/>
  <inOnly uri="bean:process"/>
</route>

<route>
  <from uri="direct:startInOut"/>
  <inOut uri="bean:process"/>
</route>

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Pipes and Filters

Camel supports the Pipes and Filters from the EIP patterns in various ways.

With Camel you can split your processing across multiple independent Endpoint instances which can then be chained together.

Using Routing Logic

You can create pipelines of logic using multiple Endpoint or Message Translator instances as follows{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java}Though pipeline is the default mode of operation when you specify multiple outputs in Camel. The opposite to pipeline is multicast; which fires the same message into each of its outputs. (See the example below).

In Spring XML you can use the <pipeline/> element

xml<route> <from uri="activemq:SomeQueue"/> <pipeline> <bean ref="foo"/> <bean ref="bar"/> <to uri="activemq:OutputQueue"/> </pipeline> </route>

In the above the pipeline element is actually unnecessary, you could use this:

xml<route> <from uri="activemq:SomeQueue"/> <bean ref="foo"/> <bean ref="bar"/> <to uri="activemq:OutputQueue"/> </route>

which is a bit more explicit.

However if you wish to use <multicast/> to avoid a pipeline - to send the same message into multiple pipelines - then the <pipeline/> element comes into its own:

xml<route> <from uri="activemq:SomeQueue"/> <multicast> <pipeline> <bean ref="something"/> <to uri="log:Something"/> </pipeline> <pipeline> <bean ref="foo"/> <bean ref="bar"/> <to uri="activemq:OutputQueue"/> </pipeline> </multicast> </route>

In the above example we are routing from a single Endpoint to a list of different endpoints specified using URIs. If you find the above a bit confusing, try reading about the Architecture or try the Examples

Using This Pattern

Error rendering macro 'include'

null

Message Translator

Camel supports the Message Translator from the EIP patterns by using an arbitrary Processor in the routing logic, by using a bean to perform the transformation, or by using transform() in the DSL. You can also use a Data Format to marshal and unmarshal messages in different encodings.

Using the Fluent Builders

You can transform a message using Camel's Bean Integration to call any method on a bean in your Registry such as your Spring XML configuration file as follows

from("activemq:SomeQueue"). beanRef("myTransformerBean", "myMethodName"). to("mqseries:AnotherQueue");

Where the "myTransformerBean" would be defined in a Spring XML file or defined in JNDI etc. You can omit the method name parameter from beanRef() and the Bean Integration will try to deduce the method to invoke from the message exchange.

or you can add your own explicit Processor to do the transformation

{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformTest.java}

or you can use the DSL to explicitly configure the transformation

{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java}

Use Spring XML

You can also use Spring XML Extensions to do a transformation. Basically any Expression language can be substituted inside the transform element as shown below

{snippet:id=example|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/transformWithExpressionContext.xml}

Or you can use the Bean Integration to invoke a bean

<route> <from uri="activemq:Input"/> <bean ref="myBeanName" method="doTransform"/> <to uri="activemq:Output"/> </route>

You can also use Templating to consume a message from one destination, transform it with something like Velocity or XQuery and then send it on to another destination. For example using InOnly (one way messaging)

from("activemq:My.Queue"). to("velocity:com/acme/MyResponse.vm"). to("activemq:Another.Queue");

If you want to use InOut (request-reply) semantics to process requests on the My.Queue queue on ActiveMQ with a template generated response, then sending responses back to the JMSReplyTo Destination you could use this.

from("activemq:My.Queue"). to("velocity:com/acme/MyResponse.vm"); Using This Pattern

Message Endpoint

Camel supports the Message Endpoint from the EIP patterns using the Endpoint interface.

When using the DSL to create Routes you typically refer to Message Endpoints by their URIs rather than directly using the Endpoint interface. Its then a responsibility of the CamelContext to create and activate the necessary Endpoint instances using the available Component implementations.

Example

The following example route demonstrates the use of a File Consumer Endpoint and JMS Producer Endpoint


Using the Fluent Builders

from("file://local/router/messages/foo")
	.to("jms:queue:foo");

 

Using the Spring XML Extensions

<route>
	<from uri="file://local/router/messages/foo"/>
	<to uri="jms:queue:foo"/>
</route>

 

Dynamic To

Available as of Camel 2.16

There is a new <toD> that allows to send a message to a dynamic computed Endpoint using one or more Expression that are concat together. By default the Simple language is used to compute the endpoint. For example to send a message to a endpoint defined by a header you can do

<route>
  <from uri="direct:start"/>
  <toD uri="${header.foo}"/>
</route>

And in Java DSL

from("direct:start")
  .toD("${header.foo}");

 

You can also prefix the uri with a value because by default the uri is evaluated using the Simple language

<route>
  <from uri="direct:start"/>
  <toD uri="mock:${header.foo}"/>
</route>

And in Java DSL

from("direct:start")
  .toD("mock:${header.foo}");

In the example above we compute an endpoint that has prefix "mock:" and then the header foo is appended. So for example if the header foo has value order, then the endpoint is computed as "mock:order".

You can also use other languages than Simple such as XPath - this requires to prefix with language: as shown below (simple language is the default language). If you do not specify language: then the endpoint is a component name. And in some cases there is both a component and language with the same name such as xquery.

<route>
  <from uri="direct:start"/>
  <toD uri="language:xpath:/order/@uri"/>
</route>

This is done by specifying the name of the language followed by a colon.

from("direct:start")
  .toD("language:xpath:/order/@uri");

You can also concat multiple Language(s) together using the plus sign + such as shown below:

<route>
  <from uri="direct:start"/>
  <toD uri="jms:${header.base}+language:xpath:/order/@id"/>
</route>

In the example above the uri is a combination of Simple language and XPath where the first part is simple (simple is default language). And then the plus sign separate to another language, where we specify the language name followed by a colon

from("direct:start")
  .toD("jms:${header.base}+language:xpath:/order/@id");

You can concat as many languages as you want, just separate them with the plus sign

The Dynamic To has a few options you can configure

NameDefault ValueDescription
uri Mandatory: The uri to use. See above
pattern To set a specific Exchange Pattern to use when sending to the endpoint. The original MEP is restored afterwards.
cacheSize Allows to configure the cache size for the ProducerCache which caches producers for reuse. Will by default use the default cache size which is 1000. Setting the value to -1 allows to turn off the cache all together.
ignoreInvalidEndpointfalseWhether to ignore an endpoint URI that could not be resolved. If disabled, Camel will throw an exception identifying the invalid endpoint URI.

 

For more details see

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Messaging Channels

Error rendering macro 'include'

org.owasp.validator.html.ScanException: java.util.EmptyStackException

Error rendering macro 'include'

org.owasp.validator.html.ScanException: java.util.EmptyStackException

Error rendering macro 'include'

null

Guaranteed Delivery

Camel supports the Guaranteed Delivery from the EIP patterns using among others the following components:

  • File for using file systems as a persistent store of messages
  • JMS when using persistent delivery (the default) for working with JMS Queues and Topics for high performance, clustering and load balancing
  • JPA for using a database as a persistence layer, or use any of the many other database component such as SQL, JDBC, iBATIS/MyBatis, Hibernate
  • HawtDB for a lightweight key-value persistent store

Example

The following example demonstrates illustrates the use of Guaranteed Delivery within the JMS component. By default, a message is not considered successfully delivered until the recipient has persisted the message locally guaranteeing its receipt in the event the destination becomes unavailable.

Using the Fluent Builders

from("direct:start")
	.to("jms:queue:foo");

 

Using the Spring XML Extensions

<route>
	<from uri="direct:start"/>
	<to uri="jms:queue:foo"/>
</route>

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Error rendering macro 'include'

org.owasp.validator.html.ScanException: java.util.EmptyStackException

Message Routing

Content Based Router

The Content Based Router from the EIP patterns allows you to route messages to the correct destination based on the contents of the message exchanges.

The following example shows how to route a request from an input seda:a endpoint to either seda:b, seda:c or seda:d depending on the evaluation of various Predicate expressions

Using the Fluent Builders

 


RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        errorHandler(deadLetterChannel("mock:error"));
 
        from("direct:a")
            .choice()
                .when(header("foo").isEqualTo("bar"))
                    .to("direct:b")
                .when(header("foo").isEqualTo("cheese"))
                    .to("direct:c")
                .otherwise()
                    .to("direct:d");
    }
};

See Why can I not use when or otherwise in a Java Camel route if you have problems with the Java DSL, accepting using when or otherwise.

Using the Spring XML Extensions

 

<camelContext errorHandlerRef="errorHandler" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:a"/>
        <choice>
            <when>
                <xpath>$foo = 'bar'</xpath>
                <to uri="direct:b"/>
            </when>
            <when>
                <xpath>$foo = 'cheese'</xpath>
                <to uri="direct:c"/>
            </when>
            <otherwise>
                <to uri="direct:d"/>
            </otherwise>
        </choice>
    </route>
</camelContext>

For further examples of this pattern in use you could look at the junit test case

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Message Filter

The Message Filter from the EIP patterns allows you to filter messages

The following example shows how to create a Message Filter route consuming messages from an endpoint called queue:a, which if the Predicate is true will be dispatched to queue:b

Using the Fluent Builders{snippet:id=e2|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java}You can, of course, use many different Predicate languages such as XPath, XQuery, SQL or various Scripting Languages. Here is an XPath example{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/XPathFilterTest.java}Here's another example of using a bean to define the filter behavior:

javafrom("direct:start") .filter().method(MyBean.class, "isGoldCustomer").to("mock:result").end() .to("mock:end"); public static class MyBean { public boolean isGoldCustomer(@Header("level") String level) { return level.equals("gold"); } }

Using the Spring XML Extensions{snippet:id=example|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/xml/buildSimpleRouteWithHeaderPredicate.xml}You can also use a method call expression (to call a method on a bean) in the Message Filter, as shown below:

xml<bean id="myBean" class="com.foo.MyBean"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:a"/> <filter> <method ref="myBean" method="isGoldCustomer"/> <to uri="direct:b"/> </filter> </route> </camelContext>Filtered Endpoint Required Inside </filter> Tag

Ensure you put the endpoint you want to filter <to uri="seda:b"/> before the closing </filter> tag or the filter will not be applied. From Camel 2.8: omitting this will result in an error.

For further examples of this pattern in use you could look at the junit test case

Using stop()

Stop is a bit different than a message filter as it will filter out all messages and end the route entirely (filter only applies to its child processor). Stop is convenient to use in a Content Based Router when you for example need to stop further processing in one of the predicates.

In the example below we do not want to route messages any further that has the word Bye in the message body. Notice how we prevent this in the when() predicate by using the .stop().{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RouteStopTest.java}

How To Determine If An  Exchange Was Filtered

Available as of Camel 2.5

The Message Filter EIP will add a property on the Exchange that states if it was filtered or not.

The property has the key Exchange.FILTER_MATCHED, which has the String value of CamelFilterMatched. Its value is a boolean indicating true or false. If the value is true then the Exchange was routed in the filter block. This property will be visible within the Message Filter block who's Predicate matches (value set to true), and to the steps immediately following the Message Filter with the value set based on the results of the last Message Filter Predicate evaluated.

Using This Pattern

Error rendering macro 'include'

null

Error rendering macro 'include'

null

Resequencer

The Resequencer from the EIP patterns allows you to reorganise messages based on some comparator. By default in Camel we use an Expression to create the comparator; so that you can compare by a message header or the body or a piece of a message etc.

Change in Camel 2.7

The <batch-config> and <stream-config> tags in XML DSL in the Resequencer EIP must now be configured in the top, and not in the bottom. So if you use those, then move them up just below the <resequence> EIP starts in the XML. If you are using Camel older than 2.7, then those configs should be at the bottom.

Camel supports two resequencing algorithms:

  • Batch resequencing collects messages into a batch, sorts the messages and sends them to their output.
  • Stream resequencing re-orders (continuous) message streams based on the detection of gaps between messages.

By default the Resequencer does not support duplicate messages and will only keep the last message, in case a message arrives with the same message expression. However in the batch mode you can enable it to allow duplicates.

Batch Resequencing

The following example shows how to use the batch-processing resequencer so that messages are sorted in order of the body() expression. That is messages are collected into a batch (either by a maximum number of messages per batch or using a timeout) then they are sorted in order and then sent out to their output.

Using the Fluent Builders{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java}This is equivalent to

from("direct:start") .resequence(body()).batch() .to("mock:result");

The batch-processing resequencer can be further configured via the size() and timeout() methods.

from("direct:start") .resequence(body()).batch().size(300).timeout(4000L) .to("mock:result")

This sets the batch size to 300 and the batch timeout to 4000 ms (by default, the batch size is 100 and the timeout is 1000 ms). Alternatively, you can provide a configuration object.

from("direct:start") .resequence(body()).batch(new BatchResequencerConfig(300, 4000L)) .to("mock:result")

So the above example will reorder messages from endpoint direct:a in order of their bodies, to the endpoint mock:result.
Typically you'd use a header rather than the body to order things; or maybe a part of the body. So you could replace this expression with

resequencer(header("mySeqNo"))

for example to reorder messages using a custom sequence number in the header mySeqNo.

You can of course use many different Expression languages such as XPath, XQuery, SQL or various Scripting Languages.

Using the Spring XML Extensions

xml<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start" /> <resequence> <simple>body</simple> <to uri="mock:result" /> <!-- batch-config can be ommitted for default (batch) resequencer settings --> <batch-config batchSize="300" batchTimeout="4000" /> </resequence> </route> </camelContext>

Allow Duplicates

Available as of Camel 2.4

In the batch mode, you can now allow duplicates. In Java DSL there is a allowDuplicates() method and in Spring XML there is an allowDuplicates=true attribute on the <batch-config/> you can use to enable it.

Reverse

Available as of Camel 2.4

In the batch mode, you can now reverse the expression ordering. By default the order is based on 0..9,A..Z, which would let messages with low numbers be ordered first, and thus also also outgoing first. In some cases you want to reverse order, which is now possible.

In Java DSL there is a reverse() method and in Spring XML there is an reverse=true attribute on the <batch-config/> you can use to enable it.

Resequence JMS messages based on JMSPriority

Available as of Camel 2.4

It's now much easier to use the Resequencer to resequence messages from JMS queues based on JMSPriority. For that to work you need to use the two new options allowDuplicates and reverse.{snippet:id=e1|lang=java|url=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsBatchResequencerJMSPriorityTest.java}Notice this is only possible in the batch mode of the Resequencer.

Ignore invalid exchanges

Available as of Camel 2.9

The Resequencer EIP will from Camel 2.9 onwards throw a CamelExchangeException if the incoming Exchange is not valid for the resequencer - ie. the expression cannot be evaluated, such as a missing header. You can use the option ignoreInvalidExchanges to ignore these exceptions which means the Resequencer will then skip the invalid Exchange.{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java}This option is available for both batch and stream resequencer.

Reject Old Exchanges

Available as of Camel 2.11

This option can be used to prevent out of order messages from being sent regardless of the event that delivered messages downstream (capacity, timeout, etc). If enabled using rejectOld(), the Resequencer will throw a MessageRejectedException when an incoming Exchange is "older" (based on the Comparator) than the last delivered message. This provides an extra level of control with regards to delayed message ordering.

from("direct:start") .onException(MessageRejectedException.class).handled(true).to("mock:error").end() .resequence(header("seqno")).stream().timeout(1000).rejectOld() .to("mock:result");

This option is available for the stream resequencer only.

Stream Resequencing

The next example shows how to use the stream-processing resequencer. Messages are re-ordered based on their sequence numbers given by a seqnum header using gap detection and timeouts on the level of individual messages.

Using the Fluent Builders{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java}The stream-processing resequencer can be further configured via the capacity() and timeout() methods.

from("direct:start") .resequence(header("seqnum")).stream().capacity(5000).timeout(4000L) .to("mock:result")

This sets the resequencer's capacity to 5000 and the timeout to 4000 ms (by default, the capacity is 1000 and the timeout is 1000 ms). Alternatively, you can provide a configuration object.

from("direct:start") .resequence(header("seqnum")).stream(new StreamResequencerConfig(5000, 4000L)) .to("mock:result")

The stream-processing resequencer algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence (i.e. the batch size) in advance. Messages must contain a unique sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the successor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs).

If the maximum time difference between messages (with successor/predecessor relationship with respect to the sequence number) in a message stream is known, then the resequencer's timeout parameter should be set to this value. In this case it is guaranteed that all messages of a stream are delivered in correct order to the next processor. The lower the timeout value is compared to the out-of-sequence time difference the higher is the probability for out-of-sequence messages delivered by this resequencer. Large timeout values should be supported by sufficiently high capacity values. The capacity parameter is used to prevent the resequencer from running out of memory.

By default, the stream resequencer expects long sequence numbers but other sequence numbers types can be supported as well by providing a custom expression.{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/MyFileNameExpression.java}{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerFileNameTest.java}or custom comparator via the comparator() method

ExpressionResultComparator<Exchange> comparator = new MyComparator(); from("direct:start") .resequence(header("seqnum")).stream().comparator(comparator) .to("mock:result");

or via a StreamResequencerConfig object.

ExpressionResultComparator<Exchange> comparator = new MyComparator(); StreamResequencerConfig config = new StreamResequencerConfig(100, 1000L, comparator); from("direct:start") .resequence(header("seqnum")).stream(config) .to("mock:result");

Using the Spring XML Extensions

xml<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <resequence> <simple>in.header.seqnum</simple> <to uri="mock:result" /> <stream-config capacity="5000" timeout="4000"/> </resequence> </route> </camelContext>

Further Examples

For further examples of this pattern in use you could look at the batch-processing resequencer junit test case and the stream-processing resequencer junit test case

Using This Pattern

Message Transformation

Error rendering macro 'include'

null

Content Filter

Camel supports the Content Filter from the EIP patterns using one of the following mechanisms in the routing logic to transform content from the inbound message.

A common way to filter messages is to use an Expression in the DSL like XQuery, SQL or one of the supported Scripting Languages.

Using the Fluent Builders

Here is a simple example using the DSL directly

Error formatting macro: snippet: java.lang.NullPointerException

In this example we add our own Processor

Error formatting macro: snippet: java.lang.NullPointerException

For further examples of this pattern in use you could look at one of the JUnit tests

Using Spring XML

<route>
  <from uri="activemq:Input"/>
  <bean ref="myBeanName" method="doTransform"/>
  <to uri="activemq:Output"/>
</route>

You can also use XPath to filter out part of the message you are interested in:

<route>
  <from uri="activemq:Input"/>
  <setBody><xpath resultType="org.w3c.dom.Document">//foo:bar</xpath></setBody>
  <to uri="activemq:Output"/>
</route> 

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Normalizer

Camel supports the Normalizer from the EIP patterns by using a Message Router in front of a number of Message Translator instances.

Example

This example shows a Message Normalizer that converts two types of XML messages into a common format. Messages in this common format are then filtered.

Using the Fluent Builders

{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/NormalizerTest.java}

In this case we're using a Java bean as the normalizer. The class looks like this

{snippet:id=example|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MyNormalizer.java}

Using the Spring XML Extensions

The same example in the Spring DSL

{snippet:id=example|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/normalizer.xml}

See Also

Using This Pattern

Messaging Endpoints

Messaging Mapper

Camel supports the Messaging Mapper from the EIP patterns by using either Message Translator pattern or the Type Converter module.

Example

The following example demonstrates the use of a Bean component to map between two messaging system

Using the Fluent Builders

from("activemq:foo")
	.beanRef("transformerBean", "transform")
	.to("jms:bar");

 

Using the Spring XML Extensions

<route>
	<from uri="activemq:foo"/>
	<bean ref="transformerBean" method="transform" />
	<to uri="jms:bar"/>
</route>

See also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Error rendering macro 'include'

null

Error rendering macro 'include'

null

Competing Consumers

Camel supports the Competing Consumers from the EIP patterns using a few different components.

You can use the following components to implement competing consumers:-

  • Seda for SEDA based concurrent processing using a thread pool
  • JMS for distributed SEDA based concurrent processing with queues which support reliable load balancing, failover and clustering.

Enabling Competing Consumers with JMS

To enable Competing Consumers you just need to set the concurrentConsumers property on the JMS endpoint.

For example

from("jms:MyQueue?concurrentConsumers=5").bean(SomeBean.class);

or in Spring DSL

<route>
  <from uri="jms:MyQueue?concurrentConsumers=5"/>
  <to uri="bean:someBean"/>
</route>

Or just run multiple JVMs of any ActiveMQ or JMS route (smile)

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Message Dispatcher

Camel supports the Message Dispatcher from the EIP patterns using various approaches.

You can use a component like JMS with selectors to implement a Selective Consumer as the Message Dispatcher implementation. Or you can use an Endpoint as the Message Dispatcher itself and then use a Content Based Router as the Message Dispatcher.

 

Example

The following example demonstrates Message Dispatcher pattern using the Competing Consumers functionality of the JMS component to offload messages to a Content Based Router and custom Processors registered in the Camel Registry running in separate threads from originating consumer.

 

Using the Fluent Builders

from("jms:queue:foo?concurrentConsumers=5")
	.threads(5)
	.choice()
		.when(header("type").isEqualTo("A")) 
			.processRef("messageDispatchProcessorA")
		.when(header("type").isEqualTo("B"))
			.processRef("messageDispatchProcessorB")
		.when(header("type").isEqualTo("C"))
			.processRef("messageDispatchProcessorC")		
		.otherwise()
			.to("jms:queue:invalidMessageType");

 

Using the Spring XML Extensions

<route>
	<from uri="jms:queue:foo?concurrentConsumers=5"/>
	<threads poolSize="5">
		<choice>
			<when>
				<simple>${in.header.type} == 'A'</simple>
				<to ref="messageDispatchProcessorA"/>
			</when>
			<when>
				<simple>${in.header.type} == 'B'</simple>
				<to ref="messageDispatchProcessorB"/>
			</when>
			<when>
				<simple>${in.header.type} == 'C'</simple>
				<to ref="messageDispatchProcessorC"/>
			</when>
			<otherwise>
				<to uri="jms:queue:invalidMessageType"/>
		</choice>
	</threads>
</route>

See Also

 

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Selective Consumer

The Selective Consumer from the EIP patterns can be implemented in two ways

The first solution is to provide a Message Selector to the underlying URIs when creating your consumer. For example when using JMS you can specify a selector parameter so that the message broker will only deliver messages matching your criteria.

The other approach is to use a Message Filter which is applied; then if the filter matches the message your consumer is invoked as shown in the following example

Using the Fluent Builders

Error formatting macro: snippet: java.lang.NullPointerException

Using the Spring XML Extensions

Error formatting macro: snippet: java.lang.NullPointerException

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Durable Subscriber

Camel supports the Durable Subscriber from the EIP patterns using the JMS component which supports publish & subscribe using Topics with support for non-durable and durable subscribers.

Another alternative is to combine the Message Dispatcher or Content Based Router with File or JPA components for durable subscribers then something like Seda for non-durable.

Here is a simple example of creating durable subscribers to a JMS topic

Using the Fluent Builders

from("direct:start").to("activemq:topic:foo");

from("activemq:topic:foo?clientId=1&durableSubscriptionName=bar1").to("mock:result1");

from("activemq:topic:foo?clientId=2&durableSubscriptionName=bar2").to("mock:result2");

Using the Spring XML Extensions

<route>
    <from uri="direct:start"/>
    <to uri="activemq:topic:foo"/>
</route>

<route>
    <from uri="activemq:topic:foo?clientId=1&durableSubscriptionName=bar1"/>
    <to uri="mock:result1"/>
</route>

<route>
    <from uri="activemq:topic:foo?clientId=2&durableSubscriptionName=bar2"/>
    <to uri="mock:result2"/>
</route>

Here is another example of JMS durable subscribers, but this time using virtual topics (recommended by AMQ over durable subscriptions)

Using the Fluent Builders

from("direct:start").to("activemq:topic:VirtualTopic.foo");

from("activemq:queue:Consumer.1.VirtualTopic.foo").to("mock:result1");

from("activemq:queue:Consumer.2.VirtualTopic.foo").to("mock:result2");

Using the Spring XML Extensions

<route>
    <from uri="direct:start"/>
    <to uri="activemq:topic:VirtualTopic.foo"/>
</route>

<route>
    <from uri="activemq:queue:Consumer.1.VirtualTopic.foo"/>
    <to uri="mock:result1"/>
</route>

<route>
    <from uri="activemq:queue:Consumer.2.VirtualTopic.foo"/>
    <to uri="mock:result2"/>
</route>

See Also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Idempotent Consumer

The Idempotent Consumer from the EIP patterns is used to filter out duplicate messages.

This pattern is implemented using the IdempotentConsumer class. This uses an Expression to calculate a unique message ID string for a given message exchange; this ID can then be looked up in the IdempotentRepository to see if it has been seen before; if it has the message is consumed; if its not then the message is processed and the ID is added to the repository.

The Idempotent Consumer essentially acts like a Message Filter to filter out duplicates.

Camel will add the message id eagerly to the repository to detect duplication also for Exchanges currently in progress.
On completion Camel will remove the message id from the repository if the Exchange failed, otherwise it stays there.

Camel provides the following Idempotent Consumer implementations:

Options

The Idempotent Consumer has the following options:

Option

Default

Description

eager

true

Eager controls whether Camel adds the message to the repository before or after the exchange has been processed. If enabled before then Camel will be able to detect duplicate messages even when messages are currently in progress. By disabling Camel will only detect duplicates when a message has successfully been processed.

messageIdRepositoryRef

null

A reference to a IdempotentRepository to lookup in the registry. This option is mandatory when using XML DSL.

skipDuplicate

true

Camel 2.8: Sets whether to skip duplicate messages. If set to false then the message will be continued. However the Exchange has been marked as a duplicate by having the Exchange.DUPLICATE_MESSAG exchange property set to a Boolean.TRUE value.

removeOnFailure

true

Camel 2.9: Sets whether to remove the id of an Exchange that failed.

completionEagerfalse

Camel 2.16: Sets whether to complete the idempotent consumer eager or when the exchange is done.

If this option is true to complete eager, then the idempotent consumer will trigger its completion when the exchange reached the end of the block of the idempotent consumer pattern. So if the exchange is continued routed after the block ends, then whatever happens there does not affect the state.

If this option is false (default) to not complete eager, then the idempotent consumer will complete when the exchange is done being routed. So if the exchange is continued routed after the block ends, then whatever happens there also affect the state. For example if the exchange failed due to an exception, then the state of the idempotent consumer will be a rollback.

Using the Fluent Builders

The following example will use the header myMessageId to filter out duplicates{snippet:id=idempotent|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java}The above example will use an in-memory based MessageIdRepository which can easily run out of memory and doesn't work in a clustered environment. So you might prefer to use the JPA based implementation which uses a database to store the message IDs which have been processed{snippet:id=idempotent|lang=java|url=camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java}In the above example we are using the header messageId to filter out duplicates and using the collection myProcessorName to indicate the Message ID Repository to use. This name is important as you could process the same message by many different processors; so each may require its own logical Message ID Repository.

For further examples of this pattern in use you could look at the junit test case

Spring XML example

The following example will use the header myMessageId to filter out duplicates{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.xml}

How to handle duplicate messages in the route

Available as of Camel 2.8

You can now set the skipDuplicate option to false which instructs the idempotent consumer to route duplicate messages as well. However the duplicate message has been marked as duplicate by having a property on the Exchange set to true. We can leverage this fact by using a Content Based Router or Message Filter to detect this and handle duplicate messages.

For example in the following example we use the Message Filter to send the message to a duplicate endpoint, and then stop continue routing that message.{snippet:id=e1|lang=java|title=Filter duplicate messages|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java}The sample example in XML DSL would be:{snippet:id=e1|lang=xml|title=Filter duplicate messages|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml}

How to handle duplicate message in a clustered environment with a data grid

Available as of Camel 2.8

If you have running Camel in a clustered environment, a in memory idempotent repository doesn't work (see above). You can setup either a central database or use the idempotent consumer implementation based on the Hazelcast data grid. Hazelcast finds the nodes over multicast (which is default - configure Hazelcast for tcp-ip) and creates automatically a map based repository:

java HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo"); from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");

You have to define how long the repository should hold each message id (default is to delete it never). To avoid that you run out of memory you should create an eviction strategy based on the Hazelcast configuration. For additional information see camel-hazelcast.

See this little tutorial, how setup such an idempotent repository on two cluster nodes using Apache Karaf.

Available as of Camel 2.13.0

Another option for using Idempotent Consumer in a clustered environment is Infinispan. Infinispan is a data grid with replication and distribution clustering support. For additional information see camel-infinispan.

Using This Pattern

Transactional Client

Camel recommends supporting the Transactional Client from the EIP patterns using spring transactions.

Transaction Oriented Endpoints like JMS support using a transaction for both inbound and outbound message exchanges. Endpoints that support transactions will participate in the current transaction context that they are called from.

Configuration of Redelivery

The redelivery in transacted mode is not handled by Camel but by the backing system (the transaction manager). In such cases you should resort to the backing system how to configure the redelivery.

You should use the SpringRouteBuilder to setup the routes since you will need to setup the spring context with the TransactionTemplates that will define the transaction manager configuration and policies.

For inbound endpoint to be transacted, they normally need to be configured to use a Spring PlatformTransactionManager. In the case of the JMS component, this can be done by looking it up in the spring context.

You first define needed object in the spring configuration.

xml <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean>

Then you look them up and use them to create the JmsComponent.

java PlatformTransactionManager transactionManager = (PlatformTransactionManager) spring.getBean("jmsTransactionManager"); ConnectionFactory connectionFactory = (ConnectionFactory) spring.getBean("jmsConnectionFactory"); JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager); component.getConfiguration().setConcurrentConsumers(1); ctx.addComponent("activemq", component);

Transaction Policies

Outbound endpoints will automatically enlist in the current transaction context. But what if you do not want your outbound endpoint to enlist in the same transaction as your inbound endpoint? The solution is to add a Transaction Policy to the processing route. You first have to define transaction policies that you will be using. The policies use a spring TransactionTemplate under the covers for declaring the transaction demarcation to use. So you will need to add something like the following to your spring xml:

xml <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="transactionManager" ref="jmsTransactionManager"/> </bean> <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/> </bean>

Then in your SpringRouteBuilder, you just need to create new SpringTransactionPolicy objects for each of the templates.

javapublic void configure() { ... Policy requried = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRED")); Policy requirenew = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRES_NEW")); ... }

Once created, you can use the Policy objects in your processing routes:

java // Send to bar in a new transaction from("activemq:queue:foo").policy(requirenew).to("activemq:queue:bar"); // Send to bar without a transaction. from("activemq:queue:foo").policy(notsupported ).to("activemq:queue:bar");

OSGi Blueprint

If you are using OSGi Blueprint then you most likely have to explicit declare a policy and refer to the policy from the transacted in the route.

xml <bean id="required" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/> </bean>

And then refer to "required" from the route:

xml<route> <from uri="activemq:queue:foo"/> <transacted ref="required"/> <to uri="activemq:queue:bar"/> </route>

Database Sample

In this sample we want to ensure that two endpoints is under transaction control. These two endpoints inserts data into a database.
The sample is in its full as a unit test.

First of all we setup the usual spring stuff in its configuration file. Here we have defined a DataSource to the HSQLDB and a most importantly the Spring DataSource TransactionManager that is doing the heavy lifting of ensuring our transactional policies. You are of course free to use any of the Spring based TransactionManager, eg. if you are in a full blown J2EE container you could use JTA or the WebLogic or WebSphere specific managers.

As we use the new convention over configuration we do not need to configure a transaction policy bean, so we do not have any PROPAGATION_REQUIRED beans. All the beans needed to be configured is standard Spring beans only, eg. there are no Camel specific configuration at all.{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/springTransactionalClientDataSourceMinimalConfiguration.xml}Then we are ready to define our Camel routes. We have two routes: 1 for success conditions, and 1 for a forced rollback condition.
This is after all based on a unit test. Notice that we mark each route as transacted using the transacted tag.{snippet:id=e2|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/springTransactionalClientDataSourceMinimalConfiguration.xml}That is all that is needed to configure a Camel route as being transacted. Just remember to use the transacted DSL. The rest is standard Spring XML to setup the transaction manager.

JMS Sample

In this sample we want to listen for messages on a queue and process the messages with our business logic java code and send them along. Since its based on a unit test the destination is a mock endpoint.

First we configure the standard Spring XML to declare a JMS connection factory, a JMS transaction manager and our ActiveMQ component that we use in our routing.{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml}And then we configure our routes. Notice that all we have to do is mark the route as transacted using the transacted tag.{snippet:id=e2|lang=xml|url=camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.xml}

Transaction error handler

When a route is marked as transacted using transacted Camel will automatic use the TransactionErrorHandler as Error Handler. It supports basically the same feature set as the DefaultErrorHandler, so you can for instance use Exception Clause as well.

Integration Testing with Spring

An Integration Test here means a test runner class annotated @RunWith(SpringJUnit4ClassRunner.class).

When following the Spring Transactions documentation it is tempting to annotate your integration test with @Transactional then seed your database before firing up the route to be tested and sending a message in. This is incorrect as Spring will have an in-progress transaction, and Camel will wait on this before proceeding, leading to the route timing out.

Instead, remove the @Transactional annotation from the test method and seed the test data within a TransactionTemplate execution which will ensure the data is committed to the database before Camel attempts to pick up and use the transaction manager. A simple example can be found on GitHub.

Spring's transactional model ensures each transaction is bound to one thread. A Camel route may invoke additional threads which is where the blockage may occur. This is not a fault of Camel but as the programmer you must be aware of the consequences of beginning a transaction in a test thread and expecting a separate thread created by your Camel route to be participate, which it cannot. You can, in your test, mock the parts that cause separate threads to avoid this issue.

Using multiple routes with different propagation behaviors

Available as of Camel 2.2
Suppose you want to route a message through two routes and by which the 2nd route should run in its own transaction. How do you do that? You use propagation behaviors for that where you configure it as follows:

  • The first route use PROPAGATION_REQUIRED
  • The second route use PROPAGATION_REQUIRES_NEW

This is configured in the Spring XML file:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/MixedTransactionPropagationTest.xml}Then in the routes you use transacted DSL to indicate which of these two propagations it uses.{snippet:id=e1|lang=java|url=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/MixedTransactionPropagationTest.java}Notice how we have configured the onException in the 2nd route to indicate in case of any exceptions we should handle it and just rollback this transaction. This is done using the markRollbackOnlyLast which tells Camel to only do it for the current transaction and not globally.

See Also

Using This Pattern

Messaging Gateway

Camel has several endpoint components that support the Messaging Gateway from the EIP patterns.

Components like Bean and CXF provide a a way to bind a Java interface to the message exchange.

However you may want to read the Using CamelProxy documentation as a true Messaging Gateway EIP solution.
Another approach is to use @Produce which you can read about in POJO Producing which also can be used as a Messaging Gateway EIP solution.

 

Example

The following example how the CXF and Bean components can be used to abstract the developer from the underlying messaging system API


Using the Fluent Builders

from("cxf:bean:soapMessageEndpoint")
	.to("bean:testBean?method=processSOAP");

 

Using the Spring XML Extensions

<route>
	<from uri="cxf:bean:soapMessageEndpoint"/>
	<to uri="bean:testBean?method=processSOAP"/>
</route>

See Also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

Service Activator

Camel has several endpoint components that support the Service Activator from the EIP patterns.

Components like Bean, CXF and Pojo provide a a way to bind the message exchange to a Java interface/service where the route defines the endpoints and wires it up to the bean.

In addition you can use the Bean Integration to wire messages to a bean using annotation.

Here is a simple example of using a Direct endpoint to create a messaging interface to a Pojo Bean service.

Using the Fluent Builders

from("direct:invokeMyService").to("bean:myService");

Using the Spring XML Extensions

<route>
    <from uri="direct:invokeMyService"/>
    <to uri="bean:myService"/>
</route>

See Also

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

System Management

Wire Tap

Wire Tap (from the EIP patterns) allows you to route messages to a separate location while they are being forwarded to the ultimate destination.

Streams

If you Wire Tap a stream message body then you should consider enabling Stream caching to ensure the message body can be read at each endpoint. See more details at Stream caching.

Options

Name

Default

Description

uri

 

Mandatory: The URI of the endpoint to which the wire-tapped message should be sent.

From Camel 2.16: support for dynamic to URIs is as documented in Message Endpoint.

executorServiceRef

 

Reference ID of a custom Thread Pool to use when processing the wire-tapped messages.

When not set, Camel will use an instance of the default thread pool.

processorRef

 

Reference ID of a custom Processor to use for creating a new message.

See "Sending a New Exchange" below.

copy

true

Camel 2.3: Whether to copy the Exchange before wire-tapping the message.

onPrepareRef

 

Camel 2.8: Reference identifier of a custom Processor to prepare the copy of the Exchange to be wire-tapped. This allows you to do any custom logic, such as deep-cloning the message payload.

cacheSize

 

Camel 2.16: Allows to configure the cache size for the ProducerCache which caches producers for reuse. Will by default use the default cache size which is 1000.

Setting the value to -1 allows to turn off the cache all together.

ignoreInvalidEndpoint

false

Camel 2.16: Whether to ignore an endpoint URI that could not be resolved.

When false, Camel will throw an exception when it identifies an invalid endpoint URI.

WireTap Threadpool

The Wire Tap uses a thread pool to process the tapped messages. This thread pool will by default use the settings detailed at Threading Model. In particular, when the pool is exhausted (with all threads utilized), further wiretaps will be executed synchronously by the calling thread. To remedy this, you can configure an explicit thread pool on the Wire Tap having either a different rejection policy, a larger worker queue, or more worker threads.

WireTap Node

Camel's Wire Tap node supports two flavors when tapping an Exchange:

  • With the traditional Wire Tap, Camel will copy the original Exchange and set its Exchange Pattern to InOnly, as we want the tapped Exchange to be sent in a fire and forget style. The tapped Exchange is then sent in a separate thread so it can run in parallel with the original. Beware that only the Exchange is copied - Wire Tap won't do a deep clone (unless you specify a custom processor via onPrepareRef which does that). So all copies could share objects from the original Exchange.
  • Camel also provides an option of sending a new Exchange allowing you to populate it with new values.

Sending a Copy (traditional wiretap)

Using the Fluent Builders{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapTest.java}Using the Spring XML Extensions{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml}

Sending a New Exchange

Using the Fluent Builders
Camel supports either a processor or an Expression to populate the new Exchange. Using a processor gives you full power over how the Exchange is populated as you can set properties, headers, etc. An Expression can only be used to set the IN body.

From Camel 2.3: the Expression or Processor is pre-populated with a copy of the original Exchange, which allows you to access the original message when you prepare a new Exchange to be sent. You can use the copy option (enabled by default) to indicate whether you want this. If you set copy=false, then it works as in Camel 2.2 or older where the Exchange will be empty.

Below is the processor variation. This example is from Camel 2.3, where we disable copy by passing in false to create a new, empty Exchange.{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetTest.java}Here is the Expression variation. In the following example we disable copying by setting copy=false which results in the creation of a new, empty Exchange.{snippet:id=e2|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapUsingFireAndForgetTest.java}Using the Spring XML Extensions
The processor variation, which uses a processorRef attribute to refer to a Spring bean by ID:{snippet:id=e2|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml}Here is the Expression variation, where the expression is defined in the body tag:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml}This variation accesses the body of the original message and creates a new Exchange based on the Expression. It will create a new Exchange and have the body contain "Bye ORIGINAL BODY MESSAGE HERE"{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml}

Further Example

For another example of this pattern, refer to the wire tap test case.

Using Dynamic URIs

Available as of Camel 2.16:

For example to wire tap to a dynamic URI, then it supports the same dynamic URIs as documented in Message Endpoint. For example to wire tap to a JMS queue where the header ID is part of the queue name:

from("direct:start") .wireTap("jms:queue:backup-${header.id}") .to("bean:doSomething");

 

Sending a New Exchange and Set Headers in DSL

Available as of Camel 2.8

If you send a new message using Wire Tap, then you could only set the message body using an Expression from the DSL. If you also need to set headers, you would have to use a Processor. From Camel 2.8: it's possible to set headers as well using the DSL.

The following example sends a new message which has

  • Bye World as message body.
  • A header with key id with the value 123.
  • A header with key date which has current date as value.

Java DSL

{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java}

XML DSL

The XML DSL is slightly different than Java DSL in how you configure the message body and headers using <body> and <setHeader>:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml}

Using onPrepare to Execute Custom Logic when Preparing Messages

Available as of Camel 2.8

See details at Multicast

Using This Pattern

  • No labels