...
Code Block | ||
---|---|---|
| ||
from("direct:projects") .setBody(constant("ASF")) .setProperty("min", constant(123)) .to("sql:select * from projects where license = :#${body} and id > :#${property.min} order by id") |
Using IN queries with dynamic values
Available as of Camel 2.17
From Camel 2.17 onwards the SQL producer allows to use SQL queries with IN statements where the IN values is dynamic computed. For example from the message body or a header etc.
To use IN you need to:
- prefix the parameter name with
in:
- add
( )
around the parameter
An example explains this better. The following query is used:
Code Block |
---|
-- this is a comment
select *
from projects
where project in (:#in:names)
order by id |
In the following route:
Code Block |
---|
from("direct:query")
.to("sql:classpath:sql/selectProjectsIn.sql")
.to("log:query")
.to("mock:query"); |
Then the IN query can use a header with the key names with the dynamic values such as:
Code Block |
---|
// use an array
template.requestBodyAndHeader("direct:query", "Hi there!", "names", new String[]{"Camel", "AMQ"});
// use a list
List<String> names = new ArrayList<String>();
names.add("Camel");
names.add("AMQ");
template.requestBodyAndHeader("direct:query", "Hi there!", "names", names);
// use a string separated values with comma
template.requestBodyAndHeader("direct:query", "Hi there!", "names", "Camel,AMQ"); |
The query can also be specified in the endpoint instead of being externalized (notice that externalizing makes maintaining the SQL queries easier)
Code Block |
---|
from("direct:query")
.to("sql:select * from projects where project in (:#in:names) order by id")
.to("log:query")
.to("mock:query"); |
Using the JDBC based idempotent repository
Available as of Camel 2.7: In this section we will use the JDBC based idempotent repository.
Tip | ||
---|---|---|
| ||
From Camel 2.9 onwards there is an abstract class org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository
you can extend to build custom JDBC idempotent repository.
First we have to create the database table which will be used by the idempotent repository. For Camel 2.7, we use the following schema:
Code Block | ||
---|---|---|
| ||
CREATE
...
TABLE
...
CAMEL_MESSAGEPROCESSED
...
(
...
processorName
...
VARCHAR(255),
...
messageId
...
VARCHAR(100)
...
)
...
In Camel 2.8, we added the createdAt column:
Code Block | ||
---|---|---|
| ||
CREATE
...
TABLE
...
CAMEL_MESSAGEPROCESSED
...
(
...
processorName
...
VARCHAR(255),
...
messageId
...
VARCHAR(100),
...
createdAt
...
TIMESTAMP
...
)
...
Warning |
---|
The SQL Server TIMESTAMP type is a fixed-length binary-string type. It does not map to any of the JDBC time types: DATE, TIME, or TIMESTAMP.
We recommend to have a unique constraint on the columns processorName and messageId. Because the syntax for this constraint differs for database to database, we do not show it here.
Second we need to setup a javax.sql.DataSource
in the spring XML file:
Wiki Markup |
---|
And finally we can create our JDBC idempotent repository in the spring XML file as well:
Wiki Markup |
---|
Customize the JdbcMessageIdRepository
Starting with Camel 2.9.1 you have a few options to tune the org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository
for your needs:
Parameter
Default Value
Description
createTableIfNotExists
true
Defines whether or not Camel should try to create the table if it doesn't exist.
tableExistsString
SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0
This query is used to figure out whether the table already exists or not. It must throw an exception to indicate the table doesn't exist.
createString
CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)
The statement which is used to create the table.
queryString
SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?
The query which is used to figure out whether the message already exists in the repository (the result is not equals to '0'). It takes two parameters. This first one is the processor name (String
) and the second one is the message id (String
).
insertString
INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)
The statement which is used to add the entry into the table. It takes three parameter. The first one is the processor name (String
), the second one is the message id (String
) and the third one is the timestamp (java.sql.Timestamp
) when this entry was added to the repository.
deleteString
DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?
The statement which is used to delete the entry from the database. It takes two parameter. This first one is the processor name (String
) and the second one is the message id (String
).
A customized org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository
could look like:
Wiki Markup |
---|
Using the JDBC based aggregation repository
Available as of Camel 2.6
Info | ||
---|---|---|
| ||
In Camel 2.6, the JdbcAggregationRepository is provided in the camel-jdbc-aggregator
component. From Camel 2.7 onwards, the JdbcAggregationRepository
is provided in the camel-sql
component.
JdbcAggregationRepository
is an AggregationRepository
which on the fly persists the aggregated messages. This ensures that you will not loose messages, as the default aggregator will use an in memory only AggregationRepository
.
The JdbcAggregationRepository
allows together with Camel to provide persistent support for the Aggregator.
It has the following options:
Option
Type
Description
dataSource
DataSource
Mandatory: The javax.sql.DataSource
to use for accessing the database.
repositoryName
String
Mandatory: The name of the repository.
transactionManager
TransactionManager
Mandatory: The org.springframework.transaction.PlatformTransactionManager
to mange transactions for the database. The TransactionManager must be able to support databases.
lobHandler
LobHandler
A org.springframework.jdbc.support.lob.LobHandler
to handle Lob types in the database. Use this option to use a vendor specific LobHandler, for example when using
...
Oracle.
returnOldExchange
boolean
Whether the get operation should return the old existing Exchange if any existed. By default this option is false
to optimize as we do not need the old exchange when aggregating.
useRecovery
boolean
Whether or not recovery is enabled. This option is by default true
. When enabled the Camel Aggregator automatic recover failed aggregated exchange and have them resubmitted.
recoveryInterval
long
If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis.
maximumRedeliveries
int
Allows you to limit the maximum number of redelivery attempts for a recovered exchange. If enabled then the Exchange will be moved to the dead letter channel if all redelivery attempts failed. By default this option is disabled. If this option is used then the deadLetterUri
option must also be provided.
deadLetterUri
String
An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be moved. If this option is used then the maximumRedeliveries
option must also be provided.
storeBodyAsText
boolean
Camel 2.11: Whether to store the message body as String which is human readable. By default this option is false
storing the body in binary format.
headersToStoreAsText
List<String>
Camel 2.11: Allows to store headers as String which is human readable. By default this option is disabled, storing the headers in binary format
...
.
optimisticLocking
false
Camel 2.12: To turn on optimistic locking, which often would be needed in clustered environments where multiple Camel applications shared the same JDBC based aggregation repository.
jdbcOptimisticLockingExceptionMapper
Camel 2.12: Allows to plugin a custom org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper
to map vendor specific error codes to an optimistick locking error, for Camel to perform a retry. This requires optimisticLocking
to be enabled.
What is preserved when persisting
JdbcAggregationRepository
will only preserve any Serializable
compatible data types. If a data type is not such a type its dropped and a WARN
is logged. And it only persists the Message
body and the Message
headers. The Exchange
properties are not persisted.
From Camel 2.11 onwards you can store the message body and select(ed) headers as String in separate columns.
Recovery
The JdbcAggregationRepository
will by default recover any failed Exchange. It does this by having a background tasks that scans for failed Exchanges in the persistent store. You can use the checkInterval
option to set how often this task runs. The recovery works as transactional which ensures that Camel will try to recover and redeliver the failed Exchange. Any Exchange which was found to be recovered will be restored from the persistent store and resubmitted and send out again.
The following headers is set when an Exchange is being recovered/redelivered:
Header
Type
Description
Exchange.REDELIVERED
Boolean
Is set to true to indicate the Exchange is being redelivered.
Exchange.REDELIVERY_COUNTER
Integer
The redelivery attempt, starting from 1.
Only when an Exchange has been successfully processed it will be marked as complete which happens when the confirm
method is invoked on the AggregationRepository
. This means if the same Exchange fails again it will be kept retried until it success.
...
You can see some examples in the unit tests of camel-sql, for example this test.
Database
To be operational, each aggregator uses two table: the aggregation and completed one. By convention the completed has the same name as the aggregation one suffixed with "_COMPLETED"
. The name must be configured in the Spring bean with the RepositoryName
property. In the following example aggregation will be used.
...
Here is the SQL query used to create the tables, just replace "aggregation"
with your aggregator repository name.
Code Block | ||
---|---|---|
| ||
CREATE
...
TABLE
...
aggregation
...
(
...
id
...
varchar(255)
...
NOT
...
NULL,
...
exchange
...
blob
...
NOT
...
NULL,
...
constraint
...
aggregation_pk
...
PRIMARY
...
KEY
...
(id)
...
);
...
CREATE
...
TABLE
...
aggregation_completed
...
(
...
id
...
varchar(255)
...
NOT
...
NULL,
...
exchange
...
blob
...
NOT
...
NULL,
...
constraint
...
aggregation_completed_pk
...
PRIMARY
...
KEY
...
(id)
...
);
...
Storing body and headers as text
Available as of Camel 2.11
You can configure the JdbcAggregationRepository
to store message body and select(ed) headers as String in separate columns. For example to store the body, and the following two headers companyName
and accountName
use the following SQL:
Code Block | ||
---|---|---|
| ||
CREATE
...
TABLE
...
aggregationRepo3
...
(
...
id
...
varchar(255)
...
NOT
...
NULL,
...
exchange
...
blob
...
NOT
...
NULL,
...
body
...
varchar(1000),
...
companyName
...
varchar(1000),
...
accountName
...
varchar(1000),
...
constraint
...
aggregationRepo3_pk
...
PRIMARY
...
KEY
...
(id)
...
);
...
CREATE
...
TABLE
...
aggregationRepo3_completed
...
(
...
id
...
varchar(255)
...
NOT
...
NULL,
...
exchange
...
blob
...
NOT
...
NULL,
...
body
...
varchar(1000),
...
companyName
...
varchar(1000),
...
accountName
...
varchar(1000),
...
constraint
...
aggregationRepo3_completed_pk
...
PRIMARY
...
KEY
...
(id)
...
);
...
And then configure the repository to enable this behavior as shown below:
Code Block | ||||
---|---|---|---|---|
| ||||
<bean
...
id="repo3"
...
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
...
<property
...
name="repositoryName"
...
value="aggregationRepo3"/>
...
<property
...
name="transactionManager"
...
ref="txManager3"/>
...
<property
...
name="dataSource"
...
ref="dataSource3"/>
...
<!--
...
configure
...
to
...
store
...
the
...
message
...
body
...
and
...
following
...
headers
...
as
...
text
...
in
...
the
...
repo
...
-->
...
<property
...
name="storeBodyAsText"
...
value="true"/>
...
<property
...
name="headersToStoreAsText">
...
<list>
...
<value>companyName</value>
...
<value>accountName</value>
...
</list>
...
</property>
...
</bean>
...
Codec (Serialization)
Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field. All those conversions are handled by the JdbcCodec
class. One detail of the code requires your attention: the ClassLoadingAwareObjectInputStream
.
The ClassLoadingAwareObjectInputStream
has been reused from the Apache ActiveMQ project. It wraps an ObjectInputStream
and use it with the ContextClassLoader
rather than the currentThread
one. The benefit is to be able to load classes exposed by other bundles. This allows the exchange body and headers to have custom types object references.
Transaction
A Spring PlatformTransactionManager
is required to orchestrate transaction.
Service (Start/Stop)
The start
method verify the connection of the database and the presence of the required tables. If anything is wrong it will fail during starting.
Aggregator configuration
Depending on the targeted environment, the aggregator might need some configuration. As you already know, each aggregator should have its own repository (with the corresponding pair of table created in the database) and a data source. If the default lobHandler is not adapted to your database system, it can be injected with the lobHandler
property.
Here is the declaration for Oracle:
Code Block | ||||
---|---|---|---|---|
| ||||
<bean
...
id="lobHandler"
...
class="org.springframework.jdbc.support.lob.OracleLobHandler">
...
<property
...
name="nativeJdbcExtractor"
...
ref="nativeJdbcExtractor"/>
...
</bean>
...
<bean
...
id="nativeJdbcExtractor"
...
class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
...
<bean
...
id="repo"
...
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
...
<property
...
name="transactionManager"
...
ref="transactionManager"/>
...
<property
...
name="repositoryName"
...
value="aggregation"/>
...
<property
...
name="dataSource"
...
ref="dataSource"/>
...
<!--
...
Only
...
with
...
Oracle,
...
else
...
use
...
default
...
-->
...
<property
...
name="lobHandler"
...
ref="lobHandler"/>
...
</bean>
...
Optimistic locking
From Camel 2.12 onwards you can turn on optimisticLocking
and use this JDBC based aggregation repository in a clustered environment where multiple Camel applications shared the same database for the aggregation repository. If there is a race condition there JDBC driver will throw a vendor specific exception which the JdbcAggregationRepository
can react upon. To know which caused exceptions from the JDBC driver is regarded as an optimistick locking error we need a mapper to do this. Therefore there is a org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper
allows you to implement your custom logic if needed. There is a default implementation org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper
which works as follows:
The following check is done:
If the caused exception is an SQLException
then the SQLState is checked if starts with 23.
If the caused exception is a DataIntegrityViolationException
If the caused exception class name has "ConstraintViolation" in its name.
optional checking for FQN class name matches if any class names has been configured
You can in addition add FQN classnames, and if any of the caused exception (or any nested) equals any of the FQN class names, then its an optimistick locking error.
Here is an example, where we define 2 extra FQN class names from the JDBC vendor.
Code Block | ||||
---|---|---|---|---|
| ||||
<bean
...
id="repo"
...
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
...
<property
...
name="transactionManager"
...
ref="transactionManager"/>
...
<property
...
name="repositoryName"
...
value="aggregation"/>
...
<property
...
name="dataSource"
...
ref="dataSource"/>
...
<property
...
name"jdbcOptimisticLockingExceptionMapper"
...
ref="myExceptionMapper"/>
...
</bean> <!--
...
use
...
the
...
default
...
mapper
...
with
...
extra
...
FQN
...
class
...
names
...
from
...
our
...
JDBC
...
driver
...
-->
...
<bean
...
id="myExceptionMapper"
...
class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
...
<property
...
name="classNames">
...
<util:set>
...
<value>com.foo.sql.MyViolationExceptoion</value>
...
<value>com.foo.sql.MyOtherViolationExceptoion</value>
...
</util:set>
...
</property>
...
</bean>
...
Include Page | ||||
---|---|---|---|---|
|