...
For a more in depth background on RX check out the RxJava wiki on Observable and the Reactive pattern or the Microsoft RX documentation.
...
You can create an Observable<Message> from any endpoint using the ReactiveCamel helper class and the toObservable() method.
Code Block | ||||
---|---|---|---|---|
| ||||
import org.apache.camel.rx.*;
ReactiveCamel rx = new ReactiveCamel(camelContext);
Observable<Message> observable = rx.toObservable("activemq:MyMessages");
// we can now call filter/map/concat etc
filtered = observable.filter(m -> m.getHeader("foo") != null).map(m -> "Hello " + m.getBody());
|
If you know the type of the message payload (its body), you can use an overloaded version of toObservable() to pass in the class and get a typesafe Observable<T> back:
Code Block | ||||
---|---|---|---|---|
| ||||
import org.apache.camel.rx.*;
ReactiveCamel rx = new ReactiveCamel(camelContext);
Observable<Order> observable = rx.toObservable("seda:orders", Order.class);
// now lets filter and map using Java 7
Observable<String> largeOrderIds = observable.filter(new Func1<Order, Boolean>() {
public Boolean call(Order order) {
return order.getAmount() > 100.0;
}
}).map(new Func1<Order, String>() {
public String call(Order order) {
return order.getId();
}
});
|
...
If you have an Observable<T> from some other library; or have created one from a Future<T> using RxJava and you wish to send the events on the observable to a Camel endpoint you can use the sendTo() method on ReactiveCamel:
Code Block | ||||
---|---|---|---|---|
| ||||
import org.apache.camel.rx.*;
// take some observable from somewhere
Observable<T> observable = ...;
ReactiveCamel rx = new ReactiveCamel(camelContext);
// lets send the events to a message queue
rx.sendTo(observable, "activemq:MyQueue");
|
...
You can use the ObservableMessage or ObservableBody classes which both have an abstract configure() method like RouteBuilder. In the configure method you can then process the Observable<T> for the Camel Message or the message body.
e.g.
Code Block | ||||
---|---|---|---|---|
| ||||
public class MyObservableBody extends ObservableBody<String> {
public MyObservableBody() {
super(String.class);
}
protected void configure(Observable<String> observable) {
// lets process the messages using the RX API
observable.map(new Func1<String, String>() {
public String call(String body) {
return "Hello " + body;
}
}).subscribe(new Action1<String>() {
public void call(String body) {
template.sendBody(resultEndpoint, body);
}
});
}
}
...
// now lets use this inside a route...
from("seda:foo").process(new MyObservableBody());
|
Another approach, if you are consuming directly from Camel using the Bean Integration is to just use the RxJava Subject directly:
Code Block | ||||
---|---|---|---|---|
| ||||
import rx.subjects.Subject;
public class MyThing {
private final Subject<String> observable = Subject.create();
public MyThing() {
// now process the observable somehow....
}
@Consume(uri="activemq:myqueue")
public void onMessageBody(String body) {
subject.onNext(body);
}
}
|
...