Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We should support to deploy cxf client(as service consumer) and cxf server (as service provider) easilly into servicemix.
We need a cxf service engine where we can deploy cxf client and server. As a service engine (SE) provides some type of logic inside the JBI environment and only communicates with the NMR. If a Cxf SE needs to communicate outside the JBI environment (e.g. the client inside servicemix but server outside servicemix or vice verse), it must send a message to a binding component (via the NMR), so we also need a cxf binding component to bridge the incoming/outcoming outgoing message. And we should leverage ws-* feature of CXF by cxf binding component.

...

This class just extends servicemix DefaultComponent
Which is JBI spec compitable component. By extends servicemix DefaultComponent, we can only focus cxf specific issue for our cxf service engine since all JBI spec details is handled by servicemix DefaultComponent. Here we just init cxf bus when CxfSeComponent init.

Code Block
langjava
    @Override
    protected void doInit() throws Exception {
        bus = BusFactory.getDefaultBus();
        super.doInit();
    }

...

Code Block
langjava
/**
 * 
 * @org.apache.xbean.XBean element="endpoint"
 */

In this class we init and publish jaxws endpoint with cxf JBI transport and JBI binding, and also register this endpoint to servicemix.

Code Block
langjava

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.servicemix.common.Endpoint#validate()
     */
    @Override
    public void validate() throws DeploymentException {
        if (getPojo() == null) {
            throw new DeploymentException("pojo must be set");
        }
        JaxWsServiceFactoryBean serviceFactory = new JaxWsServiceFactoryBean();
        serviceFactory.setPopulateFromClass(true);
        endpoint = new EndpointImpl(getBus(), getPojo(),
                new JaxWsServerFactoryBean(serviceFactory));
        endpoint.setBindingUri(org.apache.cxf.binding.jbi.JBIConstants.NS_JBI_BINDING);//using cxf jbi binding, so the message use JBI wrapper
        endpoint.setInInterceptors(getInInterceptors());
        endpoint.setInFaultInterceptors(getInFaultInterceptors());
        endpoint.setOutInterceptors(getOutInterceptors());
        endpoint.setOutFaultInterceptors(getOutFaultInterceptors());
        endpoint.getOutInterceptors().add(new WrapperOutInterceptor());
        if (isMtomEnabled()) {
            endpoint.getInInterceptors().add(new AttachmentInInterceptor());
        }
        JaxWsImplementorInfo implInfo = new JaxWsImplementorInfo(getPojo()
                .getClass());
        
        setService(implInfo.getServiceName()); // register servicename to NMR
        setInterfaceName(implInfo.getInterfaceName()); // register interfacename(portType) to NMR 
        setEndpoint(implInfo.getEndpointName().getLocalPart()); // register Endpoint(Port) name to NMR
        super.validate();
    }

We publish the endpoint when endpoint start in servicemix

Code Block
langjava

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.servicemix.common.endpoints.ProviderEndpoint#start()
     */
    @Override
    public void start() throws Exception {
        super.start();
        address = "jbi://" + ID_GENERATOR.generateSanitizedId();//use jbi prefix for the endpoint address to pick up cxf JBI transport
        endpoint.publish(address);
        setService(endpoint.getServer().getEndpoint().getService().getName());
        setEndpoint(endpoint.getServer().getEndpoint().getEndpointInfo()
                .getName().getLocalPart());
    }

We just get MessageExchange from Servicemix NMR and delegate it to cxf JBI transport

Code Block
langjava

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.servicemix.common.endpoints.ProviderEndpoint#process(javax.jbi.messaging.MessageExchange)
     */
    @Override
    public void process(MessageExchange exchange) throws Exception {
        JBITransportFactory jbiTransportFactory = (JBITransportFactory) getBus()
                .getExtension(ConduitInitiatorManager.class)
                .getConduitInitiator(CxfSeComponent.JBI_TRANSPORT_ID);//load JBITransportFactory from cxf bus
        QName serviceName = exchange.getService();
        if (serviceName == null) {
            serviceName = getService();
            exchange.setService(serviceName);
        }
        QName interfaceName = exchange.getInterfaceName();
        if (interfaceName == null) {
            interfaceName = getInterfaceName();
            exchange.setInterfaceName(interfaceName);
        }
        JBIDestination jbiDestination = jbiTransportFactory
                .getDestination(serviceName.toString()
                        + interfaceName.toString());// each endpoint should have its own JBIDestination, so just query it by serviceName and interfaceName
        DeliveryChannel dc = getContext().getDeliveryChannel();
        jbiTransportFactory.setDeliveryChannel(dc);
        jbiDestination.setDeliveryChannel(dc);
        if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
            jbiDestination.getJBIDispatcherUtil().dispatch(exchange);//delegate MessageExchange to cxf JBI transport, cxf in charge of message unmarshalling and service invocation 
        }
        
    }

