...
Info |
---|
|
S stands for Simple and Standard and Springless. Also camel-jms was already taken. |
Warning |
---|
This is a rather new component in a complex world of JMS messaging. So this component is ongoing development and hardening. The classic JMS component based on Spring JMS has been hardened and battle tested extensively. |
Maven users will need to add the following dependency to their pom.xml
for this component:
Code Block |
---|
|
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-sjms</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
|
URI format
Code Block |
---|
sjms:[queue:|topic:]destinationName[?options]
|
Where destinationName
is a JMS queue or topic name. By default, the destinationName
is interpreted as a queue name. For example, to connect to the queue, FOO.BAR
use:
You can include the optional queue:
prefix, if you prefer:
Code Block |
---|
sjms:queue:FOO.BAR
|
To connect to a topic, you must include the topic:
prefix. For example, to connect to the topic, Stocks.Prices
, use:
Code Block |
---|
sjms:topic:Stocks.Prices
|
You append query options to the URI using the following format, ?option=value&option=value&...
...
The SJMS Component supports the following configuration options:
Wiki Markup |
---|
{div:class=confluenceTableSmall}
|| Option || Required || Default Value || Description ||
| {{connectionCount}} | | {{1}} | The maximum number of connections available to endpoints started under this component |
| {{connectionFactory}} | (/) | {{null}} | A [ConnectionFactory|http://docs.oracle.com/javaee/5/api/javax/jms/ConnectionFactory.html] is required to enable the SjmsComponent. It can be set directly or set set as part of a ConnectionResource. |
| {{connectionResource}} | | {{null}} | A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory. See [Plugable Connection Resource Management|#connectionresource] for further details. |
| {{headerFilterStrategy}} | | {{DefaultJmsKeyFormatStrategy}} | |
| {{keyFormatStrategy}} | | {{DefaultJmsKeyFormatStrategy}} | |
| {{transactionCommitStrategy}} | | {{null}} | |
{div} |
Below is an example of how to configure the SjmsComponent with its required ConnectionFactory provider. It will create a single connection by default and store it using the components internal pooling APIs to ensure that it is able to service Session creation requests in a thread safe manner.
Code Block |
---|
SjmsComponent component = new SjmsComponent();
component.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"));
getContext().addComponent("sjms", component);
|
For a SjmsComponent that is required to support a durable subscription, you can override the default ConnectionFactoryResource instance and set the clientId property.
Code Block |
---|
ConnectionFactoryResource connectionResource = new ConnectionFactoryResource();
connectionResource.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"));
connectionResource.setClientId("myclient-id");
SjmsComponent component = new SjmsComponent();
component.setConnectionResource(connectionResource);
component.setMaxConnections(1);
|
Producer Configuration Options
The SjmsProducer Endpoint supports the following properties:
Div |
---|
class | confluenceTableSmall |
---|
|
Option | Required | Default Value | Description |
---|
connectionCount
| | 1
| The maximum number of connections available to endpoints started under this component | connectionFactory
|
| null
| A ConnectionFactory is required to enable the SjmsComponent. It can be set directly or set set as part of a ConnectionResource. | connectionResource
| | null
| A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory. See Plugable Connection Resource Management for further details. | headerFilterStrategy
| | DefaultJmsKeyFormatStrategy
| | keyFormatStrategy
| | DefaultJmsKeyFormatStrategy
| Camel 2.15.x or older: See option below | jmsKeyFormatStrategy | | DefaultJmsKeyFormatStrategy | Camel 2.16: Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides two implementations out of the box: default and passthrough . The default strategy will safely marshal dots and hyphens (. and - ). The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of theorg.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using the # notation. | transactionCommitStrategy
| | null
| | DestinationCreationStrategy | | DefaultDestinationCreationStrategy | Camel 2.15.0: Support to set the custom DestinationCreationStrategy on the SJMS Component. | messageCreatedStrategy | | | Camel 2.16: To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. | completionPredicate | | | Camel 2.18: The completion predicate, which causes batches to be completed when the predicate evaluates as true. The predicate can also be configured using the simple language using the string syntax. You may want to set the option eagerCheckCompletion to true to let the predicate match the incoming message, as otherwise it matches the aggregated message. | eagerCheckCompletion | | false | Camel 2.18: Use eager completion checking which means that the completionPredicate will use the incoming Exchange. As opposed to without eager completion checking the completionPredicate will use the aggregated Exchange. |
|
Below is an example of how to configure the SjmsComponent with its required ConnectionFactory provider. It will create a single connection by default and store it using the components internal pooling APIs to ensure that it is able to service Session creation requests in a thread safe manner.
Code Block |
---|
SjmsComponent component = new SjmsComponent();
component.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"));
getContext().addComponent("sjms", component);
|
For a SjmsComponent that is required to support a durable subscription, you can override the default ConnectionFactoryResource instance and set the clientId property.
Code Block |
---|
ConnectionFactoryResource connectionResource = new ConnectionFactoryResource();
connectionResource.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"));
connectionResource.setClientId("myclient-id");
SjmsComponent component = new SjmsComponent();
component.setConnectionResource(connectionResource);
component.setMaxConnections(1);
|
Producer Configuration Options
The SjmsProducer Endpoint supports the following properties:
Div |
---|
class | confluenceTableSmall |
---|
|
Option | Default Value | Description |
---|
acknowledgementMode
| AUTO_ACKNOWLEDGE
| The JMS acknowledgement name, which is one of: SESSION_TRANSACTED , AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE . CLIENT_ACKNOWLEDGE is not supported at this time. | consumerCount
| 1
| InOut only. Defines the number of MessageListener instances that for response consumers. | exchangePattern
| InOnly
| Sets the Producers message exchange pattern. | namedReplyTo
| null
| InOut only. Specifies a named reply to destination for responses. | persistent
| true
| Whether a message should be delivered with persistence enabled. | producerCount
| 1
| Defines the number of MessageProducer instances. | responseTimeOut
| 5000
| InOut only. Specifies the amount of time an InOut Producer will wait for its response. | synchronous
| true
| Sets whether the Endpoint will use synchronous or asynchronous processing. | transacted
| false
| If the endpoint should use a JMS Session transaction. | ttl
| -1
| Disabled by default. Sets the Message time to live header. | prefillPool | true | Camel 2.14: Whether to prefill the producer connection pool on startup, or create connections lazy when needed. | allowNullBody | true | Camel 2.15.1: Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown. | mapJmsMessage | true | Camel 2.16: Specifies whether Camel should auto map the received JMS message to an appropiate payload type, such as javax.jms.TextMessage to a String etc. | messageCreatedStrategy | | Camel 2.16: To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. | jmsKeyFormatStrategy | | Camel 2.16: Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides two implementations out of the box: default and passthrough . The default strategy will safely marshal dots and hyphens (. and - ). The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of theorg.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using the # notation. | includeAllJMSXProperties | | Camel 2.16: Whether to include all JMSXxxx properties when mapping from JMS to Camel Message. Setting this to true will include properties such as JMSXAppID , and JMSXUserID etc. Note: If you are using a custom headerFilterStrategy then this option does not apply. |
|
Wiki Markup |
---|
{div:class=confluenceTableSmall}
|| Option || Default Value || Description ||
| {{acknowledgementMode}} | {{AUTO_ACKNOWLEDGE}} | The JMS acknowledgement name, which is one of: {{SESSION_TRANSACTED}}, {{AUTO_ACKNOWLEDGE}} or {{DUPS_OK_ACKNOWLEDGE}}. {{CLIENT_ACKNOWLEDGE}} is not supported at this time. |
| {{consumerCount}} | {{1}} | *InOut only.* Defines the number of [MessageListener|http://docs.oracle.com/javaee/5/api/javax/jms/MessageListener.html] instances that for response consumers. |
| {{exchangePattern}} | {{InOnly}} | Sets the Producers message exchange pattern. |
| {{namedReplyTo}} | {{null}} | *InOut only.* Specifies a named reply to destination for responses. |
| {{persistent}} | {{true}} | Whether a message should be delivered with persistence enabled. |
| {{producerCount}} | {{1}} | Defines the number of [MessageProducer|http://docs.oracle.com/javaee/5/api/javax/jms/MessageProducer.html] instances. |
| {{responseTimeOut}} | {{5000}} | *InOut only.* Specifies the amount of time an InOut Producer will wait for its response. |
| {{synchronous}} | {{true}} | Sets whether the Endpoint will use synchronous or asynchronous processing. |
| {{transacted}} | {{false}} | If the endpoint should use a JMS Session transaction. |
| {{ttl}} | {{\-1}} | Disabled by default. Sets the Message time to live header. |
{div} |
Producer Usage
InOnly Producer - (Default)
The InOnly Producer is the default behavior of the SJMS Producer Endpoint.
Code Block |
---|
from("direct:start")
.to("sjms:queue:bar");
|
...
To enable InOut behavior append the exchangePattern
attribute to the URI. By default it will use a dedicated TemporaryQueue for each consumer.
Code Block |
---|
from("direct:start")
.to("sjms:queue:bar?exchangePattern=InOut");
|
You can specify a namedReplyTo
though which can provide a better monitor point.
Code Block |
---|
from("direct:start")
.to("sjms:queue:bar?exchangePattern=InOut&namedReplyTo=my.reply.to.queue");
|
...
The SjmsConsumer Endpoint supports the following properties:
Div |
---|
class | confluenceTableSmall |
---|
|
Wiki Markup |
---|
{div:class=confluenceTableSmall}
|| Option || Default Value || Description ||
| {{acknowledgementMode}} | {{AUTO_ACKNOWLEDGE}} | The JMS acknowledgement name, which is one of: {{TRANSACTED}}, {{AUTO_ACKNOWLEDGE}} or {{Option | Default Value | Description |
---|
acknowledgementMode
| AUTO_ACKNOWLEDGE
| The JMS acknowledgement name, which is one of: TRANSACTED , AUTO_ACKNOWLEDGE or |
}} {{}} |
| {{consumerCount}} | {{1}} | Defines the number of [MessageListener|http://docs.oracle.com/javaee/5/api/javax/jms/MessageListener.html] instances. |
| {{durableSubscriptionId}} | {{null}} | Required for a durable subscriptions. |
| {{exchangePattern}} | {{InOnly}} | Sets the Consumers message exchange pattern. |
| {{messageSelector}} | {{null}} | Sets the message selector. |
| {{synchronous}} | {{true}} | Sets whether the Endpoint will use synchronous or asynchronous processing. |
| {{transacted}} | {{false}} | If the endpoint should use a JMS Session transaction. |
| {{transactionBatchCount}} | {{1}} | The number of exchanges to process before committing a local JMS transaction. The {{transacted}} property must also be set to true or this property will be ignored. |
| {{transactionBatchTimeout}} | {{5000}} | The amount of time a the transaction will stay open between messages before committing what has already been consumed. Minimum value is 1000ms. |
| {{ttl}} | {{\-1}} | Disabled by default. Sets the Message time to live header. |
{div} | consumerCount
| 1
| Defines the number of MessageListener instances. | durableSubscriptionId
| null
| Required for a durable subscriptions. | exchangePattern
| InOnly
| Sets the Consumers message exchange pattern. | messageSelector
| null
| Sets the message selector. | synchronous
| true
| Sets whether the Endpoint will use synchronous or asynchronous processing. | transacted
| false
| If the endpoint should use a JMS Session transaction. | transactionBatchCount
| 1
| The number of exchanges to process before committing a local JMS transaction. The transacted property must also be set to true or this property will be ignored. | transactionBatchTimeout
| 5000
| The amount of time a the transaction will stay open between messages before committing what has already been consumed. Minimum value is 1000ms. | ttl
| -1
| Disabled by default. Sets the Message time to live header. | asyncStartListener | false | Whether to startup the consumer message listener asynchronously, when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying and/or failover. This will cause Camel to block while starting routes. By setting this option to true, you will let routes startup, while the JmsConsumer connects to the JMS broker using a dedicated thread in asynchronous mode. If this option is used, then beware that if the connection could not be established, then an exception is logged at WARN level, and the consumer will not be able to receive messages; You can then restart the route to retry. | asyncStopListener | false | Whether to stop the consumer message listener asynchronously, when stopping a route. |
|
Consumer Usage
InOnly Consumer - (Default)
The InOnly Consumer is the default Exchange behavior of the SJMS Consumer Endpoint.
Code Block |
---|
from("sjms:queue:bar")
.to("mock:result");
|
...
To enable InOut behavior append the exchangePattern
attribute to the URI.
Code Block |
---|
from("sjms:queue:in.out.test?exchangePattern=InOut")
.transform(constant("Bye Camel"));
|
...
Below is an example of using the pluggable ConnectionResource with the ActiveMQ PooledConnectionFactory:
Code Block |
---|
public class AMQConnectionResource implements ConnectionResource {
private PooledConnectionFactory pcf;
public AMQConnectionResource(String connectString, int maxConnections) {
super();
pcf = new PooledConnectionFactory(connectString);
pcf.setMaxConnections(maxConnections);
pcf.start();
}
public void stop() {
pcf.stop();
}
@Override
public Connection borrowConnection() throws Exception {
Connection answer = pcf.createConnection();
answer.start();
return answer;
}
@Override
public Connection borrowConnection(long timeout) throws Exception {
// SNIPPED...
}
@Override
public void returnConnection(Connection connection) throws Exception {
// Do nothing since there isn't a way to return a Connection
// to the instance of PooledConnectionFactory
log.info("Connection returned");
}
}
|
Then pass in the ConnectionResource to the SjmsComponent:
Code Block |
---|
CamelContext camelContext = new DefaultCamelContext();
AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1);
SjmsComponent component = new SjmsComponent();
component.setConnectionResource(pool);
camelContext.addComponent("sjms", component);
|
...
Below is an example of using the BatchMessage class. First we create a List of BatchMessages:
Code Block |
---|
List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
String body = "Hello World " + i;
BatchMessage<String> message = new BatchMessage<String>(body, null);
messages.add(message);
}
|
Then publish the List:
Code Block |
---|
template.sendBody("sjms:queue:batch.queue", messages);
|
...
The SjmsConsumer endpoint is a straitforward implementation that will process X messages before committing them with the associated Session. To enable batched transaction on the consumer first enable transactions by setting the transacted
parameter to true and then adding the transactionBatchCount
and setting it to any value that is greater than 0. For example the following configuration will commit the Session every 10 messages:
Code Block |
---|
sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10
|
...
A transacted batch consumer also carries with it an instance of an internal timer that waits a default amount of time (5000ms) between messages before committing the open transactions on the Session. The default value of 5000ms (minimum of 1000ms) should be adequate for most use-cases but if further tuning is necessary simply set the transactionBatchTimeout
parameter.
Code Block |
---|
sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10&transactionBatchTimeout=2000
|
...
The producer endpoint is handled much differently though. With the producer after each message is delivered to its destination the Exchange is closed and there is no longer a reference to that message. To make a available all the messages available for redelivery you simply enable transactions on a Producer Endpoint that is publishing BatchMessages. The transaction will commit at the conclusion of the exchange which includes all messages in the batch list. Nothing additional need be configured. For example:
Code Block |
---|
List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
String body = "Hello World " + i;
BatchMessage<String> message = new BatchMessage<String>(body, null);
messages.add(message);
}
|
Now publish the List with transactions enabled:
Code Block |
---|
template.sendBody("sjms:queue:batch.queue?transacted=true", messages);
|
...
Not at all. Below is an example of the SJMS component using the Spring DSL:
Code Block |
---|
<route
id="inout.named.reply.to.producer.route">
<from
uri="direct:invoke.named.reply.to.queue" />
<to
uri="sjms:queue:named.reply.to.queue?namedReplyTo=my.response.queue&exchangePattern=InOut" />
</route>
|
...