RxJava2 Flowable and Observable support
Introduction
RxJava 2 Flowable and Observable are supported on the client and the server side starting from CXF 3.2.0.
org.apache.cxf/cxf-rt-rs-extension-rx/3.2.0 and io.reactivex.rxjava2/rxjava/2.1.3 dependencies are required.
Client
The following simple example uses FlowableRxInvoker. org.apache.cxf.jaxrs.rx2.client.ObservableRxInvoker can be used if needed instead.
Server
As a method return value
One simply returns io.reactivex.Flowable from the method and the runtime will make sure the response is finalized once the Flowable flow is complete.
The only requirement is that one has to register a custom JAX-RS invoker, org.apache.cxf.jaxrs.rx2.server.FlowableInvoker. It does all the default JAXRSInvoker does and only checks if Flowable is returned - if yes then it links it internally with the JAX-RS AsyncResponse.
If needed, io.reactivex.Observable can be returned instead (with org.apache.cxf.jaxrs.rx2.server.ObservableInvoker registered)
Combining Flowable with AsyncResponse
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); sf.setInvoker(new FlowableInvoker()); sf.setProvider(new JacksonJsonProvider()); StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>(); streamProvider.setProduceMediaTypes(Collections.singletonList("application/json")); sf.setProvider(streamProvider); sf.setResourceClasses(RxJava2FlowableService.class); sf.setResourceProvider(RxJava2FlowableService.class, new SingletonResourceProvider(new RxJava2FlowableService(), true)); sf.setAddress("http://localhost:" + PORT + "/"); server = sf.create();
import org.apache.cxf.jaxrs.rx2.server.JsonStreamingAsyncSubscriber; import io.reactive.Flowable; import io.reactivex.schedulers.Schedulers; // return a JSON array, write to the output stream as soon as the next JSON object becomes available @GET @Produces("application/json") @Path("textJsonImplicitListAsyncStream") public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) { Flowable.just("Hello", "Ciao") .map(s -> new HelloWorldBean(s)) .subscribeOn(Schedulers.computation()) .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar)); }
RxJava1 rx.Observable support
Introduction
RxJava 1 rx.Observable is supported on the client and the server side starting from CXF 3.2.0.
org.apache.cxf/cxf-rt-rs-extension-rx/3.2.0 and io.reactivex/rxjava/1.3.0 dependencies are required.
Client
import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker; import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider; import rx.Observable; @Test public void testObservableWithWebClient() throws Exception { String address = "http://localhost:" + PORT + "/observable/textAsync"; WebClient wc = WebClient.create(address, Collections.singletonList(new ObservableRxInvokerProvider())); Observable<String> obs = wc.accept("text/plain") .rx(ObservableRxInvoker.class) .get(String.class); obs.map(s -> { return s + s; }); Thread.sleep(3000); obs.subscribe(s -> assertDuplicateResponse(s)); } @Test public void testObservableJaxrs21With404Response() throws Exception { String address = "http://localhost:" + PORT + "/observable/textAsync404"; Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider()) .target(address).request(); b.rx(ObservableRxInvoker.class).get(String.class).subscribe( s -> { fail("Exception expected"); }, t -> validateT((ExecutionException)t)); } private void validateT(ExecutionException t) { assertTrue(t.getCause() instanceof NotFoundException); } private void assertDuplicateResponse(String s) { assertEquals("Hello, world!Hello, world!", s); }
Server
As a method return value
One simply returns an Observable from the method and the runtime will make sure the response is finalized once the Observable flow is complete.
The only requirement is that one has to register a custom JAX-RS invoker, org.apache.cxf.jaxrs.rx.server.ObservableInvoker. It does all the default JAXRSInvoker does and only checks if Observable is returned - if yes then it links it internally with the JAX-RS AsyncResponse.