You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

This chapter describes various recipes for working with Camel

Bean Integration

Camel supports the integration of beans and POJOs in a number of ways

Annotations

If a bean is defined in Spring XML or scanned using the Spring component scanning mechanism and a <camelContext> is used or a CamelBeanPostProcessor then we process a number of Camel annotations to do various things such as injecting resources or producing, consuming or routing messages.

The following annotations is supported and inject by Camel's CamelBeanPostProcessor

Annotation

Description

@EndpointInject

To inject an endpoint, see more details at POJO Producing.

@BeanInject

Camel 2.13: To inject a bean obtained from the Registry. See Bean Injection.

@PropertyInject

Camel 2.12: To inject a value using property placeholder.

@Produce

To inject a producer to send message to an endpoint. See POJO Producing.

@Consume

To inject a consumer on a method. See POJO Consuming.

See more details at:

Example

See the POJO Messaging Example for how to use the annotations for routing and messaging.

Bean Component

The Bean component allows one to invoke a particular method. Alternately the Bean component supports the creation of a proxy via ProxyHelper to a Java interface; which the implementation just sends a message containing a BeanInvocation to some Camel endpoint.

Spring Remoting

We support a Spring Remoting provider which uses Camel as the underlying transport mechanism. The nice thing about this approach is we can use any of the Camel transport Components to communicate between beans. It also means we can use Content Based Router and the other Enterprise Integration Patterns in between the beans; in particular we can use Message Translator to be able to convert what the on-the-wire messages look like in addition to adding various headers and so forth.

Bean binding

Whenever Camel invokes a bean method via one of the above methods (Bean component, Spring Remoting or POJO Consuming) then the Bean Binding mechanism is used to figure out what method to use (if it is not explicit) and how to bind the Message to the parameters possibly using the Parameter Binding Annotations or using a method name option.

When writing software these days, its important to try and decouple as much middleware code from your business logic as possible.

This provides a number of benefits...

  • you can choose the right middleware solution for your deployment and switch at any time
  • you don't have to spend a large amount of time learning the specifics of any particular technology, whether its JMS or JavaSpace or Hibernate or JPA or iBatis whatever

For example if you want to implement some kind of message passing, remoting, reliable load balancing or asynchronous processing in your application we recommend you use Camel annotations to bind your services and business logic to Camel Components which means you can then easily switch between things like

  • in JVM messaging with SEDA
  • using JMS via ActiveMQ or other JMS providers for reliable load balancing, grid or publish and subscribe
  • for low volume, but easier administration since you're probably already using a database you could use
  • use JavaSpace

How to decouple from middleware APIs

The best approach when using remoting is to use Spring Remoting which can then use any messaging or remoting technology under the covers. When using Camel's implementation you can then use any of the Camel Components along with any of the Enterprise Integration Patterns.

Another approach is to bind Java beans to Camel endpoints via the Bean Integration. For example using POJO Consuming and POJO Producing you can avoid using any Camel APIs to decouple your code both from middleware APIs and Camel APIs! (smile)

Visualisation

This functionality is deprecated and to be removed in future Camel releases.

 

Camel supports the visualisation of your Enterprise Integration Patterns using the GraphViz DOT files which can either be rendered directly via a suitable GraphViz tool or turned into HTML, PNG or SVG files via the Camel Maven Plugin.

Here is a typical example of the kind of thing we can generate

If you click on the actual generated htmlyou will see that you can navigate from an EIP node to its pattern page, along with getting hover-over tool tips ec.

How to generate

See Camel Dot Maven Goal or the other maven goals Camel Maven Plugin

For OS X users

If you are using OS X then you can open the DOT file using graphviz which will then automatically re-render if it changes, so you end up with a real time graphical representation of the topic and queue hierarchies!

Also if you want to edit the layout a little before adding it to a wiki to distribute to your team, open the DOT file with OmniGraffle then just edit away (smile)

Business Activity Monitoring

The Camel BAM module provides a Business Activity Monitoring (BAM) framework for testing business processes across multiple message exchanges on different Endpoint instances.

Consider, for example, a simple system in which you submit Purchase Orders into system A and then receive Invoices from system B. You might want to test that, for a given Purchase Order, you receive a matching Invoice from system B within a specific time period.

How Camel BAM Works

Camel BAM uses a Correlation Identifier on an input message to determine the Process Instance to which it belongs. The process instance is an entity bean which can maintain state for each Activity (where an activity typically maps to a single endpoint - such as the submission of Purchase Orders or the receipt of Invoices).

