...
Code Block |
---|
|
import org.apache.camel.rx.*;
ReactiveCamel rx = new ReactiveCamel(camelContext);
Observable<Message> observable = rx.toObservable("activemq:MyMessages");
// we can now call filter/map/concat etc
filtered = observable.filter(m -> m.getHeader("foo") != null).map(m -> "Hello " + m.getBody());
|
If you know the type of the body of the message, you can use an overloaded version of toObservable() to pass in the class and get a typesafe Observable<T> back:
Code Block |
---|
|
import org.apache.camel.rx.*;
ReactiveCamel rx = new ReactiveCamel(camelContext);
Observable<Order> observable = rx.toObservable("seda:orders", Order.class);
// now lets filter and map using Java 7
Observable<String> largeOrderIds = observable.filter(new Func1<Order, Boolean>() {
public Boolean call(Order order) {
return order.getAmount() > 100.0;
}
}).map(new Func1<Order, String>() {
public String call(Order order) {
return order.getId();
}
});
|