Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To work with it, resources (= spring beans) declared in CXF project must be imported using the statement import resource.

Code Block
xml
xml
...
 <import resource="classpath:META-INF/cxf/cxf.xml" />
 <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />
 <import resource="classpath:META-INF/cxf/cxf-extension-http.xml" />
 <import resource="classpath:META-INF/cxf/osgi/cxf-extension-osgi.xml" /> 
...

...

The camel CXF endpoint is configurated like this :

Code Block
xml
xml
...
	<!-- webservice endpoint --> 
	<cxf:cxfEndpoint id="reportIncident"
		address="/camel-example/incident" (1)
		serviceClass="org.apache.camel.example.reportincident.ReportIncidentEndpoint" (2)
		xmlns:s="http://reportincident.example.camel.apache.org"> (3)
	</cxf:cxfEndpoint>
...

...

Like CXF, ActiveMq can be installed in the infrastructure using a spring.xml configuration file. So, create the file activemq-broker.xml in the directory src/main/resources/META-INF/spring and add the following lines.

Code Block
xml
xml

{scrollbar}
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:osgi="http://www.springframework.org/schema/osgi"
  xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://activemq.apache.org/schema/core
       http://activemq.apache.org/schema/core/activemq-core.xsd
       http://camel.apache.org/schema/spring
       http://camel.apache.org/schema/spring/camel-spring.xsd
       http://www.springframework.org/schema/osgi
       http://www.springframework.org/schema/osgi/spring-osgi.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="default" dataDirectory="${servicemix.base}/data/activemq/default" useShutdownHook="false">

        <!-- Destination specific policies using destination names or wildcards -->
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue=">" memoryLimit="5mb"/>
                    <policyEntry topic=">" memoryLimit="5mb">
                        <subscriptionRecoveryPolicy>
                            <lastImageSubscriptionRecoveryPolicy/>
                        </subscriptionRecoveryPolicy>
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!-- The store and forward broker networks ActiveMQ will listen to -->
        <networkConnectors>
            <!-- by default just auto discover the other brokers -->
            <networkConnector name="default-nc" uri="multicast://default"/>
            <!-- Example of a static configuration:
            <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
            -->
        </networkConnectors>

        <persistenceAdapter>
            <amqPersistenceAdapter syncOnWrite="false" directory="${servicemix.base}/data/activemq/default" maxFileLength="20 mb"/>
        </persistenceAdapter>

        <!-- Use the following if you wish to configure the journal with JDBC -->
        <!--
        <persistenceAdapter>
            <journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#postgres-ds"/>
        </persistenceAdapter>
        -->

        <!-- Or if you want to use pure JDBC without a journal -->
        <!--
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#postgres-ds"/>
        </persistenceAdapter>
        -->

        <!--  The maximum about of space the broker will use before slowing down producers -->
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="20 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1 gb" name="foo"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>


        <!-- The transport connectors ActiveMQ will listen to -->
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
            <transportConnector name="stomp" uri="stomp://localhost:61613"/>
        </transportConnectors>

    </broker>

    <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
        <property name="maxConnections" value="8" />
        <property name="maximumActive" value="500" />
        <property name="transactionManager" ref="transactionManager" />
        <property name="connectionFactory" ref="activemqConnectionFactory" />
        <property name="resourceName" value="activemq.default" />
    </bean>

    <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
          <property name="transactionManager" ref="transactionManager" />
          <property name="connectionFactory" ref="activemqConnectionFactory" />
          <property name="resourceName" value="activemq.default" />
    </bean>

    <osgi:reference id="transactionManager" interface="javax.transaction.TransactionManager" />

    <osgi:service ref="pooledConnectionFactory">
        <osgi:interfaces>
            <value>javax.jms.ConnectionFactory</value>
        </osgi:interfaces>
        <osgi:service-properties>
            <entry key="name" value="default"/>
        </osgi:service-properties>
    </osgi:service>