You can then add rules to be triggered when a message is received on any activity - such as to set time expectations or perform real time reconciliation of values across activities.

Simple Example

The following example shows how to perform some time based rules on a simple business process of 2 activities - A and B - which correspond with Purchase Orders and Invoices in the example above. If you would like to experiment with this scenario, you may edit this Test Case, which defines the activities and rules, and then tests that they work.

{snippet:id=example|lang=java|url=camel/trunk/components/camel-bam/src/test/java/org/apache/camel/bam/BamRouteTest.java}

As you can see in the above example, we first define two activities, and then rules to specify when we expect them to complete for a process instance and when an error condition should be raised.p. The ProcessBuilder is a RouteBuilder and can be added to any CamelContext.

Complete Example

For a complete example please see the BAM Example, which is part of the standard Camel Examples

Use Cases

In the world of finance, a common requirement is tracking trades. Often a trader will submit a Front Office Trade which then flows through the Middle Office and Back Office through various systems to settle the trade so that money is exchanged. You may wish to test that the front and back office trades match up within a certain time period; if they don't match or a back office trade does not arrive within a required amount of time, you might signal an alarm.

Extract Transform Load (ETL)

The ETL (Extract, Transform, Load) is a mechanism for loading data into systems or databases using some kind of Data Format from a variety of sources; often files then using Pipes and Filters, Message Translator and possible other Enterprise Integration Patterns.

So you could query data from various Camel Components such as File, HTTP or JPA, perform multiple patterns such as Splitter or Message Translator then send the messages to some other Component.

To show how this all fits together, try the ETL Example

Mock Component

Testing Summary Include

The Mock component provides a powerful declarative testing mechanism, which is similar to jMock in that it allows declarative expectations to be created on any Mock endpoint before a test begins. Then the test is run, which typically fires messages to one or more endpoints, and finally the expectations can be asserted in a test case to ensure the system worked as expected.

This allows you to test various things like:

  • The correct number of messages are received on each endpoint,
  • The correct payloads are received, in the right order,
  • Messages arrive on an endpoint in order, using some Expression to create an order testing function,
  • Messages arrive match some kind of Predicate such as that specific headers have certain values, or that parts of the messages match some predicate, such as by evaluating an XPath or XQuery Expression.

Note that there is also the Test endpoint which is a Mock endpoint, but which uses a second endpoint to provide the list of expected message bodies and automatically sets up the Mock endpoint assertions. In other words, it's a Mock endpoint that automatically sets up its assertions from some sample messages in a File or database, for example.

Mock endpoints keep received Exchanges in memory indefinitely

Remember that Mock is designed for testing. When you add Mock endpoints to a route, each Exchange sent to the endpoint will be stored (to allow for later validation) in memory until explicitly reset or the JVM is restarted. If you are sending high volume and/or large messages, this may cause excessive memory use. If your goal is to test deployable routes inline, consider using NotifyBuilder or AdviceWith in your tests instead of adding Mock endpoints to routes directly.

From Camel 2.10 onwards there are two new options retainFirst, and retainLast that can be used to limit the number of messages the Mock endpoints keep in memory.

URI format

mock:someName[?options]

Where someName can be any string that uniquely identifies the endpoint.

You can append query options to the URI in the following format, ?option=value&option=value&...

Options

confluenceTableSmall

Option

Default

Description

reportGroup

null

A size to use a throughput logger for reporting

retainFirst

 

Camel 2.10: To only keep first X number of messages in memory.

retainLast

 

Camel 2.10: To only keep last X number of messages in memory.

Simple Example

Here's a simple example of Mock endpoint in use. First, the endpoint is resolved on the context. Then we set an expectation, and then, after the test has run, we assert that our expectations have been met.

MockEndpoint resultEndpoint = context.resolveEndpoint("mock:foo", MockEndpoint.class); resultEndpoint.expectedMessageCount(2); // send some messages ... // now lets assert that the mock:foo endpoint received 2 messages resultEndpoint.assertIsSatisfied();

You typically always call the assertIsSatisfied() method to test that the expectations were met after running a test.

Camel will by default wait 10 seconds when the assertIsSatisfied() is invoked. This can be configured by setting the setResultWaitTime(millis) method.

Using assertPeriod

Available as of Camel 2.7
When the assertion is satisfied then Camel will stop waiting and continue from the assertIsSatisfied method. That means if a new message arrives on the mock endpoint, just a bit later, that arrival will not affect the outcome of the assertion. Suppose you do want to test that no new messages arrives after a period thereafter, then you can do that by setting the setAssertPeriod method, for example:

