Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Maven users will need to add the following dependency to their pom.xml for this component:

Code Block
xml
xml

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-sjms</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

URI format

Code Block

sjms:[queue:|topic:]destinationName[?options]

Where destinationName is a JMS queue or topic name. By default, the destinationName is interpreted as a queue name. For example, to connect to the queue, FOO.BAR use:

Code Block

sjms:FOO.BAR

You can include the optional queue: prefix, if you prefer:

Code Block

sjms:queue:FOO.BAR

To connect to a topic, you must include the topic: prefix. For example, to connect to the topic, Stocks.Prices, use:

Code Block

sjms:topic:Stocks.Prices

You append query options to the URI using the following format, ?option=value&option=value&...

...

Below is an example of how to configure the SjmsComponent with its required ConnectionFactory provider. It will create a single connection by default and store it using the components internal pooling APIs to ensure that it is able to service Session creation requests in a thread safe manner.

Code Block

SjmsComponent component = new SjmsComponent();
component.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"));
getContext().addComponent("sjms", component);

For a SjmsComponent that is required to support a durable subscription, you can override the default ConnectionFactoryResource instance and set the clientId property.

Code Block

ConnectionFactoryResource connectionResource = new ConnectionFactoryResource();
connectionResource.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"));
connectionResource.setClientId("myclient-id");

SjmsComponent component = new SjmsComponent();
component.setConnectionResource(connectionResource);
component.setMaxConnections(1);

...

Div
classconfluenceTableSmall

Option

Default Value

Description

acknowledgementMode

AUTO_ACKNOWLEDGE

The JMS acknowledgement name, which is one of: SESSION_TRANSACTED, AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE. CLIENT_ACKNOWLEDGE is not supported at this time.

consumerCount

1

InOut only. Defines the number of MessageListener instances that for response consumers.

exchangePattern

InOnly

Sets the Producers message exchange pattern.

namedReplyTo

null

InOut only. Specifies a named reply to destination for responses.

persistent

true

Whether a message should be delivered with persistence enabled.

producerCount

1

Defines the number of MessageProducer instances.

responseTimeOut

5000

InOut only. Specifies the amount of time an InOut Producer will wait for its response.

synchronous

true

Sets whether the Endpoint will use synchronous or asynchronous processing.

transacted

false

If the endpoint should use a JMS Session transaction.

ttl

-1

Disabled by default. Sets the Message time to live header.

prefillPool
trueCamel 2.14: Whether to prefill the producer connection pool on startup, or create connections lazy when needed.

Producer Usage

InOnly Producer - (Default)

The InOnly Producer is the default behavior of the SJMS Producer Endpoint.

Code Block

from("direct:start")
    .to("sjms:queue:bar");

...

To enable InOut behavior append the exchangePattern attribute to the URI. By default it will use a dedicated TemporaryQueue for each consumer.

Code Block

from("direct:start")
    .to("sjms:queue:bar?exchangePattern=InOut");

You can specify a namedReplyTo though which can provide a better monitor point.

Code Block

from("direct:start")
    .to("sjms:queue:bar?exchangePattern=InOut&namedReplyTo=my.reply.to.queue");

...

The InOnly Consumer is the default Exchange behavior of the SJMS Consumer Endpoint.

Code Block

from("sjms:queue:bar")
    .to("mock:result");

...

To enable InOut behavior append the exchangePattern attribute to the URI.

Code Block

from("sjms:queue:in.out.test?exchangePattern=InOut")
    .transform(constant("Bye Camel"));

...

Below is an example of using the pluggable ConnectionResource with the ActiveMQ PooledConnectionFactory:

Code Block

public class AMQConnectionResource implements ConnectionResource {
    private PooledConnectionFactory pcf;

    public AMQConnectionResource(String connectString, int maxConnections) {
        super();
        pcf = new PooledConnectionFactory(connectString);
        pcf.setMaxConnections(maxConnections);
        pcf.start();
    }

    public void stop() {
        pcf.stop();
    }

    @Override
    public Connection borrowConnection() throws Exception {
        Connection answer = pcf.createConnection();
        answer.start();
        return answer;
    }

    @Override
    public Connection borrowConnection(long timeout) throws Exception {
        // SNIPPED...
    }

    @Override
    public void returnConnection(Connection connection) throws Exception {
        // Do nothing since there isn't a way to return a Connection
        // to the instance of PooledConnectionFactory
        log.info("Connection returned");
    }
}

Then pass in the ConnectionResource to the SjmsComponent:

Code Block

CamelContext camelContext = new DefaultCamelContext();
AMQConnectionResource pool = new AMQConnectionResource("tcp://localhost:33333", 1);
SjmsComponent component = new SjmsComponent();
component.setConnectionResource(pool);
camelContext.addComponent("sjms", component);

...

Below is an example of using the BatchMessage class. First we create a List of BatchMessages:

Code Block

List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
    String body = "Hello World " + i;
    BatchMessage<String> message = new BatchMessage<String>(body, null);
    messages.add(message);
}

Then publish the List:

Code Block

template.sendBody("sjms:queue:batch.queue", messages);

...

The SjmsConsumer endpoint is a straitforward implementation that will process X messages before committing them with the associated Session. To enable batched transaction on the consumer first enable transactions by setting the transacted parameter to true and then adding the transactionBatchCount and setting it to any value that is greater than 0. For example the following configuration will commit the Session every 10 messages:

Code Block

sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10

...

A transacted batch consumer also carries with it an instance of an internal timer that waits a default amount of time (5000ms) between messages before committing the open transactions on the Session. The default value of 5000ms (minimum of 1000ms) should be adequate for most use-cases but if further tuning is necessary simply set the transactionBatchTimeout parameter.

Code Block

sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10&transactionBatchTimeout=2000

...

The producer endpoint is handled much differently though. With the producer after each message is delivered to its destination the Exchange is closed and there is no longer a reference to that message. To make a available all the messages available for redelivery you simply enable transactions on a Producer Endpoint that is publishing BatchMessages. The transaction will commit at the conclusion of the exchange which includes all messages in the batch list. Nothing additional need be configured. For example:

Code Block

List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
    String body = "Hello World " + i;
    BatchMessage<String> message = new BatchMessage<String>(body, null);
    messages.add(message);
}

Now publish the List with transactions enabled:

Code Block

template.sendBody("sjms:queue:batch.queue?transacted=true", messages);

...

Not at all. Below is an example of the SJMS component using the Spring DSL:

Code Block

<route
    id="inout.named.reply.to.producer.route">
    <from
        uri="direct:invoke.named.reply.to.queue" />
    <to
        uri="sjms:queue:named.reply.to.queue?namedReplyTo=my.response.queue&amp;exchangePattern=InOut" />
</route>

...