Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The camel-rx library provides Camel support for the Reactive Extensions (RX) using the RxJava library. So Camel users you can use the RxJava API for processing messages on endpoints using a typesafe composable API; for existing RxJava users you get to use all of the Camel transports and protocols with your RxJava code.

You can think of RX as providing an API similar to Java 8 / Groovy / Scala like API collections (methods like filter, forEach, map, flatMap etc) - but which operates on a stream of events rather than a collection. So you could think of RX as like working with asynchronous push based collections (rather than the traditional synchronous pull based collections).

...

You can then create an Observable<Message> from any endpoint using the ReactiveCamel helper class.

Observing events on Camel endpoints

Code Block
java
java
import org.apache.camel.rx.*;

ReactiveCamel rx = new ReactiveCamel(camelContext);
Observable<Message> observable = rx.toObservable("activemq:MyMessages");

// we can now call filter/map/concat etc
filtered = observable.filter(m -> m.getHeader("foo") != null).map(m -> "Hello " + m.getBody()); 

...

Code Block
java
java
import org.apache.camel.rx.*;

ReactiveCamel rx = new ReactiveCamel(camelContext);
Observable<Order> observable = rx.toObservable("seda:orders", Order.class);

// now lets filter and map using Java 7
Observable<String> largeOrderIds = observable.filter(new Func1<Order, Boolean>() {
    public Boolean call(Order order) {
        return order.getAmount() > 100.0;
    }
}).map(new Func1<Order, String>() {
    public String call(Order order) {
        return order.getId();
    }
});

Sending Observable<T> events to Camel endpoints

If you have an Observable<T> from some other library; or have created one from a Future<T> using RxJava and you wish to send the events on the observable to a Camel endpoint you can use the following code:

Code Block
java
java

import org.apache.camel.rx.*;

// take some observable from somewhere
Observable<T> observable = ...;
ReactiveCamel rx = new ReactiveCamel(camelContext);

// lets send the events to a message queue
rx.sendTo(observable, "activemq:MyQueue");