...
The camel-rx library provides Camel support for the Reactive Extensions (RX) using the RxJava library . So so that:
- Camel users
...
- can use the RxJava API for processing messages on endpoints using a typesafe composable API
...
- RxJava users
...
- get to use all of the Camel transports and protocols
...
- from within the RxJava API
Background on RX
You can think of RX as providing an API similar to Java 8 / Groovy / Scala like API collections (methods like filter, forEach, map, flatMap etc) - but which operates on a stream of events rather than a collection. So you could think of RX as like working with asynchronous push based collections (rather than the traditional synchronous pull based collections).
In RX, if you have an Observable<T> which behaves quite like a Collection<T> in Java 8 so you can filter/map/concat and so forth. The Observable<T> then acts as a typesafe composable API for working with asynchronous events.
Observing events on Camel endpoints
You can then create an Observable<Message> from any endpoint using the ReactiveCamel helper class and the toObservable() method.
Observing events on Camel endpoints
Code Block | ||||
---|---|---|---|---|
| ||||
import org.apache.camel.rx.*; ReactiveCamel rx = new ReactiveCamel(camelContext); Observable<Message> observable = rx.toObservable("activemq:MyMessages"); // we can now call filter/map/concat etc filtered = observable.filter(m -> m.getHeader("foo") != null).map(m -> "Hello " + m.getBody()); |
...
If you have an Observable<T> from some other library; or have created one from a Future<T> using RxJava and you wish to send the events on the observable to a Camel endpoint you can use the following code sendTo() method on ReactiveCamel:
Code Block | ||||
---|---|---|---|---|
| ||||
import org.apache.camel.rx.*; // take some observable from somewhere Observable<T> observable = ...; ReactiveCamel rx = new ReactiveCamel(camelContext); // lets send the events to a message queue rx.sendTo(observable, "activemq:MyQueue"); |