MockEndpoint resultEndpoint = context.resolveEndpoint("mock:foo", MockEndpoint.class); resultEndpoint.setAssertPeriod(5000); resultEndpoint.expectedMessageCount(2); // send some messages ... // now lets assert that the mock:foo endpoint received 2 messages resultEndpoint.assertIsSatisfied();

Setting expectations

You can see from the javadoc of MockEndpoint the various helper methods you can use to set expectations. The main methods are as follows:

confluenceTableSmall

Method

Description

expectedMessageCount(int)

To define the expected message count on the endpoint.

expectedMinimumMessageCount(int)

To define the minimum number of expected messages on the endpoint.

expectedBodiesReceived(...)

To define the expected bodies that should be received (in order).

expectedHeaderReceived(...)

To define the expected header that should be received

expectsAscending(Expression)

To add an expectation that messages are received in order, using the given Expression to compare messages.

expectsDescending(Expression)

To add an expectation that messages are received in order, using the given Expression to compare messages.

expectsNoDuplicates(Expression)

To add an expectation that no duplicate messages are received; using an Expression to calculate a unique identifier for each message. This could be something like the JMSMessageID if using JMS, or some unique reference number within the message.

Here's another example:

resultEndpoint.expectedBodiesReceived("firstMessageBody", "secondMessageBody", "thirdMessageBody");

Adding expectations to specific messages

In addition, you can use the message(int messageIndex) method to add assertions about a specific message that is received.

For example, to add expectations of the headers or body of the first message (using zero-based indexing like java.util.List), you can use the following code:

resultEndpoint.message(0).header("foo").isEqualTo("bar");

There are some examples of the Mock endpoint in use in the camel-core processor tests.

Mocking existing endpoints

Available as of Camel 2.7

Camel now allows you to automatically mock existing endpoints in your Camel routes.

How it works

Important: The endpoints are still in action. What happens differently is that a Mock endpoint is injected and receives the message first and then delegates the message to the target endpoint. You can view this as a kind of intercept and delegate or endpoint listener.

Suppose you have the given route below:

{snippet:id=route|title=Route|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsTest.java}

You can then use the adviceWith feature in Camel to mock all the endpoints in a given route from your unit test, as shown below:

{snippet:id=e1|title=adviceWith mocking all endpoints|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsTest.java}

Notice that the mock endpoints is given the uri mock:<endpoint>, for example mock:direct:foo. Camel logs at INFO level the endpoints being mocked:

