THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
RxJava 1 rx.Observable is supported on the client and the server side starting from CXF 3.2.0.
org.apache.cxf/cxf-rt-rs-extension-rx/3.2.0 dependency is required.
Client
Code Block | ||
---|---|---|
| ||
import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider;
@Test
public void testObservableWithWebClient() throws Exception {
String address = "http://localhost:" + PORT + "/observable/textAsync";
WebClient wc = WebClient.create(address,
Collections.singletonList(new ObservableRxInvokerProvider()));
Observable<String> obs = wc.accept("text/plain")
.rx(ObservableRxInvoker.class)
.get(String.class);
obs.map(s -> {
return s + s;
});
Thread.sleep(3000);
obs.subscribe(s -> assertDuplicateResponse(s));
}
@Test
public void testObservableJaxrs21With404Response() throws Exception {
String address = "http://localhost:" + PORT + "/observable/textAsync404";
Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider())
.target(address).request();
b.rx(ObservableRxInvoker.class).get(String.class).subscribe(
s -> {
fail("Exception expected");
},
t -> validateT((ExecutionException)t));
}
private void validateT(ExecutionException t) {
assertTrue(t.getCause() instanceof NotFoundException);
}
private void assertDuplicateResponse(String s) {
assertEquals("Hello, world!Hello, world!", s);
} |
...