CxfSeProxyFactoryBean

We use maven-xbean-plugin to generate proxy xbean schema, so we need add comment to specify the xml element name firstly

Code Block
langjava

/**
 * 
 * @org.apache.xbean.XBean element="proxy" description="A CXF proxy"
 * 
 */

We create client proxy exactly the same as we do in cxf

Code Block
langjava

    private Object createProxy() throws Exception {
        JaxWsProxyFactoryBean cf = new JaxWsProxyFactoryBean();
        cf.setServiceName(getService());
        cf.setServiceClass(type);
        cf.setAddress("jbi://" + new IdGenerator().generateSanitizedId());//use jbi prefix for the endpoint address to pick up cxf JBI transport
        cf.setBindingId(org.apache.cxf.binding.jbi.JBIConstants.NS_JBI_BINDING);//use cxf JBI binding
        Bus bus = BusFactory.getDefaultBus();
        JBITransportFactory jbiTransportFactory = (JBITransportFactory) bus
                .getExtension(ConduitInitiatorManager.class)
                .getConduitInitiator(CxfSeComponent.JBI_TRANSPORT_ID);
        if (getInternalContext() != null) { 
            DeliveryChannel dc = getInternalContext().getDeliveryChannel();
            if (dc != null) {
                jbiTransportFactory.setDeliveryChannel(dc);
            }
        }
        return cf.create();
    }

configureation

interceptors configuration

Since cxfse is using apache cxf internally, so customer can configure cxf se endpoint with inteceptors which follow cxf inteceptor api.
example per as below

Code Block
langxml

      <cxfse:endpoint>
        <cxfse:pojo>
          <bean class="org.apache.cxf.calculator.CalculatorImpl">
          </bean>

        </cxfse:pojo>
        <cxfse:inInterceptors>
          <bean class="org.apache.cxf.interceptor.LoggingInInterceptor"/>
        </cxfse:inInterceptors>
        <cxfse:outInterceptors>
          <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor"/>
        </cxfse:outInterceptors>
        <cxfse:inFaultInterceptors>
          <bean class="org.apache.cxf.interceptor.LoggingInInterceptor"/>
        </cxfse:inFaultInterceptors>
        <cxfse:outFaultInterceptors>
          <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor"/>
        </cxfse:outFaultInterceptors>
      </cxfse:endpoint>

proxy configuration

Customer can use proxy from one of the client bean, or from inside another component, and call the JBI endpoint as a plain Java object.

Code Block
langxml

      <bean class="org.apache.servicemix.cxfse.GreeterImplForClientProxy">
       <property name="calculator">
          <cxfse:proxy service="calculator:CalculatorService" type="org.apache.cxf.calculator.CalculatorPortType" />
       </property>
     </bean>

cxf binding component

overview

...

  • JBI compliant Binding Component
  • Usable in a lightweight mode in servicemix.xml configuration files
  • SOAP 1.1 and 1.2 support
  • MIME attachments
  • Support for InOut or InOnly MEPs as consumers or providers
  • SSL support
  • WS-Security support
  • WS-Policy support
  • WS-RM support
  • WS-Addressing support

main class for cxf binding component

CxfBcComponent

We use maven-xbean-plugin to generate component xbean schema, so we need add comment to specify the xml element name firstly

Code Block
langjava

/**
 * 
 * @org.apache.xbean.XBean element="component"
 */

This class just extends servicemix DefaultComponent
Which is JBI spec compitable component. By extends servicemix DefaultComponent, we can only focus cxf specific issue for our cxf binding component since all JBI spec details is handled by servicemix DefaultComponent. Here we just init cxf bus when CxfBcComponent init.

Code Block
langjava

    @Override
    protected void doInit() throws Exception {
        if (getBusConfig() != null) {//for the cxf bc, we need pass in cxf bus configuration file sometimes, e.g. we need configure bus to support ws-*
            SpringBusFactory bf = new SpringBusFactory();
            bus = bf.createBus(getBusConfig());
        } else {
            bus = BusFactory.newInstance().createBus();
        }
        super.doInit();
    }

