...
Apache Ignite In-Memory Data Fabric is a high-performance, integrated and distributed in-memory platform for computing and transacting on large-scale data sets in real-time, orders of magnitude faster than possible with traditional disk-based or flash technologies. It is designed to deliver uncompromised performance for a wide set of in-memory computing use cases from high performance computing, to the industry most advanced data grid, highly available service grid, and streaming. See all features.
This component offers seven endpoints to cover much of Ignite's functionality:
- Ignite Cache.
- Ignite Compute.
- Ignite Messaging.
- Ignite Events.
- Ignite Sets.
- Ignite Queues.
- Ignite ID Generator.
To use this component, add the following dependency to your pom.xml
:
Code Block | ||||
---|---|---|---|---|
| ||||
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-ignite</artifactId>
<version>${camel.version}</version> <!-- use the same version as your Camel core version -->
</dependency>
|
...
Warning | ||
---|---|---|
| ||
If running in an OSGi container, please don't miss the OSGi Support section below. |
Table of contents
Table of Contents | ||||
---|---|---|---|---|
|
CACHE: Ignite Cache endpoint
This endpoint allows you to interact with an Ignite Cache:
Code Block |
---|
ignite:cache:cacheName?options ignite:compute:endpointId?options ignite:messaging:topicName?options ignite:events:endpointId?options ignite:sets:setName?options ignite:queue:queueName?options ignite:idgen:sequenceName?options?option1=value1&option2=value2... |
Offers both a Producer (to invoke cache operations on an Ignite cache) and a Consumer (to consume changes from a continuous query).
The cache value is always the body of the message, whereas the cache key is always stored in the IgniteConstants.IGNITE_CACHE_KEY
message header.
Even if you configure a fixed operation in the endpoint URI, you can vary it per-exchange by setting the IgniteConstants.IGNITE_CACHE_OPERATION
message header.
Options
Option | Type | Default value | Description | Compulsory | Consumer/producer |
---|---|---|---|---|---|
operation | IgniteCacheOperation enum | --- | Cache operation to perform. Possible values: GET, PUT, REMOVE, SIZE, REBALANCE, QUERY, CLEAR. | N | Producer |
failIfInexistentCache | boolean | false | Whether to fail the initialization if the cache doesn't exist. | N | Producer |
cachePeekMode | CachePeekMode (Ignite) enum | ALL | The cache peek mode used for SIZE operations. | N | Producer |
query | Query | --- | The query used for QUERY operations (as a producer) or to launch the continuous query (as a consumer). You can use a reference to a registry bean with a syntax like: ?query=#myQuery. | Y for the consumer / N for the producer | Producer (for QUERY operation) and Consumer |
remoteFilter | CacheEntryEventSerializableFilter | --- | An optional remote filter for the continuous query consumer. You can use a reference to a registry bean with a syntax like: ?remoteFilter=#myRemoteFilter. | N | Consumer |
oneExchangePerUpdate | boolean | true | Whether to send one exchange per cache update, even if multiple changes arrive in a group. | N | Consumer |
fireExistingQueryResults | boolean | false | When starting the continuous query consumer, whether to fire existing cache results. | N | Consumer |
autoUnsubscribe | boolean | ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE | Auto-unsubscribe flag on the Continuous Query (see Ignite docs). | N | Consumer |
pageSize | int | ContinuousQuery.DFLT_PAGE_SIZE | Page size on the Continuous Query (see Ignite docs). | N | Consumer |
timeInterval | long | ContinuousQuery.DFLT_TIME_INTERVAL | Time interval in millis on the Continuous Query (see Ignite docs). | N | Consumer |
Headers used
This endpoint uses the following headers:
Header name | Constant | Expected type | Description |
---|---|---|---|
| IgniteConstants.IGNITE_CACHE_KEY | String | The cache key for the entry value in the message body. |
CamelIgniteCacheQuery | IgniteConstants.IGNITE_CACHE_QUERY | Query | The query to run (producer) when invoking the QUERY operation. |
CamelIgniteCacheOperation | IgniteConstants.IGNITE_CACHE_OPERATION | IgniteCacheOperation enum | Allows you to dynamically change the cache operation to execute (producer). |
CamelIgniteCachePeekMode | IgniteConstants.IGNITE_CACHE_PEEK_MODE | CachePeekMode enum | Allows you to dynamically change the cache peek mode when running the SIZE operation. |
CamelIgniteCacheEventType | IgniteConstants.IGNITE_CACHE_EVENT_TYPE | int (EventType constants) | This header carries the received event type when using the continuous query consumer. |
CamelIgniteCacheName | IgniteConstants.IGNITE_CACHE_NAME | String | This header carries the cache name for which a continuous query event was received (consumer). It does not allow you to dynamically change the cache against which a producer operation is performed. Use EIPs for that (e.g. recipient list, dynamic router). |
CamelIgniteCacheOldValue | IgniteConstants.IGNITE_CACHE_OLD_VALUE | Object | This header carries the old cache value when passed in the incoming cache event (consumer). |
COMPUTE: Ignite Compute endpoint
This endpoint allows you to run compute operations on the cluster by passing in an IgniteCallable, an IgniteRunnable, an IgniteClosure, or collections of them, along with their parameters if necessary.
Code Block |
---|
ignite:compute:endpointId?executionType=...&option1=value1&option2=value2... |
This endpoint only supports producers.
The host part of the endpoint URI is a symbolic endpoint ID, it is not used for any purposes.
The endpoint tries to run the object passed in the body of the IN message as the compute job. It expects different payload types depending on the execution type.
Expected payload types
Each operation expects the indicated types:
Operation | Expected payloads |
---|---|
CALL | Collection of IgniteCallable, or a single IgniteCallable. |
BROADCAST | IgniteCallable, IgniteRunnable, IgniteClosure. |
APPLY | IgniteClosure. |
EXECUTE | ComputeTask, Class<? extends ComputeTask> or an object representing parameters if the taskName option is not null. |
RUN | A Collection of IgniteRunnables, or a single IgniteRunnable. |
AFFINITY_CALL | IgniteCallable. |
AFFINITY_RUN | IgniteRunnable. |
Options
Option | Type | Default value | Description | Compulsory | Consumer/producer |
---|---|---|---|---|---|
|
| --- | The compute operation to perform. Possible values: CALL, BROADCAST, APPLY, EXECUTE, RUN, AFFINITY_CALL, AFFINITY_RUN. The component expects different payload types depending on the operation. See above. | Y | Producer |
| ClusterGroupExpression | --- | An expression that returns the Cluster Group for the IgniteCompute instance. | N | Producer |
| String | --- | The task name, only used when performing the EXECUTE if a ComputeTask is not passed in the payload. | N | Producer |
| String | --- | The name for the IgniteCompute instances produced by this endpoint. | N | Producer |
| Long |
| The timeout for the compute job. | N | Producer |
Headers used
This endpoint uses the following headers:
Header name | Constant | Expected type | Description |
---|---|---|---|
| IgniteConstants.IGNITE_COMPUTE_EXECUTION_TYPE | IgniteComputeExecutionType enum | Allows you to dynamically change the compute operation to perform. |
| IgniteConstants.IGNITE_COMPUTE_PARAMS | Any object or Collection of objects. | Parameters for APPLY, BROADCAST and EXECUTE operations. |
| IgniteConstants.IGNITE_COMPUTE_REDUCER | IgniteReducer | Reducer for the APPLY and CALL operations. |
CamelIgniteComputeAffinityCacheName | IgniteConstants.IGNITE_COMPUTE_AFFINITY_CACHE_NAME | String | Affinity cache name for the AFFINITY_CALL and AFFINITY_RUN operations. |
| IgniteConstants.IGNITE_COMPUTE_AFFINITY_KEY | Object | Affinity key for the AFFINITY_CALL and AFFINITY_RUN operations. |
MESSAGING: Ignite Messaging endpoint
The Ignite Messaging endpoint allows you to send and consume messages from an Ignite topic.
Code Block |
---|
ignite:messaging:topicName?option1=value1&option2=value2... |
This endpoint supports producers (to send messages) and consumers (to receive messages).
Options
Option | Type | Default value | Description | Compulsory | Consumer/producer |
---|---|---|---|---|---|
clusterGroupExpression |
| --- | An expression that returns the Cluster Group for the IgniteMessaging instance. | N | Consumer and Producer |
sendMode |
| UNORDERED | The send mode to use. Possible values: | N | Producer |
timeout |
|
| The timeout for the send operation when using ordered messages. | N | Producer |
Headers used
This endpoint uses the following headers:
Header name | Constant | Expected type | Description |
---|---|---|---|
| IgniteConstants.IGNITE_MESSAGING_TOPIC | String | Allows you to dynamically change the topic to send messages to (producer). |
| IgniteConstants.IGNITE_MESSAGING_UUID | UUID | This header is filled in with the UUID of the subscription when a message arrives (consumer). |
EVENTS: Ignite Events endpoint
The Ignite Events endpoint allows you to receive events from the Ignite cluster by creating a local event listener.
Code Block |
---|
ignite:events:endpointId?option1=value1&option2=value2... |
This endpoint only supports consumers.
The Exchanges created by this consumer put the received Event
object into the body of the IN message.
Options
Option | Type | Default value | Description | Compulsory | Consumer/producer |
---|---|---|---|---|---|
| Set<Integer> or String |
| The event IDs to subscribe to. You can set a | N | Consumer |
| ClusterGroupExpression |
| An expression that returns the Cluster Group to be used by the IgniteEvents instance. | N | Consumer |
SETS: Ignite Sets endpoint
The Ignite Sets endpoint allows you to interact with Ignite Set data structures.
Code Block |
---|
ignite:sets:setName?option1=value1&option2=value2... |
This endpoint only supports producers.
Options
Option | Type | Default value | Description | Compulsory | Consumer/producer |
---|---|---|---|---|---|
|
| --- | The operation to invoke on the Ignite Set. | N | Producer |
| CollectionConfiguration | empty CollectionConfiguration | The configuration for this data structure. You can also specify inner properties by using the For example: | N | Producer |
Headers used
This endpoint uses the following headers:
Header name | Constant | Expected type | Description |
---|---|---|---|
| IgniteConstants.IGNITE_SETS_OPERATION | IgniteSetOperation enum | Allows you to dynamically change the set operation. |
QUEUE: Ignite Queue endpoint
The Ignite Queue endpoint allows you to interact with Ignite Queue data structures.
Code Block |
---|
ignite:queue:queueName?option1=value1&option2=value2...
|
This endpoint only supports producers.
Options
Option | Type | Default value | Description | Compulsory | Consumer/producer |
---|---|---|---|---|---|
|
| --- | The operation to invoke on the Ignite Queue. Superseded by the Possible values: | N | Producer |
| CollectionConfiguration | empty CollectionConfiguration | The configuration for this data structure. You can also specify inner properties by using the For example: | N | Producer |
timeoutMillis | Long | --- | The timeout in milliseconds to use when invoking the OFFER or POLL operations. | N | Producer |
capacity | int | --- (unbounded) | The queue capacity if you'd like to use a bounded queue. | N | Producer |
Headers used
This endpoint uses the following headers:
Header name | Constant | Expected type | Description |
---|---|---|---|
| IgniteConstants.IGNITE_QUEUE_OPERATION | IgniteQueueOperation enum | Allows you to dynamically change the queue operation. |
CamelIgniteQueueMaxElements | IgniteConstants.IGNITE_QUEUE_MAX_ELEMENTS | Integer or int | When invoking the DRAIN operation, the amount of items to drain. |
CamelIgniteQueueTransferredCount | IgniteConstants.IGNITE_QUEUE_TRANSFERRED_COUNT | Integer or int | The amount of items transferred as the result of the DRAIN operation. |
CamelIgniteQueueTimeoutMillis | IgniteConstants.IGNITE_QUEUE_TIMEOUT_MILLIS | Long or long | Dynamically sets the timeout in milliseconds to use when invoking the OFFER or POLL operations. |
IDGEN: Ignite ID Generator endpoint
The Ignite ID Generator endpoint allows you to interact with Ignite Atomic Sequences and ID Generators.
Code Block |
---|
ignite:idgen:sequenceName?option1=value1&option2=value2... |
This endpoint only supports producers.
Options
Option | Type | Default value | Description | Compulsory | Consumer/producer |
---|---|---|---|---|---|
|
|
| The operation to invoke on the Ignite ID Generator. Superseded by the Possible values: | N | Producer |
| CollectionConfiguration | empty CollectionConfiguration | The configuration for this data structure. You can also specify inner properties by using the For example: | N | Producer |
Initializing the Ignite component
Each instance of the Ignite component is associated with an underlying org.apache.ignite.Ignite
instance. You can interact with two Ignite clusters by initializing two instances of the Ignite component and binding them to different IgniteConfiguration
s. There are 3 ways to initialize the Ignite component:
...
IgniteComponent#fromIgnite(Ignite)
IgniteComponent#fromConfiguration(IgniteConfiguration)
IgniteComponent#fromInputStream(InputStream)
IgniteComponent#fromUrl(URL)
IgniteComponent#fromLocation(String)
IgniteComponent
with your chosen configuration technique.General options
All endpoints share the following options:
Option | Type | Default value | Description |
---|---|---|---|
propagateIncomingBodyIfNoReturnValue | boolean | true | If the underlying Ignite operation returns void (no return type), this flag determines whether the producer will copy the IN body into the OUT body. |
treatCollectionsAsCacheObjects | boolean | false |
Usage
As AMQP component is inherited from JMS component, the usage of the former is almost identical to the latter:
Code Block | ||||
---|---|---|---|---|
| ||||
// Consuming from AMQP queue
from("amqp:queue:incoming").
to(...);
// Sending message to the AMQP topic
from(...).
to("amqp:topic:notify"); |
Configuring AMQP component
Starting from the Camel 2.16.1 you can also use the AMQPComponent#amqp10Component(String connectionURI)
factory method to return the AMQP 1.0 component with the pre-configured topic prefix:
Code Block | ||||
---|---|---|---|---|
| ||||
AMQPComponent amqp = AMQPComponent.amqp10Component("amqp://guest:guest@localhost:5672"); |
Keep in mind that starting from the Camel 2.17 the AMQPComponent#amqp10Component(String connectionURI)
factory method has been deprecated on the behalf of the AMQPComponent#amqpComponent(String connectionURI)
:
Code Block | ||||
---|---|---|---|---|
| ||||
AMQPComponent amqp = AMQPComponent.amqpComponent("amqp://localhost:5672");
AMQPComponent authorizedAmqp = AMQPComponent.amqpComponent("amqp://localhost:5672", "user", "password"); |
Starting from Camel 2.17, in order to automatically configure the AMQP component, you can also add an instance of org.apache.camel.component.amqp.AMQPConnectionDetails
to the registry. For example for Spring Boot you just have to define bean:
Code Block | ||||
---|---|---|---|---|
| ||||
@Bean
AMQPConnectionDetails amqpConnection() {
return new AMQPConnectionDetails("amqp://lcoalhost:5672");
}
@Bean
AMQPConnectionDetails securedAmqpConnection() {
return new AMQPConnectionDetails("amqp://lcoalhost:5672", "username", "password");
} |
You can also rely on the Camel properties to read the AMQP connection details. Factory method AMQPConnectionDetails.discoverAMQP()
attempts to read Camel properties in a Kubernetes-like convention, just as demonstrated on the snippet below:
Code Block | ||||
---|---|---|---|---|
| ||||
export AMQP_SERVICE_HOST = "mybroker.com"
export AMQP_SERVICE_PORT = "6666"
export AMQP_SERVICE_USERNAME = "username"
export AMQP_SERVICE_PASSWORD = "password"
...
@Bean
AMQPConnectionDetails amqpConnection() {
return AMQPConnectionDetails.discoverAMQP();
}
|
Using topics
To have using topics working with camel-amqp
you need to configure the component to use topic://
as topic prefix, as shown below:
Code Block |
---|
<bean id="amqp" class="org.apache.camel.component.amqp.AmqpComponent">
<property name="connectionFactory">
<bean class="org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl" factory-method="createFromURL">
<constructor-arg index="0" type="java.lang.String" value="amqp://localhost:5672" />
<property name="topicPrefix" value="topic://" /> <!-- only necessary when connecting to ActiveMQ over AMQP 1.0 -->
</bean>
</property>
</bean> |
Some Ignite operations can deal with multiple elements at once, if passed a Collection. Enabling this option will treat Collections as a single object, invoking the operation variant for cardinality 1. |
OSGi Support
Anchor | ||||
---|---|---|---|---|
|
Info |
---|
Apache Ignite supports OSGi from version 1.5.0.final onwards. |
NOTES when installing on Apache Karaf:
- Installing the camel-ignite feature will require the Ignite feature repository to be present.
- You must have exported from the JRE (system bundle) some low-level, non-standard packages that Ignite requires.
Please refer to the OSGi section in the Ignite documentation for more informationKeep in mind that both AMQPComponent#amqpComponent()
methods and AMQPConnectionDetails
pre-configure the component with the topic prefix, so you don't have to configure it explicitly.
Include Page | ||||
---|---|---|---|---|
|