Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titleExample 2: The intermediary interceptor
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.logging.Logger;

import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;

import org.apache.cxf.Bus;
import org.apache.cxf.binding.soap.SoapMessage;
import org.apache.cxf.binding.soap.SoapVersion;
import org.apache.cxf.binding.soap.SoapVersionFactory;
import org.apache.cxf.bus.CXFBusFactory;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.endpoint.ServerRegistry;
import org.apache.cxf.interceptor.InterceptorChain;
import org.apache.cxf.interceptor.StaxInInterceptor;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.AbstractPhaseInterceptor;
import org.apache.cxf.phase.Phase;
import org.apache.cxf.staxutils.DepthXMLStreamReader;
import org.apache.cxf.staxutils.StaxUtils;
import org.apache.cxf.transport.MessageObserver;


public class MediatorInInterceptor extends AbstractPhaseInterceptor<SoapMessage> {
    private static final Logger LOG = Logger.getLogger(MediatorInInterceptor.class.getName());

    public MediatorInInterceptor() {
        super();
        setPhase(Phase.POST_STREAM);
        addBefore(StaxInInterceptor.class.getName());
    }

    public void handleMessage(SoapMessage message) {
        if (isGET(message)) {
            LOG.info("StaxInInterceptor skipped in HTTP GET method");
            return;
        }

        String schemaNamespace = "";
        InterceptorChain chain = message.getInterceptorChain();

        try {
            //backup thecreate a buffered stream so that we can roll back to original inputStreamstream after forfinishing re-dispathingscaning
            InputStream is = message.getContent(InputStream.class);
            BufferedInputStream pis = new BufferedInputStream(is);
            pis.mark(pis.available());
            message.setContent(InputStream.class, pis);

            //TODO: need to process attachements

            
            //Find outScan the schema namespace, which is used to indicate the schemaservice version
            String encoding = (String)message.get(Message.ENCODING);
            XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(pis, encoding);
            DepthXMLStreamReader xmlReader = new DepthXMLStreamReader(reader);

            if (xmlReader.nextTag() == XMLStreamConstants.START_ELEMENT) {
                String ns = xmlReader.getNamespaceURI();
                SoapVersion soapVersion = SoapVersionFactory.getInstance().getSoapVersion(ns);
                StaxUtils.toNextTag(xmlReader, soapVersion.getBody());

                // advance just past body.
                xmlReader.nextTag();
            }

            schemaNamespace = xmlReader.getName().getNamespaceURI();

            //Roll back any changes to the original inputStream
            pis.reset();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (XMLStreamException e) {
            e.printStackTrace();
        }

        Bus bus = CXFBusFactory.getDefaultBus();
        ServerRegistry serverRegistry = bus.getExtension(ServerRegistry.class);
        List<Server> servers = serverRegistry.getServers();

        Server targetServer = null;
        for (Server server : servers) {
            targetServer = server;
            String address = server.getEndpoint().getEndpointInfo().getAddress();
            if (schemaNamespace.indexOf("version22007/03/21") != -1) {
                if (address.indexOf("SoapContext2") != -1) {
                    break;
                }
            } else if (address.indexOf("SoapContext1") != -1) {
                break;
            }
        }

        //Redirect the request
        MessageObserver ob = targetServer.getMessageObserver();
        ob.onMessage(message);

        chain.abort();
    }

}

A couple of things to note:

  1. The MediatorInInterceptor is for SOAP binding, you can write a similar interceptor for XML binding etc.
  2. We call chain.abort at the end of this interceptor to stop any further processing in the dummy service.
  3. In this example, the MediatorInInterceptor redirects the request to endpoints that are hosted in the same server by looking up ServerRegistry, this way we do not need to go through transport again. But if you are going to do a transport bridging (eg, jms-to-http) or your services cannot be hosted in a same server with the dummy service, you can initiate a client proxy in the interceptor to invoke the real endpoint. But in this case, you need to figure out how to send the response back through MediatorInInterceptor, obviously it wont work this time by calling chain.abort.
  4. The MediatorInInterceptor is set to POST_STREAM phase and before StaxInInterceptor, this makes sure it is the very first interceptor to be invoked.We call chain.abort at the end of this interceptor to stop any further processing in the dummy service.