</beans>
{scrollbar}

At the bundle startup, Spring will instantiate the beans declared and in consequence start the queuing engine. We haven't changed the content of the file corresponding to what is proposed in the ServiceMix distribution but you can use here the same technique described for the Datasource and add properties that you configure through by example a org.apache.activemq.config.etc file.

Code Block
xml
xml
    ...
    <transportConnectors>
        <transportConnector name="${name}" uri="${uri}" discoveryUri="${discoveryUri}"/>
    </transportConnectors>
    
    <!-- here is the list of values defined as default but can be overidded in the file org.apache.activemq.config.etc -->
    <osgix:cm-properties id="confs" persistent-id="org.apache.activemq.config.etc">
        <prop key="name">openwire</prop>
        <prop key="uri">tcp://localhost:61616</prop>
        <prop key="discoveryUri">multicast://default</prop>
    </osgix:cm-properties>
    ...

...

The pom.xml file must be modified to add properties required by Spring blueprint. So add the following lines :

Code Block
xml
xml
...
    <instructions>
      <Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName>
      <DynamicImport-Package>*</DynamicImport-Package>
      <Include-Resource>src/main/resources</Include-Resource> (1)
      <Spring-Context>*;publish-context:=false;create-asynchronously:=true</Spring-Context> (2)
      <Private-Package></Private-Package>
      <Import-Package> (3)
	javax.transaction,
	org.apache.activemq,
        org.apache.activemq.pool,
	org.springframework.beans.factory.config,
	*
       </Import-Package>
    </instructions>
...

...

First, create a spring DSL file osgi-queuingservice.xml in the directory {{src/main/resources/META-INF-spring}}directory containing the following information :

Code Block
xml
xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:osgi="http://www.springframework.org/schema/osgi"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
                      http://www.springframework.org/schema/beans/spring-beans.xsd
                      http://www.springframework.org/schema/osgi
                      http://www.springframework.org/schema/osgi/spring-osgi.xsd
                      http://camel.apache.org/schema/spring
                      http://camel.apache.org/schema/spring/camel-spring.xsd">
      
    <bean id="active-mq" class="org.apache.activemq.camel.component.ActiveMQComponent" /> (1)

    <osgi:service id="osgiqueuingservice" ref="active-mq" interface="org.apache.camel.Component"/> (2)
       
</beans>

...

Next adapt the POM.xml file like this to instruct the felix plugin how to generate the MANIFEST.MF file

Code Block
xml
xml
...
			<plugin>
				<groupId>org.apache.felix</groupId>
				<artifactId>maven-bundle-plugin</artifactId>
				<version>${felix-version}</version>
				<configuration>
					<manifestLocation>META-INF</manifestLocation>
					<instructions>
						<Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName>
						<Import-Package>
						    org.apache.activemq.camel.component,
						    org.apache.camel,
						    *</Import-Package>
					    <Include-Resource>src/main/resources</Include-Resource>
					    <Spring-Context>*;publish-context:=false;create-asynchronously:=true</Spring-Context>
					    <Private-Package></Private-Package>
					</instructions>
				</configuration>
			</plugin>
...

...

Remarks :
(1) - We instantiate List and Map classes that we will use to extract objects of our incident model
(2) - Using the method exchange.getIn().getBody(), we extract the objects from the message and put them in a List
(3) - We get the Header field ('Origin') to check the origin of the messages (file or webservice). This info will be persisted in the DB
(4) - The object incident is retrieved from the Map model. In our case, the key is unique because we have only created one model class
(5) - The incident is saved in the database by calling the OSGI service IncidentService.saveIncident
(6) - The field setIncidentService is used by Spring to inject dependency with OSGI service

b) WebService

Code Block

Remarks :
(1) -
(2) -
(3) -
(4) -

c) Feedback

...


/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.example.reportincident.internal;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.camel.example.reportincident.InputReportIncident;
import org.apache.camel.example.reportincident.model.Incident;

