You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Routing/Mediation service

The routing/mediation between services/bundles will be created using Camel Spring DSL language. We will describe its creation/genesis step by step.

First, create the file camel-context.xml in the directory src/main/resources/META-INF/spring and add the lines :

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:camel="http://camel.apache.org/schema/spring"
	xmlns:osgi="http://www.springframework.org/schema/osgi"
	xmlns:cxf="http://camel.apache.org/schema/cxf"
	xsi:schemaLocation="
	    http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
		http://www.springframework.org/schema/osgi
		http://www.springframework.org/schema/osgi/spring-osgi.xsd
		http://camel.apache.org/schema/osgi
		http://camel.apache.org/schema/osgi/camel-osgi.xsd
		http://camel.apache.org/schema/spring
		http://camel.apache.org/schema/spring/camel-spring.xsd
		http://camel.apache.org/schema/cxf
		http://camel.apache.org/schema/cxf/camel-cxf.xsd">
</beans>

Compare to a simple camel project, the spring beans tag has been enriched with new namespaces :

Now, that the schema/namespaces are declared, we can start to add addtional stuffs like import resources, beans reference, ... that our routing engine will use.

1) Webservice infrastructure : CXF

We will use the CXF framework to deploy the reportincident webservice and run it into the OSGI platform.

To work with it, resources (= spring beans) declared in CXF project must be imported using the statement import resource.

...
 <import resource="classpath:META-INF/cxf/cxf.xml" />
 <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />
 <import resource="classpath:META-INF/cxf/cxf-extension-http.xml" />
 <import resource="classpath:META-INF/cxf/osgi/cxf-extension-osgi.xml" /> 
...

These imports will be used by spring at the bundle startup to instantiate the beans defined in these files. These beans are responsible in fact to deploy the architecture of the CXF bus top of the OSGI server and to provide a servlet that we will use to communicate with webservices engine of CXF.

Remark : for the purpose of this tutorial, we have packaged this configuration into the camel-spring file but it could be defined in a separate xml file with by example the component bean that Camel will use to communicate with CXF bus. This allows you to separate routing from parameters to be provided to configure endpoints.

The camel CXF endpoint is configurated like this :

...
	<!-- webservice endpoint --> 
	<cxf:cxfEndpoint id="reportIncident"
		address="/camel-example/incident" (1)
		serviceClass="org.apache.camel.example.reportincident.ReportIncidentEndpoint" (2)
		xmlns:s="http://reportincident.example.camel.apache.org"> (3)
	</cxf:cxfEndpoint>
...

Remarks :
(1) - the address corresponds to the URI address of the web services,
(2) - the serviceClass is the name of the class used work with the webservices and deployed in the bundle reportincident.webservice
(3) - xmlns:s is the namespace of the reportincident webservice (see reportincident.webservice)

2) Queuing engine

No matter if the incidents come from a webservice or a files but before to process and save them in the database, we will put
our messages in a queue. The queue manager used here is ActiveMQ.
Like CXF, we will use spring xml file to deploy ActiveMq into the server and configure it. This will be done in two steps

a) ActiveMQ

Like CXF, ActiveMq can be installed in the infrastructure using a spring.xml configuration file. So, create the file activemq-broker.xml in the directory src/main/resources/META-INF/spring and add the following lines.

{scrollbar}
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:osgi="http://www.springframework.org/schema/osgi"
  xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://activemq.apache.org/schema/core
       http://activemq.apache.org/schema/core/activemq-core.xsd
       http://camel.apache.org/schema/spring
       http://camel.apache.org/schema/spring/camel-spring.xsd
       http://www.springframework.org/schema/osgi
       http://www.springframework.org/schema/osgi/spring-osgi.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="default" dataDirectory="${servicemix.base}/data/activemq/default" useShutdownHook="false">

        <!-- Destination specific policies using destination names or wildcards -->
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue=">" memoryLimit="5mb"/>
                    <policyEntry topic=">" memoryLimit="5mb">
                        <subscriptionRecoveryPolicy>
                            <lastImageSubscriptionRecoveryPolicy/>
                        </subscriptionRecoveryPolicy>
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!-- The store and forward broker networks ActiveMQ will listen to -->
        <networkConnectors>
            <!-- by default just auto discover the other brokers -->
            <networkConnector name="default-nc" uri="multicast://default"/>
            <!-- Example of a static configuration:
            <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
            -->
        </networkConnectors>

        <persistenceAdapter>
            <amqPersistenceAdapter syncOnWrite="false" directory="${servicemix.base}/data/activemq/default" maxFileLength="20 mb"/>
        </persistenceAdapter>

        <!-- Use the following if you wish to configure the journal with JDBC -->
        <!--
        <persistenceAdapter>
            <journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#postgres-ds"/>
        </persistenceAdapter>
        -->

        <!-- Or if you want to use pure JDBC without a journal -->
        <!--
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#postgres-ds"/>
        </persistenceAdapter>
        -->

        <!--  The maximum about of space the broker will use before slowing down producers -->
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="20 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1 gb" name="foo"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>


        <!-- The transport connectors ActiveMQ will listen to -->
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
            <transportConnector name="stomp" uri="stomp://localhost:61613"/>
        </transportConnectors>

    </broker>

    <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
        <property name="maxConnections" value="8" />
        <property name="maximumActive" value="500" />
        <property name="transactionManager" ref="transactionManager" />
        <property name="connectionFactory" ref="activemqConnectionFactory" />
        <property name="resourceName" value="activemq.default" />
    </bean>

    <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
          <property name="transactionManager" ref="transactionManager" />
          <property name="connectionFactory" ref="activemqConnectionFactory" />
          <property name="resourceName" value="activemq.default" />
    </bean>

    <osgi:reference id="transactionManager" interface="javax.transaction.TransactionManager" />

    <osgi:service ref="pooledConnectionFactory">
        <osgi:interfaces>
            <value>javax.jms.ConnectionFactory</value>
        </osgi:interfaces>
        <osgi:service-properties>
            <entry key="name" value="default"/>
        </osgi:service-properties>
    </osgi:service>