INFO Adviced endpoint [direct://foo] with mock endpoint [mock:direct:foo] Mocked endpoints are without parameters

Endpoints which are mocked will have their parameters stripped off. For example the endpoint "log:foo?showAll=true" will be mocked to the following endpoint "mock:log:foo". Notice the parameters have been removed.

Its also possible to only mock certain endpoints using a pattern. For example to mock all log endpoints you do as shown:

{snippet:id=e2|lang=java|title=adviceWith mocking only log endpoints using a pattern|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsTest.java}

The pattern supported can be a wildcard or a regular expression. See more details about this at Intercept as its the same matching function used by Camel.

Mind that mocking endpoints causes the messages to be copied when they arrive on the mock.
That means Camel will use more memory. This may not be suitable when you send in a lot of messages.

Mocking existing endpoints using the camel-test component

Instead of using the adviceWith to instruct Camel to mock endpoints, you can easily enable this behavior when using the camel-test Test Kit.
The same route can be tested as follows. Notice that we return "*" from the isMockEndpoints method, which tells Camel to mock all endpoints.
If you only want to mock all log endpoints you can return "log*" instead.

{snippet:id=e1|lang=java|title=isMockEndpoints using camel-test kit|url=camel/trunk/components/camel-test/src/test/java/org/apache/camel/test/patterns/IsMockEndpointsJUnit4Test.java}

Mocking existing endpoints with XML DSL

If you do not use the camel-test component for unit testing (as shown above) you can use a different approach when using XML files for routes.
The solution is to create a new XML file used by the unit test and then include the intended XML file which has the route you want to test.

Suppose we have the route in the camel-route.xml file:

{snippet:id=e1|lang=xml|title=camel-route.xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/mock/camel-route.xml}

Then we create a new XML file as follows, where we include the camel-route.xml file and define a spring bean with the class org.apache.camel.impl.InterceptSendToMockEndpointStrategy which tells Camel to mock all endpoints:

{snippet:id=e1|lang=xml|title=test-camel-route.xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/mock/InterceptSendToMockEndpointStrategyTest.xml}

Then in your unit test you load the new XML file (test-camel-route.xml) instead of camel-route.xml.

To only mock all Log endpoints you can define the pattern in the constructor for the bean:

xml<bean id="mockAllEndpoints" class="org.apache.camel.impl.InterceptSendToMockEndpointStrategy"> <constructor-arg index="0" value="log*"/> </bean>

Mocking endpoints and skip sending to original endpoint

Available as of Camel 2.10

Sometimes you want to easily mock and skip sending to a certain endpoints. So the message is detoured and send to the mock endpoint only. From Camel 2.10 onwards you can now use the mockEndpointsAndSkip method using AdviceWith or the Test Kit. The example below will skip sending to the two endpoints "direct:foo", and "direct:bar".

{snippet:id=e1|lang=java|title=adviceWith mock and skip sending to endpoints|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockMultipleEndpointsWithSkipTest.java}

The same example using the Test Kit

{snippet:id=e1|lang=java|title=isMockEndpointsAndSkip using camel-test kit|url=camel/trunk/components/camel-test/src/test/java/org/apache/camel/test/patterns/IsMockEndpointsAndSkipJUnit4Test.java}

Limiting the number of messages to keep

Available as of Camel 2.10

The Mock endpoints will by default keep a copy of every Exchange that it received. So if you test with a lot of messages, then it will consume memory.
From Camel 2.10 onwards we have introduced two options retainFirst and retainLast that can be used to specify to only keep N'th of the first and/or last Exchanges.

For example in the code below, we only want to retain a copy of the first 5 and last 5 Exchanges the mock receives.

MockEndpoint mock = getMockEndpoint("mock:data"); mock.setRetainFirst(5); mock.setRetainLast(5); mock.expectedMessageCount(2000); ... mock.assertIsSatisfied();

Using this has some limitations. The getExchanges() and getReceivedExchanges() methods on the MockEndpoint will return only the retained copies of the Exchanges. So in the example above, the list will contain 10 Exchanges; the first five, and the last five.
The retainFirst and retainLast options also have limitations on which expectation methods you can use. For example the expectedXXX methods that work on message bodies, headers, etc. will only operate on the retained messages. In the example above they can test only the expectations on the 10 retained messages.

Testing with arrival times

Available as of Camel 2.7

The Mock endpoint stores the arrival time of the message as a property on the Exchange.

Date time = exchange.getProperty(Exchange.RECEIVED_TIMESTAMP, Date.class);

You can use this information to know when the message arrived on the mock. But it also provides foundation to know the time interval between the previous and next message arrived on the mock. You can use this to set expectations using the arrives DSL on the Mock endpoint.

For example to say that the first message should arrive between 0-2 seconds before the next you can do:

mock.message(0).arrives().noLaterThan(2).seconds().beforeNext();

You can also define this as that 2nd message (0 index based) should arrive no later than 0-2 seconds after the previous:

mock.message(1).arrives().noLaterThan(2).seconds().afterPrevious();

You can also use between to set a lower bound. For example suppose that it should be between 1-4 seconds:

mock.message(1).arrives().between(1, 4).seconds().afterPrevious();

You can also set the expectation on all messages, for example to say that the gap between them should be at most 1 second:

mock.allMessages().arrives().noLaterThan(1).seconds().beforeNext(); time units

In the example above we use seconds as the time unit, but Camel offers milliseconds, and minutes as well.

Endpoint See Also

Templating

When you are testing distributed systems its a very common requirement to have to stub out certain external systems with some stub so that you can test other parts of the system until a specific system is available or written etc.

A great way to do this is using some kind of Template system to generate responses to requests generating a dynamic message using a mostly-static body.

There are a number of templating components included in the Camel distribution you could use

or the following external Camel components

Example

Here's a simple example showing how we can respond to InOut requests on the My.Queue queue on ActiveMQ with a template generated response. The reply would be sent back to the JMSReplyTo Destination.

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm");

If you want to use InOnly and consume the message and send it to another destination you could use

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm").
  to("activemq:Another.Queue");

See Also

  • Mock for details of mock endpoint testing (as opposed to template based stubs).

Database

Camel can work with databases in a number of different ways. This document tries to outline the most common approaches.

Database endpoints

Camel provides a number of different endpoints for working with databases

  • JPA for working with hibernate, openjpa or toplink. When consuming from the endpoints entity beans are read (and deleted/updated to mark as processed) then when producing to the endpoints they are written to the database (via insert/update).
  • iBATIS similar to the above but using Apache iBATIS
  • JDBC similar though using explicit SQL
  • SQL uses spring-jdbc behind the scene for the actual SQL handling. The difference between this component and JDBC component is that in case of SQL the query is a property of the endpoint and it uses message payload as parameters passed to the query

Database pattern implementations

Various patterns can work with databases as follows

Parallel Processing and Ordering

It is a common requirement to want to use parallel processing of messages for throughput and load balancing, while at the same time process certain kinds of messages in order.

How to achieve parallel processing

You can send messages to a number of Camel Components to achieve parallel processing and load balancing such as

  • SEDA for in-JVM load balancing across a thread pool
  • ActiveMQ or JMS for distributed load balancing and parallel processing
  • JPA for using the database as a poor mans message broker

When processing messages concurrently, you should consider ordering and concurrency issues. These are described below

Concurrency issues

Note that there is no concurrency or locking issue when using ActiveMQ, JMS or SEDA by design; they are designed for highly concurrent use. However there are possible concurrency issues in the Processor of the messages i.e. what the processor does with the message?

For example if a processor of a message transfers money from one account to another account; you probably want to use a database with pessimistic locking to ensure that operation takes place atomically.

Ordering issues

As soon as you send multiple messages to different threads or processes you will end up with an unknown ordering across the entire message stream as each thread is going to process messages concurrently.

For many use cases the order of messages is not too important. However for some applications this can be crucial. e.g. if a customer submits a purchase order version 1, then amends it and sends version 2; you don't want to process the first version last (so that you loose the update). Your Processor might be clever enough to ignore old messages. If not you need to preserve order.

Recommendations

This topic is large and diverse with lots of different requirements; but from a high level here are our recommendations on parallel processing, ordering and concurrency

  • for distributed locking, use a database by default, they are very good at it (smile)
  • to preserve ordering across a JMS queue consider using Exclusive Consumers in the ActiveMQ component
  • even better are Message Groups which allows you to preserve ordering across messages while still offering parallelisation via the JMSXGroupID header to determine what can be parallelized
  • if you receive messages out of order you could use the Resequencer to put them back together again

A good rule of thumb to help reduce ordering problems is to make sure each single can be processed as an atomic unit in parallel (either without concurrency issues or using say, database locking); or if it can't, use a Message Group to relate the messages together which need to be processed in order by a single thread.

Using Message Groups with Camel

To use a Message Group with Camel you just need to add a header to the output JMS message based on some kind of Correlation Identifier to correlate messages which should be processed in order by a single thread - so that things which don't correlate together can be processed concurrently.

For example the following code shows how to create a message group using an XPath expression taking an invoice's product code as the Correlation Identifier

from("activemq:a").setHeader(JmsConstants.JMS_X_GROUP_ID, xpath("/invoice/productCode")).to("activemq:b");

You can of course use the Xml Configuration if you prefer

Asynchronous Processing

Overview

Supported versions

The information on this page applies for Camel 2.4 or later.

Before Camel 2.4 the asynchronous processing is only implemented for JBI where as in Camel 2.4 we have implemented it in many other areas. See more at Asynchronous Routing Engine.

Camel supports a more complex asynchronous processing model. The asynchronous processors implement the org.apache.camel.AsyncProcessor interface which is derived from the more synchronous org.apache.camel.Processor interface. There are advantages and disadvantages when using asynchronous processing when compared to using the standard synchronous processing model.

Advantages:

  • Processing routes that are composed fully of asynchronous processors do not use up threads waiting for processors to complete on blocking calls. This can increase the scalability of your system by reducing the number of threads needed to process the same workload.
  • Processing routes can be broken up into SEDA processing stages where different thread pools can process the different stages. This means that your routes can be processed concurrently.

Disadvantages:

  • Implementing asynchronous processors is more complex than implementing the synchronous versions.

When to Use

We recommend that processors and components be implemented the more simple synchronous APIs unless you identify a performance of scalability requirement that dictates otherwise. A Processor whose process() method blocks for a long time would be good candidates for being converted into an asynchronous processor.

Interface Details

public interface AsyncProcessor extends Processor {
   boolean process(Exchange exchange, AsyncCallback callback);
}

The AsyncProcessor defines a single process() method which is very similar to it's synchronous Processor.process() brethren.

Here are the differences:

  • A non-null AsyncCallback MUST be supplied which will be notified when the exchange processing is completed.
  • It MUST not throw any exceptions that occurred while processing the exchange. Any such exceptions must be stored on the exchange's Exception property.
  • It MUST know if it will complete the processing synchronously or asynchronously. The method will return true if it does complete synchronously, otherwise it returns false.
  • When the processor has completed processing the exchange, it must call the callback.done(boolean sync) method.
  • The sync parameter MUST match the value returned by the process() method.

Implementing Processors that Use the AsyncProcessor API

All processors, even synchronous processors that do not implement the AsyncProcessor interface, can be coerced to implement the AsyncProcessor interface. This is usually done when you are implementing a Camel component consumer that supports asynchronous completion of the exchanges that it is pushing through the Camel routes. Consumers are provided a Processor object when created. All Processor object can be coerced to a AsyncProcessor using the following API:

Processor processor = ...
AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(processor);

For a route to be fully asynchronous and reap the benefits to lower Thread usage, it must start with the consumer implementation making use of the asynchronous processing API. If it called the synchronous process() method instead, the consumer's thread would be forced to be blocked and in use for the duration that it takes to process the exchange.

It is important to take note that just because you call the asynchronous API, it does not mean that the processing will take place asynchronously. It only allows the possibility that it can be done without tying up the caller's thread. If the processing happens asynchronously is dependent on the configuration of the Camel route.

Normally, the the process call is passed in an inline inner AsyncCallback class instance which can reference the exchange object that was declared final. This allows it to finish up any post processing that is needed when the called processor is done processing the exchange.

Example.

final Exchange exchange = ...
AsyncProcessor asyncProcessor = ...
asyncProcessor.process(exchange, new AsyncCallback() {
    public void done(boolean sync) {

        if (exchange.isFailed()) {
            ... // do failure processing.. perhaps rollback etc.
        } else {
            ... // processing completed successfully, finish up 
                // perhaps commit etc.
        }
    }
});

Asynchronous Route Sequence Scenarios

Now that we have understood the interface contract of the AsyncProcessor, and have seen how to make use of it when calling processors, let's looks a what the thread model/sequence scenarios will look like for some sample routes.

The Jetty component's consumers support asynchronous processing through the use of continuations. Suffice to say it can take a HTTP request and pass it to a Camel route for asynchronous processing. If the processing is indeed asynchronous, it uses a Jetty continuation so that the HTTP request is 'parked' and the thread is released. Once the Camel route finishes processing the request, the Jetty component uses the AsyncCallback to tell Jetty to 'un-park' the request. Jetty un-parks the request, the HTTP response returned using the result of the exchange processing.

Notice that the jetty continuations feature is only used "If the processing is indeed async". This is why AsyncProcessor.process() implementations must accurately report if request is completed synchronously or not.

The jhc component's producer allows you to make HTTP requests and implement the AsyncProcessor interface. A route that uses both the jetty asynchronous consumer and the jhc asynchronous producer will be a fully asynchronous route and has some nice attributes that can be seen if we take a look at a sequence diagram of the processing route.

For the route:

from("jetty:http://localhost:8080/service")
    .to("jhc:http://localhost/service-impl");

The sequence diagram would look something like this:

The diagram simplifies things by making it looks like processors implement the AsyncCallback interface when in reality the AsyncCallback interfaces are inline inner classes, but it illustrates the processing flow and shows how two separate threads are used to complete the processing of the original HTTP request. The first thread is synchronous up until processing hits the jhc producer which issues the HTTP request. It then reports that the exchange processing will complete asynchronously using NIO to get the response back. Once the jhc component has received a full response it uses AsyncCallback.done() method to notify the caller. These callback notifications continue up until it reaches the original Jetty consumer which then un-parks the HTTP request and completes it by providing the response.

Mixing Synchronous and Asynchronous Processors

It is totally possible and reasonable to mix the use of synchronous and asynchronous processors/components. The pipeline processor is the backbone of a Camel processing route. It glues all the processing steps together. It is implemented as an AsyncProcessor and supports interleaving synchronous and asynchronous processors as the processing steps in the pipeline.

Let's say we have two custom asynchronous processors, namely: MyValidator and MyTransformation. Let's say we want to load file from the data/in directory validate them with the MyValidator() processor, transform them into JPA Java objects using MyTransformation and then insert them into the database using the JPA component. Let's say that the transformation process takes quite a bit of time and we want to allocate 20 threads to do parallel transformations of the input files. The solution is to make use of the thread processor. The thread is AsyncProcessor that forces subsequent processing in asynchronous thread from a thread pool.

The route might look like:

from("file:data/in")
  .process(new MyValidator())
  .threads(20)
  .process(new MyTransformation())
  .to("jpa:PurchaseOrder");

The sequence diagram would look something like this:

You would actually have multiple threads executing the second part of the thread sequence.

Staying Synchronous in an AsyncProcessor

Generally speaking you get better throughput processing when you process things synchronously. This is due to the fact that starting up an asynchronous thread and doing a context switch to it adds a little bit of of overhead. So it is generally encouraged that AsyncProcessor's do as much work as they can synchronously. When they get to a step that would block for a long time, at that point they should return from the process call and let the caller know that it will be completing the call asynchronously.

Implementing Virtual Topics on other JMS providers

ActiveMQ supports Virtual Topics since durable topic subscriptions kinda suck (see this page for more detail) mostly since they don't support Competing Consumers.

Most folks want Queue semantics when consuming messages; so that you can support Competing Consumers for load balancing along with things like Message Groups and Exclusive Consumers to preserve ordering or partition the queue across consumers.

However if you are using another JMS provider you can implement Virtual Topics by switching to ActiveMQ (smile) or you can use the following Camel pattern.

First here's the ActiveMQ approach.

  • send to activemq:topic:VirtualTopic.Orders
  • for consumer A consume from activemq:Consumer.A.VirtualTopic.Orders

When using another message broker use the following pattern

  • send to jms:Orders
  • add this route with a to() for each logical durable topic subscriber
    from("jms:Orders").to("jms:Consumer.A", "jms:Consumer.B", ...); 
  • for consumer A consume from jms:Consumer.A

What's the Camel Transport for CXF

In CXF you offer or consume a webservice by defining its address. The first part of the address specifies the protocol to use. For example address="http://localhost:9000" in an endpoint configuration means your service will be offered using the http protocol on port 9000 of localhost. When you integrate Camel Tranport into CXF you get a new transport "camel". So you can specify address="camel://direct:MyEndpointName" to bind the CXF service address to a camel direct endpoint.

Technically speaking Camel transport for CXF is a component which implements the CXF transport API with the Camel core library. This allows you to easily use Camel's routing engine and integration patterns support together with your CXF services.

Integrate Camel into CXF transport layer

To include the Camel Tranport into your CXF bus you use the CamelTransportFactory. You can do this in Java as well as in Spring.

Setting up the Camel Transport in Spring

You can use the following snippet in your applicationcontext if you want to configure anything special. If you only want to activate the camel transport you do not have to do anything in your application context. As soon as you include the camel-cxf-transport jar (or camel-cxf.jar if your camel version is less than 2.7.x) in your app, cxf will scan the jar and load a CamelTransportFactory for you.

xml<!-- you don't need to specify the CamelTransportFactory configuration as it is auto load by CXF bus --> <bean class="org.apache.camel.component.cxf.transport.CamelTransportFactory"> <property name="bus" ref="cxf" /> <property name="camelContext" ref="camelContext" /> <!-- checkException new added in Camel 2.1 and Camel 1.6.2 --> <!-- If checkException is true , CamelDestination will check the outMessage's exception and set it into camel exchange. You can also override this value in CamelDestination's configuration. The default value is false. This option should be set true when you want to leverage the camel's error handler to deal with fault message --> <property name="checkException" value="true" /> <property name="transportIds"> <list> <value>http://cxf.apache.org/transports/camel</value> </list> </property> </bean>

Integrating the Camel Transport in a programmatic way

Camel transport provides a setContext method that you could use to set the Camel context into the transport factory. If you want this factory take effect, you need to register the factory into the CXF bus. Here is a full example for you.

javaimport org.apache.cxf.Bus; import org.apache.cxf.BusFactory; import org.apache.cxf.transport.ConduitInitiatorManager; import org.apache.cxf.transport.DestinationFactoryManager; ... BusFactory bf = BusFactory.newInstance(); Bus bus = bf.createBus(); CamelTransportFactory camelTransportFactory = new CamelTransportFactory(); // set up the CamelContext which will be use by the CamelTransportFactory camelTransportFactory.setCamelContext(context) // if you are using CXF higher then 2.4.x the camelTransportFactory.setBus(bus); // if you are lower CXF, you need to register the ConduitInitiatorManager and DestinationFactoryManager like below // register the conduit initiator ConduitInitiatorManager cim = bus.getExtension(ConduitInitiatorManager.class); cim.registerConduitInitiator(CamelTransportFactory.TRANSPORT_ID, camelTransportFactory); // register the destination factory DestinationFactoryManager dfm = bus.getExtension(DestinationFactoryManager.class); dfm.registerDestinationFactory(CamelTransportFactory.TRANSPORT_ID, camelTransportFactory); // set or bus as the default bus for cxf BusFactory.setDefaultBus(bus);

Configure the destination and conduit with Spring

Namespace

The elements used to configure an Camel transport endpoint are defined in the namespace http://cxf.apache.org/transports/camel. It is commonly referred to using the prefix camel. In order to use the Camel transport configuration elements, you will need to add the lines shown below to the beans element of your endpoint's configuration file. In addition, you will need to add the configuration elements' namespace to the xsi:schemaLocation attribute.

Adding the Configuration Namespace<beans ... xmlns:camel="http://cxf.apache.org/transports/camel ... xsi:schemaLocation="... http://cxf.apache.org/transports/camel http://cxf.apache.org/transports/camel.xsd ...>

The destination element

You configure an Camel transport server endpoint using the camel:destination element and its children. The camel:destination element takes a single attribute, name, that specifies the WSDL port element that corresponds to the endpoint. The value for the name attribute takes the form portQName.camel-destination. The example below shows the camel:destination element that would be used to add configuration for an endpoint that was specified by the WSDL fragment <port binding="widgetSOAPBinding" name="widgetSOAPPort"> if the endpoint's target namespace was http://widgets.widgetvendor.net.

camel:destination Element... <camel:destination name="{http://widgets/widgetvendor.net}widgetSOAPPort.http-destination> <camelContext id="context" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="direct:EndpointC" /> <to uri="direct:EndpointD" /> </route> </camelContext> </camel:destination> <!-- new added feature since Camel 2.11.x <camel:destination name="{http://widgets/widgetvendor.net}widgetSOAPPort.camel-destination" camelContextId="context" /> ...

The camel:destination element for Spring has a number of child elements that specify configuration information. They are described below.

Element

Description

camel-spring:camelContext

You can specify the camel context in the camel destination

camel:camelContextRef

The camel context id which you want inject into the camel destination

The conduit element

You configure a Camel transport client using the camel:conduit element and its children. The camel:conduit element takes a single attribute, name, that specifies the WSDL port element that corresponds to the endpoint. The value for the name attribute takes the form portQName.camel-conduit. For example, the code below shows the camel:conduit element that would be used to add configuration for an endpoint that was specified by the WSDL fragment <port binding="widgetSOAPBinding" name="widgetSOAPPort"> if the endpoint's target namespace was http://widgets.widgetvendor.net.

xmlhttp-conf:conduit Element... <camelContext id="conduit_context" xmlns="http://activemq.apache.org/camel/schema/spring"> <route> <from uri="direct:EndpointA" /> <to uri="direct:EndpointB" /> </route> </camelContext> <camel:conduit name="{http://widgets/widgetvendor.net}widgetSOAPPort.camel-conduit"> <camel:camelContextRef>conduit_context</camel:camelContextRef> </camel:conduit> <!-- new added feature since Camel 2.11.x <camel:conduit name="{http://widgets/widgetvendor.net}widgetSOAPPort.camel-conduit" camelContextId="conduit_context" /> <camel:conduit name="*.camel-conduit"> <!-- you can also using the wild card to specify the camel-conduit that you want to configure --> ... </camel:conduit> ...

The camel:conduit element has a number of child elements that specify configuration information. They are described below.

Element

Description

camel-spring:camelContext

You can specify the camel context in the camel conduit

camel:camelContextRef

The camel context id which you want inject into the camel conduit

Configure the destination and conduit with Blueprint

From Camel 2.11.x, Camel Transport supports to be configured with Blueprint.

If you are using blueprint, you should use the the namespace http://cxf.apache.org/transports/camel/blueprint and import the schema like the blow.

Adding the Configuration Namespace for blueprint<beans ... xmlns:camel="http://cxf.apache.org/transports/camel/blueprint" ... xsi:schemaLocation="... http://cxf.apache.org/transports/camel/blueprint http://cxf.apache.org/schmemas/blueprint/camel.xsd ...>

In blueprint camel:conduit camel:destination only has one camelContextId attribute, they doesn't support to specify the camel context in the camel destination.

<camel:conduit id="*.camel-conduit" camelContextId="camel1" /> <camel:destination id="*.camel-destination" camelContextId="camel1" />

Example Using Camel as a load balancer for CXF

This example shows how to use the camel load balancing feature in CXF. You need to load the configuration file in CXF and publish the endpoints on the address "camel://direct:EndpointA" and "camel://direct:EndpointB"

{snippet:id=example|lang=xml|url=camel/trunk/examples/camel-example-cxf/src/main/resources/org/apache/camel/example/camel/transport/CamelDestination.xml}

Complete Howto and Example for attaching Camel to CXF

Better JMS Transport for CXF Webservice using Apache Camel 

  • No labels