Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

For a more in depth background on RX check out the RxJava wiki on Observable and the Reactive pattern or the Microsoft RX documentation.

...

You can create an Observable<Message> from any endpoint using the ReactiveCamel helper class and the toObservable() method.

Code Block
java
java

import org.apache.camel.rx.*;

ReactiveCamel rx = new ReactiveCamel(camelContext);
Observable<Message> observable = rx.toObservable("activemq:MyMessages");

// we can now call filter/map/concat etc
filtered = observable.filter(m -> m.getHeader("foo") != null).map(m -> "Hello " + m.getBody()); 

If you know the type of the message payload (its body), you can use an overloaded version of toObservable() to pass in the class and get a typesafe Observable<T> back:

Code Block
java
java

import org.apache.camel.rx.*;

ReactiveCamel rx = new ReactiveCamel(camelContext);
Observable<Order> observable = rx.toObservable("seda:orders", Order.class);

// now lets filter and map using Java 7
Observable<String> largeOrderIds = observable.filter(new Func1<Order, Boolean>() {
    public Boolean call(Order order) {
        return order.getAmount() > 100.0;
    }
}).map(new Func1<Order, String>() {
    public String call(Order order) {
        return order.getId();
    }
});

...

If you have an Observable<T> from some other library; or have created one from a Future<T> using RxJava and you wish to send the events on the observable to a Camel endpoint you can use the sendTo() method on ReactiveCamel:

Code Block
java
java

import org.apache.camel.rx.*;

// take some observable from somewhere
Observable<T> observable = ...;
ReactiveCamel rx = new ReactiveCamel(camelContext);

// lets send the events to a message queue
rx.sendTo(observable, "activemq:MyQueue");

...

You can use the ObservableMessage or ObservableBody classes which both have an abstract configure() method like RouteBuilder. In the configure method you can then process the Observable<T> for the Camel Message or the message body.

e.g.

Code Block
java
java

    public class MyObservableBody extends ObservableBody<String> {
        public MyObservableBody() {
            super(String.class);
        }

        protected void configure(Observable<String> observable) {
            // lets process the messages using the RX API
            observable.map(new Func1<String, String>() {
                public String call(String body) {
                    return "Hello " + body;
                }
            }).subscribe(new Action1<String>() {
                public void call(String body) {
                    template.sendBody(resultEndpoint, body);
                }
            });
        }
    }
    ...
    // now lets use this inside a route...
    from("seda:foo").process(new MyObservableBody());

Another approach, if you are consuming directly from Camel using the Bean Integration is to just use the RxJava Subject directly:

Code Block
java
java

import rx.subjects.Subject;

public class MyThing {
    private final Subject<String> observable = Subject.create();
  
    public MyThing() {
         // now process the observable somehow....
    }

    @Consume(uri="activemq:myqueue")
    public void onMessageBody(String body) {
      subject.onNext(body);
    }
}

...