Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

RxJava 1 rx.Observable is supported on the client and the server side starting from CXF 3.2.0.

org.apache.cxf/cxf-rt-rs-extension-rx/3.2.0 dependency is required.

Client

Code Block
languagejava
import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
 
@Test
    public void testObservableWithWebClient() throws Exception {
        String address = "http://localhost:" + PORT + "/observable/textAsync";
        WebClient wc = WebClient.create(address,
                                        Collections.singletonList(new ObservableRxInvokerProvider()));
        Observable<String> obs = wc.accept("text/plain")
            .rx(ObservableRxInvoker.class)
            .get(String.class);
        obs.map(s -> {
            return s + s;
        });

        Thread.sleep(3000);

        obs.subscribe(s -> assertDuplicateResponse(s));
    }
    @Test
    public void testObservableJaxrs21With404Response() throws Exception {
        String address = "http://localhost:" + PORT + "/observable/textAsync404";
        Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
            .target(address).request();
        b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
            s -> {
                fail("Exception expected");
            },
            t -> validateT((ExecutionException)t));
    }

    private void validateT(ExecutionException t) {
        assertTrue(t.getCause() instanceof NotFoundException);
    }
    private void assertDuplicateResponse(String s) {
        assertEquals("Hello, world!Hello, world!", s);
    }

...