CxfBcConsumer

We use maven-xbean-plugin to generate consumer xbean schema, so we need add comment to specify the xml element name firstly

Code Block
langjava

/**
 * 
 * @org.apache.xbean.XBean element="consumer"
 */

Consumer of binding component means expose JBI internal endpoint to outside client, for example
standalone cxf client ===> cxf bc consumer===> service in cxf se
CxfBcConsumer play two role in the message process

  • cxf server
  • servicemix consumer
    As cxf server, just init a dummy server without real service class. This cxf server init the proper transport(http or jms) from the service model which is built from the wsdl. Since there is no service class for this server, this server will do no real service invocation, just transform message format between soap binding and jbi binding.
    For a request message, need transform soap message to jbi message, two main interceptors get involved
    JbiInInterceptor
    JbiInInterceptor mainly create JBI NormalizedMessage and copy headers and attachments from soap message to JBI message
    JbiInWsdl1Interceptor
    JbiInWsdl1Interceptor transform soap messasge to jbi message according to the service model
    For a reponse message, need transform jbi message to soap message, also two main interceptors get involved
    JbiOutInterceptor
    JbiOutInterceptor mainly copy attachments and headers from jbi message to soap message
    JbiOutWsdl1Interceptor
    JbiOutWsdl1Interceptor transform jbi mesage to soap message based on the service model
    As servicemix consumer, send jbi message which transformed from the soap message to NMR and need process the response jbi message from the NMR.
    CxfBcConsumer use its own JbiInvokerInterceptor to wire these two role together.
    Main part of JbiInvokerInterceptor
    Code Block
    langjava
    
    ublic class JbiInvokerInterceptor extends
                AbstractPhaseInterceptor<Message> {
    
            public JbiInvokerInterceptor() {
                super(Phase.INVOKE);
            }
    
            public void handleMessage(final Message message) throws Fault {
    
                final Exchange cxfExchange = message.getExchange();
                final Endpoint endpoint = cxfExchange.get(Endpoint.class);
                final Service service = endpoint.getService();
                final Invoker invoker = service.getInvoker();
    
                MessageExchange exchange = message
                        .getContent(MessageExchange.class);
                ComponentContext context = message.getExchange().get(
                        ComponentContext.class);
                CxfBcConsumer.this.configureExchangeTarget(exchange);
                CxfBcConsumer.this.messages.put(exchange.getExchangeId(), message);
                CxfBcConsumer.this.isOneway = message.getExchange().get(
                        BindingOperationInfo.class).getOperationInfo().isOneWay();
                message.getExchange().setOneWay(CxfBcConsumer.this.isOneway);
    
                try {
                    if (CxfBcConsumer.this.synchronous
                            && !CxfBcConsumer.this.isOneway) {
                        message.getInterceptorChain().pause();//just pause the incoming intercepor chain of cxf server, the mesage transformation from soap to jbi is done now
                        context.getDeliveryChannel().sendSync(exchange, 10000);//send exchange which contain jbi message to NMR
                        process(exchange);
                    } else {
                        context.getDeliveryChannel().send(exchange);
    
                    }
                } catch (Exception e) {
                    throw new Fault(e);
                }
            }
    
        }
    
    Also the code for processing response jbi message looks like
    Code Block
    javalang
    
        public void process(MessageExchange exchange) throws Exception {
            Message message = messages.remove(exchange.getExchangeId());
            message.getInterceptorChain().resume();//resume the cxf outgoing interceptor chain, will do the message transformation from jbi to soap
            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
                exchange.setStatus(ExchangeStatus.DONE);
                message.getExchange().get(ComponentContext.class)
                        .getDeliveryChannel().send(exchange);
            }
        }
    

