Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Firstly we need to have a dummy service with an intermediary interceptor registered. This intermediary interceptor is positioned sits at the very beginning of the interceptor chain, which this is to make sure the intermediary interceptor will be is the first interceptor being invoked in the message pipeline. The intermediary interceptor will scan scans the incoming message for example, detect the schema namespace, then direct directs the message to the desired endpoint according to user programmed strategy.

...

Code Block
titleExample 1: The server - this server has three endpoints: one endpoint for the dummy service, another two endpoints are different vrersions of Greeter service

import javax.xml.ws.Endpoint;

import org.apache.cxf.jaxws.EndpointImpl;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
import org.apache.hello_world_mixedstyle.GreeterImplMixedStyle;


public class Server extends AbstractBusTestServerBase {

    protected void run() {
        //implementor1 and implementor2 are published using local transport
        Object implementor1 = new GreeterImplMixedStyle();
        String address1 = "httplocal://localhost:9027SoapContext/SoapContext1version1/SoapPort";
        Endpoint.publish(address1, implementor1);

        Object implementor2 = new GreeterImplMixedStyle();
        String address2 = "httplocal://localhost:9027SoapContext/SoapContext2version2/SoapPort";
        Endpoint.publish(address2, implementor2);
        
        //A dummy service that acts as a routing mediator
        Object implementor = new GreeterImplMixedStyle();
        String address = "http://localhost:9027/SoapContext/SoapPort";
        javax.xml.ws.Endpoint jaxwsEndpoint = Endpoint.publish(address, implementor);  
        
        //Register a MediatorInInterceptor on this dummy service
        EndpointImpl jaxwsEndpointImpl = (EndpointImpl)jaxwsEndpoint;
        org.apache.cxf.endpoint.Server server = jaxwsEndpointImpl.getServer();
        org.apache.cxf.endpoint.Endpoint endpoint = server.getEndpoint();
        endpoint.getInInterceptors().add(new MediatorInInterceptor());
    }

    public static void main(String[] args) {
        try {
            Server s = new Server();
            s.start();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.exit(-1);
        } finally {
            System.out.println("done!");
        }
    }
}
Code Block
titleExample 2: The intermediary interceptor
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.logging.Logger;

import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;

import org.apache.cxf.Bus;
import org.apache.cxf.binding.soap.SoapMessage;
import org.apache.cxf.binding.soap.SoapVersion;
import org.apache.cxf.binding.soap.SoapVersionFactory;
import org.apache.cxf.bus.CXFBusFactory;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.endpoint.ServerRegistry;
import org.apache.cxf.interceptor.InterceptorChain;
import org.apache.cxf.interceptor.StaxInInterceptor;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.AbstractPhaseInterceptor;
import org.apache.cxf.phase.Phase;
import org.apache.cxf.staxutils.DepthXMLStreamReader;
import org.apache.cxf.staxutils.StaxUtils;
import org.apache.cxf.transport.MessageObserver;


public class MediatorInInterceptor extends AbstractPhaseInterceptor<SoapMessage> {
    private static final Logger LOG = Logger.getLogger(MediatorInInterceptor.class.getName());

    public MediatorInInterceptor() {
        super();
        setPhase(Phase.POST_STREAM);
        addBefore(StaxInInterceptor.class.getName());
    }

    public void handleMessage(SoapMessage message) {
        if (isGET(message)) {
            LOG.info("StaxInInterceptor skipped in HTTP GET method")String schemaNamespace = "";
        InterceptorChain chain   return;
        }

= message.getInterceptorChain();

        String//scan schemaNamespacethe = "";
        InterceptorChain chain = message.getInterceptorChain();
incoming message for its schema namespace
        try {
            //create a buffered stream so that we canget roll back tothe original stream after finishing scaning
            InputStream is = message.getContent(InputStream.class);
            BufferedInputStream pis = new BufferedInputStream(is);
            pis.mark(pis.available());
            message.setContent(InputStream.class, pis);

            //TODO: need to process attachements

            
            //Scan the schema namespace, which is used to indicate the service version
            String encoding = (String)message.get(Message.ENCODING);
            XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(pis, encoding);
            DepthXMLStreamReader xmlReader = new DepthXMLStreamReader(reader);

            if (xmlReader.nextTag() == XMLStreamConstants.START_ELEMENT) {
                String ns = xmlReader.getNamespaceURI();
                SoapVersion soapVersion = SoapVersionFactory.getInstance().getSoapVersion(ns);
                //advance just past header
                StaxUtils.toNextTag(xmlReader, soapVersion.getBody());

                // advance just past body.
                xmlReader.nextTag();
            }

            schemaNamespace = xmlReader.getName().getNamespaceURI();

            //Roll back to the original inputStream
            pis.reset();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (XMLStreamException e) {
            e.printStackTrace();
        }

        //Look up for all available endpoints registered on the bus
        Bus bus = CXFBusFactory.getDefaultBus();
        ServerRegistry serverRegistry = bus.getExtension(ServerRegistry.class);
        List<Server> servers = serverRegistry.getServers();

        //if the incoming message has a namespace contained "2007/03/21", we redirect the message
        //to the new version of service on endpoint "local://localhost:9027/SoapContext/version2/SoapPort"
        Server targetServer = null;
        for (Server server : servers) {
            targetServer = server;
            String address = server.getEndpoint().getEndpointInfo().getAddress();
            if (schemaNamespace.indexOf("2007/03/21") != -1) {
                if (address.indexOf("SoapContext2version2") != -1) {
                    break;
                }
            } else if (address.indexOf("SoapContext1version1") != -1) {
                break;
            }
        }

        //Redirect the request
        MessageObserver obmo = targetServer.getMessageObserver();
        obmo.onMessage(message);

        //Now the response has been put in the message, abort the chain 
        chain.abort();
    }

}

A couple of things to note:

  1. The MediatorInInterceptor is for SOAP binding, you can write a similar interceptor for XML binding etc.
  2. We call chain.abort at the end of this interceptor to stop any further processing in the dummy service.
  3. In this example, the MediatorInInterceptor redirects the request to endpoints that are implementor1 and implementor2 are published using local transport. This is achieved by using an address like "local://SoapContext/version1/SoapPort". The MediatorInInterceptor looks up ServerRegistry to find endpoints hosted in the same server by looking up ServerRegistry, this way we do not need to go through transport again. But if you are going to do a transport bridging (eg, jms-to-http) or your services cannot be hosted in a same server with the dummy service, you can initiate a client proxy in the interceptor to invoke the real endpoint. But in this case, you need to figure out how to send the response back through MediatorInInterceptor, obviously it wont work this time by calling chain.abortserver then do the re-dispatch.
  4. The MediatorInInterceptor is set to POST_STREAM phase and before StaxInInterceptor, this makes sure it is the very first interceptor to be invoked.