public class WebService {

	private static final transient Log LOG = LogFactory.getLog(WebService.class);

	public void process(Exchange exchange) throws ParseException {
        
		InputReportIncident webincident = (InputReportIncident)exchange.getIn().getBody(); (1)
		
		LOG.debug("Incident received from : " + webincident.getFamilyName() + ", " + webincident.getGivenName());
		LOG.debug("Incident info : " + webincident.getIncidentId() + ", at : " + webincident.getIncidentDate());
		LOG.debug("Incident details : " + webincident.getDetails() + ", summary : " + webincident.getSummary());
  
		
		List<Map<String, Incident>> models = new ArrayList<Map<String, Incident>>();
		Map<String, Incident> model = new HashMap<String, Incident>();
			
		// Convert the InputReportIncident into an Incident 
		Incident incident = new Incident(); (2)
		
		DateFormat format = new SimpleDateFormat( "dd-mm-yyyy" );
		incident.setIncidentDate(format.parse(webincident.getIncidentDate()));
		
		incident.setDetails(webincident.getDetails());
		incident.setEmail(webincident.getEmail());
		incident.setFamilyName(webincident.getFamilyName());
		incident.setGivenName(webincident.getGivenName());
		incident.setIncidentRef(webincident.getIncidentId());
		incident.setPhone(webincident.getPhone());
		incident.setSummary(webincident.getSummary());
		
		// Get Header origin from message
		String origin = (String) exchange.getIn().getHeader("origin"); (3)

		// Specify current Date
		format = new SimpleDateFormat( "dd/MM/yyyy HH:mm:ss" );
                String currentDate = format.format( new Date() );
                Date creationDate = format.parse( currentDate );
        
                incident.setCreationDate(creationDate);
                incident.setCreationUser(origin);
		
                LOG.debug("Incident created from web service : " + incident.toString());
        
		// and place it in a model (cfr camel-bindy)
		model.put(Incident.class.getName(), incident); (4)
		models.add(model);
		
	        // replace with our input
		exchange.getOut().setBody(models); (5)
		
		// propagate the header
		exchange.getOut().setHeader("origin", origin); (6)
         
     }

}

Remarks :
(1) - The message coming from the WebService and copied in the InputReportIncident class is retrieved from the body using the method exchange.getIn().getBody()
(2) - We create using the model a class Incident where we will put webservice's InputReportIncident
(3) - We get the Header field ('Origin') to check the origin of the messages (webservice). This info will be persisted in the DB
(4) - The model incident is added to the Map and List objects required by Camel Bindy
(5) - The model is added to the body object of the message that we send it OUT
(6) - Thbe header parameter is also propagated for the next endpoint

c) Feedback

Code Block

package org.apache.camel.example.reportincident.internal;

import org.apache.camel.example.reportincident.OutputReportIncident;

public class Feedback {

	public OutputReportIncident setOk() { (3)
		OutputReportIncident outputReportIncident = new OutputReportIncident(); (1)
		outputReportIncident.setCode("0"); (2)
		return outputReportIncident;
	}

}

Remarks :
(1) - An OutputReportIncident object is created because it will be used to send the message back to the webservice
(2) - The field/property setCode is setted with the value ("OK")
(3) - The method setOk() will be called by Camel routing

4) Routing

Now that evverything is in place, we can create the three routes that we need to implement the architecture that we have presented in the introduction of this tutorial

From File(s) to queue

Code Block
xml
xml

	<camelContext trace="true" xmlns="http://camel.apache.org/schema/osgi"> (1)

		<camel:route> 
			<camel:from uri="file://d:/temp/data/?move=d:/temp/done/${file:name}" /> (2)
			<camel:setHeader headerName="origin"> (3)
				<camel:constant>file</camel:constant>
			</camel:setHeader>
			<camel:unmarshal ref="bindyDataformat" /> (4)
			<camel:to uri="queuingservice:queue:in" /> (5)
		</camel:route>
