Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: spelling

...

  • Processing routes that are composed fully of asynchronous processors do not use up threads waiting for processors to complete on blocking calls. This can increase the scalability of your system by reducing the number of threads needed to process the same workload.
  • Processing routes can be broken up into SEDA processing stages where different thread pools can process the different stages. This means that your routes can be processed concurrently.

Disadvantages:

  • Implementing asynchronous processors is more complex than implementing the synchronous versions.

...

We recommend that processors and components be implemented the more simple synchronous APIs unless you identify a performance of scaleability scalability requirement that dictates otherwise. A Processor whose process() method blocks for a long time would be good candidates for being converted into an asynchronous processor.

...

The AsyncProcessor defines a single process() method which is very similar to it's synchronous Processor.process() brethren. Here are the differences:

  • A non-null AsyncCallback MUST be supplied which will be notified when the exchange processing is completed.
  • It MUST not throw any exceptions that occurred while processing the exchange. Any such exceptions must be stored on the exchange's Exception property.
  • It MUST know if it will complete the processing synchronously or asynchronously. The method will return true if it does complete synchronously, otherwise it returns false.
  • When the processor has completed processing the exchange, it must call the callback.done(boolean sync) method. The sync parameter MUST match the value return returned by the process() method.

Implementing Processors that Use the AsyncProcessor API

All processors, even synchronous processors that do not implement that the AsyncProcessor interface, can be coerced to implement the AsyncProcessor interface. This is usually done when you are implementing a Camel component consumer that supports asynchronous completion of the exchanges that it is pushing through the Camel routes. Consumers are provided a Processor object when created. All Processor object can be cast coerced to a AsyncProcessor using the following API:

...

Normally, the the process call is is passed in an inline inner AsyncCallback class AsyncCallback instance which can reference the exchange object that was declared final. This allows it to to finish up any post processing that is needed when the called processor is done processing the exchange. See below for an example.

Code Block
java
java
final Exchange exchange = ...
AsyncProcessor asyncProcessor = ...
asyncProcessor.process(exchange, new AsyncCallback() {
    public void done(boolean sync) {
        if (exchange.isFailed()) {
            ... // Dodo failure processing.. perhaps rollback etc.
        } else {
            ... // processing completed successfully, finish up 
                // perhaps commit etc.
        }
    }
});

Asynchronous Route Sequence Scenarios

Now that we have understand understood the interface contract of the AsyncProcessor, and have seen how to make use of it when calling processors, lets looks a what the thread model/sequence scenarios will look like for some sample routes.

The Jetty component's consumers support async processing by using continuations. Suffice to say it can take an a http request and pass it to a camel route for async processing. If the processing is indeed async, it uses as Jetty continuation so that the http request is 'parked' and the thread is released. Once the camel route finishes processing the request, the jetty component uses the AsyncCallback to tell Jetty to 'un-park' the request. Jetty un-parks the request, the http response returned using the result of the exchange processing.

Notice that the jetty continuations feature is only used "If the processing is indeed async". This is why AsyncProcessor.process() implemenations implementations MUST accurately report if request is completed synchronously or not.

The jhc component's producer allows you to make HTTP requests and implement the AsyncProcessor interface. A route the that uses both the jetty asynchronous consumer and the jhc asynchronous producer will be a fully asynchronous route and has some nice attributes that can be seen if we take a look at a sequence diagram of the processing route. For the route:

...

The diagram simplifies things by making it looks like processors implement the AsyncCallback interface when in reality the AsyncCallback interfaces are inline inner classes, but it illustrates the processing flow and shows how 2 separate threads are used to complete the processing of the original http request. The first thread is synchronous up until processing hits the jhc producer which issues the Http http request. It then reports that the exchange processing will complete async since it will use an a NIO to complete getting the response back. Once the jhc component has received a full response it use uses AsyncCallback.done() method to notify the caller. These callback notifications continue up until it reaches the original jetty consumer which then un-parks the http request and completes it by providing the response.

...

Lets say we have 2 custom processors, MyValidator and MyTransformation, both of which are synchronous processors. Lets say we want to load file from the data/in directory validate them with the MyValidator() processor, Transform them into JPA java objects using MyTransformation and then insert them into the database using the jpa JPA component. Lets say that the transformation process takes quite a bit of time and we want to allocate 20 threads to do parallel transformations of the input files. The solution is to make use of the thread processor. The thread is AsyncProcessor that forces subsequent processing in asynchronous thread from a thread pool.

...

Generally speaking you get better throughput processing when you process things synchronously. This is due to the fact that starting up an asynchronous thread and and doing a context switch to it adds a little bit of of overhead. So it is generally encouraged that AsyncProcessors do as much work as they can synchronously. When they get to a step that would block for a long time, at that point they should return from the process call and let the caller know that it will be completing the call asynchronously.