Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Project Reactor support

Introduction

Project Reactor support for Flux and Mono are provided from the Rx Project Reactor extension.  This allows you to develop applications in a reactive fashion on both the client and server side.

org.apache.cxf/cxf-rt-rs-extension-reactor/3.2.3 and io.projectreactor/reactor-core/3.1.0.RELEASE dependencies are required.

Client

The following simple example uses ObservableRxInvoker. org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker can be used if needed instead.  Reviewing our systests for reactive may help as well.

Code Block
languagejava
String address = "http://localhost:" + PORT + "/reactor/flux/textJson";
List<HelloWorldBean> collector = new ArrayList<>();
ClientBuilder.newClient()
        .register(new JacksonJsonProvider())
        .register(new ReactorInvokerProvider())
        .target(address)
        .request("application/json")
        .rx(ReactorInvoker.class)
        .get(HelloWorldBean.class)
        .doOnNext(collector::add)
        .subscribe();
// make sure to do a Thread.sleep or wait for the response to come back if you're trying to collect

Server

As a method return value

One simply returns reactor.core.publisher.Mono or reactor.core.publisher.Flux from the method and the runtime will make sure the response is finalized once the Flux/Mono flow is complete.

The only requirement is that one has to register a custom JAX-RS invoker, org.apache.cxf.jaxrs.reactor.server.ReactorInvoker. It does all the default JAXRSInvoker does and only checks if Flux or Mono are returned - if yes then it links it internally with the JAX-RS AsyncResponse.  The invoker can be automatically registered with a lot of sensible defaults by using org.apache.cxf.jaxrs.reactor.server.ReactorCustomizer.

The built in invoker handles Flux and Mono based on proper semantics.  For instance, reading a JAX-RS response object is always a Mono (0 to 1 elements), but any method containing flux will convert to a Flux operation meant to contain 0 to n elements.  You would typically use a Flux on a get with an arbitrary data set converted from a JSON array, Mono would likely be the right solution for any time you want to read the Response object.

Combining Flux/Mono with AsyncResponse

Code Block
languagejava
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
sf.setProvider(new JacksonJsonProvider());
new ReactorCustomizer().customize(sf); // use a JAXRSServerFactoryCustomizationExtension to customize the server
sf.setResourceClasses(FluxService.class);
sf.setResourceProvider(FluxService.class,
                       new SingletonResourceProvider(new FluxService(), true));
sf.setAddress("http://localhost:" + PORT + "/");
server = sf.create();

 

 

Code Block
languagejava
import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

@GET
@Produces("application/json")
@Path("textJsonImplicitListAsyncStream")
public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) {
    Flux.just("Hello", "Ciao")
            .map(HelloWorldBean::new)
            .subscribeOn(Schedulers.parallel())
            .subscribe(new JsonStreamingAsyncSubscriber<>(ar));
}
 
// or you can just return the Flux
@GET
@Produces("application/json")
@Path("textJsonImplicitListAsyncStream2")
public Flux<HelloWorldBean> getJsonImplicitListStreamingAsync2() {
    return Flux.just("Hello", "Ciao")
            .map(HelloWorldBean::new)
            .subscribeOn(Schedulers.parallel());
}