...

Remarks :
(1) -
(2) -
(3) -
(4) -
(5) -

From Webservices to queue

Code Block
xml
xml

...		 
		<camel:route>
			<camel:from uri="cxf:bean:reportIncident" />
			<camel:setHeader headerName="origin">
				<camel:constant>webservice</camel:constant>
			</camel:setHeader>
			<camel:convertBodyTo type="org.apache.camel.example.reportincident.InputReportIncident" />
			<camel:to uri="bean:webservice" />
			<camel:inOnly uri="queuingservice:queue:in" />
			<camel:transform>
				<camel:method bean="feedback" method="setOk" />
			</camel:transform>
		</camel:route>
...

Remarks :
(1) -
(2) -
(3) -
(4) -
(5) -

From queue to DB

Code Block
xml
xml

...		 
	
		<camel:route>
			<camel:from uri="queuingservice:queue:in" />
			<camel:to uri="bean:incidentSaver?method=process" />
		</camel:route>
	</camelContext>

Remarks :
(1) -
(2) -
(3) -
(4) -
(5) -

Once that you have finished to configure camel, created your classes, the pom.xml file must be modified :

Code Block
xml
xml

...
					<instructions>
						<Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName>
						<Import-Package> (1)
						    META-INF.cxf,
						    META-INF.cxf.osgi,
						    META-INF.wsdl,
						    org.apache.commons.logging,
						    org.apache.camel,
						    org.apache.camel.component,
						    org.apache.camel.component.cxf,
						    org.apache.camel.component.cxf.converter,
						    org.apache.camel.component.jms,
						    org.apache.camel.converter,
						    org.apache.camel.converter.jaxp,
						    org.apache.camel.converter.stream,
							org.apache.camel.dataformat.bindy,
							org.apache.camel.dataformat.bindy.csv,
							org.apache.camel.example.reportincident,
							org.apache.camel.example.reportincident.model,
							org.apache.camel.example.reportincident.service,
							org.apache.camel.processor,
							org.apache.activemq.camel.component;${activemq.osgi.version},
							org.apache.activemq.camel.converter;${activemq.osgi.version},
							org.apache.activemq.pool,
							org.apache.cxf,
							org.apache.cxf.binding,
							org.apache.cxf.binding.corba,
							org.apache.cxf.binding.soap,
							org.apache.cxf.binding.soap.spring,
							org.apache.cxf.bus,
							org.apache.cxf.bus.resource,
							org.apache.cxf.bus.spring,
							org.apache.cxf.buslifecycle,
							org.apache.cxf.catalog,
							org.apache.cxf.configuration,
							org.apache.cxf.configuration.spring,
							org.apache.cxf.endpoint,
							org.apache.cxf.headers,
							org.apache.cxf.management,
							org.apache.cxf.management.jmx,
							org.apache.cxf.phase,
							org.apache.cxf.resource,
							org.apache.cxf.transport,
							org.apache.cxf.transport.http,
							org.apache.cxf.transport.http.policy,
							org.apache.cxf.transport.http_jetty,
							org.apache.cxf.transport.jms,
							org.apache.cxf.transports.http,
							org.apache.cxf.workqueue,
							org.apache.cxf.wsdl,
							org.apache.cxf.wsdl11,
							org.apache.servicemix.cxf.transport.http_osgi,
							org.springframework.beans.factory.config,
							*
					    </Import-Package>
					    <Private-Package>org.apache.camel.example.reportincident.internal</Private-Package> (2)
					</instructions>
...

Remarks :
(1) - Classes required by Camel, CXF muste be imported.
(2) - Our internal classes are declared as private to avoid that they become available for another bundles of the OSGI server

Conclusion

TODO

Remarks :
(1) -
(2) -
(3) -
(4) -

4) Routing

Web

Packaging and deployment

#Resources

  • Attachments
    patterns.*part2.zip