Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

RxJava rx.Observable support

Introduction

RxJava 1 rx.Observable is supported on the client and the server side starting from CXF 3.2.0.

Client

Code Block
languagejava
@Test
    public void testObservableWithWebClient() throws Exception {
        String address = "http://localhost:" + PORT + "/observable/textAsync";
        WebClient wc = WebClient.create(address,
                                        Collections.singletonList(new ObservableRxInvokerProvider()));
        Observable<String> obs = wc.accept("text/plain")
            .rx(ObservableRxInvoker.class)
            .get(String.class);
        obs.map(s -> {
            return s + s;
        });

        Thread.sleep(3000);

        obs.subscribe(s -> assertDuplicateResponse(s));
    }
    @Test
    public void testObservableJaxrs21With404Response() throws Exception {
        String address = "http://localhost:" + PORT + "/observable/textAsync404";
        Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
            .target(address).request();
        b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
            s -> {
                fail("Exception expected");
            },
            t -> validateT((ExecutionException)t));
    }

    private void validateT(ExecutionException t) {
        assertTrue(t.getCause() instanceof NotFoundException);
    }
    private void assertDuplicateResponse(String s) {
        assertEquals("Hello, world!Hello, world!", s);
    }

 

Server

As a method return value

One simply returns an Observable from the method and the runtime will make sure the response is finalized once the Observable flow is complete

Combining with AsyncResponse

 

Code Block
languagejava
import org.apache.cxf.jaxrs.rx.server.ListAsyncSubscriber;
import org.apache.cxf.jaxrs.rx.server.JsonStreamingAsyncSubscriber;

@GET
    @Produces("application/json")
    @Path("textJsonImplicitListAsync")
    public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
        final HelloWorldBean bean1 = new HelloWorldBean();
        final HelloWorldBean bean2 = new HelloWorldBean("Ciao");
        new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException ex) {
                    // ignore
                }
                Observable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar));
            }
        }).start();

    }

 // return a JSON array, write to the output stream as soon as the next JSON object becomes available
 @GET
    @Produces("application/json")
    @Path("textJsonImplicitListAsyncStream")
    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
        Observable.just("Hello", "Ciao")
            .map(s -> new HelloWorldBean(s))
            .subscribeOn(Schedulers.computation())
            .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
    }