</beans>
{scrollbar}

At the bundle startup, Spring will instantiate the beans declared and in consequence start the queuing engine. We haven't changed the content of the file corresponding to what is proposed in the ServiceMix distribution but you can use here the same technique described for the Datasource and add properties that you configure through by example a org.apache.activemq.config.etc file.

    ...
    <transportConnectors>
        <transportConnector name="${name}" uri="${uri}" discoveryUri="${discoveryUri}"/>
    </transportConnectors>
    
    <!-- here is the list of values defined as default but can be overidded in the file org.apache.activemq.config.etc -->
    <osgix:cm-properties id="confs" persistent-id="org.apache.activemq.config.etc">
        <prop key="name">openwire</prop>
        <prop key="uri">tcp://localhost:61616</prop>
        <prop key="discoveryUri">multicast://default</prop>
    </osgix:cm-properties>
    ...

The activeMq broker can also be integrated differently because the ServiceMix4 distribution (and not ServiceMix Kernel) proposes it in standard with additional commands that you can use from the console to :

Available commands in activemq:
browse Display selected messages in a specified destination
bstat Displays useful broker statistics
create-broker Creates a broker instance.
destroy-broker Destroys a broker instance.
list Lists all available brokers in the specified JMX context
purge Delete selected destination's messages that matches the message selector
query Display selected broker component's attributes and statistics

The pom.xml file must be modified to add properties required by Spring blueprint. So add the following lines :

...
    <instructions>
      <Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName>
      <DynamicImport-Package>*</DynamicImport-Package>
      <Include-Resource>src/main/resources</Include-Resource> (1)
      <Spring-Context>*;publish-context:=false;create-asynchronously:=true</Spring-Context> (2)
      <Private-Package></Private-Package>
      <Import-Package> (3)
	javax.transaction,
	org.apache.activemq,
        org.apache.activemq.pool,
	org.springframework.beans.factory.config,
	*
       </Import-Package>
    </instructions>
...

Remarks :
(1) - Include-Resource will allow to add to the jar generated the spring files of the directory src/main/resources/META-INF/spring
(2) - The Spring-Context tag allows to provide specific information used by Spring blueprint service to load the application context and how to load it. The asterisk means that all the spring xml files will be processed
(3) - The list of classes to be used is dependent of the queue engine use as implementation. This list must be reviewed when switching from ActiveMq to IBM WebSphere MQ, Tibco, ...

b) Camel ActiveMq component

To makes Camel independent of the JMS queue engine deployed in the OSGI server, we will implement a proxy using blueprint service between Camel component and the queuing engine used (Apache ActiveMq, IBM Websphere MQ, Oracle Advance Queue, TIBCO, ...)

First, create a spring DSL file osgi-queuingservice.xml in the directory {{src/main/resources/META-INF-spring}}directory containing the following information :

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:osgi="http://www.springframework.org/schema/osgi"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
                      http://www.springframework.org/schema/beans/spring-beans.xsd
                      http://www.springframework.org/schema/osgi
                      http://www.springframework.org/schema/osgi/spring-osgi.xsd
                      http://camel.apache.org/schema/spring
                      http://camel.apache.org/schema/spring/camel-spring.xsd">
      
    <bean id="active-mq" class="org.apache.activemq.camel.component.ActiveMQComponent" /> (1)

    <osgi:service id="osgiqueuingservice" ref="active-mq" interface="org.apache.camel.Component"/> (2)
       
</beans>

Remarks:
(1) Spring will instantiate the ActiveMqComponent to work with the ActiveMq server. If you would like to use another JMS component, then switch this class to org.apache.camel.component.jms.JmsComponent

(2) Our camel component will be exposed on the OSGI registry as an org.apache.camel.Component and has a reference to the ActiveMQComponent, JMSComponent

Next adapt the POM.xml file like this to instruct the felix plugin how to generate the MANIFEST.MF file

