Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The camel-rx library provides Camel support for the Reactive Extensions (RX) using the RxJava library . So so that:

  • Camel users

...

  • can use the RxJava API for processing messages on endpoints using a typesafe composable API

...

...

...

Background on RX

You can think of RX as providing an API similar to Java 8 / Groovy / Scala like API collections (methods like filter, forEach, map, flatMap etc) - but which operates on a stream of events rather than a collection. So you could think of RX as like working with asynchronous push based collections (rather than the traditional synchronous pull based collections).

In RX, if you have an Observable<T> which behaves quite like a Collection<T> in Java 8 so you can filter/map/concat and so forth. The Observable<T> then acts as a typesafe composable API for working with asynchronous events.

Observing events on Camel endpoints

You can then create an Observable<Message> from any endpoint using the ReactiveCamel helper class and the toObservable() method.

Observing events on Camel endpoints

Code Block
java
java
import org.apache.camel.rx.*;

ReactiveCamel rx = new ReactiveCamel(camelContext);
Observable<Message> observable = rx.toObservable("activemq:MyMessages");

// we can now call filter/map/concat etc
filtered = observable.filter(m -> m.getHeader("foo") != null).map(m -> "Hello " + m.getBody()); 

...

If you have an Observable<T> from some other library; or have created one from a Future<T> using RxJava and you wish to send the events on the observable to a Camel endpoint you can use the following code sendTo() method on ReactiveCamel:

Code Block
java
java
import org.apache.camel.rx.*;

// take some observable from somewhere
Observable<T> observable = ...;
ReactiveCamel rx = new ReactiveCamel(camelContext);

// lets send the events to a message queue
rx.sendTo(observable, "activemq:MyQueue");