CxfBcProvider
We use maven-xbean-plugin to generate provider xbean schema, so we need add comment to specify the xml element name firstly
/**

  • @org.apache.xbean.XBean element="provider"
    */
    Provider of binding component means bridge internal request to outside server, for example
    cxf proxy inside servicemix ===> cxf bc provider===> standalone cxf server outside servicemix
    Similiar as CxfBcConsumer, CxfBcProvider also play two role in the message process
  • servicemix provider
  • cxf client
    The message process looks like
    provider get incoming jbi message from NMR==> transform it to soap message==> send out to external service==> in the registered MessageObserver, get response soap message from external service==> transform it to jbi message==> send back to the NMR.
    This process splitted into two classes,
    The first half in CxfBcProvider
    Code Block
    langjava
    
        public void process(MessageExchange exchange) throws Exception {
            NormalizedMessage nm = exchange.getMessage("in");//get incoming jbi message
                   
            CxfBcProviderMessageObserver obs = new CxfBcProviderMessageObserver(exchange, this);
            conduit.setMessageObserver(obs);// register message observer for cxf client
            Message message = ep.getBinding().createMessage();//create outgoing cxf message based on the service model, both soap11 and soap12 is supported
            message.put(MessageExchange.class, exchange);
            Exchange cxfExchange = new ExchangeImpl();
            message.setExchange(cxfExchange);
            cxfExchange.setOutMessage(message);       
              ...
              ...        
            message.setInterceptorChain(outChain);
            InputStream is = JBIMessageHelper.convertMessageToInputStream(nm.getContent());
            
            StreamSource source = new StreamSource(is);
            message.setContent(Source.class, source);
            
            message.setContent(InputStream.class, is);
            
            conduit.prepare(message);
            OutputStream os = message.getContent(OutputStream.class);
            XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
            
    
            String encoding = getEncoding(message);
            
            try {
                writer = StaxOutInterceptor.getXMLOutputFactory(message).createXMLStreamWriter(os, encoding);
            } catch (XMLStreamException e) {
                //
            }
            message.setContent(XMLStreamWriter.class, writer);
            message.put(org.apache.cxf.message.Message.REQUESTOR_ROLE, true);
            outChain.doIntercept(message);//transform jbi message to soap message
            XMLStreamWriter xtw = message.getContent(XMLStreamWriter.class);
            if (xtw != null) {
                xtw.writeEndDocument();
                xtw.close();
            }
    
            os.flush();
            is.close();
            os.close();//send out message to external service
    
        }
    

The second half in CxfBcProviderMessageObserver

Code Block
langjava

    public void onMessage(Message message) {
        try {
             ...
             ...
            inChain.add(providerEndpoint.getInInterceptors());
            inChain.add(providerEndpoint.getInFaultInterceptors());
            soapMessage.setInterceptorChain(inChain);
            inChain.doIntercept(soapMessage);//transform soap message to jbi message
           
            //send back jbi message to nmr
            if (boi.getOperationInfo().isOneWay()) {
                messageExchange.setStatus(ExchangeStatus.DONE);
            } else if (soapMessage.get("jbiFault") != null
                    && soapMessage.get("jbiFault").equals(true)) {
                ...
                ...
            } else {
                messageExchange.setStatus(ExchangeStatus.DONE);

            }
            boolean txSync = messageExchange.getStatus() == ExchangeStatus.ACTIVE
                    && messageExchange.isTransacted()
                    && Boolean.TRUE.equals(messageExchange
                            .getProperty(JbiConstants.SEND_SYNC));
            if (txSync) {
                providerEndpoint.getContext().getDeliveryChannel().sendSync(
                        messageExchange);
            } else {
                providerEndpoint.getContext().getDeliveryChannel().send(
                        messageExchange);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } 
        ...
        ...
    }

configuration

Since cxfbc is using apache cxf internally, so customer can configure cxf bc provider or consumer with inteceptors which follow cxf inteceptor api. A full example

typical use case

We have test cases almost cover all features of cxf bc and se, such as
*proxy
*ws-policy
*ws-security
*ws-rm
*ws-addressing
*MTOM
*multiple transport support
*oneway
*Exception
Go through these tests for servicemix-cxf-se and servicemix-cxf-bc

some tips

what JBI binding message looks like

Code Block
langxml

<jbi:message xmlns:jbi="http://java.sun.com/xml/ns/jbi/wsdl-11-wrapper" xmlns:msg="http://apache.org/cxf/calculator" name="add" type="msg:add" version="1.0">
    <jbi:part>
        <add xmlns="http://apache.org/cxf/calculator/types">
            <arg0>1</arg0>
            <arg1>2</arg1>
        </add>
    </jbi:part>
</jbi:message>

what SOAP binding message looks like

Code Block
langxml

<soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">
    <soap:Body>
        <add xmlns="http://apache.org/cxf/calculator/types">
            <arg0>1</arg0>
            <arg1>2</arg1>
        </add>
    </soap:Body>
</soap:Envelope>

can cxf component work with other component inside servicemix

Yes, provided those component can hanle the normalized JBI message