...
			<plugin>
				<groupId>org.apache.felix</groupId>
				<artifactId>maven-bundle-plugin</artifactId>
				<version>${felix-version}</version>
				<configuration>
					<manifestLocation>META-INF</manifestLocation>
					<instructions>
						<Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName>
						<Import-Package>
						    org.apache.activemq.camel.component,
						    org.apache.camel,
						    *</Import-Package>
					    <Include-Resource>src/main/resources</Include-Resource>
					    <Spring-Context>*;publish-context:=false;create-asynchronously:=true</Spring-Context>
					    <Private-Package></Private-Package>
					</instructions>
				</configuration>
			</plugin>
...

Remark that we import here the org.apache.activemq.camel.component class and not the ActiveMQComponent java class.

All the infrastructure is in place, so we can start to describe the beans that we will use

3) Beans reference

5 beans will be used by our application :

  • BindyCsvDataFormat : to generate the model, marshall (= parse a CSV file in to incidents objects) or unmarshall (= create a CSV file from incidents objects)
  • IncidentSaver : proxy service who will extract from the body of the message the Incidents objects and call the reportinicident.Service bundle to save in the DB the incidents
  • WebService : service who will receive messages from WebServices, extract them from the body message and transform them into Incident objects
  • Feedback : simple class who will send a message back to the webservice

So, adapt the camel-context.xml file :

	<bean id="bindyDataformat" class="org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat">
		<constructor-arg type="java.lang.String" value="org.apache.camel.example.reportincident.model" /> (1)
	</bean>

	<bean id="incidentSaver" class="org.apache.camel.example.reportincident.internal.IncidentSaver">
		<property name="incidentService">
			<osgi:reference interface="org.apache.camel.example.reportincident.service.IncidentService"/> (2)
		</property>
	</bean>
	
	<bean id="webservice" class="org.apache.camel.example.reportincident.internal.WebService" />
	<bean id="feedback" class="org.apache.camel.example.reportincident.internal.Feedback" />
	
	<osgi:reference id="queuingservice" interface="org.apache.camel.Component" /> (3)

Remarks :

(1) - The name of the package containing the class of the model must be provided as parameter
(2) - We inject into our proxy service using OSGI service reference, the interface IncidentService
(3) - We need also an OSGI service reference to the queue engine using the interface org.apache.camel.Component

We will quickly review the three classes that we will created for our project in the directory org.apache.camel.example.reportincident.internal :

a) IncidentSaver

package org.apache.camel.example.reportincident.internal;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.camel.Exchange;
import org.apache.camel.example.reportincident.model.Incident;
import org.apache.camel.example.reportincident.service.IncidentService;

public class IncidentSaver {

	private static final transient Log LOG = LogFactory.getLog(IncidentSaver.class);
	
	private IncidentService incidentService = null;

	public void process(Exchange exchange) throws ParseException {

		int count = 0;

		List<Map<String, Object>> models = new ArrayList<Map<String, Object>>(); (1)
		Map<String, Object> model = new HashMap<String, Object>();

		// Get models from message
		models = (List<Map<String, Object>>) exchange.getIn().getBody(); (2)
		
		// Get Header origin from message
		String origin = (String) exchange.getIn().getHeader("origin"); (3)
		LOG.debug("Header origin : " + origin);

		Iterator<Map<String, Object>> it = models.iterator();
		
		// Specify current Date
                DateFormat format = new SimpleDateFormat( "dd/MM/yyyy HH:mm:ss" );
                String currentDate = format.format( new Date() );
                Date creationDate = format.parse( currentDate );
        
		while (it.hasNext()) {

			model = it.next();
			
			LOG.debug("Model retrieved");

			for (String key : model.keySet()) {
				
				LOG.debug("Object retrieved : " + model.get(key).toString());
				
				// Retrieve incident from model
				Incident incident = (Incident) model.get(key); (4)
				incident.setCreationDate(creationDate);
		                incident.setCreationUser(origin);
		        
				LOG.debug("Count : " + count + ", " + incident.toString());
				
				// Save Incident
				incidentService.saveIncident(incident); (5)
				LOG.debug("Incident saved");
			}

			count++;
		}

		LOG.debug("Nber of CSV records received by the csv bean : " + count);

	}
	
        // Property used to inject service implementation (6)
	public void setIncidentService(IncidentService incidentService) {
		this.incidentService = incidentService;
	}
	
}

Remarks :
(1) - We instantiate List and Map classes that we will use to extract objects of our incident model
(2) - Using the method exchange.getIn().getBody(), we extract the objects from the message and put them in a List
(3) - We get the Header field ('Origin') to check the origin of the messages (file or webservice). This info will be persisted in the DB
(4) - The object incident is retrieved from the Map model. In our case, the key is unique because we have only created one model class
(5) - The incident is saved in the database by calling the OSGI service IncidentService.saveIncident
(6) - The field setIncidentService is used by Spring to inject dependency with OSGI service

b) WebService

 

Remarks :
(1) -
(2) -
(3) -
(4) -

c) Feedback

 

Remarks :
(1) -
(2) -
(3) -
(4) -

4) Routing

Web

Packaging and deployment

#Resources

  • No labels