Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

RxJava 2 Flowable and Observable are supported on the client and the server side starting from CXF 3.2.0.  Starting with CXF 3.2.3, RxJava and RxJava2 support were split into different modules, the below Maven coordinates represent state of CXF 3.2.3.

org.apache.cxf/cxf-rt-rs-extension-rxrx2/3.2.0 3 and io.reactivex.rxjava2/rxjava/2.1.3 dependencies are required.

...

Code Block
languagejava
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
sf.setInvoker(new FlowableInvoker());
sf.setProvider(new JacksonJsonProvider());
StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>ReactiveIOCustomizer();
streamProvider.setProduceMediaTypes(Collections.singletonList("application/json"));
sf.setProvider(streamProvider);.customize(sf); // use a JAXRSServerFactoryCustomizationExtension to customize the server
sf.setResourceClasses(RxJava2FlowableService.class);
sf.setResourceProvider(RxJava2FlowableService.class,
                       new SingletonResourceProvider(new RxJava2FlowableService(), true));
sf.setAddress("http://localhost:" + PORT + "/");
server = sf.create();

...

Code Block
languagejava
import org.apache.cxf.jaxrs.rx2.server.JsonStreamingAsyncSubscriber;
import io.reactive.Flowable;
import io.reactivex.schedulers.Schedulers;

    // return a JSON array, write to the output stream as soon as the next JSON object becomes available
    @GET
    @Produces("application/json")
    @Path("textJsonImplicitListAsyncStream")
    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
        Flowable.just("Hello", "Ciao")
            .map(s -> new HelloWorldBean(s))
            .subscribeOn(Schedulers.computation())
            .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
    }

...

org.apache.cxf/cxf-rt-rs-extension-rx/3.2.0 3 and io.reactivex/rxjava/1.3.0 dependencies are required.

...

Code Block
languagejava
import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
import rx.Observable;
 
    @Test
    public void testObservableWithWebClient() throws Exception {
        String address = "http://localhost:" + PORT + "/observable/textAsync";
        WebClient wc = WebClient.create(address,
                                        Collections.singletonList(new ObservableRxInvokerProvider()));
        Observable<String> obs = wc.accept("text/plain")
            .rx(ObservableRxInvoker.class)
            .get(String.class);
        obs.map(s -> {
            return s + s;
        });

        Thread.sleep(3000);

        obs.subscribe(s -> assertDuplicateResponse(s));
    }
    @Test
    public void testObservableJaxrs21With404Response() throws Exception {
        String address = "http://localhost:" + PORT + "/observable/textAsync404";
        Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
            .target(address).request();
        b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
            s -> {
                fail("Exception expected");
            },
            t -> validateT((ExecutionException)t));
    }

    private void validateT(ExecutionException t) {
        assertTrue(t.getCause() instanceof NotFoundException);
    }
    private void assertDuplicateResponse(String s) {
        assertEquals("Hello, world!Hello, world!", s);
    }

...

The only requirement is that one has to register a custom JAX-RS invoker, org.apache.cxf.jaxrs.rx.server.ObservableInvoker. It does all the default JAXRSInvoker does and only checks if Observable is returned - if yes then it links it internally with the JAX-RS AsyncResponse. 

Code Block
languagejava
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
sf.setProvider(new JacksonJsonProvider());
new ObservableCustomizer().customize(sf); // use a JAXRSServerFactoryCustomizationExtension to customize the server
sf.setResourceClasses(RxJavaObservableService.class);
sf.setResourceProvider(RxJavaObservableService.class,
                       new SingletonResourceProvider(new RxJavaObservableService(), true));
sf.setAddress("http://localhost:" + PORT + "/");
server = sf.create();