...
NMR
...
Component
...
The
...
nmr
...
component
...
is
...
an
...
adapter
...
to
...
the
...
Normalized
...
Message
...
Router
...
(NMR)
...
in
...
...
,
...
which
...
is
...
intended
...
for
...
use
...
by
...
Camel
...
applications
...
deployed
...
directly
...
into
...
the
...
OSGi
...
container. You can exchange objects with NMR and not only XML like this is the case with the JBI specification. The interest of this component is that you can interconnect camel routes deployed in different OSGI bundles.
By contrast, the JBI component is intended for use by Camel applications deployed into the ServiceMix JBI container.
Installing in Apache Servicemix
The NMR component is provided with Apache ServiceMix. It is not distributed with Camel. To install the NMR component in ServiceMix, enter the following command in the ServiceMix console window:
Code Block |
---|
features:install nmr camel-nmr
|
Installing in plain Apache Karaf
In plain Karaf the nmr component can also be installed using the servicemix artifacts:
Code Block |
---|
features:chooseurl camel <version>
features:addurl mvn:org.apache.servicemix.nmr/apache-servicemix-nmr/1.5.0/xml/features
features:install camel-blueprint nmr camel-nmr
install -s mvn:org.apache.servicemix.camel/org.apache.servicemix.camel.component/4.4.2
|
Configuration
You also need to instantiate the NMR component. You can do this by editing your Spring configuration file, META-INF/spring/*.xml
...
,
...
and
...
adding
...
the
...
following
...
bean
...
instance:
Code Block | ||||
---|---|---|---|---|
| ||||
{code:xml} <beans xmlns:osgi="http://www.springframework.org/schema/osgi" ... > ... <bean id="nmr" class="org.apache.servicemix.camel.nmr.ServiceMixComponent"> <property name="nmr"> <osgi:reference interface="org.apache.servicemix.nmr.api.NMR" /> </property> </bean> ... </beans> {code} h3. NMR consumer and producer endpoints The following code: |
NMR consumer and producer endpoints
The following code:
Code Block |
---|
{code} from("nmr:MyServiceEndpoint") {code} |
Automatically
...
exposes
...
a
...
new
...
endpoint
...
to
...
the
...
bus
...
with
...
endpoint
...
name
...
MyServiceEndpoint
...
(see
...
...
).
...
When
...
an
...
NMR
...
endpoint
...
appears
...
at
...
the
...
end
...
of
...
a
...
route,
...
for
...
example:
Code Block |
---|
} to("nmr:MyServiceEndpoint") {code} |
The
...
messages
...
sent
...
by
...
this
...
producer
...
endpoint
...
are
...
sent
...
to
...
the
...
already
...
deployed
...
NMR endpoint.
...
Anchor | ||||
---|---|---|---|---|
|
...
URI
...
format
Code Block |
---|
} nmr:endpointName {code} h4. URI Options {div:class=confluenceTableSmall} || Option || Default Value || Description || | {{runAsSubject}} | {{false}} | *Apache ServiceMix 4.4:* When this is set to *{{true}}* on a consumer endpoint, the endpoint will be invoked on behalf of the {{Subject}} that is set on the {{Exchange}} |
URI Options
Div | ||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||||||||||||||||||||||
|
Examples
Consumer
Code Block |
---|
| | {{synchronous}} | {{false}} | When this is set to *{{true}}* on a consumer endpoint, an incoming, synchronous NMR Exchange will be handled on the sender's thread instead of being handled on a new thread of the NMR endpoint's thread pool | | {{timeout}} | {{0}} | *Apache ServiceMix 4.4:* When this is set to a value greater than 0, the producer endpoint will timeout if it doesn't receive a response from the NMR within the given timeout period (in milliseconds). Configuring a timeout value will switch to using synchronous interactions with the NMR instead of the usual asynchronous messaging. | {div} h4. Examples {code} from("nmr:MyServiceEndpoint") // consume nmr exchanges asynchronously from("nmr:MyServiceEndpoint?synchronous=true").to() // consume nmr exchanges synchronously and use the same thread as defined by NMR ThreadPool |
Producer
Code Block |
---|
from()...to("nmr:AnotherEndpointMyServiceEndpoint") {code} h3. Using Stream bodies If you are using a stream type as the message body, you should be aware that a stream is only capable of being read once. So if you enable {{DEBUG}} logging, the body is usually logged and thus read. To deal with this, Camel has a {{streamCaching}} option that can cache the stream, enabling you to read it multiple times. {code} // produce nmr exchanges asynchronously from()...to("nmr:MyServiceEndpoint?timeout=10000") // produce nmr exchanges synchronously and wait till 10s to receive response |
Using Stream bodies
If you are using a stream type as the message body, you should be aware that a stream is only capable of being read once. So if you enable DEBUG
logging, the body is usually logged and thus read. To deal with this, Camel has a streamCaching
option that can cache the stream, enabling you to read it multiple times.
Code Block |
---|
from("nmr:MyEndpoint").streamCaching().to("xslt:transform.xsl", "bean:doSomething");
|
The stream caching is default enabled, so it is not necessary to set the streamCaching()
option.
We store big input streams (by default, over 64K) in a temp
file using CachedOutputStream
. When you close the input stream, the temp file will be deleted.
Testing
NMR camel routes can be tested using the camel unit test approach even if they will be deployed next in different bundles on an OSGI runtime. With this aim in view, you will extend the ServiceMixNMR Mock class org.apache.servicemix.camel.nmr.AbstractComponentTest
which will create a NMR bus, register the Camel NMR Component and the endpoints defined into the Camel routes.
Code Block |
---|
{code} From *Camel 1.5* onwards, the stream caching is default enabled, so it is not necessary to set the {{streamCaching()}} option. In *Camel 2.0* we store big input streams (by default, over 64K) in a {{temp}} file using {{CachedOutputStream}}. When you close the input stream, the temp file will be deleted. {include:Endpoint See Also} public class ExchangeUsingNMRTest extends AbstractComponentTest { @Test public void testProcessing() throws InterruptedException { MockEndpoint mock = getMockEndpoint("mock:simple"); mock.expectedBodiesReceived("Simple message body"); template.sendBody("direct:simple", "Simple message body"); assertMockEndpointsSatisfied(); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:simple").to("nmr:simple"); from("nmr:simple?synchronous=true").to("mock:simple"); } }; } } |
Include Page | ||||
---|---|---|---|---|
|