Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
java
java
import org.apache.camel.rx.*;

// take some observable from somewhere
Observable<T> observable = ...;
ReactiveCamel rx = new ReactiveCamel(camelContext);

// lets send the events to a message queue
rx.sendTo(observable, "activemq:MyQueue");

Embedding some RxJava processing inside a Camel route

Sometimes you may wish to use a Camel route to consume messages, perform content based routing, transformation, deal with data format marshalling and so forth and then within the route invoke some typesafe RxJava event processing.

One approach is to just send messages from inside the Camel route to an endpoint; then use the toObservable() method to bind the endpoint to an Observable<T>.

However if you prefer to embed the RxJava processing of messages inside your route there are 2 helper classes which can be used to wrap up the RxJava processing as a Camel Processor that can be easily embed into a Camel route.

You can use the ObservableMessage or ObservableBody classes which both have an abstract configure() method like RouteBuilder. In the configure method you can then process the Observable<T> for the Camel Message or the message body.

e.g.

Code Block
java
java

    public class MyObservableBody extends ObservableBody<String> {
        public MyObservableBody() {
            super(String.class);
        }

        protected void configure(Observable<String> observable) {
            // lets process the messages using the RX API
            observable.map(new Func1<String, String>() {
                public String call(String body) {
                    return "Hello " + body;
                }
            }).subscribe(new Action1<String>() {
                public void call(String body) {
                    template.sendBody(resultEndpoint, body);
                }
            });
        }
    }
    ...
    // now lets use this inside a route...
    from("seda:foo").process(new MyObservableBody());

Another approach, if you are consuming directly from Camel using the Bean Integration is to just use the RxJava Subject directly:

Code Block
java
java

import rx.subjects.Subject;

public class MyThing {
    private final Subject<String> observable = Subject.create();
  
    public MyThing() {
         // now process the observable somehow....
    }

    @Consume(uri="activemq:myqueue")
    public void onMessageBody(String body) {
      subject.onNext(body);
    }
}

Though using the toObservable on ReactiveCamel is maybe a little simpler.