...
Available as of Camel 2.7: In this section we will use the JDBC based idempotent repository.
Tip | ||
---|---|---|
| ||
...
From Camel 2.9 onwards there is an abstract |
...
class org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository |
...
you can extend to build custom JDBC idempotent repository. |
...
First we have to create the database table which will be used by the idempotent repository. For Camel 2.7, we use the following schema:
Code Block | ||
---|---|---|
| ||
| ||
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100) ) |
...
|
In Camel 2.8, we added the createdAt column:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP ) |
...
|
Warning |
---|
...
The SQL Server TIMESTAMP |
...
type is a fixed-length binary-string type. It does not map to any of the JDBC time types: DATE, TIME, |
...
or TIMESTAMP. |
We recommend to have a unique constraint on the columns processorName and messageId. Because the syntax for this constraint differs for database to database, we do not show it here.
Second we need to setup a javax.sql.DataSource
in the spring XML file:
Code Block | |
---|---|
|
...
|
{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml}
<jdbc:embedded-database id="dataSource" type="DERBY" /> |
And finally we can create And finally we can create our JDBC idempotent repository in the spring XML file as well:
{snippet:id=e2|lang=xml|url=camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml}
...
Code Block | ||
---|---|---|
| ||
<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
<constructor-arg ref="dataSource" />
<constructor-arg value="myProcessorName" />
</bean> |
Customize the JdbcMessageIdRepository
Starting with Camel 2.9.1 you have a few options to tune the org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository
for your needs:
...
Parameter |
---|
Default Value |
---|
...
Description |
---|
createTableIfNotExists |
true |
Defines whether or not Camel should try to create the table if it doesn't exist. |
...
...
tableExistsString |
...
...
SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0 |
This query is used to figure out whether the table already exists or not. It must throw an exception to indicate the table doesn't exist. |
createString
...
createString | CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP) |
The statement which is used to create the table. |
queryString |
SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ? |
...
The query which is used to figure out whether the message already exists in the repository (the result is not equals to '0'). It takes two parameters. This first one is the processor name ( |
...
...
insertString |
...
...
INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?) |
The statement which is used to add the entry into the table. It takes three parameter. The first one is the processor name ( |
...
...
deleteString |
DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ? |
The statement which is used to delete the entry from the database. It takes two parameter. This first one is the processor name ( |
A customized org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository
could look like:
{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/customized-spring.xml}
Code Block | ||
---|---|---|
| ||
<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
<constructor-arg ref="dataSource" />
<constructor-arg value="myProcessorName" />
<property name="tableExistsString" value="SELECT 1 FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE 1 = 0" />
<property name="createString" value="CREATE TABLE CUSTOMIZED_MESSAGE_REPOSITORY (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)" />
<property name="queryString" value="SELECT COUNT(*) FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?" />
<property name="insertString" value="INSERT INTO CUSTOMIZED_MESSAGE_REPOSITORY (processorName, messageId, createdAt) VALUES (?, ?, ?)" />
<property name="deleteString" value="DELETE FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?" />
</bean> |
Using the JDBC based aggregation repository
...
Info | ||
---|---|---|
| ||
...
In Camel 2.6, the JdbcAggregationRepository is provided in |
...
the camel-jdbc-aggregator |
...
component. From Camel 2.7 onwards, |
...
the JdbcAggregationRepository |
...
is provided in |
...
the camel-sql |
...
component. |
JdbcAggregationRepository
is an AggregationRepository
which on the fly persists the aggregated messages. This ensures that you will not loose messages, as the default aggregator will use an in memory only AggregationRepository
.
The JdbcAggregationRepository
allows together with Camel to provide persistent support for the Aggregator.
It has the following options:
Option |
---|
Type |
---|
...
Description |
---|
...
...
dataSource |
DataSource |
...
Mandatory: |
...
The javax.sql.DataSource |
...
to use for accessing the database. |
...
...
repositoryName |
String |
...
Mandatory: |
...
The name of the repository. |
...
...
transactionManager |
...
...
TransactionManager |
...
Mandatory: |
...
The |
...
to mange transactions for the database. The TransactionManager must be able to support databases. |
lobHandler |
LobHandler |
...
A org.springframework.jdbc.support.lob.LobHandler |
...
to handle Lob types in the database. Use this option to use a vendor specific LobHandler, for example when using Oracle. |
...
...
returnOldExchange |
boolean |
...
Whether the get operation should return the old existing Exchange if any existed. By default this option |
...
is false |
...
to optimize as we do not need the old exchange when aggregating. |
useRecovery |
boolean |
Whether or not recovery is enabled. This option is by |
...
default true . When enabled the |
...
Camel Aggregator |
...
automatic recover failed aggregated exchange and have them |
...
...
resubmitted. |
recoveryInterval |
...
...
long |
...
If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis. |
maximumRedeliveries |
int |
Allows you to limit the maximum number of redelivery attempts for a recovered exchange. If enabled then the Exchange will be moved to the dead letter channel if all redelivery attempts failed. By default this option is disabled. If this option is used then |
...
the deadLetterUri |
...
option must also be provided. |
...
...
|
...
String |
...
An endpoint uri for |
...
...
where exhausted recovered Exchanges will be moved. If this option is used then |
...
the | |
storeBodyAsText | boolean |
storeBodyAsText
boolean
Camel 2.11: Whether to store the message body as String which is human readable. By default this option is false
storing the body in binary format.
headersToStoreAsText
List<String>
...
Camel 2.11: |
...
Whether to store |
...
optimisticLocking
false
Camel 2.12: To turn on optimistic locking, which often would be needed in clustered environments where multiple Camel applications shared the same JDBC based aggregation repository.
jdbcOptimisticLockingExceptionMapper
the message body as String which is human readable. By default this option |
...
is | ||
headersToStoreAsText | List<String> | Camel 2.11: Allows to store headers as String which is human readable. By default this option is disabled, storing the headers in binary format. |
jdbcOptimisticLockingExceptionMapper | jdbcOptimisticLockingExceptionMapper |
Camel 2.12: |
...
Allows to plugin a |
...
custom |
...
to map vendor specific error codes to an optimistick locking error, for Camel to perform |
...
a retry. This requires |
Info | ||
---|---|---|
| ||
Optimistic locking is set to on by default. If two exchanges attempt to insert at the same time an exception will thrown, caught, converted to an OptimisticLockingException, and rethrown. |
What is preserved when persisting
JdbcAggregationRepository
will only preserve any Serializable
compatible data types. If a data type is not such a type its dropped and a WARN
is logged. And it only persists the Message
body and the Message
headers. The Exchange
properties are not persisted.
...
The following headers is set when an Exchange is being recovered/redelivered:
Header |
---|
...
...
Type |
---|
...
Description |
---|
...
...
Exchange.REDELIVERED |
Boolean |
...
Is set to true to indicate |
...
the Exchange |
...
is being redelivered. |
Exchange.REDELIVERY_COUNTER |
...
...
Integer |
...
The redelivery attempt, starting from 1. |
Only when an Exchange has been successfully processed it will be marked as complete which happens when the confirm
method is invoked on the AggregationRepository
. This means if the same Exchange fails again it will be kept retried until it success.
...
Here is the SQL query used to create the tables, just replace "aggregation"
with your aggregator repository name.
Code Block | ||
---|---|---|
| ||
|
...
CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_completed_pk PRIMARY KEY (id) ); |
...
Storing body and headers as text
...
You can configure the JdbcAggregationRepository
to store message body and select(ed) headers as String in separate columns. For example to store the body, and the following two headers companyName
and accountName
use the following SQL:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_completed_pk PRIMARY KEY (id) ); |
...
And then configure the repository to enable this behavior as shown below:
Code Block | ||||
---|---|---|---|---|
| ||||
|
...
<bean id="repo3" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="repositoryName" value="aggregationRepo3"/> <property name="transactionManager" ref="txManager3"/> <property name="dataSource" ref="dataSource3"/> <!-- configure to store the message body and following headers as text in the repo --> <property name="storeBodyAsText" value="true"/> <property name="headersToStoreAsText"> <list> <value>companyName</value> <value>accountName</value> </list> </property> </bean> |
...
Codec (Serialization)
Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field. All those conversions are handled by the JdbcCodec
class. One detail of the code requires your attention: the ClassLoadingAwareObjectInputStream
.
...
Here is the declaration for Oracle:
Code Block | ||||
---|---|---|---|---|
| ||||
|
...
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <!-- Only with Oracle, else use default --> <property name="lobHandler" ref="lobHandler"/> </bean> |
Optimistic locking
From Camel 2.12 onwards you can turn on optimisticLocking
and use this JDBC based aggregation repository in a clustered environment where multiple Camel applications shared the same database for the aggregation repository. If there is a race condition there JDBC driver will throw a vendor specific exception which the JdbcAggregationRepository
can react upon. To know which caused exceptions from the JDBC driver is regarded as an optimistick locking error we need a mapper to do this. Therefore there is a org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper
allows you to implement your custom logic if needed. There is a default implementation org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper
which works as follows:
...
Here is an example, where we define 2 extra FQN class names from the JDBC vendor.
Code Block | ||||
---|---|---|---|---|
| ||||
|
...
<bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <property name"jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/> </bean> <!-- use the default mapper with extra FQN class names from our JDBC driver --> <bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper"> <property name="classNames"> <util:set> <value>com.foo.sql.MyViolationExceptoion</value> <value>com.foo.sql.MyOtherViolationExceptoion</value> </util:set> </property> </bean> |
Include Page | ||||
---|---|---|---|---|
|
...